如何将消息从套接字流源转换为自定义域对象?



我对火花流很陌生。我有一个Spark Standalone 2.2,运行在一个工作人员身上。我正在使用套接字源并尝试将传入流读取到名为 MicroserviceMessage 的对象中。

val message = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
val df = message.as[MicroserviceMessage].flatMap(microserviceMessage =>
microserviceMessage.DataPoints.map(datapoint => (datapoint, microserviceMessage.ServiceProperties, datapoint.EpochUTC)))
.toDF("datapoint", "properties", "timestamp")

我希望这将是一个带有"数据点"、"属性"和"时间戳"列的数据帧

我粘贴到我的netcat终端中的数据如下所示(这是我尝试作为微服务消息读取的数据):

{
"SystemType": "mytype",
"SystemGuid": "6c84fb90-12c4-11e1-840d-7b25c5ee775a",
"TagType": "Raw Tags",
"ServiceType": "FILTER",
"DataPoints": [
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956001,
"ItemValue": 25.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
},
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956010,
"ItemValue": 26.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
}
],
"ServiceProperties": [
{
"Key": "OutputTagName",
"Value": "FI12102.PV_CL"
},
{
"Key": "OutputTagType",
"Value": "Cleansing Flow Tags"
}
]
}

相反,我看到的是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`SystemType`' given input columns: [value];

微服务消息案例类如下所示:

case class DataPoints
(
TagName: String,
EpochUTC: Double,
ItemValue: Double,
ItemValueStr: String,
Quality: String,
TimeOffset: String
)
case class ServiceProperties
(
Key: String,
Value: String
)
case class MicroserviceMessage
(
SystemType: String,
SystemGuid: String,
TagType: String,
ServiceType: String,
DataPoints: List[DataPoints],
ServiceProperties: List[ServiceProperties]
)

编辑: 阅读这篇文章后,我能够通过做

val messageEncoder = Encoders.bean(classOf[MicroserviceMessage])
val df = message.select($"value").as(messageEncoder).map(
msmg => (msmg.ServiceType, msmg.SystemGuid)
).toDF("service", "guid")

但是当我开始发送数据时,这会导致问题。

Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize

全栈跟踪

这个:

message.as[MicroserviceMessage]

不正确,如错误消息所述:

无法解析给定输入列的 'SystemType':[值];

来自SocketStream的数据只是字符串(或字符串和时间戳)。要使其可用于强类型Dataset您必须解析它,例如使用org.apache.spark.sql.functions.from_json.

异常的原因

由:java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize

是您使用 Scala 2.12.4(或 2.12 流中的任何其他流)编译了 Spark 结构化流应用程序,这在 Spark 2.2 中不受支持。

来自scala.runtime.LambdaDeserializer的scaladoc:

此类仅用于通过合成 $deserializeLambda$ 方法调用,Scala 2.12 编译器将该方法添加到托管 lambda 的类中。

Spark 2.2 最多支持并包括 Scala 2.11.12,其中 2.11.8 是最"受祝福"的版本。

相关内容

  • 没有找到相关文章

最新更新