spark flatMapGroupsWithState随机丢失事件



我有一份火花工作,由研究员组成:

1-读取Delta Lake的静态dataFrame

2-读取来自Delta Lake的dataFrame

3-加入静态流。

4- do aflatMapGroupsWithState.

5-写输出

问题是我的输出与我预期的不同,就像我在flatMapGroupsWithState上丢失了事件一样。不仅如此,输出也是随机的。当我用相同的输入重新运行时,我得到了不同的输出。

但是当我在写入操作中添加.coalesce(1)时,我总是在LocalMode中得到所需的输出,而不是在ClusterMode中。

这是我使用的代码:

val entityScheduleSlots = data
.withColumn("products", concat(col("batteries"), col("photovoltaics")))
.drop("photovoltaics", "batteries", "labels")
.join(
entities,
array_contains(entities("entity_delivery_points"), col("delivery_point_id")))
.withColumn("now", current_timestamp())
.withWatermark("now", "5 minutes")
.as(Encoders.product[enrichedDeliveryPointSchedule])
.groupByKey(e => e.timestamp.toString + e.entity_id.toString + e.schedule_id)(
Encoders.STRING)
.flatMapGroupsWithState(
outputMode = OutputMode.Append,
timeoutConf = GroupStateTimeout.EventTimeTimeout)(
Function.computeExplodedEntityScheduleSlots)(
Encoders.kryo[Function.State],
Encoders.product[EntityScheduleSlot])

entityScheduleSlots是我的输出,我在LocalMode下做了测试。

object Function {
case class ProductState(
var count: Int,
var quantity: Double,
var price: Double,
val sellable: Boolean)
case class State(var delivery_points_count: Int, var products: mutable.Map[Long, ProductState])
private def computeExplodedEntityScheduleSlots(
uid: String,
ss: Iterator[enrichedDeliveryPointSchedule],
state: GroupState[State]): Iterator[EntityScheduleSlot] = {
if (state.hasTimedOut) {
state.remove()
return Iterator.empty
}
val schedules = ss.toList
val newState: State =
state.getOption.getOrElse(State(0, mutable.Map()))
schedules.foreach(s => {
newState.delivery_points_count = newState.delivery_points_count + 1
val qualificationsProductsIDs =
if (s.entity_qualifications != null) s.entity_qualifications.map(q => q.product)
else List()
if (s.products != null) {
s.products.foreach(p => {
if (qualificationsProductsIDs.contains(p.product)) {
val productState =
newState.products.getOrElse(p.product, ProductState(0, 0.0, 0.0, p.sellable))
val factor =
if (productState.count == 0) 1
else p.quantity / (productState.quantity / productState.count)
productState.quantity += p.quantity
productState.price =
(productState.price * productState.count + p.price * factor) / (productState.count + 1)
productState.count += 1
newState.products.update(p.product, productState)
}
})
}
})
if (newState.delivery_points_count == schedules.head.entity_delivery_points.length) {
state.remove()
return Iterator(
EntityScheduleSlot(
timestamp = schedules.head.timestamp,
entity = schedules.head.entity_id,
schedule_timestamp = schedules.head.schedule_timestamp,
schedule_id = schedules.head.schedule_id,
products =
if (schedules.head.entity_qualifications != null)
schedules.head.entity_qualifications
.map(q => {
val product =
newState.products.getOrElse(q.product, ProductState(0, 0.0, 0.0, false))
EntityScheduleSlotProduct(
q.product,
product.quantity,
product.price,
product.sellable)
})
else List()))
}
state.update(newState)
val currentWatermarkMs =
if (state.getCurrentWatermarkMs() > 0) state.getCurrentWatermarkMs()
else System.currentTimeMillis()
state.setTimeoutTimestamp(currentWatermarkMs, "2 minutes")
Iterator.empty
}
}
case class enrichedDeliveryPointSchedule(
timestamp: java.sql.Timestamp,
schedule_timestamp: java.sql.Timestamp,
schedule_id: String,
delivery_point_id: Long,
products: List[DeliveryPointScheduleSlotProduct],
entity_id: Long,
entity_delivery_points: List[Long],
entity_qualifications: List[EntityQualification])

提前谢谢你。

你提供的信息很少,很难理解这个问题,我可以给你一些提示:

flatMapGroupsWithState是一个在有状态结构化流中使用的函数,用于在spark的内部状态中存储部分结果

def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])

除初始状态外,参数中还有状态更新函数:

func: (K, Iterator[V], GroupState[S]) => Iterator[U]
  • 可能只是您正在聚合结果并因此获得不同的输出

  • 另一个问题可能是您正在检查一个执行器,并考虑到spark框架的分布式特性接收部分输出,

在将来添加代码以便更好地理解问题

最新更新