Avro消息-InvalidNumberEncodingException反序列化logicalType日期



当我用定义为logicalType date的字段反序列化消息时,我遇到了一个异常。作为文档,该字段定义为:

{"name": "startDate", "type": {"type": "int", "logicalType": "date"}}

我使用";avro maven插件"(1.9.2(生成java类,并且我可以将字段startDate设置为java.time.LocalDate.now();avro对象序列化消息并将其发送到kafka主题。到目前为止,一切都很好。

然而,当我读到消息时,我得到了异常:

Caused by: org.apache.avro.InvalidNumberEncodingException: Invalid int encoding
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:166)
at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:83)
at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:551)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:195)
at org.apache.avro.generic.GenericDatumReader.readWithConversion(GenericDatumReader.java:173)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:134)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)

更奇怪的是,如果我设置了一个不同的日期,比如LocalDate.of(1970, 1, 1),就不会出现错误。

换言之,如果表示自1970年1月1日以来的天数的序列化int值足够小,则一切正常。我在查看了引发异常的代码后尝试了该测试,这让我认为如果int日低于127,则可以避免错误:

public int readInt() throws IOException {
this.ensureBounds(5);
int len = 1;
int b = this.buf[this.pos] & 255;
int n = b & 127;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 7;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 14;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 21;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 28;
if (b > 127) {
throw new InvalidNumberEncodingException("Invalid int encoding");
}
}
}
}
}
....

当然,我不能只使用1970年1月1日的生产日期。欢迎任何帮助:-(

TL:DR

您发布的代码不仅可以反序列化127以下的数字,还可以反序列化Javaint的整个范围,因此最多可以反序列化几十亿,对应于1970年后的几百万年。

详细信息

ApacheAvro的BinaryDecoder.readInt方法将1到5个字节反序列化为Javaint。它将每个字节的最后7位用于int,而不是符号位。相反,符号位用于确定要读取的字节数。符号位0表示这是最后一个字节。符号位为1意味着在该符号位之后还有更多字节。如果读取了5个字节,并且它们所有的符号位都设置为1,则会引发异常。5个字节可以提供35个比特,而int可以保持32个比特,因此将超过5个字节视为错误是公平的。

因此,从你发布的代码来看,我合理地期望在应用程序中使用的日期不会带来任何问题。

测试代码

我把你的方法放在TestBinaryDecoder类中试用(最后是完整的代码(。让我们首先看看异常是如何来自5个字节的,所有字节的符号位都设置为1:

try {
System.out.println(new TestBinaryDecoder(-1, -1, -1, -1, -1).readInt());
} catch (IOException ioe) {
System.out.println(ioe);
}

输出:

ovv.so.binary.misc.InvalidNumberEncodingException: Invalid int encoding

正如你所说,127没有问题:

System.out.println(new TestBinaryDecoder(127, -1, -1, -1, -1).readInt());
127

当我们在int的保持位中放入更多字节时,就会出现有趣的部分。这里,第一个字节的符号位为1,下一个字节为0,所以我希望使用这两个字节:

System.out.println(new TestBinaryDecoder(255, 127, -1, -1, -1).readInt());
16383

我们已经接近今天日期所需的数字。今天是2021-06-04,在我的时区,纪元后的第18782天,或者二进制:100100101011110。因此,让我们尝试将这15个二进制数字放入解码器的三个字节中:

int epochDay = new TestBinaryDecoder(0b11011110, 0b10010010, 0b1, -1, -1).readInt();
System.out.println(epochDay);
System.out.println(LocalDate.ofEpochDay(epochDay));
18782
2021-06-04

所以我不知道你是如何得到异常的。来源肯定不仅仅是一个大的int值。问题一定出在别的地方。

完整代码

public class TestBinaryDecoder {

private byte[] buf;
private int pos;

/** Convenience constructor */
public TestBinaryDecoder(int... buf) {
this(toByteArray(buf));
}

private static byte[] toByteArray(int[] intArray) {
byte[] byteArray = new byte[intArray.length];
IntStream.range(0, intArray.length).forEach(ix -> byteArray[ix] = (byte) intArray[ix]);
return byteArray;
}
public TestBinaryDecoder(byte[] buf) {
this.buf = buf;
pos = 0;
}
public int readInt() throws IOException {
this.ensureBounds(5);
int len = 1;
int b = this.buf[this.pos] & 255;
int n = b & 127;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 7;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 14;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 21;
if (b > 127) {
b = this.buf[this.pos + len++] & 255;
n ^= (b & 127) << 28;
if (b > 127) {
throw new InvalidNumberEncodingException("Invalid int encoding");
}
}
}
}
}
return n;
}

private void ensureBounds(int bounds) {
System.out.println("Ensuring bounds " + bounds);
}
}
class InvalidNumberEncodingException extends IOException {
public InvalidNumberEncodingException(String message) {
super(message);
}

}

相关内容

  • 没有找到相关文章

最新更新