浮士德出版到Kafka主题的示例



我很好奇您应该如何表达您想要在浮士德中传递给Kafka主题的消息。他们的读书中的示例似乎没有写入一个主题:

import faust
class Greeting(faust.Record):
    from_name: str
    to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )
if __name__ == '__main__':
    app.main()

我希望上述代码中的hello.send将消息发布到主题,但似乎没有。

有许多从主题阅读的示例,以及使用CLI来推动临时消息的许多示例。梳理文档后,我看不出任何明确的发布有关代码主题的示例。我只是疯了,上述代码应该起作用吗?

send()函数是要打电话给主题的正确的函数。您甚至可以指定特定分区,就像等效的Java API调用一样。

这是send()方法的参考:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.topic.send.send

您可以使用sink告诉FAUST在哪里传递代理功能的结果。如果需要,您也可以一次使用多个主题作为下沉。

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result

如果您只想要浮士德生产者(不与消费者/接收器结合使用(,则原始问题实际上具有正确的代码,这是一个功能齐全的脚本,将消息发布给'faust_test'Kafka主题是任何Kafka/Faust消费者所消耗的。

这样的代码如下:python faust_producer.py worker

"""Simple Faust Producer"""
import faust
if __name__ == '__main__':
    """Simple Faust Producer"""
    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')
    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')
    # Start the Faust App
    app.main()

,因此我们遇到了将消息发送到sink主题以外的主题的需要。

我们发现的最简单方法是: foo = await my_topic.send_soon(value="wtfm8")

您也可以使用Asyncio事件循环直接使用send

loop = asyncio.get_event_loop()
foo = await ttopic.send(value="wtfm8??")
loop.run_until_complete(foo)

不知道这有多相关,但是我在尝试学习浮士德时遇到了这个问题。根据我的阅读,这是发生的事情:

topic = app.topic('hello-topic', value_type=Greeting)

这里的误解是您创建的主题是您试图从中消费/阅读的主题。您创建的主题当前无需做任何事情。

await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

这实质上创建了一个中间的KStream,该Kstream将值发送到您的Hello(问候(函数。def hello(...(将在流到流的新消息时被调用,并处理要发送的消息。

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

这是从Hello.Send(...(接收Kafka流,然后将其打印到控制台(未对创建的"主题"输出输出(。在这里,您可以向新主题发送消息。因此,代替打印可以做:

topic.send(value = "my message!")

或者:

这是您在做的:

  1. example_sender((将消息发送给Hello(...((通过InterMediate KStream(
  2. 你好(...(拿起消息并打印注意:没有将消息发送到正确的主题

这是您可以做的:

  1. example_sender((向Hello(...(发送一条消息(通过InterMediate KStream(

  2. 你好(...(拿起消息和打印

  3. 你好(...(也向创建的主题发送了一条新消息(假设您正在尝试转换原始数据(

     app = faust.App('hello-app', broker='kafka://localhost')
     topic = app.topic('hello-topic', value_type=Greeting)
     output_topic = app.topic('test_output_faust', value_type=str)
     @app.agent(topic)
     async def hello(greetings):
         async for greeting in greetings:
             new_message = f'Hello from {greeting.from_name} to {greeting.to_name}'
             print(new_message)
             await output_topic.send(value=new_message)
    

我找到了如何使用Faust将数据发送到Kafka主题的解决方案,但我不太了解它的工作原理。

在Faust:send(), cast(), ask_nowait(), ask()中有几种方法。在文档中,它们称为RPC操作。

创建发送任务后,您需要以仅限端客户端模式运行浮士德应用程序。(start_client(), maybe_start_client()(

以下代码(prodices((函数(演示了他们的应用(请注意评论(:

import asyncio
import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
result_topic = app.topic('result-topic', value_type=str)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        s = f'Hello from {greeting.from_name} to {greeting.to_name}'
        print(s)
        yield s

async def produce(to_name):
    # send - universal method for sending data to a topic
    await hello.send(value=Greeting(from_name='SEND', to_name=to_name), force=True)
    await app.maybe_start_client()
    print('SEND')
    # cast - allows you to send data without waiting for a response from the agent
    await hello.cast(value=Greeting(from_name='CAST', to_name=to_name))
    await app.maybe_start_client()
    print('CAST')
    # ask_nowait - it seems to be similar to cast
    p = await hello.ask_nowait(
        value=Greeting(from_name='ASK_NOWAIT', to_name=to_name),
        force=True,
        reply_to=result_topic
    )
    # without this line, ask_nowait will not work; taken from the ask implementation
    await app._reply_consumer.add(p.correlation_id, p)
    await app.maybe_start_client()
    print(f'ASK_NOWAIT: {p.correlation_id}')
    # blocks the execution flow
    # p = await hello.ask(value=Greeting(from_name='ASK', to_name=to_name), reply_to=result_topic)
    # print(f'ASK: {p.correlation_id}')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce('Faust'))

使用命令faust -A <example> worker

开始快速工作者

然后,我们可以启动应用程序的客户端部分,并检查所有内容是否有效:python <example.py>

&lt; example.py&gt;输出:

SEND
CAST
ASK_NOWAIT: bbbe6795-5a99-40e5-a7ad-a9af544efd55

值得注意的是,您还会看到交货后发生的某些错误的追溯,这不会干扰该程序(似乎如此(

浮士德工人输出:

[2022-07-19 12:06:27,959] [1140] [WARNING] Hello from SEND to Faust 
[2022-07-19 12:06:27,960] [1140] [WARNING] Hello from CAST to Faust 
[2022-07-19 12:06:27,962] [1140] [WARNING] Hello from ASK_NOWAIT to Faust 

我不明白为什么它是这样起作用的,为什么如此困难,为什么在文档中写的很少。

最新更新