我正在研究一个Flink项目,并希望将源JSON字符串数据解析为Json对象。 我正在使用jackson-module-scala进行JSON解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(例如map
)。
以下是代码的一些示例,我无法理解它为什么会这样运行的原因。
情况1:
在这种情况下,我正在做 jackson-module-scala 的官方 exmaple 代码告诉我做的事情:
- 创建新
ObjectMapper
- 注册
DefaultScalaModule
DefaultScalaModule
是一个 Scala 对象,它包括对所有当前支持的 Scala 数据类型的支持。 - 调用
readValue
以解析要Map
的 JSON
我得到的错误是:org.apache.flink.api.common.InvalidProgramException:
Task not serializable
。
object JsonProcessing {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("xxx")
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
// execute and print result
counts.print()
env.execute("JsonProcessing")
}
}
情况2:
然后我做了一些谷歌,并提出了以下解决方案,其中registerModule
被移动到map
函数中。
val mapper = new ObjectMapper
val counts = text.map(l => {
mapper.registerModule(DefaultScalaModule)
mapper.readValue(l, classOf[Map[String, String]])
})
但是,我无法理解的是:为什么这会起作用,调用外部定义的对象mapper
的方法?是因为ObjectMapper
本身是可序列化的,如 ObjectMapper.java#L114 中所述?
现在,JSON 解析工作正常,但每次我都必须调用我认为可能会导致一些性能问题的mapper.registerModule(DefaultScalaModule)
(是吗?我还尝试了以下另一种解决方案。
情况3:
我创建了一个新的case class Jsen
,并将其用作相应的解析类,注册了 Scala 模块。而且它工作正常。
但是,如果您的输入 JSON 经常变化,则这并不那么灵活。管理类Jsen
是不可维护的。
case class Jsen(
@JsonProperty("a") a: String,
@JsonProperty("c") c: String,
@JsonProperty("e") e: String
)
object JsonProcessing {
def main(args: Array[String]) {
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[Jsen]))
...
}
此外,我还尝试在不调用registerModule
的情况下使用JsonNode
,如下所示:
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
...
它工作正常。
我的主要问题是:实际上是什么导致了任务无法在registerModule(DefaultScalaModule)
的引擎盖下序列化的问题?
如何确定您的代码在编码过程中是否可能导致此不可序列化的问题?
问题是 Apache Flink 被设计为分布式的。这意味着它需要能够远程运行您的代码。因此,这意味着所有处理函数都应该是可序列化的。在当前实现中,即使您不会在任何分布式模式下运行流式处理,也可以在构建流式处理时尽早确保这一点。这是一种权衡,具有明显的好处,即为您提供反馈,直到破坏此合同的行(通过异常堆栈跟踪)。
所以当你写的时候
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
你实际写的是这样的
val counts = text.map(new Function1[String, Map[String, String]] {
val capturedMapper = mapper
override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
})
这里重要的是,从外部上下文中捕获mapper
,并将其存储为必须序列化的Function1
对象的一部分。这意味着mapper
必须是可序列化的。杰克逊图书馆的设计者认识到了这种需求,并且由于映射器中没有任何根本上不可保护的东西,因此他们将其ObjectMapper
和默认Module
可序列化。不幸的是,Scala Jackson Module 的设计者错过了这一点,并通过使ScalaTypeModifier
和所有子类不可序列化来使他们的DefaultScalaModule
变得非常不可序列化。这就是为什么您的第二个代码有效而第一个代码无效的原因:"原始"ObjectMapper
是可序列化的,而具有预注册DefaultScalaModule
的ObjectMapper
则不是。
有几种可能的解决方法。可能最简单的一种是包装ObjectMapper
object MapperWrapper extends java.io.Serializable {
// this lazy is the important trick here
// @transient adds some safety in current Scala (see also Update section)
@transient lazy val mapper = {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
然后将其用作
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
这个lazy
技巧之所以有效,是因为尽管DefaultScalaModule
的实例不可序列化,但创建DefaultScalaModule
实例的函数是可序列化的。
更新:@transient呢?
如果我添加
lazy val
与@transient lazy val
?
这实际上是一个棘手的问题。lazy val
被编译成的内容实际上是这样的:
object MapperWrapper extends java.io.Serializable {
// @transient is set or not set for both fields depending on its presence at "lazy val"
[@transient] private var mapperValue: ObjectMapper = null
[@transient] @volatile private var mapperInitialized = false
def mapper: ObjectMapper = {
if (!mapperInitialized) {
this.synchronized {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapperValue = mapper
mapperInitialized = true
}
}
mapperValue
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
其中lazy val
上的@transient
会影响两个支持字段。所以现在你可以看到为什么lazy val
技巧有效:
在本地它之所以有效,是因为它延迟了
mapperValue
字段的初始化,直到第一次访问mapper
方法,因此在执行序列化检查时,该字段null
是安全的远程它可以工作,因为
MapperWrapper
是完全可序列化的,并且应该如何初始化lazy val
的逻辑被放入同一类的方法中(请参阅def mapper
)。
但请注意,AFAIK 这种lazy val
编译行为是当前 Scala 编译器的实现细节,而不是 Scala 规范的一部分。如果在以后的某个时候,类似于.NetLazy
的类将被添加到Java标准库中,Scala编译器可能会开始生成不同的代码。这很重要,因为它为@transient
提供了一种权衡。现在添加@transient
的好处是,它可以确保这样的代码也能正常工作:
val someJson:String = "..."
val something:Something = MapperWrapper.readValue(someJson:String, ...)
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
如果不@transient
上面的代码将失败,因为我们强制初始化lazy
支持字段,现在它包含一个不可序列化的值。有了@transient
这不是问题,因为该字段根本不会被序列化。
@transient
的一个潜在缺点是,如果 Scala 更改了生成lazy val
代码的方式并将字段标记为@transient
,则在远程工作场景中实际上可能不会反序列化。
object
还有一个技巧,因为对于object
来说,Scala编译器会生成自定义反序列化逻辑(覆盖readResolve
)以返回相同的单例对象。这意味着包括lazy val
的对象并没有真正反序列化,而是使用object
本身的值。这意味着object
内部@transient lazy val
比远程场景中的内部class
更面向未来。