如何将Kafka消费者连接到Django应用程序?我应该为使用者、新进程或新docker容器使用新线程吗



我有Django应用程序,它应该使用Kafka消息,并用我的处理程序和现有模型处理它。我使用https://kafka-python.readthedocs.io/en/master/usage.html图书馆

将KafkaConsumer连接到Django应用程序的正确方法是什么。我应该使用一个新的守护进程线程吗?还是一个新的流程?或者一个单独的码头集装箱?把代码放在哪里(新的Django应用程序?(,以及如何在Django程序准备好后自动启动它。以及如何更新它动态监听的主题:我应该杀死旧的消费者,每次在新线程中启动新的消费者吗?

也有类似的问题,我所做的是创建一个自定义Django命令,然后继续为您的功能添加一个处理程序方法。在部署中,您可以将其作为sidecar容器启动。

class Command(BaseCommand):
def handle(self, *args, **options):

consumer = kafka.KafkaConsumer(KAFKA_TOPIC_NAME,bootstrap_server=["localhost:9092"],group_id=KAFKA_CONSUMER_GROUP)
for message in consumer:
handler_method(message)

作为一个sidecar,它会在启动时拾取消费者中的任何消息。

我知道这是一个迟来的答案。

关于这个问题,你可以试试Faust,它是一个流处理库,可以与Django集成。

在Docker中,启动两个共享相同代码库的不同容器,除了其中一个容器运行启动Faust worker 的命令

version: '3.3'
services:
backend_service: &backend_service
container_name: your_container_name
build:
context: .
dockerfile: Dockerfile
image: backend_service_image
volumes:
- ./your_codebase:/your_container_codebase_path
ports:
- "8000:8000"
env_file: .env
kafka_consumer:
<<: *backend_service
image: kafka_consumer_image
container_name: kafka_consumer
command: faust -A <your_project_root>.kafka:app worker -l info

请注意,在kafka_consumer容器中,会运行一个命令:faust -A <your_project_root>.kafka:app worker -l info,其中<your_project_root>是包含settings.py文件的文件夹的名称空间。

在此文件夹中创建一个文件kafka.py,如下所示

import os
import faust
from django.conf import settings
# eventlet is used as a bridge to communicate with asyncio
os.environ.setdefault('FAUST_LOOP', 'eventlet')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your_root_namespace>.settings')
app = faust.App('set_a_name_here', broker=f"kafka://{settings.KAFKA_URL}")
# Specify a topic name
new_topic = app.topic("topic_to_process")
# define a method to process the above topic
@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
...  # process event

请记住,这个过程是以异步方式执行的,因此在使用Django ORM时可能会遇到问题,为了正确实现使用ORM的使用者,可以使用sync_to_async包装器。

或者将其用作装饰器:

# kafka.py
from asgiref.sync import sync_to_async
@sync_to_async
def _do_something(event):
random_model_id = event.get('random_model_id')
random_model = RandomModel.objects.get(id=random_model_id)

@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
await _do_something(event)

然后您可以调试kafka_consumer容器来检查发生了什么。为了反映更改,您需要重新启动容器。

如果您没有使用Docker,则需要安装一个supervisor,并将此命令faust -A <your_project_root>.kafka:app worker -l info配置为在supervisord.conf文件上运行。

最新更新