我正在尝试创建一个Windows服务来启动Celery。我偶然发现一篇文章使用任务调度程序来实现这一点。然而,它似乎启动了许多芹菜实例,并不断消耗内存,直到机器死亡。有什么方法可以将其作为Windows服务启动吗?
我从另一个网站得到了答案。Celeryd(Celery的守护进程服务)作为粘贴应用程序运行,搜索"粘贴Windows服务"将引导我来到这里。它描述了如何将Pylons应用程序作为Windows服务运行。作为paster框架和托管python web服务的新手,我一开始并没有想到要检查它。但这个解决方案适用于Celery,只是在脚本中做了一些细微的更改。
我修改了脚本,使其更容易修改Celery设置。主要变化是:
- 使用Celery服务的设置创建一个INI文件(如下所示)
- 创建一个python脚本来创建一个Windows服务
INI文件设置(celeryd.INI):
[celery:service]
service_name = CeleryService
service_display_name = Celery Service
service_description = WSCGI Windows Celery Service
service_logfile = celeryd.log
创建Windows服务的Python脚本(CeleryService.py):
"""
The most basic (working) Windows service possible.
Requires Mark Hammond's pywin32 package.
Most of the code was taken from a CherryPy 2.2 example of how to set up a service
"""
import pkg_resources
import win32serviceutil
from paste.script.serve import ServeCommand as Server
import os, sys
import ConfigParser
import win32service
import win32event
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
INI_FILE = 'celeryd.ini'
SERV_SECTION = 'celery:service'
SERV_NAME = 'service_name'
SERV_DISPLAY_NAME = 'service_display_name'
SERV_DESC = 'service_description'
SERV_LOG_FILE = 'service_logfile'
SERV_APPLICATION = 'celeryd'
SERV_LOG_FILE_VAR = 'CELERYD_LOG_FILE'
# Default Values
SERV_NAME_DEFAULT = 'CeleryService'
SERV_DISPLAY_NAME_DEFAULT = 'Celery Service'
SERV_DESC_DEFAULT = 'WSCGI Windows Celery Service'
SERV_LOG_FILE_DEFAULT = r'D:logscelery.log'
class DefaultSettings(object):
def __init__(self):
if SCRIPT_DIR:
os.chdir(SCRIPT_DIR)
# find the ini file
self.ini = os.path.join(SCRIPT_DIR,INI_FILE)
# create a config parser opject and populate it with the ini file
c = ConfigParser.SafeConfigParser()
c.read(self.ini)
self.c = c
def getDefaults(self):
'''
Check for and get the default settings
'''
if (
(not self.c.has_section(SERV_SECTION)) or
(not self.c.has_option(SERV_SECTION, SERV_NAME)) or
(not self.c.has_option(SERV_SECTION, SERV_DISPLAY_NAME)) or
(not self.c.has_option(SERV_SECTION, SERV_DESC)) or
(not self.c.has_option(SERV_SECTION, SERV_LOG_FILE))
):
print 'setting defaults'
self.setDefaults()
service_name = self.c.get(SERV_SECTION, SERV_NAME)
service_display_name = self.c.get(SERV_SECTION, SERV_DISPLAY_NAME)
service_description = self.c.get(SERV_SECTION, SERV_DESC)
iniFile = self.ini
service_logfile = self.c.get(SERV_SECTION, SERV_LOG_FILE)
return service_name, service_display_name, service_description, iniFile, service_logfile
def setDefaults(self):
'''
set and add the default setting to the ini file
'''
if not self.c.has_section(SERV_SECTION):
self.c.add_section(SERV_SECTION)
self.c.set(SERV_SECTION, SERV_NAME, SERV_NAME_DEFAULT)
self.c.set(SERV_SECTION, SERV_DISPLAY_NAME, SERV_DISPLAY_NAME_DEFAULT)
self.c.set(SERV_SECTION, SERV_DESC, SERV_DESC_DEFAULT)
self.c.set(SERV_SECTION, SERV_LOG_FILE, SERV_LOG_FILE_DEFAULT)
cfg = file(self.ini, 'wr')
self.c.write(cfg)
cfg.close()
print '''
you must set the celery:service section service_name, service_display_name,
and service_description options to define the service
in the %s file
''' % self.ini
sys.exit()
class CeleryService(win32serviceutil.ServiceFramework):
"""NT Service."""
d = DefaultSettings()
service_name, service_display_name, service_description, iniFile, logFile = d.getDefaults()
_svc_name_ = service_name
_svc_display_name_ = service_display_name
_svc_description_ = service_description
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
# create an event that SvcDoRun can wait on and SvcStop
# can set.
self.stop_event = win32event.CreateEvent(None, 0, 0, None)
def SvcDoRun(self):
os.chdir(SCRIPT_DIR)
s = Server(SERV_APPLICATION)
os.environ[SERV_LOG_FILE_VAR] = self.logFile
s.run([self.iniFile])
win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE)
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
#win32event.SetEvent(self.stop_event)
self.ReportServiceStatus(win32service.SERVICE_STOPPED)
sys.exit()
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(CeleryService)
要安装服务,请运行python CeleryService.py install
,然后运行python CeleryService.py start
以启动服务注意:这些命令应该在具有管理员权限的命令行中运行。
如果需要删除服务,请运行python CeleryService.py remove
。
我试图托管Celery,作为增强我的RhodeCode安装的一部分。这个解决方案似乎奏效了。希望这能帮助到别人。
接受的答案不适用于使用Django应用程序运行celeni。但它启发了我想出一个解决方案,用Django将芹菜作为Windows服务运行。请注意,以下内容仅适用于Django项目。经过一些修改,它可以与其他应用程序配合使用。
以下讨论假设已经安装了Python>=3.6和RabbitMQ,并且rabbitmq-server
正在localhost
上运行。
在Django项目的顶级文件夹(与manage.py级别相同)中创建一个文件celey_service.py(或任何您喜欢的文件),内容如下:
'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
from pathlib import Path
import shlex
import logging
import time
# The directory for celery.log and celery_service.log
# Default: the directory of this script
INSTDIR = Path(__file__).parent
# The path of python Scripts
# Usually it is in path_to/venv/Scripts.
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = INSTDIR / 'venvcelery/Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'proj'
logging.basicConfig(
filename = INSTDIR / 'celery_service.log',
level = logging.DEBUG,
format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)
class CeleryService(win32serviceutil.ServiceFramework):
_svc_name_ = "Celery"
_svc_display_name_ = "Celery Distributed Task Queue Service"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
def SvcStop(self):
logging.info('Stopping {name} service ...'.format(name=self._svc_name_))
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.ReportServiceStatus(win32service.SERVICE_STOPPED)
sys.exit()
def SvcDoRun(self):
logging.info('Starting {name} service ...'.format(name=self._svc_name_))
os.chdir(INSTDIR) # so that proj worker can be found
logging.info('cwd: ' + os.getcwd())
self.ReportServiceStatus(win32service.SERVICE_RUNNING)
command = '"{celery_path}" -A {proj_dir} worker -f "{log_path}" -l info -P eventlet'.format(
celery_path=PYTHONSCRIPTPATH / 'celery.exe',
proj_dir=PROJECTDIR,
log_path=INSTDIR / 'celery.log')
logging.info('command: ' + command)
args = shlex.split(command)
proc = subprocess.Popen(args)
logging.info('pid: {pid}'.format(pid=proc.pid))
self.timeout = 3000
while True:
rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
if rc == win32event.WAIT_OBJECT_0:
# stop signal encountered
# terminate process 'proc'
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
break
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(CeleryService)
在脚本可以运行之前,您需要
可选择创建一个python虚拟环境,例如"venvcelery"。
安装以下要求:
django>=2.0.0sqlalchemy>=1.0.14芹菜>=4.3.0,<5pywin32>=227eventlet>=0.25
修复pywin32
pywintypes36.dll
位置。参考在celey_service.py 中正确设置PYTHONSTRIPTPATH和PROJECTDIR
PYTHONSTRIPTPATH通常是python安装路径或当前虚拟环境下的"脚本"文件夹
PROJECTDIR是Django项目的目录名。
它是manage.py的同一级别的目录,而不是父目录。
现在,您可以使用安装/启动/停止/删除服务
python celery_service.py install
python celery_service.py start
python celery_service.py stop
python celery_service.py remove
我创建了一个Django演示项目,其中芹菜作为Windows服务运行:
https://github.com/azalea/django_celery_windows_service
如果您对正在运行的示例感兴趣。
注意:这是一个更新版本,假设Python>=3.6、Django 2.2和Celery 4。
Python 2.7、Django 1.6和Celery 3的旧版本可以在编辑历史中查看。
@azalea的回答帮了我很多忙,但我想在这里强调的一点是,服务(celey_service.py)需要使用您的用户/密码安装,否则,当您运行subprocess.Popen(args) in SvcDoRun()
函数时,不会发生任何事情,因为会出现权限问题。要设置用户/密码,您可以选择两种方法之一:
-
使用命令行:
python33 .celeryService1.py --username .USERNAME --password PASSWORD
-
转到计算机管理(本地)>服务和应用程序>服务,找到您的服务器(在@azalea的示例中,它是"Celery Distributed Task Queue Service"),右键单击打开属性页面,在登录选项卡中输入"This account"
这里是一个很好的项目,但没有成功使用它:django-windows工具的GitHub链接。它在最后一个命令行给了我一个超时。没有足够的时间搜索原因。
该包允许在IIS上设置Django项目的FastCGI、Celery和静态文件。
感谢Azalea,因为这让我走上了在windows上使用Celery 4创建2个windows服务的道路。
一个能够启动/停止多个工人T其次,能够启动/停止beat服务,并使用Celery 4整理pid。
我没有解决方案的唯一警告是,您不能重新启动工作进程,因为您需要确保在启动备份之前停止多个进程的派生进程。
Workers.py:
'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time
# The directory for celery_worker.log and celery_worker_service.log
# Default: the directory of this script
INSTDIR = 'X:ApplicationProject'
LOGDIR = 'X:ApplicationLogFiles'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:Python27Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:Python36Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'Project'
logging.basicConfig(
filename = os.path.join(LOGDIR, 'celery_worker_service.log'),
level = logging.DEBUG,
format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)
class CeleryService(win32serviceutil.ServiceFramework):
_svc_name_ = "CeleryWorkers"
_svc_display_name_ = "CeleryWorkers"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
def SvcStop(self):
logging.info('Stopping {name} service ...'.format(name=self._svc_name_))
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
logging.info('Stopped1 {name} service ...'.format(name=self._svc_name_))
logging.info('Stopped3 {name} service ...'.format(name=self._svc_name_))
command = '"{celery_path}" -A {proj_dir} --workdir=X:/Application/Project control shutdown --timeout=10'.format(
celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
proj_dir=PROJECTDIR,
log_path=os.path.join(LOGDIR,'celery_worker.log'))
logging.info('command: ' + command)
args = shlex.split(command)
proc = subprocess.Popen(args)
logging.info('Stopped celery shutdown ...')
self.ReportServiceStatus(win32service.SERVICE_STOPPED)
logging.info('Stopped2 {name} service ...'.format(name=self._svc_name_))
sys.exit()
def SvcDoRun(self):
logging.info('Starting {name} service ...'.format(name=self._svc_name_))
os.chdir(INSTDIR) # so that proj worker can be found
logging.info('cwd: ' + os.getcwd())
self.ReportServiceStatus(win32service.SERVICE_RUNNING)
command = '"{celery_path}" -A {proj_dir} -c 8 worker --workdir=X:/Application/Project --pidfile=celeryservice.pid -f "{log_path}" -l info'.format(
celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
proj_dir=PROJECTDIR,
log_path=os.path.join(LOGDIR,'celery_worker.log'))
logging.info('command: ' + command)
args = shlex.split(command)
proc = subprocess.Popen(args)
logging.info('pid: {pid}'.format(pid=proc.pid))
self.timeout = 3000
while True:
rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
if rc == win32event.WAIT_OBJECT_0:
# stop signal encountered
# terminate process 'proc'
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
break
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(CeleryService)
Beatservice.py:
'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time
import signal
# The directory for celery_beat.log and celery_beat_service.log
# Default: the directory of this script
INSTDIR = os.path.dirname(os.path.realpath(__file__))
LOGPATH = 'X:ApplicationLogs'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:Python27Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:Python36Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'PROJECT'
logging.basicConfig(
filename = os.path.join(LOGPATH, 'celery_beat_service.log'),
level = logging.DEBUG,
format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)
class CeleryService(win32serviceutil.ServiceFramework):
_svc_name_ = "CeleryBeat"
_svc_display_name_ = "CeleryBeat"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
def SvcStop(self):
logging.info('Stopping 1 {name} service ...'.format(name=self._svc_name_))
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
pidno = open("X:AplicationProjectcelerybeat.pid", "r")
_pid_id_ = pidid=pidno.read()
pidno.close()
logging.info(_pid_id_)
logging.info('taskkill /F /PID {pidid} ..'.format(pidid=_pid_id_))
cmdcom = 'taskkill /F /PID {pidid}'.format(pidid=_pid_id_)
logging.info(cmdcom)
killargs = shlex.split(cmdcom)
process = subprocess.Popen(killargs)
output, error = process.communicate()
logging.info(output)
logging.info('Stopping 2 {name} service ...'.format(name=self._svc_name_))
os.remove("X:ApplicationPROJECTcelerybeat.pid")
logging.info('X:ApplicationPROJECTcelerybeat.pid file removed')
self.ReportServiceStatus(win32service.SERVICE_STOPPED)
sys.exit()
def SvcDoRun(self):
logging.info('Starting {name} service ...'.format(name=self._svc_name_))
os.chdir(INSTDIR) # so that proj worker can be found
logging.info('cwd: ' + os.getcwd())
self.ReportServiceStatus(win32service.SERVICE_RUNNING)
command = '"{celery_path}" -A {proj_dir} beat --workdir=X:/Application/Project -f X:/Application/logs/beat.log -l info'.format(
celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
proj_dir=PROJECTDIR,
log_path=os.path.join(LOGPATH,'celery_beat.log'))
logging.info('command: ' + command)
args = shlex.split(command)
proc = subprocess.Popen(args)
logging.info('pid: {pid}'.format(pid=proc.pid))
self.timeout = 3000
while True:
rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
if rc == win32event.WAIT_OBJECT_0:
# stop signal encountered
# terminate process 'proc'
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
break
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(CeleryService)