python call ansibleapi with celery返回无,我已经搜索了几天。它可以与无芹菜的呼叫部署功能搭配,但是使用芹菜,我的代码我的代码呼叫ansibleapi and ansibleapi none none。
复制步骤。
1.tasks.py
from celery import shared_task
from .deploy_tomcat2 import django_process
@shared_task
def deploy(jira_num):
#return 'hello world {0}'.format(jira_num)
#rdb.set_trace()
return django_process(jira_num)
2.Deploy_tomcat2.py
from .AnsibleApi import CallApi
def django_process(jira_num):
server = '10.10.10.30'
name = 'abc'
port = 11011
code = 'efs'
jdk = '1.12.13'
jvm = 'xxxx'
if str.isdigit(jira_num):
# import pdb
# pdb.set_trace()
call = CallApi(server,name,port,code,jdk,jvm)
return call.run_task()
3.Ansibleapi.py
#!/usr/bin/env python
import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)
class ResultCallback(CallbackBase):
def __init__(self, *args, **kwargs):
super(ResultCallback ,self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
self.host_unreachable[result._host.get_name()] = result
def v2_runner_on_ok(self, result, *args, **kwargs):
self.host_ok[result._host.get_name()] = result
def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = result
class CallApi(object):
user = settings.SSH_USER
ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
results_callback = ResultCallback()
Options = namedtuple('Options',
['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
'become_user', 'check'])
def __init__(self,ip,name,port,code,jdk,jvm):
self.ip = ip
self.name = name
self.port = port
self.code = code
self.jdk = jdk
self.jvm = jvm
self.results_callback = ResultCallback()
self.results_raw = {}
def _gen_user_task(self):
tasks = []
deploy_script = 'autodeploy/tomcat_deploy.sh'
dst_script = '/tmp/tomcat_deploy.sh'
cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
# tasks.append(dict(action=dict(module='command', args=args)))
# tasks.append(dict(action=dict(module='command', args=args), register='result'))
# tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
self.tasks = tasks
def _set_option(self):
self._gen_user_task()
self.variable_manager = VariableManager()
self.loader = DataLoader()
self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
become=True, become_method='sudo', become_user='root', check=False)
self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
self.variable_manager.set_inventory(self.inventory)
play_source = dict(
name = "auto deploy tomcat",
hosts = self.ip,
remote_user = self.user,
gather_facts='no',
tasks = self.tasks
)
self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
def run_task(self):
self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
tqm = None
from celery.contrib import rdb;rdb.set_trace()
#import pdb;pdb.set_trace()
self._set_option()
try:
tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords=None,
stdout_callback=self.results_callback,
)
result = tqm.run(self.play)
finally:
if tqm is not None:
tqm.cleanup()
for host, result in self.results_callback.host_ok.items():
self.results_raw['success'][host] = result._result
for host, result in self.results_callback.host_failed.items():
self.results_raw['failed'][host] = result._result
for host, result in self.results_callback.host_unreachable.items():
self.results_raw['unreachable'][host]= result._result
Log.info("result is :%s" % self.results_raw)
return self.results_raw
4.celery Worker
celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info
5.生产味精:
deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')
似乎还可以。
唯一的问题是None
确实是deploy
任务返回吗?
更好的是,如果您可以发布芹菜工人日志。
有两种解决此问题的方法,请禁用:1.芹菜开始设置导出pythonoptimize = 1或使用此参数-O优化启动芹菜2.可见的python数据包多处理过程。
assert not _current_process._config.get('daemon'),
'daemonic processes are not allowed to have children'