Google数据流:在流媒体管道中的BigQuery中插入+更新



主对象

一个python流式管道,我在其中读取pub/sub的输入。

分析输入后,有两个选项可用:

  • 如果x=1->插入
  • 如果x=2->更新

测试

  • 这是使用apache beam函数无法实现的,因此您需要使用BigQuery的0.25 API来开发它(目前这是Google Dataflow支持的版本(

问题

  • 插入的记录仍在BigQuery缓冲区中,因此更新语句失败:

    UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
    

代码

插入

def insertCanonicalBQ(input):
from google.cloud import bigquery
client = bigquery.Client(project='project')
dataset = client.dataset('dataset')
table = dataset.table('table' )
table.reload()
table.insert_data(
rows=[[values]])

更新

def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD= "#standardSQL"
QUERY= STD + "n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
client.use_legacy_sql = False    
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
query_job.begin()
while True:
query_job.reload()  # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
print "done"
return input
time.sleep(1)

即使行不在流缓冲区中,这仍然不是BigQuery中解决此问题的方法。BigQuery存储更适合批量突变,而不是像这样通过UPDATE突变单个实体。您的模式与我所期望的事务性用例而非分析性用例相一致。

考虑一个基于追加的模式。每次处理实体消息时,都会通过流插入将其写入BigQuery。然后,在需要时,您可以通过查询获得所有实体的最新版本。

举个例子,让我们假设一个任意模式:idfield是您唯一的实体密钥/标识符,message_time表示消息发出的时间。您的实体可能有许多其他字段。为了获得实体的最新版本,我们可以运行以下查询(并可能将其写入另一个表(:

#standardSQL
SELECT
idfield,
ARRAY_AGG(
t ORDER BY message_time DESC LIMIT 1
)[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield

这种方法的另一个优点是,它还允许您在任意时间点执行分析。要对一小时前的实体状态进行分析,只需添加WHERE子句:WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

最新更新