如何使用气流导出bigquery到bigtable ?模式的问题



我正在使用Airflow以Avro格式提取BigQuery行到谷歌云存储。

with models.DAG(
"bigquery_to_bigtable",
default_args=default_args,
schedule_interval=None,
start_date=datetime.now(),
catchup=False,
tags=["test"],
) as dag:

data_to_gcs = BigQueryInsertJobOperator(
task_id="data_to_gcs",
project_id=project_id,
location=location,
configuration={
"extract": {
"destinationUri": gcs_uri, "destinationFormat": "AVRO",
"sourceTable": {
"projectId": project_id, "datasetId": dataset_id, 
"tableId": table_id}}})
gcs_to_bt = DataflowTemplatedJobStartOperator(
task_id="gcs_to_bt",
template="gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable",
location=location,
parameters={
'bigtableProjectId': project_id,
'bigtableInstanceId': bt_instance_id,
'bigtableTableId': bt_table_id,
'inputFilePattern': 'gs://export/test.avro-*'
},
)
data_to_gcs >> gcs_to_bt

bigquery行包含

row_key      | 1_cnt | 2_cnt | 3_cnt
1#2021-08-03 |   1   |   2   |   2 
2#2021-08-02 |   5   |   1   |   5 
.
.
.

我想在bigtable中使用row_key列作为行键,在bigtable中使用my_cf列作为特定列族中的列。

然而,当使用数据流加载avro文件到bigtable时,我得到了错误消息

"java.io.IOException: Failed to start reading from source: gs://export/test.avro-"
Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

我读的文档告诉我:

Bigtable表必须存在并且具有相同的列族

如何在Avro中导出相同列族的BigQuery ?

我认为您必须将AVRO转换为适当的模式。你提到的文件还说:

  • Bigtable期望从输入的Avro文件中获得一个特定的模式。

有一个指向特殊数据模式的链接,必须使用。

如果我理解正确的话,您只是从表中导入数据,虽然是AVRO模式,但不会太多的需求模式,所以您需要将数据转换为适合您的BigTable模式的适当模式。

最新更新