无法从数据流中的GCS读取我的配置文本文件(列名称)



我在GCS中有一个源CSV文件(没有标题(以及标题配置CSV文件(仅包含列名(。我在 Bigquery 中也有静态表。我想使用列标题映射(配置文件(将源文件加载到静态表中。

我之前尝试过使用不同的方法(我维护在同一文件中包含标头和数据的源文件,然后尝试将标头与源文件拆分,然后使用标题列映射将这些数据插入 Bigquery。我注意到这种方法是不可能的,因为数据流将数据随机排列到多个工作节点中。所以我放弃了这种方法。

下面的代码我使用了硬编码的列名。我正在寻找从外部配置文件读取列名的方法(我想让我的代码动态(。

package com.coe.cog;
import java.io.BufferedReader;
import java.util.*;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
public class SampleTest {
private static final Logger LOG = LoggerFactory.getLogger(SampleTest.class);
public static TableReference getGCDSTableReference() {
TableReference ref = new TableReference();
ref.setProjectId("myownproject");
ref.setDatasetId("DS_Employee");
ref.setTableId("tLoad14");
return ref;
}
static class TransformToTable extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
String csvSplitBy = ",";
String lineHeader = "ID,NAME,AGE,SEX"; // Hard code column name but i want to read these header from GCS file.
String[] colmnsHeader = lineHeader.split(csvSplitBy); //Only Header array
String[] split = c.element().split(csvSplitBy); //Data section
TableRow row = new TableRow();

for (int i = 0; i < split.length; i++) {                                 
row.set(colmnsHeader[i], split[i]);
}
c.output(row);
// }
}
}
public interface MyOptions extends PipelineOptions {
/*
* Param
*
*/
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setTempLocation("gs://demo-bucket-data/temp");
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-bucket-data/Demo/Test/SourceFile_WithOutHeader.csv"));
PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));
rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())                                              
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
p.run();
}
}

源文件:

1,John,25,M 
2,Smith,30,M
3,Josephine,20,F

配置文件(仅限标头(:

ID,NAME,AGE,SEX

您有几个选择:

  1. 使用数据流/Beamside input将配置/头文件读取到某种集合中,例如 aArrayList。它将可供群集中的所有工作人员使用。然后,您可以使用side input通过DynamicDestinations将架构动态分配给 BigQuery 表。
  2. 在放入数据流管道之前,请直接调用 GCS API 以获取配置/头文件,对其进行解析,然后分析结果以设置管道。

使用Beam的FileSystemsAPI从GCS读取配置文件是另一种方法。

优势:

  • 不需要额外的依赖项,它包含在 beam API 中。
  • 使用 GCP 的客户端库可能会导致依赖项版本问题。
  • 我们可以在任何转换中使用 beam 的FileSystemsAPI。

这是读取文件的代码片段。

//filePath format: gs://bucket/file
public static String loadSchema(String filePath) {
MatchResult.Metadata metadata;
try {
metadata = FileSystems.matchSingleFileSpec(filePath);  // searching 
} catch (IOException e) {
throw new RuntimeException(e);
}
String schema;

try {
// reading file
schema = CharStreams.toString(
Channels.newReader(
FileSystems.open(metadata.resourceId()),
StandardCharsets.UTF_8.name()
)
);
} catch (IOException e) {
throw new RuntimeException(e);
}
// returning content as string. We can process it now. 
return schema;
} 

侧输入的缺点

  • 文件的方向更改。
  • 很难像 Json 和其他人那样解析多行文件。

侧输入可用于单行静态值。