以有状态的方式在spark中处理网络数据包



我想使用Spark来解析网络消息,并以有状态的方式将它们分组为逻辑实体。

问题描述

假设每条消息都在输入数据帧的一行中,如下所示。

| row   | time | raw payload   |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;TEXT3;  |
|  3    | 30   | LONG-         |
|  4    | 40   | TEXT1;        |
|  5    | 50   | TEXT4;TEXT5;L |
|  6    | 60   | ONG           |
|  7    | 70   | -TEX          |
|  8    | 80   | T2;           | 

任务是解析原始负载中的逻辑消息,并在新的输出数据帧中提供它们。在本例中,负载中的每个逻辑消息都以分号(分隔符)结尾。

所需的输出数据帧可以如下所示:

| row   | time | message       |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;        |
|  3    | 20   | TEXT3;        |
|  4    | 30   | LONG-TEXT1;   |
|  5    | 50   | TEXT4;        |
|  6    | 50   | TEXT5;        |
|  7    | 50   | LONG-TEXT2;   |

请注意,一些消息行不会在结果中产生新行(例如,行4、6、7、8),而一些消息行甚至会产生多行(例如,第2、5行)

我的问题:

  • 这是UDAF的用例吗?如果是,例如,我应该如何实现merge函数?我不知道它的目的是什么
  • 由于消息顺序很重要(如果不考虑消息顺序,我就无法正确处理LONGTEXT-1和LONGTEXT-2),我可以告诉spark可能在更高的级别上并行化(例如,每个日历日的消息),但不能在一天内并行化(例如,时间50、60、70、80的事件需要按顺序处理)
  • 后续问题:是否可以想象,该解决方案不仅适用于传统的spark,还适用于spark结构化流媒体?还是后者需要自己的有状态处理方法

通常,您可以使用flatMapGroupsWithStatemapGroupsWithState在spark流上运行任意有状态聚合。你可以在这里找到一些例子。但是,这些都不能保证流的处理将按事件时间排序。

如果需要强制执行数据排序,则应尝试在事件时间使用窗口操作。在这种情况下,您需要运行无状态操作,但如果每个窗口组中的元素数量足够少,则可以使用collectList,然后在每个列表上应用UDF(可以在其中管理每个窗口组的状态)。

好的,我同时弄清楚了如何使用UDAF来实现这一点。

class TagParser extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def bufferSchema: StructType = StructType(
StructField("parsed", ArrayType(StringType)) ::
StructField("rest", StringType)
:: Nil)
override def dataType: DataType = ArrayType(StringType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
buffer(1) = null
}
def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
val prevRest = buffer(1)
var idx = -1
val strToParse = if (prevRest != null) prevRest + str else str
do {
val oldIdx = idx;
idx = strToParse.indexOf(';', oldIdx + 1)
if (idx == -1) {
buffer(1) = strToParse.substring(oldIdx + 1)
} else {
val newlyParsed = strToParse.substring(oldIdx + 1, idx)
buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
buffer(1) = null
}
} while (idx != -1)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer == null) {
return
}
doParse(input.getAs[String](0), buffer)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException
override def evaluate(buffer: Row): Any = buffer(0)
}

这里有一个演示应用程序使用上面的UDAF来解决上面的问题:

case class Packet(time: Int, payload: String)
object TagParserApp extends App {
val spark, sc = ... // kept out for brevity
val df = sc.parallelize(List(
Packet(10, "TEXT1;"),
Packet(20, "TEXT2;TEXT3;"),
Packet(30, "LONG-"),
Packet(40, "TEXT1;"),
Packet(50, "TEXT4;TEXT5;L"),
Packet(60, "ONG"),
Packet(70, "-TEX"),
Packet(80, "T2;")
)).toDF()
val tp = new TagParser
val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
df2.show()
}

这就产生了:

+----+-------------+--------------+
|time|      payload|           msg|
+----+-------------+--------------+
|  10|       TEXT1;|       [TEXT1]|
|  20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
|  30|        LONG-|            []|
|  40|       TEXT1;|  [LONG-TEXT1]|
|  50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
|  60|          ONG|            []|
|  70|         -TEX|            []|
|  80|          T2;|  [LONG-TEXT2]|
+----+-------------+--------------+

对我来说,主要的问题是弄清楚如何实际应用这个UDAF,即使用这个:

df.withColumn("msg", tp.apply(df.col("payload")).over(window))

我现在唯一需要弄清楚的是并行化的各个方面(我只想在不依赖排序的情况下实现),但这对我来说是一个单独的问题。

相关内容

  • 没有找到相关文章

最新更新