如何在ApacheBeam中序列化运行时创建的类



我有一个apachebeam应用程序,它使用direct runner在本地运行管道,也使用dataflow runner在谷歌云中运行管道。它在本地工作,但无法通过谷歌数据流运行程序。

以下是错误跟踪:

(9938ce94c0752c7):java.lang.RuntimeException:com.google.cloud.dataflow.worker.repackaged.comgoogle.common.util.courrent.UncheckedExecutionException:java.lang.IollegalArgumentException:无法反序列化序列化的DoFnInfo网址:com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:283)网址:com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:253)网址:com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)网址:com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)网址:com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)网址:com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutiorFactory.java:142)网址:com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271)网址:com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker:java:244)网址:com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)在com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)在com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)位于java.util.concurrent.FFutureTask.run(FutureTask.java:266)位于java.util.concurrent.ThreadPoolExecutiator.runWorker(ThreadPoolExecutiator.java:1142)位于java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)
由以下原因引起:com.google.cloud.dataflow.worker.repackaged.comgoogle.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalArgumentException:无法反序列化序列化的DoFnInfo在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$Segon.get(LocalCache.java:2214)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocaCache.get(LocalCache.java:4053)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)网址:com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoF工厂.java:95)网址:com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoF工厂.java:66)网址:com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutitorFactory.java:360)网址:com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:271)…14更多
由以下原因引起:java.lang.IollegalArgumentException:无法反序列化序列化的DoFnInfo位于org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)网址:com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDofFnFactory.java:64)网址:com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:100)网址:com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:97)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)网址:com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocaCache$LoadingValueReference.loadFuture(LocalCache.java:3628)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$Seage.loadSync(LocalCache.java:2336)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$Segon.lockedGetOrLoad(LocalCache.java:2295)在com.google.cloud.dataflow.worker.repackaged.com.google.commmon.cache.LocalCache$Segon.get(LocalCache.java:2208)…还有20个
原因:java.lang.ClassNotFoundException:Header_H位于java.net.URLClassLoader.findClass(URLClassLoader.java:381)位于java.lang.ClassLoader.loadClass(ClassLoader.java:424)在sun.mic.Launcher$AppClassLoader.loadClass(Launcher.java:331)位于java.lang.ClassLoader.loadClass(ClassLoader.java:357)位于java.lang.Class.forName0(本机方法)位于java.lang.Class.forName(Class.java:348)位于java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)位于java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)位于java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)位于java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486)位于java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336)位于java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)位于java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)位于java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)位于java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)位于java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)位于java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)位于java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)位于java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)位于java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)位于org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:72)…还有28个

它指向

"…无法反序列化序列化的DoFnInfo">

"…java.lang.ClassNotFoundException:Header_H">

我怀疑这与我使用字节码创建类Header_H有关。我使用bytebuddy在现有源代码中的some.class和运行时配置文件中的额外用户输入的基础上构建了一个子类,即Header_H仅在运行时可用。

我的字节码有点像这样:

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
.defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
.implement(Serializable.class);
Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();

然后clazz(在本例中为Header_H)将被传递到数据流中的管道。当我在临时谷歌云后台位置检查jar文件的内容时,我看到了some.class,但没有看到Header_H.class,这可能会导致错误"ClassNotFoundException"。

所以,如果我的推理是正确的,那么考虑到我在类创建中有implement(Serializable.class),我如何让Beam将运行时创建的类放置在要发送到数据流运行程序的jar文件中?

Byte Buddy可以通过:在jar文件中注入一个类

DynamicType.Unloaded<?> type = builder.make();
builder.inject(someJar);

这将更改现有的jar文件以包含动态生成的类。通过这种方式,您可以更改已经在系统类路径上的现有jar。

这个API还允许您创建一个新的jar,并且您可以使用InstrumentationAPI(通过Java代理),它允许您将这个类作为一个新jar文件附加到类路径。为了避免附加代理,您还可以尝试使用字节好友代理项目进行动态附件。

这将通过以下方式工作:

File someFolder = ...
File jar = builder.saveIn(someFolder);
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar));

如果谷歌云上不允许动态附件,您可以通过命令行上的常规附件来解决这个问题。

数据流运行程序不控制JAR文件的内容,它只解析程序的类路径,从磁盘读取JAR,并将它们复制到GCS上管道的暂存目录。目前,Beam没有提供一种方法来运送类路径上的JAR中不包含的类。

您可能需要找到一种方法,在管道规范中只使用那些JAR中的类,但是您当然仍然可以在DoFn或其他在本地工作程序上运行的代码中使用ByteBuddy。但请注意,将在工作进程之间传送的任何内容(例如PCollection的内容)仍然必须是可序列化的(在一个工作进程上可序列化,在另一个工作程序上可反序列化)或具有Coder。

或者,可能有一种方法可以让ByteBuddy生成一个JAR,并将其动态添加到程序的类路径中。这可能会奏效,但这是一个特定于ByteBuddy的问题,我对ByteBuddy不够熟悉,无法告诉我如何做到这一点。

最新更新