我们要解决的问题
每天一次将存储在GCP的CloudStorage中的Parquet类型数据写入BigQuery表(批处理)。
这个实现有问题。
问题到目前为止,我们一直在GCS中存储以下CSV文件,并运行一个作业,使用数据流"云存储上的文本文件到BigQuery">将其写入BigQuery模板。
如果你想使用Dataflow,没有Cloud Storage Parquet to BigQuery
谷歌提供的模板可用。您可以通过本页的链接提交特性请求,以便创建这样的模板。我从这里检查了一下,到今天为止,没有关于它的现有FR。
如果你想自己编写一个Dataflow/Beam管道,你可以参考的最接近的例子是谷歌提供的模板Cloud Storage Parquet to Bigtable
和Cloud Storage Text to BigQuery
(源代码是Java的),你必须使用Apache Beam Python SDK来编写一个管道来完成它。
或者,如果您遵循这个BigQuery文档从云存储加载Parquet数据,那么有这个代码片段可以在不使用Dataflow的情况下完成这项工作:
import io
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
)
body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
相关内容
- Windows批处理脚本中的URL解码
- 如何在春季批处理中使用项读取器读取mdb文件结果
- 批处理文件:选择多个新上传的文件
- r语言 - 在readOGR批处理读取中包含文件名
- 按候选键分组并更新记录以进行批处理
- Firebase函数批处理/时间限制
- 如何将GCS Parquet数据写入BigQuery(批处理)
- 如何使用python在数据流批处理管道中写入bigquery后执行独立函数
- 无法创建批处理管道以使用 http 插件 1.2.1 从 ZohoCRM 获取数据到 BigQuery。重新调整火花程序'phase-1'失败
- 批处理作业在多次成功查询后偶尔会收到"does not have bigquery.jobs.create permission"错误
- 为什么我们不能在一个批处理文件中执行 BigQuery 的多个语句?
- 数据流:我可以使用批处理作业连续写入/流式写入到 BigQuery 吗?
- 使用批处理数据流作业中的数据中存在的日期写入已分区的日期 bigQuery
- Bigquery 异步作业与批处理查询有何不同
- BigQuery更新失败,但仅在使用Python API批处理时
- 数据流批处理或流式插入到 BigQuery 说明
- Bigquery 取消或停止尚未启动的批处理查询作业 (Status.State = "PENDING" )
- BigQuery加载批处理文件夹错误
- 如何在批处理数据流作业中使用基于数据本身的日期写入日期分区的BigQuery表?
- 使用Google Bigquery和Python进行批处理
最新更新
- 如何在 JavaScript 中检查"is not defined" eval(var)?
- 错误NETSDK1152在WinUI3应用程序:发现多个发布输出文件具有相同的相对路径:Microsoft.Web.We
- 当我在Ubuntu16中使用字符串命令查找一些东西时,我没有得到任何输出
- Apache Spark: parse PT2H5M (duration ISO-8601) duration以分钟为单
- 如何在Parent还没有Id时添加Child
- 无法记录android应用程序的jmeter脚本
- 使用AsyncRabbitTemplate::sendAndReceive - RabbitMQ时的应答超时
- 是自定义文件扩展名吗?
- 选择远程docker容器(没有kubernetes)后, JVM列表为空
- c# TwinCAT数据类型的等效枚举
- 传递地址时,函数与函数模板的语法限制
- 如何将flutter riverpod ref.read()函数传递给另一个小部件
- Pug/Node.js中的循环优化
- Postgresql函数在删除t_providers的行之前作为触发器
- 在Python中替换字符串中类似模式的正则表达式
- Azure DevOps YAML:带有模板变量的If-else条件不起作用
- 我有麻烦管理我的循环while loo[,反之亦然
- 我怎么能使最后一行从图在PHP?
- 无法使用引导折叠折叠
- 当cell1的值大于cell2的值时显示模态对话框
- Vue模板使用JavaScript内置函数
- 使用python中除"in"以外的其他运算符"match"函数
- Powershell Where-Object returning null
- 在React中通过ClassName Prop扩展样式
- 如何缩小Firestore字段值
- 导入谷歌地图模块时反应错误
- 按顺序切换Class
- Blazor语言 - 提交表单不重新加载(没有JS)
- Curl impersonate for OkHttp
- 第一个单元格为空白的行,不复制到另一个工作表
热门标签:
javascript python java c# php android html jquery c++ css ios sql mysql arrays asp.net json python-3.x ruby-on-rails .net sql-server django objective-c excel regex ruby linux ajax iphone xml vba spring asp.net-mvc database wordpress string postgresql wpf windows xcode bash git oracle list vb.net multithreading eclipse algorithm macos powershell visual-studio image forms numpy scala function api selenium