我最近作为DAG开发人员加入了一个团队。我可以看到,我们目前直接使用Python的请求,而不是在代码中使用HttpHooks。我们创建一个请求。因为min_file_process_interval默认设置为30秒,所以会话每30秒重新创建一次,这没有多大意义。
在这种情况下使用HttpHook会有帮助吗?在DAG刷新过程中是否遗漏了钩子?它们还创建一个请求。
同样,我们调用的api需要一个访问令牌,该令牌在一段时间后过期。目前,我们每次进行API调用时都会获取一个新的访问令牌,但最好仅在前一个已过期时获取令牌。但是,dag每30秒刷新一次。那么,如何防止在刷新dag时清除令牌呢?
令牌检索和请求。会话对象的创建是在utils.py模块中完成的,该模块用作气流dag中的插件。
您可以使用HttpHook
,也可以直接使用request。两者都很好,由你决定。一般来说,使用HttpHook
应该会使您的工作更轻松(您也可以子类化并增强它)。在任何情况下,您应该使用PythonOperator
内部的代码,而不是作为顶级代码,因此min_file_process_interval
是不相关的。
用例子解释
OK to do
def func():
HttpHook(...).run(...) # or requests.get(...)
with DAG('my_dag', default_args=default_args, catchup=False, schedule=None):
PythonOperator(
task_id='places',
python_callable=func,
)
在这个例子中,HttpHook
(或requests.get
)将只在操作符运行时被调用。
不做:
with DAG('my_dag', default_args=default_args, catchup=False, schedule=None):
HttpHook(...).run(...) # or requests.get(...)
在本例中,每次解析DAG (min_file_process_interval
)时都调用HttpHook
(或requests.get
),这意味着每30秒调用一次终点。