在数据流管道中动态设置bigquery表id



我有数据流管道,它在Python中,这就是它正在做的:

  1. 从PubSub读取消息。消息是压缩的协议缓冲区。PubSub上接收的一条消息包含多种类型的消息。请参阅下面的协议父级消息规范:

    message BatchEntryPoint {
    /**
    * EntryPoint
    * 
    * Description: Encapsulation message
    */
    message EntryPoint {
    // Proto Message
    google.protobuf.Any proto = 1;
    // Timestamp
    google.protobuf.Timestamp timestamp = 4;
    }
    // Array of EntryPoint messages
    repeated EntryPoint entrypoints = 1;
    }
    

因此,为了更好地解释,我有几个protobuf消息。每个消息都必须打包在EntryPoint消息的proto字段中,由于MQTT的限制,我们一次发送多个消息,这就是为什么我们在BatchEntryPint上使用一个指向EntryPoint消息的重复字段。

  1. 分析收到的消息

这里没有什么特别之处,只是解压缩并取消序列化我们刚刚从PubSub中读取的消息。以获得"人类可读"的数据。

  1. 对于BatchEntryPoint上的循环,以评估每个EntryPint消息

由于BatchEntryPoint上的每条消息可能有不同的类型,我们需要以不同的处理它们

  1. 解析的消息数据

执行不同的过程来获得我需要的所有信息,并将其格式化为BigQuery可读格式

  1. 将数据写入bigQuery

这就是我的"麻烦"开始的地方,所以我的代码可以工作,但在我看来,它非常脏,很难维护。有两件事需要注意
每条消息的类型可以发送到3个不同的数据集,一个r&d数据集、dev数据集和production数据集。假设我有一条名为System的消息。它可以去:

  • 我的项目:rd_dataset.system
  • 我的项目:dev_dataset.system
  • 我的项目:prod_dataset.system

所以这就是我现在正在做的:

console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
...) else 0

我有30多种不同类型的消息,其中90多行用于向大查询插入数据。

以下是数据集_is__tableX方法看起来像:

def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
valid = dataset_is_rd(element)
return valid

check_element_type检查消息是否具有正确的类型(例如:系统(
dataset_is_rd如下所示:

def dataset_is_rd(element) -> bool:
""" Check if dataset should be RD from registry id """
if element['device_registry_id'] == 'rd':
del element['device_registry_id']
del element['bq_type']
return True
return False

作为键的元素指示我们必须在哪个数据集上发送消息。

所以这是按预期工作的,但我希望我可以做更干净的代码,也许可以减少在添加或删除某种类型的消息时要更改的代码量。

有什么想法吗?

如何使用TaggedOutput。

你能写这样的东西吗:

def dataset_type(element) -> bool:
""" Check if dataset should be RD from registry id """
dev_registry = element['device_registry_id']
del element['device_registry_id']
del element['bq_type']
table_type = get_element_type(element, 'MessagesType')
return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)

用它作为传递给BQ的tablelambda?

因此,我设法创建代码,通过动态创建表名来将数据插入到动态表中。

这并不完美,因为我必须修改我传递给方法的元素,但我仍然对结果很满意,它已经清理了我的数百行代码。如果我有一个新表,添加它将在数组上花费一行,而之前在管道中需要6行。

这是我的解决方案:

def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console's message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub1',
with_attributes=True)
)
common_message = (
pipeline
| 'Get common's message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub2',
with_attributes=True)
)
jetson_message = (
pipeline
| 'Get jetson's message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub3',
with_attributes=True)
)

message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
lambda e: write_gps(e)
)
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
lambda e: write_to_bq(e)
)

因此,管道从3个不同的pub-sub中读取数据,提取数据并写入大查询。

WriteToBigQuery使用的元素结构如下:

obj = {
'data': data_to_write_on_bq,
'registry_id': data_needed_to_craft_table_name,
'gcloud_id': data_to_write_on_bq,
'proto_type': data_needed_to_craft_table_name
}

然后我在WriteToBigQuery上的lambda上使用的一个方法看起来是这样的:

def write_to_bq(e):
logging.info(e)
element = copy(e)
registry = element['registry_id']
logging.info(registry)
dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
proto_type = element['proto_type']
logging.info('Proto Type %s', proto_type)
table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
full_table_name = f'my_project:{dataset}.{table_name}'
logging.info(full_table_name)
del e['registry_id']
del e['proto_type']
return full_table_name

就是这样,在经历了3天的麻烦之后!!

最新更新