错误:使用云数据流写入 BigQuery 时不可哈希的类型'dict'



我需要通过 apache-beam 将数据从 pub/sub 写入 Bigquery。代码如下所示:

import argparse
import base64
import logging
import json
from datetime import datetime
from ast import literal_eval
from google.cloud import bigquery
import apache_beam as beam

class DataIngestion(beam.DoFn):
@classmethod
def parse_method(cls, string_input):
"""
:param string_input:
:return:
"""
try:
pubsub_message = literal_eval(string_input.data.decode('utf8'))
process_data = pubsub_message['data']
print('- ' * 20)
regex = 'n'
for i in process_data.split(regex)[:-1]:
d = eval(i)
d['dt'] = "{}".format(datetime.utcnow().strftime('%Y-%m-%d'))
return json.loads(d)
except Exception as e:
logger.exception(e)

def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_subscription', required=False,
help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription_name>".',
default='projects/subscriptions/client_sub')
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='project:fm.client_event')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription, with_attributes=True)
table_info = 'project:fm.client_event'
table_schema = 'app_version:STRING, build:INTEGER, channel:STRING, client_ip:STRING, client_ip:STRING...'
transformed = (
lines
| 'String to BigQuery Row' >> beam.Map(lambda s: DataIngestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table_info,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
transformed.run().wait_until_finish()

if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
run()

此作业尝试将数据拆分并解析为单行,并使用beam.io.WriteToBigQuery,这会导致以下堆栈跟踪:

Traceback (most recent call last):
File "./bigquery_io_write.py", line 68, in <module>
run()
File "./bigquery_io_write.py", line 57, in run
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
TypeError: unhashable type: 'dict'

奇怪的是我没有向WriteToBigQuery传递任何字典,并且我尝试修改 pub/sub 数据的模式,甚至删除create_dispositionwrite_disposition。 看起来某个参数的传输在某处出了问题,我已经验证了它与parse_method函数的数据格式无关,该作业在执行该步骤之前就已经失败了。

parse_method返回调用json.loads(d)的结果。根据Python的文档,json.loads返回一个字典。

此外,return 语句位于 for 循环内,因此您只分析每条消息中的第一行。您想改用发电机吗?

最新更新