Amazon Kinesis Data Analytics for Java Applications:反序列化传入消息



我尝试将我的 Flink 应用程序部署到 AWS Kinesis Data Analytics 中。此应用程序使用 Apache Avro 进行反序列化/序列化传入消息。我的应用程序在本地计算机上运行良好,但是当我将其部署到 AWS 时,我遇到了异常(在 CloudWatch 日志中(:Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795

日志详细信息:

{
"locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:913)",
"logger": "org.apache.flink.runtime.taskmanager.Task",
"message": "Source: Custom Source -> Sink: Unnamed (1/1) (a72ff69f9dc0f9e56d1104ce21456a5d) switched from RUNNING to FAILED.",
"throwableInformation": [
"org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.",
"tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:160)",
"tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:380)",
"tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)",
"tat org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)",
"tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)",
"tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)",
"tat java.lang.Thread.run(Thread.java:748)",
"Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795",
"tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)",
"tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
"tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
"tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
"tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
"tat java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)",
"tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)",
"tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
"tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readCurrentLayout(AvroSerializer.java:465)",
"tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readObject(AvroSerializer.java:432)",
"tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
"tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
"tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"tat java.lang.reflect.Method.invoke(Method.java:498)",
"tat java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)",
"tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)",
"tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)",
"tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)",
"tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
"tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)",
"tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)",
"tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)",
"tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)",
"tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:158)",
"t... 6 more"
],
"threadName": "Source: Custom Source -> Sink: Unnamed (1/1)",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:829044228870:application/poc-kda",
"applicationVersionId": "8",
"messageSchemaVersion": "1",
"messageType": "INFO"
}

我使用库版本:

  • 帕奇阿夫罗 - 1.9.1
  • Apache Flink - 1.9.1
  • Kinesis 生产者库 - 0.13.1
  • AWS Flink - 1.8

请注意,如果我使用 Apache Flink - 1.8、1.6,同样的问题

KDA Flink 代码:

public class KinesisExampleKDA {
private static final String REGION = "us-east-1";
public static void main(String[] args) throws Exception {
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, REGION);
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(50000);
DataStream<EventAttributes> consumerStream = env.addSource(new FlinkKinesisConsumer<>(
"dev-events", new KinesisSerializer(), consumerConfig));
consumerStream
.addSink(getProducer());
env.execute("kinesis-example");
}
private static FlinkKinesisProducer<EventAttributes> getProducer(){
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<EventAttributes> sink = new FlinkKinesisProducer<>(new KinesisSerializer(), outputProperties);
sink.setDefaultStream("dev-result");
sink.setDefaultPartition("0");
return sink;
}
}
class KinesisSerializer implements DeserializationSchema<EventAttributes>, SerializationSchema<EventAttributes> {
@Override
public EventAttributes deserialize(byte[] bytes) throws IOException {
return EventAttributes.fromByteBuffer(ByteBuffer.wrap(bytes));
}
@Override
public boolean isEndOfStream(EventAttributes eventAttributes) {
return false;
}
@Override
public byte[] serialize(EventAttributes eventAttributes) {
try {
return eventAttributes.toByteBuffer().array();
} catch (IOException e) {
e.printStackTrace();
}
return new byte[1];
}
@Override
public TypeInformation<EventAttributes> getProducedType() {
return TypeInformation.of(EventAttributes.class);
}
}

Kinesis 生产者代码:

public class KinesisProducer {
private static String streamName = "dev-events";
public static void main(String[] args) throws InterruptedException, JsonMappingException {
AmazonKinesis kinesisClient = getAmazonKinesisClient("us-east-1");
try {
sendData(kinesisClient, streamName);
} catch (IOException e) {
e.printStackTrace();
}
}
private static AmazonKinesis getAmazonKinesisClient(String regionName) {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("kinesis.us-east-1.amazonaws.com",
regionName));
clientBuilder.withCredentials(DefaultAWSCredentialsProviderChain.getInstance());
clientBuilder.setClientConfiguration(new ClientConfiguration());
return clientBuilder.build();
}
private static void sendData(AmazonKinesis kinesisClient, String streamName) throws IOException {
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(streamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
EventAttributes eventAttributes = EventAttributes.newBuilder().setName("Jon.Doe").build();
putRecordsRequestEntry.setData(eventAttributes.toByteBuffer());
putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
System.out.println("Put Result" + putRecordsResult);
}

格式为 .avdl 的 Avro schema :

@version("0.1.0")
@namespace("com.naya.avro")
protocol UBXEventProtocol{
record EventAttributes{
union{null, string} name=null;
}
}

由 Avro 自动生成的实体类:

@org.apache.avro.specific.AvroGenerated
public class EventAttributes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 2780976157169751219L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"EventAttributes","namespace":"com.naya.avro","fields":[{"name":"name","type":["null",{"type":"string","avro.java.string":"String"}],"default":null}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<EventAttributes> ENCODER =
new BinaryMessageEncoder<EventAttributes>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<EventAttributes> DECODER =
new BinaryMessageDecoder<EventAttributes>(MODEL$, SCHEMA$);
…

Github链接:

  • KDA 应用示例
  • KDS 生产者示例

有人可以对此添加更多详细信息吗?为什么它在 AWS 上不起作用?

提前谢谢你

查看堆栈跟踪,它看起来不是在尝试读取消息时发生的,而是在运算符本身的初始化阶段发生的。

Flink 的工作方式 - 它序列化(使用 Java 序列化(每个需要执行的运算符,然后以序列化形式将它们分发到集群中。这意味着 KinesisSerializer 本身将被序列化(作为一个类(以通过网络发送。

现在的问题是,Kinesis 序列化程序引用EventAttributes模型,这意味着对 EventAttributes(类本身,而不是特定实例(的引用将与它一起序列化。作为序列化元数据的一部分,它应该扩展/实现。在您的情况下,它需要SpecificRecordBase它不是您的可分发文件的一部分,而是 Avro 库的一部分。

因此,运算符本身的完整序列化链是KinesisConsumer->KinesisSerializer->EventAttributes->SpecificRecordBase(Avro lib 的一部分(。

但是,AWS 使用 Flink 1.8,它使用 Avro 1.8.2,并且所有基础 avro 类也来自 1.8.2。编译应用程序并将其链接到 1.9 的 avro 二进制文件。因此,当 Flink 尝试序列化您的运算符并将它们发送到集群时 - 它会序列化对 1.9 版 SpecificRecordBase 的引用。但是当 Flink 实际尝试反序列化它时 - 它发现版本与它实际可用的类 (1.8.2( 不匹配,并且链接失败。

您在这里有 2 个选项:

  1. 不要使用 KDA。而是转到 EMR(截至 2020 年 1 月已打包 1.9.1(或独立 Flink(需要在 EMR 或裸骨上手动部署(。
  2. 完全使用 Flink 1.8 编写应用程序。您提到"使用版本 1.8.2 应用程序无法编译" - 尝试解决此问题。

我们解决了这个问题。在应用程序中,我们使用 Avro 1.9.1,但 AWS KDA 使用 Avro 1.8.1。从 1.9.1 降级到 1.8.1 解决了这个问题。

这可能是由于 KDA 中的类加载问题(我们在 Jackson 中也遇到过这个问题(,因为您的 jar 提供的类没有正确的优先级。但我相信这个问题现在已经在 KDA 中得到解决(至少对于 IAD 和 DUB(

最新更新