BigQuery Python: google.api_core.exceptions。读取数据时出现错误,错误消息:架构不匹配:引用变量ro_sub_ros。$is_not_null'的数组级别为1,而Parquet列对应的字段路径重复为0字段。
我的原始数据是这样的:
testData = {
"ro_user_email": "tech@techietech.com",
"ro_account_id": "23402042",
"ro_sub_account_id": "34020334",
"ro_name": "Test RO",
"ro_number": "1304340",
"ro_currency": {"label":"USD","value":"USD"},
"ro_dates": {"from":now,"to":now},
"ro_status": "draft",
"ro_operation_timestamp": pd.Timestamp(now),
"ro_billing_cycle": {"label":"Fortnightly","value":"Fortnightly"},
"ro_sub_ros": [
{
"sub_ro_id": "2323",
"valid":False,
"sub_ro_name": "Testing",
"sub_ro_dates":{"from":now,"to":now},
"sub_ro_budget": 1203302.22,
"sub_ro_revenue_price":1202302.22,
"sub_ro_revenue_selected": {"label":"Fortnightly","value":"Fortnightly"},
"sub_ro_revenue_model_selected": {"label":"Fortnightly","value":"Fortnightly"},
"sub_ro_campaigns_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
"sub_ro_ios_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
"sub_ro_client_id": [{"label":"Fortnightly","value":"Fortnightly"}],
"sub_ro_ids_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
"sub_ro_pixels_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
"kpi_1_metric_selected": {"label":"Fortnightly","value":"Fortnightly"},
"attribution_model_selected": {"label":"Fortnightly","value":"Fortnightly"},
"kpi_window_selected": {"label":"Fortnightly","value":"Fortnightly"},
"deepMetrics_selected": {"label":"Fortnightly","value":"Fortnightly"},
"sub_ro_kpi_goal":"ROI"
}
],
}
我是这样创建我的BQ Schema的:
schema = [
bigquery.SchemaField("ro_user_email", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_account_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_sub_account_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_number", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_currency",
"STRUCT",
mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("ro_dates",
"STRUCT",
mode="REQUIRED",
fields=[
bigquery.SchemaField("from", "DATE", mode="REQUIRED"),
bigquery.SchemaField("to", "DATE", mode="REQUIRED"),
]
),
bigquery.SchemaField("ro_status","STRING", mode="REQUIRED"),
bigquery.SchemaField("ro_operation_timestamp","TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("ro_billing_cycle",
"STRUCT",
mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField(
"ro_sub_ros",
"RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("sub_ro_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("valid", "BOOL", mode="REQUIRED"),
bigquery.SchemaField("sub_ro_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("sub_ro_dates", "STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("from", "DATE", mode="REQUIRED"),
bigquery.SchemaField("to", "DATE", mode="REQUIRED"),
]
),
bigquery.SchemaField("sub_ro_budget", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("sub_ro_revenue_price", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("sub_ro_revenue_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("sub_ro_revenue_model_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("sub_ro_campaigns_selected","RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("model_list",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
)
]),
bigquery.SchemaField("sub_ro_ios_selected","RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("model_list",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
)
]),
bigquery.SchemaField("sub_ro_client_id","RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("model_list",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
)
]),
#
bigquery.SchemaField("sub_ro_ids_selected","RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("model_list",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
)
]),
bigquery.SchemaField("sub_ro_pixels_selected","RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("model_list",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
)
]),
bigquery.SchemaField("kpi_1_metric_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("attribution_model_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("kpi_window_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("deepMetrics_selected",
"STRUCT", mode="REQUIRED",
fields=[
bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
]
),
bigquery.SchemaField("sub_ro_kpi_goal", "STRING", mode="REQUIRED"),
],
)
]
当我尝试使用bigquery client library
上传此数据时,我得到此错误:
job_config = bigquery.LoadJobConfig(schema=schema)
return bq.client.load_table_from_dataframe(
df, tablename, job_config=job_config
).result()
抛出:
google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Schema mismatch: referenced variable 'ro_sub_ros.$is_not_null' has array levels of 1, while the corresponding field path to Parquet column has 0 repeated
fields.
不知道这里出了什么问题,万一我的模式太大,太笨重,无法分析,有人能展示一个最小的例子,在谷歌大查询使用客户端库和熊猫数据框架上传REPEATED RECORD
?
您可以考虑验证这些选项。
验证BigQuery模式是否正确,这是一个使用重复记录的例子。您可以查看官方文档。
# from google.cloud import bigquery
# client = bigquery.Client()
# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')
schema = [
bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("dob", "DATE", mode="NULLABLE"),
bigquery.SchemaField(
"addresses",
"RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
bigquery.SchemaField("address", "STRING", mode="NULLABLE"),
bigquery.SchemaField("city", "STRING", mode="NULLABLE"),
bigquery.SchemaField("state", "STRING", mode="NULLABLE"),
bigquery.SchemaField("zip", "STRING", mode="NULLABLE"),
bigquery.SchemaField("numberOfYears", "STRING", mode="NULLABLE"),
],
),
]
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table) # API request
print("Created table {}".format(table.full_table_id))
验证记录语法是否正确。下面是一个包含模式值的示例。
{"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}
{"id":"2","first_name":"Jane","last_name":"Doe","dob":"1980-10-16","addresses":[{"status":"current","address":"789 Any Avenue","city":"New York","state":"NY","zip":"33333","numberOfYears":"2"},{"status":"previous","address":"321 Main Street","city":"Hoboken","state":"NJ","zip":"44444","numberOfYears":"3"}]}
考虑在python代码中使用" autodetect schema "。类似于这个例子。你可以看到更多的文档。
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name
# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
# kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
# "cloud-samples-tests", "us", "test", "test"
# )
job_config = bigquery.LoadJobConfig(
autodetect=True, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
您可以在此页面验证JSON格式。