在 Beam Spark 运行器中为 Kryo 序列化注册自定义类



我已经看到Beam Spark runner使用BeamSparkRunnerRegistrator进行kryo注册。有没有办法注册自定义用户类?

有一种方法可以做到这一点,但首先,请问你为什么要这样做?

一般来说,Beam的Spark运行器使用Beam编码器来序列化用户数据。

我们目前有一个错误,其中缓存的DStream正在使用 Kryo 序列化,如果用户类不可序列化 Kryo 这将失败。光束-2669。我们目前正在尝试解决此问题。

如果这是您面临的问题,您目前可以使用 Kryo 的注册器解决此问题。这是您面临的问题吗?或者您这样做还有其他原因,请告诉我。

无论如何,以下是如何使用SparkContextOptions向 Beam 的 Spark 运行器提供自己的自定义JavaSparkContext实例

SparkConf conf = new SparkConf();
conf.set("spark.serializer", KryoSerializer.class.getName());
conf.set("spark.kryo.registrator", "my.custom.KryoRegistrator");
JavaSparkContext jsc = new JavaSparkContext(..., conf);
SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
Pipeline p = Pipeline.create(options);

有关详细信息,请参阅:

光束火花流道文档

示例:提供SparkContextTest.java

使用此自定义序列化程序创建自己的KryoRegistrator

package Mypackage
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[A], new CustomASerializer())
}}

然后,使用注册人的完全限定名称添加有关它的配置条目,例如 Mypackage.MyRegistrator:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "Mypackage.KryoRegistrator")

请参阅文档:数据序列化 Spark

如果你不想注册你的类,Kryo 序列化仍然有效,但它必须为每个对象存储完整的类名,这是浪费。

最新更新