从带有头的postgres使用TextIO.Write()写入GCS



我正在GCP数据流上运行一个管道,在那里我从SQL实例中读取数据,收集PCollection中的数据,然后将该PCollection写入CSV文件。在写入CSV时,我似乎无法在运行时传递标头(作为值提供程序(,因为这里给出了标头必须是字符串参数。

我尝试过给出一个空字符串并在运行时更新该字符串,但它不起作用。我只将第一个空字符串作为标题。

有没有任何方法可以在内部生成标头并将该字符串作为标头,或者我可以将标头作为运行时参数传递?

在下方附加textio代码

String header = /*header*/;
PCollection<String> output = /*jdbc result*/;
output
.apply(
"Write File(s)",
TextIO.write()
.to(options.getFilePath())
.withSuffix(".csv")
.withHeader(header)
.withShardNameTemplate("-S-of-N")
.withTempDirectory(options.getTempDirectory()))

我不明白这个问题,我认为你可以将程序参数传递为String:

--header=test

Java代码中的选项:

public interface MyOptions extends PipelineOptions {
@Description("Header")
String getHeader();
void setHeader(String value);
}

然后在withHeader(header)方法中传递:

output
.apply(
"Write File(s)",
TextIO.write()
.to(options.getFilePath())
.withSuffix(".csv")
.withHeader(options.getHeader())
.withShardNameTemplate("-S-of-N")
.withTempDirectory(options.getTempDirectory()))

如果需要,也可以在代码外部配置header

当前withHeader是一个必须在构造时指定的参数,因此无法使用PCollection元素值提供它。

您可以通过将管道分解为两个管道,或者从Beam管道启动的位置生成/发现程序中的头值来实现这一点。

最新更新