我希望使用python api将以下文件附加到一个具有以下定义架构的bigquery表中:
[
{
"name": "batsman",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "batting_team",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "bowler",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "city",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "date",
"type": "DATE",
"mode": "NULLABLE"
},
{
"name": "delivery",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "extras",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "extras_type",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "inning",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "match_code",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "non_striker",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "player_out",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "runs",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "team1",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "team2",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "toss_decision",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "toss_winner",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "total",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "venue",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "wicket_fielders",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "wicket_kind",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "win_margin",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "win_type",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "winner",
"type": "STRING",
"mode": "NULLABLE"
}
]
我用来添加到bigquery的代码如下:
def insert_data_in_bq(bucketname, csv_filepath, table_id='ipl'):
"""Appends a csv to a BigQuery table."""
client = bigquery.Client()
dataset_id = 'cric'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# job_config.null_marker = 'NULL'
uri = 'gs://' + bucketname + '/' + csv_filepath
load_job = client.load_table_from_uri(uri, dataset_ref.table(table_id),
job_config=job_config) # API request
print('Starting job {}'.format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print('Job finished.')
print('Loaded {} rows.'.format(load_job.output_rows))
但是,每当我加载文件时,我都会发现一个错误说:
BadRequest: 400 Invalid schema update. Field win_margin has changed type from INTEGER to STRING
正常文件看起来像这样。
我应该做什么,以便我可以将win_margin
列作为INTEGER
,但仍然可以加载此文件,该文件包含列的所有空行?
,您看到的是BigQuery将不允许您在整数字段中添加空值,因此您需要在创建文件或上传期间填写此字段,例如:
- 构建文件时,请确保
win_margin
不是空put 0或null - 如果不可能,您需要将Python代码更新为在上传之前更新字段值
- 在表格本身中创建一个公式以填充字段
- 将文件上传到BQ中的另一个表,然后运行SQL命令将数据从1个表移动到另一个表
您需要指定表模式,其中明确指定了win_margin
列的类型。您可以通过设置job_config.schema
字段并将job_config.autodetect
设置为False
来做到这一点。
以下是您可以使用文件读取模式的功能:
def read_bigquery_schema_from_file(filepath):
file_content = open(filepath).read()
json_content = json.loads(file_content)
return read_bigquery_schema_from_json_recursive(json_content)
def read_bigquery_schema_from_json_recursive(json_schema):
"""
CAUTION: Recursive function
This method can generate BQ schemas for nested records
"""
result = []
for field in json_schema:
if field.get('type').lower() == 'record' and field.get('fields'):
schema = SchemaField(
name=field.get('name'),
field_type=field.get('type', 'STRING'),
mode=field.get('mode', 'NULLABLE'),
description=field.get('description'),
fields=read_bigquery_schema_from_json_recursive(field.get('fields'))
)
else:
schema = SchemaField(
name=field.get('name'),
field_type=field.get('type', 'STRING'),
mode=field.get('mode', 'NULLABLE'),
description=field.get('description')
)
result.append(schema)
return result