我有Django应用程序,它应该使用Kafka消息,并用我的处理程序和现有模型处理它。我使用https://kafka-python.readthedocs.io/en/master/usage.html图书馆
将KafkaConsumer连接到Django应用程序的正确方法是什么。我应该使用一个新的守护进程线程吗?还是一个新的流程?或者一个单独的码头集装箱?把代码放在哪里(新的Django应用程序?(,以及如何在Django程序准备好后自动启动它。以及如何更新它动态监听的主题:我应该杀死旧的消费者,每次在新线程中启动新的消费者吗?
也有类似的问题,我所做的是创建一个自定义Django命令,然后继续为您的功能添加一个处理程序方法。在部署中,您可以将其作为sidecar容器启动。
class Command(BaseCommand):
def handle(self, *args, **options):
consumer = kafka.KafkaConsumer(KAFKA_TOPIC_NAME,bootstrap_server=["localhost:9092"],group_id=KAFKA_CONSUMER_GROUP)
for message in consumer:
handler_method(message)
作为一个sidecar,它会在启动时拾取消费者中的任何消息。
我知道这是一个迟来的答案。
关于这个问题,你可以试试Faust,它是一个流处理库,可以与Django集成。
在Docker中,启动两个共享相同代码库的不同容器,除了其中一个容器运行启动Faust worker 的命令
version: '3.3'
services:
backend_service: &backend_service
container_name: your_container_name
build:
context: .
dockerfile: Dockerfile
image: backend_service_image
volumes:
- ./your_codebase:/your_container_codebase_path
ports:
- "8000:8000"
env_file: .env
kafka_consumer:
<<: *backend_service
image: kafka_consumer_image
container_name: kafka_consumer
command: faust -A <your_project_root>.kafka:app worker -l info
请注意,在kafka_consumer
容器中,会运行一个命令:faust -A <your_project_root>.kafka:app worker -l info
,其中<your_project_root>
是包含settings.py
文件的文件夹的名称空间。
在此文件夹中创建一个文件kafka.py
,如下所示
import os
import faust
from django.conf import settings
# eventlet is used as a bridge to communicate with asyncio
os.environ.setdefault('FAUST_LOOP', 'eventlet')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your_root_namespace>.settings')
app = faust.App('set_a_name_here', broker=f"kafka://{settings.KAFKA_URL}")
# Specify a topic name
new_topic = app.topic("topic_to_process")
# define a method to process the above topic
@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
... # process event
请记住,这个过程是以异步方式执行的,因此在使用Django ORM时可能会遇到问题,为了正确实现使用ORM的使用者,可以使用sync_to_async包装器。
或者将其用作装饰器:
# kafka.py
from asgiref.sync import sync_to_async
@sync_to_async
def _do_something(event):
random_model_id = event.get('random_model_id')
random_model = RandomModel.objects.get(id=random_model_id)
@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
await _do_something(event)
然后您可以调试kafka_consumer
容器来检查发生了什么。为了反映更改,您需要重新启动容器。
如果您没有使用Docker,则需要安装一个supervisor,并将此命令faust -A <your_project_root>.kafka:app worker -l info
配置为在supervisord.conf
文件上运行。