Flink:从kafka获取byte[]数据



我使用flink-1.0-SNAPSHOT从kafka消费数据。数据以Snappy压缩的字节[]的形式传入,并传递给thrift以供以后使用。

当我使用flink来检索数据时,它会被损坏或处理不当,从而无法解压缩。代码来自这个示例,如下所示:

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
    @Override public String map(String value) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
 });

isValidCompressedBuffer每次返回false。

当通过其他途径使用时,数据是已知的好的。

我错过了什么?


解决方案:

我发这个是因为我找不到任何使用RawSchema的例子。

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));
    dataStream.map(new MapFunction<byte[], Object>() {
        @Override
        public Object map(byte[] bytes) throws Exception {
            boolean bvali = Snappy.isValidCompressedBuffer(bytes);
            });
            return 0;
        }
    }).print();
    env.execute();
}

将字节消息读取为String是不正确的。您应该按原样读取字节,然后解压缩:

public Object map(byte[] bytes) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
    ...

相关内容

  • 没有找到相关文章