以下代码使用运行
# For flask
gunicorn --bind 0.0.0.0:5000 project:app --access-logfile -
# For celery worker
celery -A project.celery worker --loglevel=info
两者都引用相同的文件(如下(
import os
import time
import threading
from flask import (
Flask,
jsonify,
)
from flask_sqlalchemy import SQLAlchemy
from greenlet import getcurrent
from celery import Celery
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
app = Flask(__name__)
app.config.from_object("project.config.Config")
db = SQLAlchemy(app)
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://redis:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379")
@app.route("/update", methods=['GET'])
def update():
for i in range(4):
celery_worker.delay()
return jsonify(result="success")
@celery.task(name="celery_worker")
def celery_worker():
time.sleep(1)
print(getcurrent())
time.sleep(1)
print(getcurrent().__hash__())
time.sleep(1)
print(threading.local())
time.sleep(1)
print(threading.current_thread())
time.sleep(1)
print(os.getpid())
生成日志,显示所有工作人员,尽管处于不同的进程,但都在同一个线程上?
日志:
celery_worker_1 | [2022-08-15 09:50:16,721: INFO/MainProcess] Task celery_worker[f38b203c-51e8-4242-9b3e-e466a208a395] received
celery_worker_1 | [2022-08-15 09:50:16,722: INFO/MainProcess] Task celery_worker[60afd98f-1139-4962-9aaf-4049261dae06] received
web_1 | 172.20.0.1 - - [15/Aug/2022:09:50:16 +0000] "GET /update HTTP/1.1" 200 21 "-" "curl/7.68.0"
celery_worker_1 | [2022-08-15 09:50:16,724: INFO/MainProcess] Task celery_worker[06feb5ee-4ece-4dfa-9d73-acf396be5161] received
celery_worker_1 | [2022-08-15 09:50:16,725: INFO/MainProcess] Task celery_worker[545068a2-8b28-454d-a8dc-47809adea845] received
celery_worker_1 | [2022-08-15 09:50:17,724: WARNING/ForkPoolWorker-1] <greenlet.greenlet object at 0x7f7102ff25c0 (otid=0x7f71038996c0) current active started main>
celery_worker_1 | [2022-08-15 09:50:17,724: WARNING/ForkPoolWorker-8] <greenlet.greenlet object at 0x7f7102ff25c0 (otid=0x7f71038996c0) current active started main>
celery_worker_1 | [2022-08-15 09:50:17,727: WARNING/ForkPoolWorker-3] <greenlet.greenlet object at 0x7f7102ff25c0 (otid=0x7f71038996c0) current active started main>
celery_worker_1 | [2022-08-15 09:50:17,727: WARNING/ForkPoolWorker-2] <greenlet.greenlet object at 0x7f7102ff25c0 (otid=0x7f71038996c0) current active started main>
celery_worker_1 | [2022-08-15 09:50:18,727: WARNING/ForkPoolWorker-1] 8757709894236
celery_worker_1 | [2022-08-15 09:50:18,727: WARNING/ForkPoolWorker-8] 8757709894236
celery_worker_1 | [2022-08-15 09:50:18,729: WARNING/ForkPoolWorker-3] 8757709894236
celery_worker_1 | [2022-08-15 09:50:18,730: WARNING/ForkPoolWorker-2] 8757709894236
celery_worker_1 | [2022-08-15 09:50:19,729: WARNING/ForkPoolWorker-1] <_thread._local object at 0x7f7101191c20>
celery_worker_1 | [2022-08-15 09:50:19,729: WARNING/ForkPoolWorker-8] <_thread._local object at 0x7f7101191c20>
celery_worker_1 | [2022-08-15 09:50:19,731: WARNING/ForkPoolWorker-3] <_thread._local object at 0x7f7101191c20>
celery_worker_1 | [2022-08-15 09:50:19,731: WARNING/ForkPoolWorker-2] <_thread._local object at 0x7f7101191c20>
celery_worker_1 | [2022-08-15 09:50:20,731: WARNING/ForkPoolWorker-1] <_MainThread(MainThread, started 140123376166720)>
celery_worker_1 | [2022-08-15 09:50:20,731: WARNING/ForkPoolWorker-8] <_MainThread(MainThread, started 140123376166720)>
celery_worker_1 | [2022-08-15 09:50:20,732: WARNING/ForkPoolWorker-2] <_MainThread(MainThread, started 140123376166720)>
celery_worker_1 | [2022-08-15 09:50:20,732: WARNING/ForkPoolWorker-3] <_MainThread(MainThread, started 140123376166720)>
celery_worker_1 | [2022-08-15 09:50:21,733: WARNING/ForkPoolWorker-8] 16
celery_worker_1 | [2022-08-15 09:50:21,733: WARNING/ForkPoolWorker-1] 9
celery_worker_1 | [2022-08-15 09:50:21,733: WARNING/ForkPoolWorker-2] 10
celery_worker_1 | [2022-08-15 09:50:21,734: WARNING/ForkPoolWorker-3] 11
celery_worker_1 | [2022-08-15 09:50:21,743: INFO/ForkPoolWorker-8] Task celery_worker[f38b203c-51e8-4242-9b3e-e466a208a395] succeeded in 5.019944864034187s: None
celery_worker_1 | [2022-08-15 09:50:21,743: INFO/ForkPoolWorker-2] Task celery_worker[06feb5ee-4ece-4dfa-9d73-acf396be5161] succeeded in 5.016862046031747s: None
celery_worker_1 | [2022-08-15 09:50:21,743: INFO/ForkPoolWorker-1] Task celery_worker[60afd98f-1139-4962-9aaf-4049261dae06] succeeded in 5.02064540295396s: None
celery_worker_1 | [2022-08-15 09:50:21,743: INFO/ForkPoolWorker-3] Task celery_worker[545068a2-8b28-454d-a8dc-47809adea845] succeeded in 5.017592608986888s: None
操作系统是linux。芹菜工人和烧瓶分别装在由python:3.9.5-lim-buster构建的docker容器中。
这一点很重要,因为大型项目中的一些工具是由线程确定作用域的,从它们的角度来看,它们都在同一个线程上运行。
答案是这与Celery无关,这是当进程被分叉并调用greenlets.getcurrent时观察到的行为。
甚至可以在这个使用多处理库的示例代码中观察到:
from multiprocessing import process
from greenlet import getcurrent
import time
import os
from multiprocessing import Process
def printit():
print(getcurrent())
time.sleep(3)
print(os.getpid())
time.sleep(3)
tasks = [Process(target=printit) for i in range(4)]
for t in tasks:
t.start()
for i in tasks:
t.join()
输出:
<greenlet.greenlet object at 0x7fb5c9649930 (otid=0x7fb5c9627ec0) current active started main>
<greenlet.greenlet object at 0x7fb5c9649930 (otid=0x7fb5c9627ec0) current active started main>
<greenlet.greenlet object at 0x7fb5c9649930 (otid=0x7fb5c9627ec0) current active started main>
<greenlet.greenlet object at 0x7fb5c9649930 (otid=0x7fb5c9627ec0) current active started main>
713408
713407
713409
713410