我想将spark写入操作安排到postgresdb。我在下面附上了我的代码。我的气流任务实例在小时前触发。我能做些什么让它在一个小时内运行,每个dag只有一个任务实例运行
df = spark.read
.format("jdbc")
.option("url", URL)
.option("dbtable", "tagpool_with_tag_raw")
.option("user", "tsdbadmin")
.option("password", "cgqu5zy3i1")
.option("driver", "org.postgresql.Driver")
.load()
# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch-3600
print("epoch ", epoch)
df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select * from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select * from tagpool_with_tag_raw WHERE input_time = " + str(epoch)) # .format()
# query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) , from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()
query.write
.format("jdbc")
.option("url", URL)
.option("dbtable", "tagpool_tag_raw")
.option("user", USER)
.option("password", PW)
.option("driver", DRIVER).save(mode='append')
我的DAG如下:
default_args = {
'owner': 'lns',
'depends_on_past': True,
"start_date": datetime(now.year, now.month, now.day),
'email_on_failure': True,
'email_on_retry': True,
'retries': 0}
with DAG(dag_id='kafka_stream',
default_args=default_args,
schedule_interval= '@hourly',
max_active_runs=1,
) as dag:
SparkSubmitOperator(task_id='batch_ingestion',
conn_id='spark_default',
application=f'/usr/local/spark/app/demo.py',
conf={"spark.master":spark_master},
total_executor_cores=1,
packages="org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4",
executor_cores=2,
executor_memory='6g',
name='data_processor',
jars=postgres_driver_jar,
driver_class_path=postgres_driver_jar,
我用catchup=False,解决了这个问题
DAG(dag_id='kafka_stream',
default_args=default_args,
schedule_interval= '@hourly',
max_active_runs=1,
catchup = False
)