从Dataflow python作业写入bigquery中的分区表



当我从数据流写入bigquery中的分区表时,我会收到以下错误-有人能帮我吗?

无效的表ID\"test$20181126\"。表ID必须是字母数字(加下划线(,并且长度不得超过1024个字符。此外,不能使用表装饰器。

这是我用于编写的python片段

import apache_beam as beam

class bqwriter(beam.PTransform):
def __init__(self, table, schema):
super(BQWriter, self).__init__()
self.table = table
self.schema = schema
def expand(self, pcoll):
pcoll | beam.io.Write(beam.io.BigQuerySink(
self.table,
schema=self.schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))

我正在创建类似的选项卡

a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)

我也遇到了同样的问题。我的解决方案是:

  • 在数据中添加一个日期列,然后将BQ表设置为在上分区

  • 或者在BQ中的_PARTITIONTIME上设置默认分区。

这两个选项都意味着您只能插入test-123:test.test

至于我们是否应该做你想做的事,似乎是的。Beam JIRA声明他们为Java修复了它,但我找不到python的状态。

最好的方法是将函数传递给本机beam.io.WriteToBigQuery类:

def table_fn(element):
current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
return f"{bq_output_table}${current_date}"
user_parent_user_watchever_pcol | "Write to BigQuery" >> 
beam.io.Write(
beam.io.WriteToBigQuery(
table_fn,
schema=schemas.VIDEOCATALOG_SCHEMA,
method="STREAMING_INSERTS",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
)
)

最新更新