My Django依赖Celery和Celeybeat来生成任务。我有3个系统服务/单元:
- myapp.service(gunicorn守护进程(
- 芹菜-myapp.service(芹菜工人(
- celeybeat-myapp.service(celeybeat守护进程(
我在主"myapp.service"服务(systemd单元(中定义了一个环境变量MY_SECRET。
我无法从settings.py或直接在我的任务(tasks.py(中的环境变量中获取此值,但我可以直接使用settings.py或环境变量从我的视图(views.py(顺利地检索此值。
我是否需要将myapp.service中定义的环境变量MY_SECRET复制到celery-myapp.service和celerybeat-myapp.seervice,以便从我的celery任务中获取此值?如何检索MY_SECRET值I,我的Celery任务?
myapp.服务
[Unit]
Description=My App
After=network.target
Wants=celery-myapp.service
Wants=celerybeat-myapp.service
[Service]
User=myapp
Group=nginx
Environment=MY_SECRET=1234
WorkingDirectory=/opt/myproject/
ExecStart=/opt/myproject/env/bin/gunicorn --workers 3 --log-level debug --bind unix://opt/myproject/myapp.sock myaproject.wsgi:application
[Install]
WantedBy=multi-user.target
环境变量在我的Django项目"settings.py"中被引用,还有我打算在应用程序中使用的其他设置:
设置.py
# Populated with environment variable defined by systemd unit
MY_SECRET = os.environ.get('MY_SECRET')
# Static value
MY_URL= 'http://127.0.0.1'
在我看来,我可以从settings.py和直接从environment:中获得定义为环境变量my_SECRET的值
视图.py
def my_view(request):
from django.conf import settings as project_settings
my_secret_env = os.environ.get('MY_SECRET')
my_secret_setting = project_settings.MY_SECRET
my_url_setting = project_settings.MY_URL
return render(request,'mypage.html',{
'my_secret_env': my_secret_env,
'my_secret_setting': my_secret_setting,
'my_url_setting': my_url_setting,
)
mypage.html
{% block page_content %}
<!-- Value displayed properly -->
<p><strong>my_secret_env:</strong> {{ my_secret_env }}</p>
<!-- Value displayed properly -->
<p><strong>my_secret_setting:</strong> {{ my_secret_setting }}</p>
<!-- Value displayed properly -->
<p><strong>my_url_setting:</strong> {{ my_url_setting }}</p>
{% endblock %}
从我的任务中,我无法从settings.py或直接从环境中获得定义为环境变量my_SECRET的值。
另一方面,settings.py的静态值被正确检索:
任务.py
@shared_task
def task_mytask():
from django.conf import settings as project_settings
my_secret_env = os.environ.get('MY_SECRET')
my_secret_setting = project_settings.MY_SECRET
my_url_setting = project_settings.MY_URL
# No value displayed
print(my_secret_env)
# No value displayed
print(my_secret_setting)
# Value displayed properly
print(my_url_setting)
@periodic_task(run_every=timedelta(seconds=60))
def task_myperiodictask():
task_mytask.delay()
不确定你的目标是什么,但你永远不应该随意传递你的秘密。但是,如果您愿意,您可能必须将它们作为args从调用任务的位置传递。像这样:
任务.py
@shared_task()
def task_mytask(my_secret_env, my_secret_setting):
...
视图.py
task_mytask.apply_async(args=[my_secret_env, my_secret_setting])
此外,您可以尝试设置一个环境变量并尝试调用它。
export MY_SETTING = 1234