为了账号安全,请及时绑定邮箱和手机立即绑定

如何根据json中的特定键将接收器从一个数据流添加到不同的路径?

如何根据json中的特定键将接收器从一个数据流添加到不同的路径?

哆啦的时光机 2022-10-20 17:24:25
我有类似的json,{  "name":"someone",  "job":"doctor",  "etc":"etc"}在每个 json 中,“工作”都有不同的值,比如医生、飞行员、司机、守望者等。我想根据“工作”值分离每个 json,并将其存储在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已经尝试过 SplitStream 函数来执行此操作,但我必须指定这些值以匹配条件。public class MyFlinkJob {       private static JsonParser jsonParser = new JsonParser();    private static String key_1 = "doctor";    private static String key_2 = "driver";    private static String key_3 = "pilot";    private static String key_default = "default";    public static void main(String args[]) throws Exception {        Properties prop = new Properties();        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties props = new Properties();        props.setProperty("bootstrap.servers", kafka);        props.setProperty("group.id", "myjob");        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);        DataStream<String> record = env.addSource(myConsumer).rebalance()        SplitStream<String> split = record.split(new OutputSelector<String>() {            @Override            public Iterable<String> select(String val) {                JsonObject json = (JsonObject)jsonParser.parse(val);                String jsonValue = CommonFields.getFieldValue(json, "job");                List<String> output = new ArrayList<String>();                if (key_1.equalsIgnoreCase(jsonValue)) {            }                    output.add("doctor");                } else if (key_2.equalsIgnoreCase(jsonValue)) {                    output.add("driver");                } else if (key_3.equalsIgnoreCase(jsonValue)) {                    output.add("pilot");                } else {                    output.add("default");                }                return output;            }});}假设如果任何其他值出现在“job”中,比如工程师或其他东西,并且我没有在类中指定,那么它会转到默认文件夹有没有办法根据“job”的值自动拆分这些 json 事件而不指定它和创建一个包含值名称的路径,例如 /home/enginerr?
查看完整描述

1 回答

?
守候你守候我

TA贡献1802条经验 获得超10个赞

您想使用BucketingSink,它支持根据字段的值将记录写入单独的存储桶。我可能有一个 map 函数,它接收 JSON 字符串,对其进行解析并发出 a Tuple2<String, String>,其中第一个元素是jobJSON 中字段的值,第二个元素是完整的 JSON 字符串。



查看完整回答
反对 回复 2022-10-20
  • 1 回答
  • 0 关注
  • 90 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信