我需要通过 apache-beam 将数据从 pub/sub 写入 Bigquery。代码如下所示:
import argparse
import base64
import logging
import json
from datetime import datetime
from ast import literal_eval
from google.cloud import bigquery
import apache_beam as beam
class DataIngestion(beam.DoFn):
@classmethod
def parse_method(cls, string_input):
"""
:param string_input:
:return:
"""
try:
pubsub_message = literal_eval(string_input.data.decode('utf8'))
process_data = pubsub_message['data']
print('- ' * 20)
regex = 'n'
for i in process_data.split(regex)[:-1]:
d = eval(i)
d['dt'] = "{}".format(datetime.utcnow().strftime('%Y-%m-%d'))
return json.loads(d)
except Exception as e:
logger.exception(e)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_subscription', required=False,
help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription_name>".',
default='projects/subscriptions/client_sub')
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='project:fm.client_event')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription, with_attributes=True)
table_info = 'project:fm.client_event'
table_schema = 'app_version:STRING, build:INTEGER, channel:STRING, client_ip:STRING, client_ip:STRING...'
transformed = (
lines
| 'String to BigQuery Row' >> beam.Map(lambda s: DataIngestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table_info,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
transformed.run().wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
run()
此作业尝试将数据拆分并解析为单行,并使用beam.io.WriteToBigQuery
,这会导致以下堆栈跟踪:
Traceback (most recent call last):
File "./bigquery_io_write.py", line 68, in <module>
run()
File "./bigquery_io_write.py", line 57, in run
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
TypeError: unhashable type: 'dict'
奇怪的是我没有向WriteToBigQuery
传递任何字典,并且我尝试修改 pub/sub 数据的模式,甚至删除create_disposition
和write_disposition
。 看起来某个参数的传输在某处出了问题,我已经验证了它与parse_method
函数的数据格式无关,该作业在执行该步骤之前就已经失败了。
parse_method
返回调用json.loads(d)
的结果。根据Python的文档,json.loads
返回一个字典。
此外,return 语句位于 for 循环内,因此您只分析每条消息中的第一行。您想改用发电机吗?