在Kafka connect hdfs中,我们有下面的SequenceFileWriter.java类,用于以SequenceFileFormat编写kafka消息。
import java.io.IOException;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;
/**
* Provider of a Sequence File record writer.
*/
public class SequenceFileWriterProvider implements RecordWriterProvider
{
public String getExtension() {
return "";
}
@Override
public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
Path path = new Path(fileName);
final SequenceFile.Writer writer;
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) throws IOException {
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text((byte[]) record.value())
);
}
@Override
public void close() throws IOException {
writer.close();
}
};
}
}
我们在由 kubernetes 管理的 docker 容器中运行 confluent 5.0.0。我们观察到,当我们删除运行 kafka 连接器的 k8s 中的复制控制器并重新创建复制控制器时,某些序列文件会损坏。我们有一个火花作业,它使用 SequenceFileReader 读取这些数据并接收低于 EOFException。还观察到文件末尾出现了两个额外的字节。我们猜测 SequenceFileWriter 存在问题,需要帮助来验证编写器。任何帮助将不胜感激。谢谢。
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
at badSequenceFile.main(badSequenceFile.java:345)
注意:当我们在启动 k8s 复制控制器之前删除连接器临时文件(+tmp(时,连接器将干净启动并且不会创建坏文件。
修改writer.append
以处理异常似乎已经解决了不写入文件末尾错误 (EOF( 标记的错误序列文件的问题。 此外,还执行了将记录值从字节转换为字符串数据类型的类型转换。
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) {
if (record != null) {
byte[] text = (byte[]) record.value();
try{
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text(new String (text))
);
} catch (Exception e) {
logger.error("Exception encounterd : "+e+" for text : "+text);
}
}
}
}