spark结构化流媒体-触发器.使用Azure事件中心一次



我读到关于使用带触发器的流式处理(https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html)。我使用Azure事件中心作为源,每个触发器有10000个项目的限制,我必须处理几百万个事件。这将导致只处理10.000个项目,并且当您启用Trigger.Once时,流将停止。有人知道如何使用流和Trigger做到这一点吗?类似于一次性功能,当流为空或处理到设定时间时停止?

最近有机会实现类似的东西:

我们的需要是

  1. Databricks每小时启动一次作业并从上次运行的eventhub偏移量中读取一次
  2. 将数据持久化到位置
  3. 结束作业,集群返回空闲/终止模式

我们就是这样做到的:

设置eventhub-conf:

val eventHubsConf: EventHubsConf = EventHubsConf(connStr).setMaxEventsPerTrigger(<someNumber>).setStartingPosition(EventPosition.fromOffset("@latest"))  

从事件中心读取:

val incomingStream = spark.readStream.format("<your_evenhub_name>").options(eventHubsConf.toMap).load()

使用触发器写入。一次

incomingStream.writeStream
.format("parquet").outputMode("append").trigger(Trigger.Once())
.partitionBy("<yourpartitionColumn>").option("truncate", false)
.option("checkpointLocation", "<yourcheckpointlocation>")
.option("path", "<youroutputdataDir>")
.start()

最新更新