浮士德代理中"encoding without a string argument"将数据发送到接收器的错误?



我试图使用Faust代理中的sink将一些数据从一个Kafka主题发送到另一个主题,但我一直收到这个错误:

File "/.../venv/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 49, in <lambda>
return lambda record, fp: schemaless_writer(fp, avro_schema.schema, record)
Crashed reason=TypeError('encoding without a string argument')

在调试中,我可以看到它发生在fastavro中schemales_writer函数的某个地方,但它的代码在源代码中并不清楚,所以我不确定那里到底发生了什么:https://github.com/fastavro/fastavro/blob/357543aebb94a4fd593b035f8501b20ac66d3b17/fastavro/_write.pyi

我的想法是,它试图对发送数据中的每个字段进行"编码",但它不适用于每个字段,因为它不是字符串(我有一些数字、布尔值、日期时间和dict(。在发送之前,我尝试将每个字段转换为字符串,但它不起作用,因为它会与我的模式冲突,例如:

Crashed reason=ValueError("'True' (type <class 'str'>) do not match ['null', 'boolean']") 

与也有关联

await converted_payment_topic.send(value=result)

但是得到了相同的结果。

这是我当前的代码:

app = faust.App(f'testConvertedPayments',
version=1,
key_serializer='raw',
value_serializer='raw',
broker=KAFKA_URL,
web_port=9090)
payment_topic = app.topic(payment_topic)
converted_payment_topic = app.topic(converted_payment_topic)
@app.agent(payment_topic, sink=[converted_payment_topic], concurrency=3)
async def payment_stream(payment):
async for part in payment.take(1000, within=30):
for x in part:
result = make_payment_row(x) #just some data manipulation and making a dict
result = {k: v for k, v in result.items() if v is not None}
yield result

与我在文档中看到的所有示例类似,不同之处在于,我的数据中不仅有字符串,有没有办法解决这个问题,而不将我的模式和所有数据类型更改为字符串?

谢谢!

我想好了,我需要发送一个faust。记录模型,而不是dict。来自浮士德的ct方法。Record类可用于从dict生成记录。

最新更新