请花时间阅读完整的问题以了解确切的问题。谢谢。
我有一个runner/driver程序,它监听Kafka主题,并在收到关于该主题的新消息时使用ThreadPoolExecuter
调度任务(如下所示(:
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for message in consumer:
futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))
中间有一堆代码,但这些代码在这里并不重要,所以我跳过了它
现在,SOME_FUNCTION来自另一个导入的python脚本(事实上,有一个在稍后阶段发生的导入层次结构(。重要的是,在这些脚本中的某个时刻,我调用Multiprocessing
池,因为我需要对数据(SIMD-单指令多数据(进行并行处理,并使用apply_async函数来执行此操作
for loop_message_chunk in loop_message_chunks:
res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))
现在,我有两个版本的跑步者/驱动程序:
基于卡夫卡的(如上所示(
- 此版本生成启动多处理的线程
听卡夫卡的歌->启动线程->启动多处理器
基于REST(使用flask通过REST调用实现相同的任务(
- 此版本不启动任何线程,并立即调用多处理
侦听REST端点->启动多处理器
为什么要问2个运行程序/驱动程序脚本?-该微服务将由多个团队使用,有些团队希望基于同步REST,而有些团队则希望基于KAFKA的实时异步系统
当我从并行函数(上例中的self.one_matching.match
(进行日志记录时,它在通过REST版本调用时工作,但在使用KAFKA版本调用时不工作(基本上是在线程启动多处理时,它不工作(。
还要注意,只有来自并行函数的日志记录不起作用。层次结构中的其余脚本从runner到调用apply_async的脚本,其中包括从线程日志中成功调用的脚本
其他详细信息:
- 我使用yaml文件配置记录器
- 我在runner脚本本身中为KAFKA或REST版本配置记录器
- 我在runner脚本之后调用的每个其他脚本中执行
logging.getLogger
,以使特定的记录器记录到不同的文件
Logger Config(值被替换为泛型,因为我无法查询确切的名称(:
version: 1
formatters:
simple:
format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
custom1:
format: '%(asctime)s | %(filename)s :: %(message)s'
time-message:
format: '%(asctime)s | %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
handler1:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile1.log
handler2:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: custom1
level: INFO
filename: logs/logfile2.log
handler3:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile3.log
handler4:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile4.log
handler5:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile5.log
loggers:
logger1:
level: DEBUG
handlers: [console, handler1]
propagate: no
logger2:
level: DEBUG
handlers: [console, handler5]
propagate: no
logger3:
level: INFO
handlers: [handler2]
propagate: no
logger4:
level: DEBUG
handlers: [console, handler3]
propagate: no
logger5:
level: DEBUG
handlers: [console, handler4]
propagate: no
kafka:
level: WARNING
handlers: [console]
propogate: no
root:
level: INFO
handlers: [console]
propogate: no
可能的答案:去掉线程并使用异步
示例伪代码结构(由这些示例拼凑而成(
#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
async def SOME_FUNCTION_CO(executor, **kwargs):
res_list = []
for loop_message_chunk in loop_message_chunks:
res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
#call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
return res_list
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
#Global executor:
#I would also suggest using a "spawn" context unless you really need the
#performance of "fork".
ctx = multiprocessing.get_context("spawn")
tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
with ProcessPoolExecutor(mp_context=ctx) as executor:
try:
# Consume messages
async for msg in consumer:
tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
if __name__ == "__main__":
asyncio.run(consume())
我一直在反复讨论我认为应该如何在这个例子中表示SOME_FUNCTION
,但这里的关键点是,在msg in consumer
上的循环中,您正在计划任务最终完成。如果这些任务中的任何一个需要很长时间,它可能会阻塞主循环(它也在运行async for msg in consumer
行(。相反这些可能需要很长时间的任务中的任何一个都应该快速返回某种类型的future,这样一旦结果准备好,您就可以简单地访问它。
首先,我没有使用完全相同的堆栈。我使用的是fastaapi和Redis-pubsub,现在为flask和Kafka复制它会很乏味。我认为原则上它应该以同样的方式工作。至少它可能会让你在代码中发现一些错误配置。此外,我正在对记录器配置进行硬编码。
很抱歉粘贴了很多代码,但我想提供一个完整的工作示例,也许我在你的描述中遗漏了一些东西,你还没有提供一个最小的工作示例。
我有四个文件:
app.py (fastapi application)
config.py (setup config variables and logger)
redis_ps (redis consumer/listener)
utils (processing function (some_function), redis publish function)
和redis集装箱
docker pull redis
运行
docker run --restart unless-stopped --publish 6379:6379 --name redis -d redis
python3 app.py (will run server and pubsub listener)
python3 utils.py (will publish message over pubsub)
curl -X 'POST'
'http://0.0.0.0:5000/sync'
-H 'accept: application/json'
-H 'Content-Type: application/json'
-d '[[2,4],[6, 8]]'
输出
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 1, result 1
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 3, result 9
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 5, result 25
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 7, result 49
[2021-12-08 17:54:39,519] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,520] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 8, result 64
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 6, result 36
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 2, result 4
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 4, result 16
源代码
应用程序
from concurrent import futures
from typing import List
import uvicorn
from fastapi import FastAPI, APIRouter
from redis_ps import PubSubWorkerThreadListen
from utils import some_function
router = APIRouter()
@router.post("/sync")
def sync_process(data: List[List[int]]):
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="rest api") for d in data]
return [future.result() for future in future_all]
def create_app():
app = FastAPI(title="app", openapi_url="/openapi.json", docs_url="/")
app.include_router(router)
thread = PubSubWorkerThreadListen()
thread.start()
return app
if __name__ == "__main__":
_app = create_app()
uvicorn.run(_app, host="0.0.0.0", port=5000, debug=True, log_level="debug")
配置
import sys
import logging
COMPONENT_NAME = "test_logger"
REDIS_URL = "redis://localhost:6379"
def setup_logger(logger_name: str, log_level=logging.DEBUG, fmt: logging.Formatter = None):
fmt = fmt or logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.name = "h_console"
handler.setFormatter(fmt)
handler.setLevel(log_level)
logger_ = logging.getLogger(logger_name)
logger_.addHandler(handler)
logger_.setLevel(log_level)
return logger_
setup_logger(COMPONENT_NAME)
redis.ps
import json
import logging
import threading
import time
from concurrent import futures
from typing import Dict, List, Union
import redis
from config import COMPONENT_NAME, REDIS_URL
from utils import some_function
logger = logging.getLogger(COMPONENT_NAME)
class PubSubWorkerThreadListen(threading.Thread):
def __init__(self):
super().__init__()
self._running = threading.Event()
@staticmethod
def connect_pubsub() -> redis.client.PubSub:
while True:
try:
r = redis.Redis.from_url(REDIS_URL)
p = r.pubsub()
p.psubscribe(["*:*:*"])
logger.info("Connected to Redis")
return p
except Exception:
time.sleep(0.1)
def run(self):
if self._running.is_set():
return
self._running.set()
while self._running.is_set():
p = self.connect_pubsub()
try:
listen(p)
except Exception as e:
logger.error(f"Failed to process Redis message or failed to connect: {e}")
time.sleep(0.1)
def stop(self):
self._running.clear()
def get_data(msg) -> Union[Dict, List]:
data = msg.get("data")
if isinstance(data, int):
# the first message has {'data': 1}
return []
try:
return json.loads(data)
except Exception as e:
logger.warning("Failed to parse data in the message (%s) with error %s", msg, e)
return []
def listen(p_):
logger.debug("Start listening")
while True:
for msg_ in p_.listen():
data = get_data(msg_)
if data:
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="pubsub") for d in data]
[future.result() for future in future_all]
utils.py
import json
import logging
from multiprocessing import Pool
from typing import List
import redis
from config import COMPONENT_NAME, REDIS_URL
logger = logging.getLogger(COMPONENT_NAME)
def one_matching(v, caller: str = ""):
logger.debug(f"caller: {caller}, Processing {v}, result {v*v}")
return v * v
def some_function(loop_message_chunks: List[int], caller: str):
logger.debug(f"Run some_function, caller: {caller}")
with Pool(2) as pool:
v = [pool.apply_async(one_matching, args=(i, caller)) for i in loop_message_chunks]
res_list = [res.get(timeout=1) for res in v]
return res_list
def publish():
data = [[1, 3], [5, 7]]
r_ = redis.Redis.from_url(REDIS_URL)
logger.debug("Published message %s %s", "test", data)
r_.publish("test:test:test", json.dumps(data).encode())
if __name__ == "__main__":
publish()