我读到关于使用带触发器的流式处理(https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html)。我使用Azure事件中心作为源,每个触发器有10000个项目的限制,我必须处理几百万个事件。这将导致只处理10.000个项目,并且当您启用Trigger.Once时,流将停止。有人知道如何使用流和Trigger做到这一点吗?类似于一次性功能,当流为空或处理到设定时间时停止?
最近有机会实现类似的东西:
我们的需要是
- Databricks每小时启动一次作业并从上次运行的eventhub偏移量中读取一次
- 将数据持久化到位置
- 结束作业,集群返回空闲/终止模式
我们就是这样做到的:
设置eventhub-conf:
val eventHubsConf: EventHubsConf = EventHubsConf(connStr).setMaxEventsPerTrigger(<someNumber>).setStartingPosition(EventPosition.fromOffset("@latest"))
从事件中心读取:
val incomingStream = spark.readStream.format("<your_evenhub_name>").options(eventHubsConf.toMap).load()
使用触发器写入。一次
incomingStream.writeStream
.format("parquet").outputMode("append").trigger(Trigger.Once())
.partitionBy("<yourpartitionColumn>").option("truncate", false)
.option("checkpointLocation", "<yourcheckpointlocation>")
.option("path", "<youroutputdataDir>")
.start()