Apache Airflow - HttpHook 与直接 Python 的请求库使用及其与 DAG 刷新的关系



我最近作为DAG开发人员加入了一个团队。我可以看到,我们目前直接使用Python的请求,而不是在代码中使用HttpHooks。我们创建一个请求。因为min_file_process_interval默认设置为30秒,所以会话每30秒重新创建一次,这没有多大意义。

在这种情况下使用HttpHook会有帮助吗?在DAG刷新过程中是否遗漏了钩子?它们还创建一个请求。

同样,我们调用的api需要一个访问令牌,该令牌在一段时间后过期。目前,我们每次进行API调用时都会获取一个新的访问令牌,但最好仅在前一个已过期时获取令牌。但是,dag每30秒刷新一次。那么,如何防止在刷新dag时清除令牌呢?

令牌检索和请求。会话对象的创建是在utils.py模块中完成的,该模块用作气流dag中的插件。

您可以使用HttpHook,也可以直接使用request。两者都很好,由你决定。一般来说,使用HttpHook应该会使您的工作更轻松(您也可以子类化并增强它)。在任何情况下,您应该使用PythonOperator内部的代码,而不是作为顶级代码,因此min_file_process_interval是不相关的。

用例子解释

OK to do

def func():
HttpHook(...).run(...) # or requests.get(...)

with DAG('my_dag', default_args=default_args, catchup=False, schedule=None):
PythonOperator(
task_id='places',
python_callable=func,
)

在这个例子中,HttpHook(或requests.get)将只在操作符运行时被调用。

不做:

with DAG('my_dag', default_args=default_args, catchup=False, schedule=None):

HttpHook(...).run(...)  # or requests.get(...) 

在本例中,每次解析DAG (min_file_process_interval)时都调用HttpHook(或requests.get),这意味着每30秒调用一次终点。