Apache Beam将字典加载到BigQuery



我在使用Apache Beam将数据加载到BiqQuery中时遇到问题。代码正在进行一个API调用,返回应该是字典的行(如下所示(。我的理解是,然后我应该考虑做json.dumps()json.loads(),以便制作一些可以迭代的东西,以便传递到BQ中。然而,每当我尝试这种方法时,我都会遇到无法迭代str的问题,因为它在这一点上不应该是一个字符串,这会让我认为有些东西处理不正确,但我不确定它可能是什么。我对Python没有太多经验,所以我不确定该怎么办。

{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"1234"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"5678"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}

管道代码如下:

class callAPI(beam.DoFn):
def __init__(self, input_header):
self.headers = input_header
# self.remote_url = input_uri

def process(self, input_uri):
try:
res = requests.get(input_uri, headers=self.headers) 
res.raise_for_status()
except HTTPError as message:
logging.error(message)
return

data = json.loads(json.dumps(res.text))
yield data 

def run():
with beam.Pipeline() as p:
data = ( p 
| beam.Create([REMOTE_URI])
| 'Call API ' >> beam.ParDo(callAPI(HEADER)) 
| 'Write to BQ ' >> beam.io.WriteToBigQuery(
table = table_name1,
schema = table_schema,
method="STREAMING_INSERTS",
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED  ,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

错误:

AttributeError: 'str' object has no attribute 'items' [while running 'Write to BQ /_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']

尝试以下片段来拆分JSON:-

class Transaction(beam.DoFn):
def process(self, element):
result = json.loads(element)

data_id =  result['id']
data_source = result['source']
data_country =  result['country']
data_region = result['region']
data_exampleKey =  result['exampleKey']
data_name = result['name']
data_code =  result['code']
data_currency = result['currency']
data_status = result['status']

return [{"id": data_id,"source": data_source,"country": data_country,"region": data_region, "exampleKey": data_exampleKey, "name": data_name, "code": data_code, "currency": data_currency, "status": data_status}]

我仔细查看了我所使用的结构,因为我意识到在处理数据之前,我需要将数据放在列表中。我添加了以下内容,这使我可以处理要加载到BigQuery中的数据。

class convertJson(beam.DoFn):
def process (self, res):
data_list = []
for i in res.splitlines(): 
data_list.append(i)

data = []
for i in data_list:
data.append(json.loads(i))

yield data

最新更新