我正在尝试从kafka读取JSON数据并在Scala中进行处理。我是新手Flink和Kafka流媒体,因此请尝试通过提供解决方案代码来回答。我希望能够要将其转换为包含所有密钥的映射,值对。
map1.get(" fc196"(应该给我休眠,其中map1是包含键值对的地图
我面临的挑战是转换dataStream [objectNode],该数据是代码中的ST变量到钥匙值对的地图。我正在使用jsondeserializerschema。如果我使用简单的字符串架构,则会获得datastream [string]。我对替代建议开放。
kafka的输入格式:
{"FC196":"Dormant","FC174":"A262210940","FC195":"","FC176":"40","FC198":"BANKING","FC175":"AHMED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053575","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"afs","FC188":"BR08","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
代码:
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WordCount {
def main(args: Array[String]) {
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.*:9092")
properties.setProperty("zookeeper.connect", "***.**.*.*:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new JSONDeserializationSchema() , properties))
st.print()
env.execute()
}
}
更改后我的代码:
import java.util.Properties
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try
object WordCount{
def main(args: Array[String]) {
case class CC(key:String)
implicit val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
.map(_.extract[CC])
st.print()
env.execute()
}
}
,由于某种原因,我无法像您描述的
那样在flatmap中尝试错误:
INFO [main] (TypeExtractor.java:1804) - No fields detected for class org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:36)
at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
Process finished with exit code 1
这里需要处理两个任务:
- 将原始JSON有效载荷解析为某种形式的AST
- 将AST转换为可以使用的格式。
如果您使用SimpleStringschema,则可以选择一个不错的JSON解析器,并在简单的Flatmap操作员中删除JSON有效负载。
构建的一些依赖项.sbt
"org.json4s" %% "json4s-core" % "3.5.1",
"org.json4s" %% "json4s-native" % "3.5.1"
Scala中有十几个JSON图书馆可供选择,可以在这里找到一个不错的概述https://manuel.bernhardt.io/2015/2015/11/06/a-quick-tour--tour---tour-of-json-libraries-in-in-in-scala/
然后进行一些解析:
scala> import org.json4s.native.JsonMethods._
import org.json4s.native.JsonMethods._
scala> val raw = """{"key":"value"}"""
raw: String = {"key":"value"}
scala> parse(raw)
res0: org.json4s.JValue = JObject(List((key,JString(value))))
在此阶段,AST可用。可以将其转换为地图,如下:
scala> res0.values
res1: res0.Values = Map(key -> value)
请记住,JSON4S不会执行异常处理,因此可以抛出异常(当您从Kafka获取数据时,应该避免的事情最终会杀死您的工作(。
在Flink中,这看起来像这样:
env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => Try(JsonMethods.parse(raw).toOption) // this will discard failed instances, you should handle better, ie log
.map(_.values)
但是,我建议将您的数据表示为案例类。这将需要另一个步骤将AST映射到案例类。
在上面的示例中。
scala> implicit val formats = org.json4s.DefaultFormats
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@341621da
scala> case class CC(key: String)
defined class CC
scala> parse(raw).extract[CC]
res20: CC = CC(value)
或flink:
env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema(), properties))
.flatMap(raw => Try(JsonMethods.parse(raw).toOption)
.map(_.extract[CC])
更新:
只需将隐式格式移到主要方法之外:
Object WordCount {
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]) = {...}
}