要求
我们正在尝试使用executable jar file
过程遵循
- 通过使用SDK 2.2.0 从此页面中遵循该指令创建一个应用程序
- 使用maven命令
mvn package
生成了一个JAR文件 - 使用此命令执行JAR文件
java -jar DataFlow-jobs-0.1.jar --tempLocation=gs://events-dataflow/tmp --gcpTempLocation=gs://events-dataflow/tmp --project=google-project-id --runner=DataflowRunner --BQQuery='select t1.user_id google-project-id.deve.user_info t1'
输出
Exception in thread "main" java.lang.IllegalArgumentException: Class interface org.apache.beam.sdk.options.PipelineOptions missing a property named 'gcpTempLocation'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1579)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:104)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:291)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.create(PipelineOptionsFactory.java:270)
at org.customerlabs.beam.WriteFromBQtoES.main(WriteFromBQtoES.java:98)
代码
pom.xml <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>org.customerlabs.beam.WriteFromBQtoES</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-executable-jar</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
writefrombqtoes.java public class WriteFromBQtoES {
private static DateTimeFormatter fmt =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private static final Logger LOG = LoggerFactory.getLogger(WriteFromBQtoES.class);
private static final ObjectMapper mapper = new ObjectMapper();
public interface Options extends PipelineOptions {
@Description("Bigquery query to fetch data")
@Required
String getBQQuery();
void setBQQuery(String value);
}
public static void main(String[] args) throws IOException{
PipelineOptionsFactory.register(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(options.getBQQuery()).usingStandardSql());
tableRows.apply("WriteToCSV", ParDo.of(new DoFn<TableRow, String>() {
// process WriteToCSV
}))
p.run();
}
}
public static void main(String[] args) throws IOException{
PipelineOptionsFactory.register(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
String query = options.getBQQuery();
Pipeline p = Pipeline.create(options);
.....
..... pipeline operations.....
.....
}
不确定我们缺少什么,我们有这个错误。我们在命令行中传递参数gcptemplocation。请帮助找出这个问题。预先感谢
我认为您想要的代替管道:
public interface Options extends DataflowPipelineOptions { ... }
gcptemplocation在gcpoptions.java中定义,并通过dataflowpipelineoptions.java进行扩展。
我遇到了相同的问题,只是我使用maven shade插件创建了一个Uber jar,其中包含应用程序所需的所有依赖项。用Apache Beam所需的参数执行JAR文件导致相同的错误,其中找不到-gcptemplocation。将以下代码块添加到您的pom.xml中,您可以使用maven shade包装Uber Jar文件,并解决丢失的参数问题。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- Required to ensure Beam Pipeline options can be passed properly. Without this, pipeline options will not be recognised -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>NAME-OF-YOUR-MAIN-CLASS</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
变压器线将确保可以通过命令行参数传递光束管道选项。将其添加到pom.xml后,运行MVN软件包,该软件包将在root/target中生成一个Uber Jar文件。之后,您可以使用以下命令执行JAR文件:
java -jar target/[your-jar-name].jar
--runner=org.apache.beam.runners.dataflow.DataflowRunner
--tempLocation=[GCS temp folder path]
--stagingLocation=[GCS staging folder path]