使用python编译的protobufpb2作为键和值序列化程序



我正在尝试从kafka topiv中读取数据,该topiv已使用谷歌的protobuf进行了序列化。

我使用生成pb2文件的protoc编译了proto文件。

现在我正在尝试使用faust并创建一个流处理器,但我找不到将pb2文件用作key_serializervalue_serializer的正确方法。

以下是我尝试过的:

import faust
from proto.topic_pb2 import topic

app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)
schema = faust.Schema(
## key_type=topic.PK,
## value_type=topic,
key_serializer=topic.PK,
value_serializer=topic,
)
topic = app.topic(
'topic',
schema=schema
)

@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)

if __name__ == "__main__":
app.main()

有人知道如何在序列化程序中使用pb2吗?

伙计,我过去一周也在尝试这样做。在挣扎之后,我终于找到了一些工作——这不是最好的方式——但它已经足够好了。

所以最初我使用了这个python编译器:https://github.com/danielgtaylor/python-betterproto以生成具有数据类/类型提示的CCD_ 5文件。

然后,我能够通过使用一个助手动态地创建Faust.Record类:

import abc
import inspect
from typing import Type
import betterproto
import faust
GENERATED_SUFFIX = "__FaustRecord_Auto"

def _import_relative_class(module: str, klass_name: str):
resolved_import = __import__(module, fromlist=[klass_name])
klass = getattr(resolved_import, klass_name)
return klass

def _is_record(attype: Type):
return (
inspect.isclass(attype)
and isinstance(attype, betterproto.Message)
or isinstance(attype, abc.ABCMeta)
)

def _build_record_annotations(klass: Type):
annotations = {}
for atname, attype in klass.__annotations__.items():
if _is_record(attype):
annotations[atname] = make_faust_record(attype)
elif isinstance(attype, str):
subklass = _import_relative_class(klass.__module__, attype)
annotations[atname] = make_faust_record(subklass)
else:
annotations[atname] = attype
return annotations

def make_faust_record(klass: Type):
type_name = f"{klass.__name__}{GENERATED_SUFFIX}"
record_type = type(type_name, (faust.Record, klass), {})
record_type.__annotations__ = _build_record_annotations(klass)
record_type._init_subclass()
return record_type

现在你可以像这样使用它:

import faust
from proto.your_models import YourModel # Import your generated proto here
from faust_converter import make_faust_record

app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)
model_record = make_faust_record(YourModel)
topic = app.topic(
'topic',
value_type=model_record
)

@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)

if __name__ == "__main__":
app.main()

我也在试验将Protobuf与Faust一起使用。

下面提到的是使用浮士德系列编解码器的解决方案。原蟾蜍群https://github.com/hemantkashniyal/faust-protobuf

proto_serializer.py

from faust.serializers import codecs
from typing import Any
from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import MessageToDict
from google.protobuf import text_format
from google.protobuf.text_format import MessageToString
from google.protobuf.text_format import MessageToBytes
class ProtobufSerializer(codecs.Codec):
def __init__(self, pb_type: Any):
self.pb_type = pb_type
super(self.__class__, self).__init__()
def _dumps(self, pb: Any) -> bytes:
return pb.SerializeToString()
def _loads(self, s: bytes) -> Any:
pb = self.pb_type()
pb.ParseFromString(s)
return pb

应用程序

import faust
from google.protobuf.json_format import MessageToJson
from .proto.greetings_pb2 import Greeting
from .proto_serializer import ProtobufSerializer
app = faust.App(
'faust-consumer',
broker='kafka://', # TODO: update kafka endpoint
store="memory://",
cache="memory://",
)
greetings_schema = faust.Schema(
key_serializer=ProtobufSerializer(pb_type=Greeting),
value_serializer=ProtobufSerializer(pb_type=Greeting),
)
topic = app.topic(
'greetings',
schema=greetings_schema
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(MessageToJson(event))
@app.timer(5)
async def produce():
for i in range(10):
data = Greeting(hello="world", message=i)
await consume.send(value=data)
if __name__ == "__main__":
app.main()

我可以通过创建一个Serializer类来实现这一点:

import faust
from abc import ABCMeta, abstractmethod
from google.protobuf.json_format import MessageToDict
from faust.serializers.codecs import Codec
from importlib import import_module

def get_proto(topic_name, only_pk=False):
if not hasattr(get_proto, "topics"):
setattr(get_proto, "topics", dict())
get_proto.topics[topic_name] = import_module(
"protodef.{}_pb2".format(topic_name)
).__getattribute__(topic_name.split(".")[-1])
if only_pk:
return getattr(get_proto, "topics").get(topic_name).PK
else:
return getattr(get_proto, "topics").get(topic_name)

class ProtoSerializer(Codec, metaclass=ABCMeta):
@abstractmethod
def only_key(self):
...
def as_proto(self, topic_name):
self._proto = get_proto(topic_name, self.only_key())
return self
def _loads(self, b):
data = MessageToDict(
self._proto.FromString(b),
preserving_proto_field_name=True,
including_default_value_fields=True,
)
# remove the key object from the unserialized message
data.pop("key", None)
return data
def _dumps(self, o):
# for deletes
if not o:
return None
obj = self._proto()
# add the key object to them message before serializing
if hasattr(obj, "PK"):
for k in obj.PK.DESCRIPTOR.fields_by_name.keys():
if k not in o:
raise Exception(
"Invalid object `{}` for proto `{}`".format(o, self._proto)
)
setattr(obj.key, k, o[k])
for k, v in o.items():
if hasattr(obj, k):
setattr(obj, k, v)
else:
ghost.debug(
"Invalid value-attribute `%s` for proto `%s`", k, self._proto
)
return obj.SerializeToString()

class ProtoValue(ProtoSerializer):
def only_key(self):
return False

class ProtoKey(ProtoSerializer):
def only_key(self):
return True

然后按如下方式使用:

import faust
from utils.serializer import ProtoKey, ProtoValue

app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)

topic = app.topic(
'topic',
key_serializer=ProtoKey().as_proto('topic'),
value_serializer=ProtoValue().as_proto('topic')
)

@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)

if __name__ == "__main__":
app.main()

最新更新