我正在处理气流,我试图将数据从mysql数据库传输到csv文件。以下是代码和函数
from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from dbextract import extract_data
from dbpost_processing import dbpost_process
default_args = {"owner":"airflow","start_date":datetime(2021,7,10)}
with DAG(dag_id="dbworkflow2",default_args=default_args,schedule_interval=None) as dag:
extract = MySqlOperator(
task_id='extract',
mysql_conn_id="mysql_db1",
sql = extract_data
)
dbpost_process = PythonOperator(
task_id = "dbpost_process",
python_callable = dbpost_process
)
extract >> dbpost_process
import pandas as pd
def extract_data():
df=pd.read_sql('SELECT * FROM new_table', mysql_conn_id)
import pandas as pd
def dbpost_process():
df.to_csv('~/op_files/sample3.csv', index=False)
在extract_data步骤中获取函数对象不可迭代的错误
MySQL文件中的sql不是可调用的,它应该是字符串或字符串列表。您还试图在任务之间传递熊猫数据框,但它不会工作,因为任务可能(很可能)在不同的进程中运行在不同的机器上。
任务之间数据交换的方式通常是通过XCom(对于少量数据应该通过DB,对于大量数据可以添加自定义XCom后端并通过S3或GCS传递数据)。
然而,在您的情况下,您不需要有两个单独的任务/操作符。相反,您应该在Python操作符中使用MySQL Hook来读取数据并在相同的任务中处理它。将该任务拆分为两个独立的任务是没有意义的——MySQL操作符实际上是执行DDL或DML操作,而不是提取数据(正是因为气流操作符是隔离工作的)
Airflow采用Hooks的概念,提供API,您可以使用该API在同一个Python Operator可调用对象中运行查询和处理数据。即使是最近,它也可以使用@task装饰器来完成,所以它非常简单,更容易编写-特别是如果你习惯编写函数式Python。
见https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
但是在你的情况下,你甚至不需要这样做,因为你想使用pandas
与MySQL数据库通信,所以你甚至不需要使用钩子。这样就足够了。在这种情况下不应该使用conn_id,但需要在那里传递一个SQL-Alchemy兼容的连接字符串。不确定是否气流连接url将工作,但如果它是,那么你可以使用Connection.get_uri()
(也许你将不得不适应的URI一点)。
类似的东西(这是一个灵感,不是可编译的代码,所以你需要解决的细节)应该工作:
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def my_dag():
@task()
def my_extraction():
df = pd.read_sql('SELECT *', Connnection.get("my_connection_id").get_uri())
file = post_process(df)
现在-你将不得不对该文件做一些事情,因为一旦任务完成,本地数据将不可用(除非你使用LocalExecutor) -你可以将CSV发送到某个地方(为此你可以使用任何钩子-例如S3Hook)。