如何在Python中使用apache beam分区Bigquery表?



我正在写一个连接到bigquery表的输出。表有一个日期列。我想按日期对表进行分区。但是,我认为没有选项来提供我们想要分区的字段。

我尝试了以下代码:

additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}

但是,它不是在dob上进行分区,而是按_PARTITIONTIME进行分区。

我也试过了,

additional_bq_parameters={'timePartitioning': {
'type': 'DAY',
'field': '_time'
}}

但是,它给出了错误。

我代码:

from apache_beam.io.gcp.internal.clients import bigquery
import apache_beam as beam
def retTuple(element):

thisTuple=element.split(',')
return (thisTuple[0],thisTuple[1:])
def jstr(cstr):
import datetime

left_dict=cstr[1]['dep_data']
right_dict=cstr[1]['loc_data']
for i  in left_dict:

for j in right_dict:
id,name,rank,dept,dob,loc,city=([cstr[0]]+i+j)

json_str={ "id":id,"name":name,"rank":rank,"dept":dept,"dob":datetime.datetime.strptime(dob, "%d-%m-%Y").strftime("%Y-%m-%d").strip("'"),"loc":loc,"city":city }
return json_str

table_spec = 'dotted-transit-351803:test_dataflow.inner_join'
table_schema = 'id:STRING,name:STRING,rank:INTEGER,dept:STRING,dob:DATE,loc:INTEGER,city:STRING'   
gcs='gs://dataflow4bigquery/temp/'

p1 = beam.Pipeline()
# Apply a ParDo to the PCollection "words" to compute lengths for each word.
dep_rows = ( 
p1
| "Reading File 1" >> beam.io.ReadFromText('dept_data.txt')
| 'Pair each employee with key' >> beam.Map(retTuple)          # {149633CM : [Marco,10,Accounts,1-01-2019]}

)

loc_rows = ( 
p1
| "Reading File 2" >> beam.io.ReadFromText('location.txt') 
| 'Pair each loc with key' >> beam.Map(retTuple)                # {149633CM : [9876843261,New York]}
)

results = ({'dep_data': dep_rows, 'loc_data': loc_rows} 

| beam.CoGroupByKey()
| beam.Map(jstr)
|  beam.io.WriteToBigQuery(
custom_gcs_temp_location=gcs,
table=table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
additional_bq_parameters={'timePartitioning': {
'type': 'DAY',
'field': 'dob'
}}

)
)



p1.run().wait_until_finish()

错误是:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_378_c4a563c648634e9dbbf7be3a56578b6d_bfcd0c33602b47deae6a351f72edc0cb failed. Error Result: <ErrorProto
message: 'Incompatible table partitioning specification. Expects partitioning specification none, but input partitioning specification is interval(type:day,field:dob)'
reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']

当传递additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}而不指定field时,使用_PARTITIONTIME的默认字段。这正是https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning(从Beam的pydoc链接)上的文档所描述的。

当您传递{'timePartitioning': {'type': 'DAY', 'field': 'dob'}时,错误消息表明该参数按预期传递:负载作业基于dob字段进行分区,并使用DAY分区。但是,这个表已经有了none分区,因此出现了错误。

最新更新