芹菜和定制消费者



据我所知,Celery既是消息的生产者,也是消息的消费者。这不是我想要实现的。我希望Celery只充当消费者,根据我发送给我选择的AMQP代理的消息来启动某些任务。这可能吗?

或者我需要在我的菜堆里加胡萝卜来做汤吗?

Celery代理充当消息存储,并将它们发布给订阅这些消息的一个或多个工作人员

所以:cerele通过django-db将消息推送到broker(rabbitmq、redist、cerele本身等)。这些消息由工作人员按照broker的协议检索,并存储它们(通常它们是持久的,但可能取决于您的broker),然后由您的工作人员执行。

任务结果在执行的辅助任务上可用,您可以配置将这些结果存储在何处,也可以使用此方法检索这些结果。

您可以通过将参数传递给"接收器函数"(您定义的任务,文档中有一些示例,通常您不想在这里传递重要信息(比如查询集),而只传递允许您在执行任务时检索所需信息的最小信息。

一个简单的例子可能是:

您注册了任务

@task
def add(x,x):
    return x+y

然后您从另一个模块调用:

from mytasks import add
metadata1 = 1
metadata2 = 2
myasyncresult = add.delay(1,2)
myasyncresult.get() == 3

编辑

在您编辑后,我看到您可能想要从除芹菜之外的其他来源构建消息,您可以在这里看到消息格式,它们默认为尊重该格式的pickle对象,所以您将这些消息发布在您的rabbitmqbroker的正确队列中,您可以从您的工作人员那里检索它们。

Celery使用消息代理体系结构模式。许多实现/代理传输可以与Celery一起使用,包括RabbitMQ和Django数据库。

来自维基百科:

消息代理是一种用于消息验证、消息转换和消息路由的体系结构模式。它调解了应用程序之间的通信,最大限度地减少了应用程序为了能够交换消息而应该相互了解的情况,从而有效地实现了解耦。

保留结果是可选的,并且需要一个结果后端。您可以使用不同的代理和结果后端。《芹菜入门》指南包含更多信息。

问题的答案是您可以在不添加Carrot的情况下通过参数来启动特定任务。

Celery Custom Consumer将在3.1v中发布,目前正在开发中,您可以阅读http://docs.celeryproject.org/en/master/userguide/extending.html关于它。

为了使用来自芹菜的消息,您需要创建芹菜可以使用的消息。您可以如下创建芹菜消息:-

def get_celery_worker_message(task_name,args,kwargs,routing_key,id,exchange=None,exchange_type=None):
    
    message=(args, kwargs, None)
    application_headers={
        'lang': 'py',
        'task': task_name,
        'id':id,
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs)
        #, 'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id':id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }

    body, content_type, content_encoding = prepare(
            message, 'json', 'application/json', 'utf-8',None, application_headers)
    prep_message = prepare_message(body,None,content_type,content_encoding,application_headers,properties)
    
    inplace_augment_message(prep_message, exchange, exchange_type, routing_key,id)
    # dump_json = json.dumps(prep_message)
    # print(f"json encoder:- {dump_json}")
    return prep_message

您需要首先根据使用者定义serializer、content_type、content_encoding、compression和headers来准备消息。

def prepare( body, serializer=None, content_type=None,
                 content_encoding=None, compression=None, headers=None):
        # No content_type? Then we're serializing the data internally.
        if not content_type:
            serializer = serializer
            (content_type, content_encoding,
             body) = dumps(body, serializer=serializer)
        else:
            # If the programmer doesn't want us to serialize,
            # make sure content_encoding is set.
            if isinstance(body, str):
                if not content_encoding:
                    content_encoding = 'utf-8'
                body = body.encode(content_encoding)
            # If they passed in a string, we can't know anything
            # about it. So assume it's binary data.
            elif not content_encoding:
                content_encoding = 'binary'
        if compression:
            body, headers['compression'] = compress(body, compression)
        return body, content_type, content_encoding
def prepare_message( body, priority=None, content_type=None,
                        content_encoding=None, headers=None, properties=None):
        """Prepare message data."""
        properties = properties or {}
        properties.setdefault('delivery_info', {})
        properties.setdefault('priority', priority )
        return {'body': body,
                'content-encoding': content_encoding,
                'content-type': content_type,
                'headers': headers or {},
                'properties': properties or {}}

一旦创建了消息,您就需要添加参数,以便芹菜消费者能够阅读它。

def inplace_augment_message(message, exchange,exchange_type, routing_key,next_delivery_tag):
    body_encoding_64 = 'base64'
    message['body'], body_encoding = encode_body(
        str(json.dumps(message['body'])), body_encoding_64
    )
    props = message['properties']
    props.update(
        body_encoding=body_encoding,
        delivery_tag=next_delivery_tag,
    )
    if exchange and exchange_type:
        props['delivery_info'].update(
            exchange=exchange,
            exchange_type=exchange_type,
            routing_key=routing_key,
        )
    elif exchange:
        props['delivery_info'].update(
            exchange=exchange,
            routing_key=routing_key,
        )
    else:
        props['delivery_info'].update(
            exchange=None,
            routing_key=routing_key,
        )
class Base64:
    """Base64 codec."""
    def encode(self, s):
        return bytes_to_str(base64.b64encode(str_to_bytes(s)))
    def decode(self, s):
        return base64.b64decode(str_to_bytes(s))
def encode_body( body, encoding=None):
    codecs = {'base64': Base64()}
    if encoding:
        return codecs.get(encoding).encode(body), encoding
    return body, encoding

相关内容

  • 没有找到相关文章

最新更新