使用Python从Bigquery到Redshift的ETL数据



我在Python中有这个脚本,它设置了一个变量与查询结果,在Google Bigquery中运行(我在这里不使用一些库,但我正在测试将json转换为csv文件):

import httplib2
import datetime
import json
import csv
import sys
from oauth2client.service_account import ServiceAccountCredentials
from bigquery import get_client

#Set DAY - 1
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
today = datetime.datetime.now()
#Format to Date
yesterday = '{:%Y-%m-%d}'.format(yesterday)
today = '{:%Y-%m-%d}'.format(today)

# BigQuery project id as listed in the Google Developers Console.
project_id = 'project'
# Service account email address as listed in the Google Developers Console.
service_account = 'email@email.com'

scope = 'https://www.googleapis.com/auth/bigquery'
credentials = ServiceAccountCredentials.from_json_keyfile_name('/path/to/file/.json', scope)
http = httplib2.Http()
http = credentials.authorize(http)

client = get_client(project_id, credentials=credentials, service_account=service_account)
#Synchronous query
try:
    _job_id, results = client.query("SELECT * FROM dataset.table WHERE CreatedAt >= PARSE_UTC_USEC('" + yesterday + "') and CreatedAt < PARSE_UTC_USEC('" + today + "') limit 1", timeout=1000)
except Exception as e:
    print e
print results

变量results返回的结果如下所示:

[
{u'Field1': u'Msn', u'Field2': u'00000000000000', u'Field3': u'jsdksf422552d32', u'Field4': u'00000000000000', u'Field5': 1476004363.421, 
u'Field5': u'message', u'Field6': u'msn', 
u'Field7': None, 
u'Field8': u'{"user":{"field":"j23h4sdfsf345","field":"Msn","field":"000000000000000000","field":true,"field":"000000000000000000000","field":"2016-10-09T09:12:43.421Z"}}', u'Field9': 1476004387.016}
]

我需要在Amazon Redshift加载它,但在这种格式下,我不能使用它生成的。json从s3运行副本…

是否有一种方法,我可以修改这个json为红移加载?还是直接返回。csv ?从bigquery或python(我的第一个脚本之一)中,我对这个库了解不多。

非常感谢!

删除字段前的'u':

results = json.dumps(results)

然后,为了转换csv文件中的json变量,我创建了:

#Transform json variable to csv
results = json.dumps(results)
results = json.loads(results)
f = csv.writer(open("file.csv", "w"), delimiter='|')
f.writerow(["field","field","field","field","field","field","field", "field", "field", "field"])
for results in results:
    f.writerow([results["field"],
            results["field"],
            results["field"],
                results["field"],
            results["field"],
            results["field"],
            results["field"],
            results["field"],
            results["field"],
           results["field"]])

在此之后,我可以将文件加载到Redshift。

最新更新