有什么方法可以定义应用程序吗.表没有使用浮士德记录



我目前正在使用模式注册表和faust来处理流数据。

我尽量避免使用faust.Record的原因是模式可以动态更改,而且我不喜欢每次更改代码(继承faust.Record的类(。

但如果没有faust.Record,看起来会有很多限制。例如,app.Tablerelative_to_field需要FieldDescriptorT,但这个类看起来只与faust.Record耦合

这是代码:

import faust
from datetime import timedelta
from pydantic_avro.base import AvroBase
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer

topic_name = "practice4"
subject_name = f"{topic_name}-value"
serializer_name = f"{topic_name}_serializer"
bootstrap_server = "192.168.59.100:30887"
sr_server = "http://localhost:8081"
client = SchemaRegistryClient({"url": sr_server})
topic_schema = client.get_schema(subject_name)
fp_avro_schema = schema.AvroSchema(topic_schema.schema.raw_schema)
avro_fp_serializer = FaustSerializer(client, serializer_name, fp_avro_schema)
faust.serializers.codecs.register(name=serializer_name, codec=avro_fp_serializer)
app = faust.App('sample_app', broker=bootstrap_server)
faust_topic = app.topic(topic_name, value_serializer=serializer_name)

count_table = app.Table(
'count_table', default=int,
).hopping(
timedelta(minutes=10),
timedelta(minutes=5),
expires=timedelta(minutes=10)
).relative_to_field(??????)

@app.agent(faust_topic)
async def process_fp(fps):
async for fp in fps.group_by(lambda fp: fp["job_id"], name=f"{subject_name}.job_id"):     
print(fp)

幸运的是,流的group_by可以用callable对象调用,所以我可以用lambda处理它,但表的relative_to_field没有这样的选项。

简短回答:不,你是对的,没有FieldDescriptor就不能定义relative_to_field。您可以在此处查看relative_to_field的定义。然后,这个字段在这里用getattr提取,您需要一个faust.Record来进行此操作。

但是在使用Avro时,可以使用库数据类avroschema将faust.RecordAvro组合在同一类中。因此,你们将能够两全其美。

正如您在doc中看到的,这个库与faust集成得很好。

最新更新