我有类似的json,{ "name":"someone", "job":"doctor", "etc":"etc"}在每个 json 中,“工作”都有不同的值,比如医生、飞行员、司机、守望者等。我想根据“工作”值分离每个 json,并将其存储在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已经尝试过 SplitStream 函数来执行此操作,但我必须指定这些值以匹配条件。public class MyFlinkJob { private static JsonParser jsonParser = new JsonParser(); private static String key_1 = "doctor"; private static String key_2 = "driver"; private static String key_3 = "pilot"; private static String key_default = "default"; public static void main(String args[]) throws Exception { Properties prop = new Properties(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", kafka); props.setProperty("group.id", "myjob"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props); DataStream<String> record = env.addSource(myConsumer).rebalance() SplitStream<String> split = record.split(new OutputSelector<String>() { @Override public Iterable<String> select(String val) { JsonObject json = (JsonObject)jsonParser.parse(val); String jsonValue = CommonFields.getFieldValue(json, "job"); List<String> output = new ArrayList<String>(); if (key_1.equalsIgnoreCase(jsonValue)) { } output.add("doctor"); } else if (key_2.equalsIgnoreCase(jsonValue)) { output.add("driver"); } else if (key_3.equalsIgnoreCase(jsonValue)) { output.add("pilot"); } else { output.add("default"); } return output; }});}假设如果任何其他值出现在“job”中,比如工程师或其他东西,并且我没有在类中指定,那么它会转到默认文件夹有没有办法根据“job”的值自动拆分这些 json 事件而不指定它和创建一个包含值名称的路径,例如 /home/enginerr?
1 回答
守候你守候我
TA贡献1802条经验 获得超10个赞
您想使用BucketingSink,它支持根据字段的值将记录写入单独的存储桶。我可能有一个 map 函数,它接收 JSON 字符串,对其进行解析并发出 a Tuple2<String, String>
,其中第一个元素是job
JSON 中字段的值,第二个元素是完整的 JSON 字符串。
添加回答
举报
0/150
提交
取消