将CSV文件加载到BigQuery中,其中包含所有空值的一列



我希望使用python api将以下文件附加到一个具有以下定义架构的bigquery表中:

[
   {
    "name": "batsman",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "batting_team",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "bowler",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "city",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "date",
    "type": "DATE",
    "mode": "NULLABLE"
   },
   {
    "name": "delivery",
    "type": "FLOAT",
    "mode": "NULLABLE"
   },
   {
    "name": "extras",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "extras_type",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "inning",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "match_code",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "non_striker",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "player_out",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "runs",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "team1",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "team2",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "toss_decision",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "toss_winner",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "total",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "venue",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "wicket_fielders",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "wicket_kind",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "win_margin",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "win_type",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "winner",
    "type": "STRING",
    "mode": "NULLABLE"
   }
  ]

我用来添加到bigquery的代码如下:

def insert_data_in_bq(bucketname, csv_filepath, table_id='ipl'):
    """Appends a csv to a BigQuery table."""
    client = bigquery.Client()
    dataset_id = 'cric'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    # job_config.null_marker = 'NULL'
    uri = 'gs://' + bucketname + '/' + csv_filepath
    load_job = client.load_table_from_uri(uri, dataset_ref.table(table_id), 
    job_config=job_config)  # API request
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()  # Waits for table load to complete.
    print('Job finished.')
    print('Loaded {} rows.'.format(load_job.output_rows))

但是,每当我加载文件时,我都会发现一个错误说:

BadRequest: 400 Invalid schema update. Field win_margin has changed type from INTEGER to STRING

正常文件看起来像这样。

我应该做什么,以便我可以将win_margin列作为INTEGER,但仍然可以加载此文件,该文件包含列的所有空行?

,您看到的是BigQuery将不允许您在整数字段中添加空值,因此您需要在创建文件或上传期间填写此字段,例如:

  1. 构建文件时,请确保win_margin不是空put 0或null
  2. 如果不可能,您需要将Python代码更新为在上传之前更新字段值
  3. 在表格本身中创建一个公式以填充字段
  4. 将文件上传到BQ中的另一个表,然后运行SQL命令将数据从1个表移动到另一个表

您需要指定表模式,其中明确指定了win_margin列的类型。您可以通过设置job_config.schema字段并将job_config.autodetect设置为False来做到这一点。

以下是您可以使用文件读取模式的功能:

def read_bigquery_schema_from_file(filepath):
    file_content = open(filepath).read()
    json_content = json.loads(file_content)
    return read_bigquery_schema_from_json_recursive(json_content)
def read_bigquery_schema_from_json_recursive(json_schema):
    """
    CAUTION: Recursive function
    This method can generate BQ schemas for nested records
    """
    result = []
    for field in json_schema:
        if field.get('type').lower() == 'record' and field.get('fields'):
            schema = SchemaField(
                name=field.get('name'),
                field_type=field.get('type', 'STRING'),
                mode=field.get('mode', 'NULLABLE'),
                description=field.get('description'),
                fields=read_bigquery_schema_from_json_recursive(field.get('fields'))
            )
        else:
            schema = SchemaField(
                name=field.get('name'),
                field_type=field.get('type', 'STRING'),
                mode=field.get('mode', 'NULLABLE'),
                description=field.get('description')
            )
        result.append(schema)
    return result

最新更新