我正在尝试将函数参数从烧瓶路由中的函数调用传递到芹菜任务。我尝试了很多变量,但我似乎无法弄清楚。呸。我确定我只是错过了一些明显的东西。这是我正在尝试的一项非常简单的任务。不疯。我只想在后台调用一个函数并重定向到新页面。下面的代码
烧瓶路线:
@application.route("/vrops-connect", methods=["GET","POST"])
def vrops_connect():
if request.method == "POST":
vropshost = request.form.get('vropshost')
vropsuser = request.form.get('vropsuser')
vropspass = request.form.get('vropspass')
customer_id = request.form.get('customer_id')
# test_call()
task = call_vrops_connect.delay(vropshost, vropsuser, vropspass, customer_id)
return redirect('get-json')
芹菜任务:
@celery.task(name='call.vrops.connect')
def call_vrops_connect(vropshost, vropsuser, vropspass, customer_id):
test_connect = 'call vrops connect was called '
with open('test_connect.log', 'a') as j:
j.write(test_connect)
vhost = vropshost
vuser = vropsuser
vpass = vropspass
cust_id = customer_id
pull_data_from_vrops(vhost, vuser, vpass, cust_id)
我有一个没有参数的 test_call() 工作得很好。当我通过参数时,什么都没有被调用。我有一种笨拙的方式来创建一个日志进行测试。
我已经尝试了几种延迟和apply_async的变化。
编辑:
这些是我迄今为止尝试过的一些变体。
test_call() # This works
call_vrops_connect.delay(vropshost, vropsuser, vropspass, customer_id) # This doesn't
task = call_vrops_connect.delay(vropshost, vropsuser, vropspass, customer_id) # This doesn't
call_vrops_connect.apply_async(args=[vropshost, vropsuser, vropspass, customer_id]) # This doesn't
call_vrops_connect.apply_async(kwargs={'vropshost': vropshost, 'vropsuser': vropsuser, 'vropspass': vropspass, 'customer_id': customer_id}) # This doesn't
task = call_vrops_connect.apply_async(args=[vropshost, vropsuser, vropspass, customer_id]) # This doesn't
task = call_vrops_connect.apply_async(kwargs={'vropshost': vropshost, 'vropsuser': vropsuser, 'vropspass': vropspass, 'customer_id': customer_id}) # This doesn't
我想我找到了你要找的东西。
delay() 方法很方便,因为它看起来像调用一个常规函数:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
使用 apply_async() 代替,您必须编写:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
在这里我找到了