我有一个普通Java中的代码,用于将JSON数组转换为JSON对象,我需要将此普通Java转换为Kafka流...下面是我的
import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
catch (FileNotFoundException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ParseException ex) {
ex.printStackTrace();
} catch (NullPointerException ex) {
ex.printStackTrace();
}
}
}
如果有人帮助我编写逻辑部分的代码,我可以与之相关,以下是我的逻辑部分,我需要此
的帮助public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
如何在Kafka流中编写相同的代码?有人可以帮助这样做吗?
我会推荐两个选项:
- 您实现了自己的序列化器/求职者(这是Javadoc)
-
您可以将流作为(字符串,字符串)的流处理,并使用flatmap解析流的每个元素,然后将其转换为(字符串,jsonobject)的流:
JSONParser parser = new JSONParser(); stringStream.flatMap((k,v) -> { List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>(); JSONArray jsonArray = (JSONArray) parser.parse(v); for (Object o : jsonArray) { JSONObject snap = (JSONObject) o; tmp.add(new KeyValue(k, snap)); } return tmp; });
在这里,我没有将lambda的代码包装为尝试/捕获。