气流Python操作员返回类型



我的dag中有一个python操作员。Python可呼叫功能正在返回Bool值。但是,当我运行DAG时,我会收到以下错误。

typeError:'bool'对象不是可叫

我修改了该功能以返回,但我又一次获得以下错误

错误 - 'nontype'对象不可callable

下面是我的dag

def check_poke(threshold,sleep_interval):
flag=snowflake_poke(1000,10).poke()
#print(flag)
return flag
dependency = PythonOperator(
task_id='poke_check',
#python_callable=check_poke(129600,600),
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)
end = BatchEndOperator(
queue=QUEUE,
dag=dag)
start.set_downstream(dependency)
dependency.set_downstream(end)

无法弄清楚我缺少的是什么。有人可以帮助我解决这个问题。

我在DAG中编辑了python操作员,如下

dependency = PythonOperator(
task_id='poke_check',
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

但是现在,我有一个不同的错误。

Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: () takes no arguments (25 given)
[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)

参数名称将其放弃。您正在传递通话的结果,而不是可呼叫。

python_callable=check_poke(129600,600)

第二个错误指出,可呼叫带有25个参数。因此,lambda:无法正常工作。以下是有效的,但是忽略25个论点确实值得怀疑。

python_callable=lambda *args, **kwargs: check_poke(129600,600)

对于 @dan d。对于问题而言;但这令人困惑为什么他的解决方案不起作用(当然可以在python shell 中起作用)

看看这是否找到了任何运气(它只是 @dan d。的解决方案的简称变体)

from typing import Callable
# your original check_poke function
def check_poke(arg_1: int, arg_2: int) -> bool:
    # do something
    # somehow returns a bool
    return arg_1 < arg_2
# a function that returns a callable, that in turn invokes check_poke
# with the supplied params
def check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:
    def check_poke_wrapper() -> bool:
        return check_poke(arg_1=arg_1, arg_2=arg_2)
    return check_poke_wrapper
..
# usage
python_callable=check_poke_wrapper_creator(129600, 600)

代码期望可召唤,而不是结果(如已经指出的那样)。
您可以使用functool.Sartial来填写参数:

from functools import partial
def check_poke(threshold,sleep_interval):
   flag=snowflake_poke(1000,10).poke()
   return flag
func = partial(check_poke, 129600, 600)
dependency = PythonOperator(
    task_id='poke_check',
    provide_context=True,
    python_callable=func,
    dag=dag)

最新更新