如何使用 Flink 流式传输 JSON



我实际上正在处理一个流,接收一堆字符串,需要对所有字符串进行计数。 总和被加重,这意味着对于第二条记录,总和被添加到前一天输出必须是看起来像

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

我创建了一个看起来像这样的流:

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

提前感谢您的帮助:)

关于在 Flink 中使用 JSON 的说明:

使用 JSONDeserializationSchema 反序列化事件,这将产生ObjectNode。为方便起见,您可以将ObjectNode映射到YourObject,也可以继续使用ObjectNode

使用ObjectNode教程:http://www.baeldung.com/jackson-json-node-tree-model

回到您的案例,您可以像下面这样操作:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
    .addSource(source)
    .windowAll()
    .TimeWindow(Time.minutes(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)

将输出 1 分钟聚合的流

[     
      {
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }
]

然后将另一个运算符链接到"oneMinuteAgg",该运算符会将 1 分钟聚合添加到 1 天聚合中:

[...]
oneMinuteAgg
        .windowAll()
        .TimeWindow(Time.days(1))
        .trigger(new Whatever)
        .aggregation(new YourDayAggF)

这将输出您需要的内容

{
    "aggregationType" : "day"
    "days before" : 4
    "aggregates : [{
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }]
}

我曾经windowAll()假设您不需要对流进行键控。

相关内容

  • 没有找到相关文章

最新更新