从 jar 文件调用数据流进程,错误管道选项缺少名为 'gcpTempLocation' 的属性



要求

我们正在尝试使用executable jar file

启动数据流批处理过程

过程遵循

  • 通过使用SDK 2.2.0
  • 从此页面中遵循该指令创建一个应用程序
  • 使用maven命令mvn package
  • 生成了一个JAR文件
  • 使用此命令执行JAR文件 java -jar DataFlow-jobs-0.1.jar --tempLocation=gs://events-dataflow/tmp --gcpTempLocation=gs://events-dataflow/tmp --project=google-project-id --runner=DataflowRunner --BQQuery='select t1.user_id google-project-id.deve.user_info t1'

输出

Exception in thread "main" java.lang.IllegalArgumentException: Class interface org.apache.beam.sdk.options.PipelineOptions missing a property named 'gcpTempLocation'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1579)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:104)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:291)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.create(PipelineOptionsFactory.java:270)
at org.customerlabs.beam.WriteFromBQtoES.main(WriteFromBQtoES.java:98)

代码

pom.xml
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <appendAssemblyId>false</appendAssemblyId>
        <archive>
            <manifest>
                <mainClass>org.customerlabs.beam.WriteFromBQtoES</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <id>make-executable-jar</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
  </plugin>
writefrombqtoes.java
public class WriteFromBQtoES {
    private static DateTimeFormatter fmt =
        DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
    private static final Logger LOG = LoggerFactory.getLogger(WriteFromBQtoES.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    public interface Options extends PipelineOptions {
        @Description("Bigquery query to fetch data")
        @Required
        String getBQQuery();
        void setBQQuery(String value);
    }
    public static void main(String[] args) throws IOException{
        PipelineOptionsFactory.register(Options.class);
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
        Pipeline p = Pipeline.create(options);
        PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(options.getBQQuery()).usingStandardSql());
        tableRows.apply("WriteToCSV", ParDo.of(new DoFn<TableRow, String>() {
        // process WriteToCSV
        }))
        p.run();
    }
}
public static void main(String[] args) throws IOException{
   PipelineOptionsFactory.register(Options.class);
   Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
   String query = options.getBQQuery();
   Pipeline p = Pipeline.create(options);
   .....
   ..... pipeline operations.....
   .....
}

不确定我们缺少什么,我们有这个错误。我们在命令行中传递参数gcptemplocation。请帮助找出这个问题。预先感谢

我认为您想要的代替管道:

public interface Options extends DataflowPipelineOptions { ... }

gcptemplocation在gcpoptions.java中定义,并通过dataflowpipelineoptions.java进行扩展。

我遇到了相同的问题,只是我使用maven shade插件创建了一个Uber jar,其中包含应用程序所需的所有依赖项。用Apache Beam所需的参数执行JAR文件导致相同的错误,其中找不到-gcptemplocation。将以下代码块添加到您的pom.xml中,您可以使用maven shade包装Uber Jar文件,并解决丢失的参数问题。

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>${maven-shade-plugin.version}</version>
  <executions>
    <!-- Run shade goal on package phase -->
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <transformers>
          <!-- Required to ensure Beam Pipeline options can be passed properly. Without this, pipeline options will not be recognised -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
          <!-- add Main-Class to manifest file -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>NAME-OF-YOUR-MAIN-CLASS</mainClass>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

变压器线将确保可以通过命令行参数传递光束管道选项。将其添加到pom.xml后,运行MVN软件包,该软件包将在root/target中生成一个Uber Jar文件。之后,您可以使用以下命令执行JAR文件:

java -jar target/[your-jar-name].jar 
--runner=org.apache.beam.runners.dataflow.DataflowRunner 
--tempLocation=[GCS temp folder path] 
--stagingLocation=[GCS staging folder path]

最新更新