合并文件并将其插入BigQuery表中



我有一个文件夹,在那里我会得到很多JSON文件,但每个JSON只有一条记录。JSON文件记录示例:-

{"ID":"3193559","Title":"Una Familia de Diez - El secreto","Description":"Martina escucha que la Nena tiene novio y la amenaza con decirles a todos si no hace todo lo que le pida, pero despuu00e9s la familia descubre su gran secreto.","Program":"Una Familia de Diez","Season":"1","Episode":"16","Source":"Televisa","Category":"Comedy","Syndicator":"CSv2","[CSv2] external_id":"UFDD100023004","[CSv2] pub_win_US_begin":"1657166400","[CSv2] pub_win_US_end":"1924923600","[CSv2] language":"es","[CSv2] title":"Una Familia de Diez - El secreto","[CSv2] descriptive_title":"El secreto","[CSv2] description":"Martina escucha que la Nena tiene novio y la amenaza con decirles a todos si no hace todo lo que le pida, pero despuu00e9s la familia descubre su gran secreto.","[CSv2] supplier":"Televisa","[CSv2] categories":"Comedy","[CSv2] rating":"TV-14","[CSv2] subratings":"D,L","[CSv2] program_type":"SERIES","[CSv2] entity":"","[CSv2] exception_countries":"US ,tUM ,PR ,tMX ,tAR ,tCL ,tCO ,tPE ,tEC ,tCR ,tSV ,tHN ,tBO ,tPA ,tDO ,tNI ,tPY ,tVE ,tUY ,tGT","[CSv2] episode_type":"","TMS ID":"EP009112420015","external_id":"UFDD100023004","Content Type":"Entertainment","Release Year":"2007","sports_event_ID":""}

我是Python和GCP的新手。需要有关此问题的帮助:-如何在Python中合并所有JSON文件,然后将其数据插入到我需要在DAG中创建的分段BigQuery表中,然后在插入BQ表后将这些文件移动到另一个文件夹中。还需要根据id将暂存表数据与最终表合并,一旦插入,是否需要删除暂存表?这样,每当新文件出现时,它就会重复整个过程?

我在Python中尝试过读取JSON文件,但它不起作用:-

def map_keys(
bucket_name, file_path, list_of_files
):  # pass the folder as an argument
logging.info(f"bucket_name: {bucket_name}")
logging.info(f"file_path: {file_path}")
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
logging.info(f"list_of_files from the DAG: {list_of_files}")
blobs = storage_client.list_blobs(
bucket_or_name=bucket_name, prefix=mcp_source_folder
)
blobs = [blob for blob in blobs if "json" in blob.name]
logging.info(f"The process found {len(blobs)} files to insert")
if not os.path.exists("unprocessed"):
os.makedirs("unprocessed")
if blobs:
for blob in blobs:
json_content = blob.download_as_string()
mcp_data = json.loads(json_content)
file_name = blob.name.split("/")[-1]
logging.info(
f"file to store: {file_name} with {len(mcp_data)} rows"
)
path_unprocessed_file = f"unprocessed/{file_name}"
unprocessed_blob = bucket.blob(path_unprocessed_file)
with open(path_unprocessed_file, "w") as unprocessed_file:
for datum in mcp_data:
model_datum = McpModel.parse_obj(datum)
unprocessed_file.write(json.dumps(model_datum.dict()))
unprocessed_file.write("n")
unprocessed_blob.upload_from_filename(path_unprocessed_file)

我可以向您提出一个解决方案,希望它能帮助

您可以使用以下步骤:

  • 截断暂存表
  • 通过Airflow操作符将GCS的所有输入Json文件加载到Bigquery暂存表
  • 在您的暂存表和最终基于表的ID之间使用Airflow启动merge查询
  • 将输入的Json处理文件移动到另一个文件夹

示例:

import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

with airflow.DAG(
"your_dag",
default_args=args,
schedule_interval=None) as dag:
truncate_staging_table = BigQueryInsertJobOperator(
task_id='truncate_staging_table',
configuration={
"query": {
"query": "TRUNCATE TABLE `project.dataset.staging_table`",
"useLegacySql": False,
}
},
location='EU'
)
load_json_to_staging_table = GCSToBigQueryOperator(
task_id='load_json_to_staging_table',
bucket='your_bucket',
source_objects='your_folder/*.json'],
destination_project_dataset_table='your_project:your_dataset.your_staging_table',
source_format='NEWLINE_DELIMITED_JSON',
compression='NONE',
create_disposition='CREATE_NEVER',
skip_leading_rows=1,
write_disposition='WRITE_APPEND',
autodetect=True
)
# In this example I used autodetect schema for files insertion 
# on Bigquery, but you can also use a json schema for Bigquery with 
# schema_fields
merge_query = """
MERGE `project.dataset.final_table` T
USING `project.dataset.staging_table` S
ON T.ID = S.ID
WHEN MATCHED THEN

UPDATE
SET
episode_type = S.episode_type,
sports_event_ID = S.sports_event_ID

WHEN NOT MATCHED THEN
INSERT ROW;
"""
merge_final_table = BigQueryInsertJobOperator(
task_id='merge_staging_to_final_table',
configuration={
"query": {
"query": merge_query,
"useLegacySql": False,
}
},
location='EU'
)
move_treaten_files = GCSToGCSOperator(
task_id='move_treaten_files',
source_bucket=your_source_bucket,
source_object='source_folder/*',
destination_bucket=your_dest_bucket,
destination_object='dest_folder/',
move_object=True
)
truncate_staging_table >> load_json_to_staging_table >> merge_final_table >> move_treaten_files 

我写了一篇关于Medium的文章,给出了使用Bigquery批处理管道和AirflowDAG:进行重复数据消除的完整用例

使用Airflow和Dataflow 删除Bigquery批处理管道中的重复项

本文展示了使用Bigquerymerge查询的多种方法。

如果您在1行中已经有1个json,那么您就很幸运了。所以,你有两个解决方案

  • 要么使用带有通配符的BigQuery加载作业功能来选择所有文件(例如,在云存储中使用公共前缀(也称为"目录",但目录不存在(
  • 或者从我的一篇文章中获得灵感来查询云存储API,在对象上循环,并使用composite功能将所有文件合并为一个文件。然后,在此单个作业上调用BigQuery加载作业功能

最新更新