django 芹菜 ansibleapi返回无



python call ansibleapi with celery返回无,我已经搜索了几天。它可以与无芹菜的呼叫部署功能搭配,但是使用芹菜,我的代码我的代码呼叫ansibleapi and ansibleapi none none。

复制步骤。

1.tasks.py

from celery import shared_task
from .deploy_tomcat2 import django_process

@shared_task
def deploy(jira_num):
    #return 'hello world {0}'.format(jira_num)
    #rdb.set_trace()
    return django_process(jira_num)    

2.Deploy_tomcat2.py

from .AnsibleApi import CallApi
def django_process(jira_num):
    server = '10.10.10.30'
    name = 'abc'
    port = 11011
    code = 'efs'
    jdk = '1.12.13'
    jvm = 'xxxx'
    if str.isdigit(jira_num):
        # import pdb
        # pdb.set_trace()
        call = CallApi(server,name,port,code,jdk,jvm)
        return call.run_task()

3.Ansibleapi.py

#!/usr/bin/env python
import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)

class ResultCallback(CallbackBase):
    def __init__(self, *args, **kwargs):
        super(ResultCallback ,self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}
    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result
    def v2_runner_on_ok(self, result, *args, **kwargs):
        self.host_ok[result._host.get_name()] = result
    def v2_runner_on_failed(self, result, *args, **kwargs):
        self.host_failed[result._host.get_name()] = result

class CallApi(object):
    user = settings.SSH_USER
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
    results_callback = ResultCallback()
    Options = namedtuple('Options',
                         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
                          'become_user', 'check']) 
    def __init__(self,ip,name,port,code,jdk,jvm):
        self.ip = ip
        self.name = name
        self.port = port
        self.code = code
        self.jdk = jdk
        self.jvm = jvm
        self.results_callback = ResultCallback()
        self.results_raw = {}
    def _gen_user_task(self):
        tasks = []
        deploy_script = 'autodeploy/tomcat_deploy.sh'
        dst_script = '/tmp/tomcat_deploy.sh'
        cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
        args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
        tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
        tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
        # tasks.append(dict(action=dict(module='command', args=args)))
        # tasks.append(dict(action=dict(module='command', args=args), register='result'))
        # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
        self.tasks = tasks
    def _set_option(self):
        self._gen_user_task()
        self.variable_manager = VariableManager()
        self.loader = DataLoader()
        self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
                                    become=True, become_method='sudo', become_user='root', check=False)
        self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
        self.variable_manager.set_inventory(self.inventory)
        play_source = dict(
        name = "auto deploy tomcat",
            hosts = self.ip,
            remote_user = self.user,
            gather_facts='no',
            tasks = self.tasks
        )
        self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
    def run_task(self):
        self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
        tqm = None
        from celery.contrib import rdb;rdb.set_trace()
        #import pdb;pdb.set_trace()
        self._set_option()
        try:
            tqm = TaskQueueManager(
                inventory=self.inventory,
                variable_manager=self.variable_manager,
                loader=self.loader,
                options=self.options,
                passwords=None,
                stdout_callback=self.results_callback,
            )
            result = tqm.run(self.play)
        finally:
            if tqm is not None:
                tqm.cleanup()
        for host, result in self.results_callback.host_ok.items():
            self.results_raw['success'][host] = result._result
        for host, result in self.results_callback.host_failed.items():
            self.results_raw['failed'][host] = result._result
        for host, result in self.results_callback.host_unreachable.items():
            self.results_raw['unreachable'][host]= result._result
        Log.info("result is :%s" % self.results_raw)
        return self.results_raw

4.celery Worker

celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info

5.生产味精:

deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')

似乎还可以。
唯一的问题是None确实是deploy任务返回吗?
更好的是,如果您可以发布芹菜工人日志。

有两种解决此问题的方法,请禁用:1.芹菜开始设置导出pythonoptimize = 1或使用此参数-O优化启动芹菜2.可见的python数据包多处理过程。

assert not _current_process._config.get('daemon'), 
               'daemonic processes are not allowed to have children'

相关内容

  • 没有找到相关文章

最新更新