安排火花作业,将数据插入气流环境中的postgresdb



我想将spark写入操作安排到postgresdb。我在下面附上了我的代码。我的气流任务实例在小时前触发。我能做些什么让它在一个小时内运行,每个dag只有一个任务实例运行

df = spark.read 
.format("jdbc") 
.option("url", URL) 
.option("dbtable", "tagpool_with_tag_raw") 
.option("user", "tsdbadmin") 
.option("password", "cgqu5zy3i1") 
.option("driver", "org.postgresql.Driver") 
.load()
# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch-3600
print("epoch ", epoch)
df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select *  from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select *  from tagpool_with_tag_raw WHERE input_time = " + str(epoch))  # .format()
# query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) ,  from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()
query.write 
.format("jdbc") 
.option("url", URL) 
.option("dbtable", "tagpool_tag_raw") 
.option("user", USER) 
.option("password", PW) 
.option("driver", DRIVER).save(mode='append')

我的DAG如下:

default_args = {
'owner': 'lns',
'depends_on_past': True,
"start_date": datetime(now.year, now.month, now.day),
'email_on_failure': True,
'email_on_retry': True,
'retries': 0}
with DAG(dag_id='kafka_stream',
default_args=default_args,
schedule_interval= '@hourly',
max_active_runs=1,
) as dag:
SparkSubmitOperator(task_id='batch_ingestion',
conn_id='spark_default',
application=f'/usr/local/spark/app/demo.py',
conf={"spark.master":spark_master},
total_executor_cores=1,
packages="org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4",
executor_cores=2,
executor_memory='6g',
name='data_processor',
jars=postgres_driver_jar,
driver_class_path=postgres_driver_jar,

我用catchup=False,解决了这个问题

DAG(dag_id='kafka_stream',
default_args=default_args,
schedule_interval= '@hourly',
max_active_runs=1,
catchup = False
)

最新更新