Composer/Airflow在可抢占的虚拟机上运行Dataflow作业时遇到困难



我在GCP中有一个Composer/Airflow实例(版本Composer-1.12.2-Airflow 1.10.10(,它运行其当前作业是可以接受的(大约40个DAG,每个DAG中有6个Dataflow作业,还有18个其他较短的任务(。数据流作业是从数据流模板(普通模板,而不是弹性模板(运行的,通常需要大约20分钟才能完成,部分是并行的。我用python DataflowTemplatedJobStartOperator 启动它们

如果我使用Dataflow选项--flexRSGoal=COST_OPTIMIZED,实例就会阻塞。在高CPU使用率的情况下,它最多能在前20分钟内安排每分钟一个数据流作业。这会导致任务不断累积,使其速度减慢,直到几乎停止调度。

flexRSGoal设置是工作设置和有问题设置之间的唯一区别。

我预计DataflowTemplatedJobStartOperator不能正确地支持Dataflow作业处于状态"的情况;排队的";在它们开始之前的一段时间内,或者我需要设置/调整一些其他参数来实现这一点。有人有主意吗?非常感谢。

Airflow版本1.10.0未将QUEUED识别为Dataflow的可能状态。您可以通过遵循模板操作符的执行路径来看到这一点:

  • 这里调用下划线DataflowHook来启动模板
  • CCD_ 3方法调用一个";私人的";_start_template_dataflow()方法
  • _start_template_dataflow()中,有一个对_DataflowJob类中wait_for_done()方法的调用
  • 最后,在wait_for_done()方法中,我们可以看到有一个if/else块处理预期的作业状态,其中不考虑QUEUED

Composer支持的其他Airflow版本也是如此,即1.10.6、1.10.9,甚至是最新的1.10.12。

作为一种变通方法,我的建议是使用monkey补丁来处理QUEUED状态。例如,您可以将以下代码添加到DAG文件中,以在运行时替换_DataflowJob中的wait_for_done()方法:

from airflow.contrib.hooks.gcp_dataflow_hook import _DataflowJob
import time
def wait_for_done(self):
while True:
if self._job and 'currentState' in self._job:
if 'JOB_STATE_DONE' == self._job['currentState']:
return True
elif 'JOB_STATE_RUNNING' == self._job['currentState'] and 
'JOB_TYPE_STREAMING' == self._job['type']:
return True
elif 'JOB_STATE_FAILED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))
elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
self._job['name']))
elif 'JOB_STATE_RUNNING' == self._job['currentState']:
time.sleep(self._poll_sleep)
elif 'JOB_STATE_PENDING' == self._job['currentState']:
time.sleep(15)
elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state
else:
self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
else:
time.sleep(15)
self._job = self._get_job()
_DataflowJob.wait_for_done = wait_for_done

与原始代码的区别在于elif语句,我们在其中查找QUEUED状态:

elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state

注意,我留下了三种最自然的处理这种状态的方法,即睡眠(15秒或poll_sleep时间(以等待作业执行和完成,或者简单地返回true而不等待执行。您可以取消对要执行的行的注释,甚至可以在此处添加自己的逻辑。

最新更新