我需要通过ftp下载一个文件,修改后再上传回来。我正在使用芹菜来做到这一点,但我在尝试使用链接时遇到了问题,我得到:
TypeError: upload_ftp_image()取5个参数
另外,我可以使用链并确保步骤是连续的吗?如果不是,还有什么替代方案?
res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()
任务:
@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
if not os.path.exists(directory):
os.makedirs(directory)
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
else:
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
ftp.quit()
except error_perm, resp:
raise download_ftp_image.retry(countdown=15)
return "SUCCESS: "
@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
new_file= file.replace(directory, "")
directory = directory.replace("tmp","")
try:
ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
except:
ftp.mkd(directory)
ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
ftp.quit()
except error_perm, resp:
raise upload_ftp_image.retry(countdown=15)
return "SUCCESS: "
,这是一个好还是坏的做法,为我的具体情况?:
result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
如果您不希望使用前一个任务的返回值作为参数,另一个选项是使用'不变性'。
http://docs.celeryproject.org/en/latest/userguide/canvas.html不变性
而不是将子任务定义为:
download_ftp_image.s(...) and upload_ftp_image.s(...)
定义为:
download_ftp_image.si(...) and upload_ftp_image.si(...)
您现在可以在一个链中使用具有通常数量的参数的任务。
始终将前一个结果作为第一个参数传递给链。从链文档:
链接的任务将应用其父任务的结果作为第一个参数,在上面的情况下,结果将是
mul(4, 16)
,因为结果是4。
你的upload_ftp_image
任务不接受这个额外的参数,因此它失败了。
你有一个很好的用例链接;第二个任务是保证在完成第一个任务后被调用(否则无论如何都不能传递结果)。
只需为前一个任务的结果添加一个参数:
def upload_ftp_image(download_result, ftp_server, username , password , file , directory):
你可以利用这个结果值;也许让下载方法返回下载文件的路径这样上传方法就知道要上传什么了?