部分代码:
implicit val formats = Serialization.formats(NoTypeHints)
case class DataClass(id: String, name: String)
val dataSource = env
.addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
.uid(s"data-stream-$stage-source-id").name("dataSource")
.map(json => read[DataClass](json))
在这里,我从驱动蛋白流中获取数据,并将其序列化到我的数据类中。一切都很好,但现在需要增加以一种额外格式(例如DataClassSecond
(接收数据的能力
其中一个选项是,添加一个额外的数据源,并在您自己的流中处理它们。
但这需要一个额外的驱动器队列。我不确定这是否是个好办法有没有任何方法可以从驱动蛋白中接收不同的数据,然后根据类型划分流?
您可以尝试基于字段filter
和DataStream[String]
,这样您将获得两个或多个仅包含具有正确JSON格式的元素的流。
因此,最简单的方法是:
val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))
只有当name
和surname
对于每个DataClass
是唯一的时,这才会起作用。一个更有效的方法可能是先将map
和DataStream
转换为某种常见格式,或者使用类似Either
的东西作为反序列化结果,然后检查它是否成功。