如何从非 Python 语言(如 Java)调用芹菜任务延迟函数



我已经在 3 台集群机器上设置了芹菜 + 兔子。我还创建了一个任务,该任务根据文件中的数据生成正则表达式,并使用这些信息来解析文本。

from celery import Celery
celery = Celery('tasks', broker='amqp://localhost//')
import re
@celery.task
def add(x, y):
     return x + y

def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    

@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 

我可以使用以下python代码非常轻松地调用此任务:-

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 

但是,现在我想从Java而不是python进行相同的调用。我不确定做同样事情的最简单方法是什么。

我编写了此代码以向 AMQP 代理发送消息。代码运行良好,但任务未执行。我不确定如何指定应执行的任务的名称。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
class try1 {
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "celery", "celery");
    String messageBody = "{"text":"i am good"}" ;
    byte[] msgBytes = messageBody.getBytes("ASCII") ;
    channel.basicPublish(queueName, queueName,
            new AMQP.BasicProperties
            ("application/json", null, null, null,
                    null, null, null, null,
                    null, null, null, "guest",
                    null, null),messageBody.getBytes("ASCII")) ;
    connection.close();    

}}

这是 rabbitMq 错误日志中的输出:-

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

任何帮助将不胜感激。

谢谢阿米特

有几个问题。

1) 字符串 queueName = channel.queueDeclare().getQueue() 命令返回错误的队列名称。我将队列名称更改为"芹菜",它工作正常。2)json的格式必须是这种类型的:- {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", "task": "celery.task.PingTask", "参数": [], "kwargs": {}, "重试次数":0, "eta": "2009-11-17T12:30:56.527191"}

如 http://docs.celeryproject.org/en/latest/internals/protocol.html 所见

在这两个更改之后,它工作正常。

-阿米特

celery 隐式声明交换,使用 Java 你必须自己声明一个交换。

参见 与 Java 中的 Django/Celery 互操作

相关内容

  • 没有找到相关文章

最新更新