如何在flink中使用joda.time(或者如何使用typeutils.runtime.kryo)



在flink项目中,我使用case类单击。

case class click( date: LocalDateTime, stbId:String, channelId :Int)

这个类填充了DataSets,并且它工作得很好,日期是java8java.time.LocalDateTime。在java7环境中切换到org.joda(版本2.9)后,对DataSets中的click Objects的调用没有像以前那样执行。对点击对象的日期字段的某些函数的访问抛出了NullPointerExceptions。这些函数的例子是getHourOfDaytoString等。我能够确保单击类的日期字段不是空的。我怀疑joda时间库与kryo序列化的交互不好。请参阅joda DateTime格式导致spark RDD函数中的空指针错误或带有joda DateTime的spark中的NPE在Flink API中有org.apache.Flink.API.java.typeutils.runtime.kryo.Serializers,静态方法为registerJodaTime。这似乎是相关的。我很简单地尝试了

import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)

这还不够。我说的对吗?如何使用java.typeutils.runtime.kryo?

使用的版本Flink:0.9.1。scala:2.10 joda.time 2.9

跟进:以下是建议的确切添加代码(感谢Fabian和Robert)

val env = ExecutionEnvironment.getExecutionEnvironment
//import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)

在嵌入式执行的日志文件中,我可以找到以下相关部分:

16:44:53,998 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE
16:44:54,545 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream                                        - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO  org.apache.flink.api.java.typeutils.TypeExtractor                 - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$                        - accessedFields: Map()
16:44:57,369 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE

尽管如此,我还是目睹了以下

Exception in thread "main" java.lang.NullPointerException
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
    at java.lang.String.valueOf(Unknown Source)
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
    at myflink.click.toString(Ingestor.scala:20)
    ...

Flink对无法序列化的类型使用Kryo。LocalDateTime就是这样一个类。

遗憾的是,Kryo也无法正确地序列化它,所以我们必须告诉Kryo如何做到这一点,为此类提供一个专门的序列化程序。

  1. 添加de.javakaffee:kryo-serializers作为依赖项:
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.30</version>
</dependency>

(请注意,添加此依赖项可能会导致在集群上使用Flink时出现问题。请告诉我)

  1. 使用ExecutionEnvironment:
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])

我希望这有帮助(我保留旧答案作为参考)


Flink中调试Kryo/Serializer问题的一些一般说明:

当在本地执行作业时(也应该在./bin/flink前端工作,但输出可能在log/目录中),您应该看到以下内容:

14:05:52,863 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 15 registered types and 2 default Kryo serializers 
14:05:52,943 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster. 
14:05:53,150 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started

注册的类型和Kryo序列化程序的数量高于0。

使用DEBUG日志级别(在log4j.properties中将INFO替换为DEBUG),您实际上可以获得有关已注册序列化程序的更详细信息:

14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 

您应该在ExecutionEnvironment:的ExecutionConfig中注册joda序列化程序

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Serializers.registerJodaTime(env.getConfig());

希望这能有所帮助。

相关内容

  • 没有找到相关文章