Kafka流将JSON数组转换为JSON对象



我有一个普通Java中的代码,用于将JSON数组转换为JSON对象,我需要将此普通Java转换为Kafka流...下面是我的

import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}
 catch (FileNotFoundException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ParseException ex) {
            ex.printStackTrace();
        } catch (NullPointerException ex) {
            ex.printStackTrace();
        }
    }
}

如果有人帮助我编写逻辑部分的代码,我可以与之相关,以下是我的逻辑部分,我需要此

的帮助
public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}

如何在Kafka流中编写相同的代码?有人可以帮助这样做吗?

我会推荐两个选项:

  • 您实现了自己的序列化器/求职者(这是Javadoc)
  • 您可以将流作为(字符串,字符串)的流处理,并使用flatmap解析流的每个元素,然后将其转换为(字符串,jsonobject)的流:

    JSONParser parser = new JSONParser();
    stringStream.flatMap((k,v) -> {
        List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>();
        JSONArray jsonArray = (JSONArray) parser.parse(v);
        for (Object o : jsonArray) {
             JSONObject snap = (JSONObject) o;
             tmp.add(new KeyValue(k, snap));
        }     
        return tmp;
    });
    

在这里,我没有将lambda的代码包装为尝试/捕获。

相关内容

  • 没有找到相关文章

最新更新