如何在来自 SBT(本地)的数据流上运行 Scio 管道



我正在尝试在数据流上运行我的第一个Scio管道。

有问题的代码可以在这里找到。但是,我认为这并不太重要。
我的第一个实验是读取一些本地CSV文件,并使用DirecRunner编写另一个本地CSV文件。这符合预期。

现在,我正在尝试从GCS读取文件,将输出写入BigQuery并使用DataflowRunner运行管道。我已经进行了所有必要的更改(或者这就是我相信的(。但我无法让它运行。

我已经gcloud auth application-default login了,当我这样做时

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table

我可以看到 Jb 是在数据流中提交的。但是,一小时后,作业将失败,并显示以下错误消息。

工作流失败。原因:数据流作业似乎停滞不前,因为在过去 1 小时内未看到辅助角色活动。

(请注意,这项工作在这段时间里什么也没做,而且由于这是一个实验,数据太小了,不能超过几分钟(。

检查堆栈驱动程序我可以找到以下错误:

java.lang.ClassNotFoundException: scala.collection.Seq

一些杰克逊的事情有关:

java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: Provider com.fasterxml.jackson.module.scala.DefaultScalaModule 无法实例化

这就是一开始就杀死每个执行者的原因。我真的不明白为什么我找不到 Scala 标准库。

我还尝试先创建一个模板,然后用以下命令运行它:

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table --stagingLocation=gs://path/to/staging --templateLocation=gs://path/to/templates/template-1

但是,运行模板后,我收到相同的错误。
另外,我注意到在暂存文件夹中有很多罐子,但scala-library.jar不在那里。

我错过了一些明显的东西?

这是 sbt 1.3.0 的一个已知问题,它引入了一些 w.r.t.class 加载程序的重大更改。试试1.2.8?

此外,杰克逊问题可能与Java 11或更高版本有关。暂时使用 Java 8。

通过设置 sbtclassLoaderLayeringStrategy来修复:

run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat

SBT 对使用run运行的应用程序使用新的类装入器。这会导致 JVM 已加载的其他类(例如 Predef(被重用,从而缩短启动时间。有关详细信息,请参阅进程内类装入器。

这不适用于 Beam DataflowRunner,因为它明确地不从父类加载器暂存类,请参阅 PipelineResources.java#L51:

尝试检测类装入器有权访问的所有资源。这不会递归于类装入器父级,阻止它从系统类装入器拉入资源。

因此,解决方法是强制应用程序使用的所有类加载到同一个类加载器中,以便 DataflowRunner 暂存所有内容。

希望有帮助

最新更新