如何使用嵌套proto.使用BigQuery Storage API Writer python客户端发送消息



基于https://github.com/googleapis/python-bigquery-storage/issues/398它使用proto-plus包在python中定义protobuff消息,非常有用,并且可以正常工作,但在嵌套消息的情况下,它不起作用
如果消息是嵌套的,则以下经过调整的代码在调用await bq_write_client.append_rows(iter([append_row_request]))时抛出错误:google.api_core.exceptions.InvalidArgument: 400 Invalid proto schema: BqMessage.proto: Message.nested: "._default_package.Team" is not defined.

附言:我知道google-cloud-bigquery-storag库通常处理嵌套消息,因为使用了官方代码段https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/append_rows_proto2.py有效,它使用嵌套的消息,但在一个单独的.proto文件中,该文件需要编译步骤,并且不如直接在python中定义消息那样实用。

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import asyncio
import proto
from google.oauth2.service_account import Credentials
from google.protobuf.descriptor_pb2 import DescriptorProto
from google.cloud.bigquery_storage_v1beta2.types.storage import AppendRowsRequest
from google.cloud.bigquery_storage_v1beta2.types.protobuf import ProtoSchema, ProtoRows
from google.cloud.bigquery_storage_v1beta2.services.big_query_write import BigQueryWriteAsyncClient
class Team(proto.Message):
name = proto.Field(proto.STRING, number=1)
class UserSchema(proto.Message):
username = proto.Field(proto.STRING, number=1)
email = proto.Field(proto.STRING, number=2)
team = proto.Field(Team, number=3)
async def main():
write_stream_path = BigQueryWriteAsyncClient.write_stream_path(
"yolocommon", "test", "t_test_data", "_default")
credentials = Credentials.from_service_account_file(filename="bigquery_config_file.json")
bq_write_client = BigQueryWriteAsyncClient(credentials=credentials)
proto_descriptor = DescriptorProto()
UserSchema.pb().DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema = ProtoSchema(proto_descriptor=proto_descriptor)
serialized_rows = []
data = [
{
"username": "Jack",
"email": "jack@google.com",
"nested": {
"name": "Jack Jack"
}
},
{
"username": "mary",
"email": "mary@google.com",
"nested": {
"name": "Mary Mary"
}
}
]
for item in data:
instance = UserSchema.from_json(payload=json.dumps(item))
serialized_rows.append(UserSchema.serialize(instance))
proto_data = AppendRowsRequest.ProtoData(
rows=ProtoRows(serialized_rows=serialized_rows),
writer_schema=proto_schema
)
append_row_request = AppendRowsRequest(
write_stream=write_stream_path,
proto_rows=proto_data
)
result = await bq_write_client.append_rows(iter([append_row_request]))
async for item in result:
print(item)

if __name__ == "__main__":
asyncio.run(main())

更新:来自ProtoSchema的文档:

输入消息的描述符。提供的描述符必须是自包含的,这样发送的数据行可以完全仅使用单个描述符进行解码。对于以下数据行是多个独立消息的组合意味着描述符可能需要转换为仅使用嵌套类型:https://developers.google.com/protocol-buffers/docs/proto#nested所以写消息描述的正确方法是:

class UserSchema(proto.Message):
class Team(proto.Message):
name = proto.Field(proto.STRING, number=1)
username = proto.Field(proto.STRING, number=1)
email = proto.Field(proto.STRING, number=2)
team = proto.Field(Team, number=3)

但它仍然抛出了相同的错误:google.api_core.exceptions.InvalidArgument: 400 Invalid proto schema: BqMessage.proto: Message.nested: "._default_package.UserSchema.Team" is not defined.

更新2:问题的基础是,如果包名称为空,proto-plus会附加_default_package作为包名称,因为这会导致另一个错误。https://github.com/googleapis/proto-plus-python/blob/main/proto/_package_info.py#L40

TODO:在protobuf修复后恢复为空字符串作为包值。当包为空时,基于upb的protobuf将失败,并返回"类型错误:无法将原型文件构建到描述符池中:无效名称:空部分(('表示";在尝试添加到描述符池的过程中。

显然,目前无法使用proto.Message如果BigQuery表有嵌套字段(STRUCT(,则表示该表。

protobuf已修复,因此分叉项目并更改行:https://github.com/googleapis/proto-plus-python/blob/main/proto/_package_info.py#L40

package = getattr(
proto_module, "package", module_name if module_name else ""
)

它将工作

以下模块有助于绕过proto-plus中的预编译或消息类定义。https://pypi.org/project/xia-easy-proto/1.0.0/

您可以解析python对象并将其转换为protobuff。希望能有所帮助。

最新更新