我们尝试将protobuf与Akka一起使用,并通过protobuf序列化所有Akka消息。对于 Scala,我们有一个名为 ScalaPB 的库,它帮助我们生成类,其中包括用于序列化或反序列化数据的 parseFrom
、toByteArray
等方法。但是,当我们尝试运行该程序时,出现以下异常:
akka.actor.dungeon.SerializationCheckFailedException: Failed to serialize and deserialize message of type com.knoldus.akkaserialization.msg.example.Bang$ for testing. To avoid this error, either disable 'akka.actor.serialize-messages', mark the message with 'akka.actor.NoSerializationVerificationNeeded', or configure serialization to support this message
application.conf 文件包含以下配置:
akka {
actor {
allow-java-serialization = off
serialize-messages = on
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"com.knoldus.akkaserialization.msg.example.Bang" = proto
"com.knoldus.akkaserialization.msg.example.Message" = proto
}
}
}
这些类com.knoldus.akkaserialization.msg.example.Bang
,com.knoldus.akkaserialization.msg.example.Message
通过 ScalaPB 生成,并包含所有 require 方法。
akka.remote.serialization.ProtobufSerializer
定义的源代码,
This Serializer serializes `akka.protobuf.Message` and `com.google.protobuf.Message` It is using reflection to find the `parseFrom` and `toByteArray` methods to avoid dependency to `com.google.protobuf`
因此,我们期望,这会自动读取我们的案例类Bang
并Message
并执行序列化,但不幸的是出现序列化异常。
你能帮忙弄清楚ScalaPB和ProtoBuff到底有什么问题吗?
您使用的序列化程序旨在与Java protobufs一起使用,而不是与ScalaPB一起使用。您需要包含自己的序列化程序。这是一个你可以使用的:
package com.example.protoser
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import scalapb.GeneratedMessageCompanion
class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private val classToCompanionMapRef = new AtomicReference[Map[Class[_], GeneratedMessageCompanion[_]]](Map.empty)
override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: scalapb.GeneratedMessage => e.toByteArray
case _ => throw new IllegalArgumentException("Need a subclass of scalapb.GeneratedMessage")
}
override def includeManifest: Boolean = true
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
manifest match {
case Some(clazz) =>
@scala.annotation.tailrec
def messageCompanion(companion: GeneratedMessageCompanion[_] = null): GeneratedMessageCompanion[_] = {
val classToCompanion = classToCompanionMapRef.get()
classToCompanion.get(clazz) match {
case Some(cachedCompanion) => cachedCompanion
case None =>
val uncachedCompanion =
if (companion eq null) Class.forName(clazz.getName + "$", true, clazz.getClassLoader)
.getField("MODULE$").get().asInstanceOf[GeneratedMessageCompanion[_]]
else companion
if (classToCompanionMapRef.compareAndSet(classToCompanion, classToCompanion.updated(clazz, uncachedCompanion)))
uncachedCompanion
else
messageCompanion(uncachedCompanion)
}
}
messageCompanion().parseFrom(bytes).asInstanceOf[AnyRef]
case _ => throw new IllegalArgumentException("Need a ScalaPB companion class to be able to deserialize.")
}
}
}
配置应该是这样的:
akka {
actor {
serializers {
scalapb = "com.example.protoser.ScalaPbSerializer"
}
serialization-bindings {
"scalapb.GeneratedMessage" = scalapb
}
serialization-identifiers {
"com.example.protoser.ScalaPbSerializer" = 10000
}
}
}
以上内容改编自旧代码,欢迎编辑和建议!
这是一种简单的方法,只需在配置中添加以下行即可。
https://doc.akka.io/docs/akka/current/serialization.html
Akka默认为几种基元类型和protobuf com.google.protobuf.GeneratedMessage (protobuf2(ScalaPB 生成的和com.google.protobuf.GeneratedMessageV3(protobuf3(提供了序列化程序(后者仅在依赖于akka-remote模块时(,因此通常如果您将原始protobuf消息作为参与者消息发送,则不需要为此添加配置。
代码也可以序列化为 protobuf,所以我们只需要将 ScalaPB 生成的案例类 trait 绑定到序列化程序即可。
akka {
actor {
serialization-bindings {
"com.google.protobuf.Message" = proto
"scalapb.GeneratedMessage" = proto
"scalapb.GeneratedEnum" = proto
}
}
}
这对我有用。我的环境是:
- AKKA-GRPC:2.1.4
- 阿卡:2.6.19
- 斯卡拉:2.13.8