在火花结构化流中处理二进制数据



我正在使用kafka和火花结构化流。我正在以以下格式收到kafka消息。

{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}

我像下面一样阅读它。

Dataset<String> data = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
      processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}
private void processData(String deviceId,int SNo, byte[] data) 
{
  //How to check previous processed Dataset???
} 

在我的JSON消息中,"数据"是字节的字符串形式[]。我有一个要求我需要按" sno"顺序为给定的" deviceID"处理二进制"数据"。因此,对于" DeviceID" =" 001",我必须处理" SNO" = 1,然后" SNO" = 2等的二进制数据,依此类推。如何在结构化流中检查以前处理的数据集的状态?

如果您正在寻找诸如dstream.mapwithstate之类的状态管理,则在结构化流中不支持它。工作正在进行中。请检查https://issues.apache.org/jira/browse/spark-19067。

最新更新