我首次使用Apache Flink和AWS Kineses。基本上,我的目标是从运动流进行转换传入数据,以使我可以执行简单的转换,例如过滤和聚集。
我使用以下内容添加源:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
最终,当我打印传入流时,我会按预期获得JSON数据:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();
这是印刷品的示例结果:
{" event_num":" 5530"," timestmap":" 2019-03-04 14:29:44.8882376", "量":" 80.4","类型":"购买"} {" event_num":" 5531"," timestmap":" 2019-03-04 14:29:44.8881379","量":" 11.98"," type":" service"}
有人可以启发我如何以一种可以执行简单转换的方式访问这些JSON元素,例如,仅选择包含"服务"的记录作为类型?
当您使用SimpleStringSchema
时,所得的事件流是String
类型。因此,您需要先解析字符串,然后可以应用过滤器等。
您可能想看看jsonnodedeserializationschema,它将产生ObjectNode
。