Storm Kafkaspout KryoSerialization issue for java bean from



嗨,我是风暴和卡夫卡的新手。我正在使用风暴 1.0.1 和卡夫卡 0.10.0我们有一个Kafkaspout,它将从Kafka Topic接收Java Bean。我花了几个小时来挖掘找到正确的方法。找到了一些有用的文章,但到目前为止,没有一种方法对我有用。

以下是我的代码:

风暴拓扑:

public class StormTopology {
public static void main(String[] args) throws Exception {
    //Topo test /zkroot test
    if (args.length == 4) {
        System.out.println("started");
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
                args[3]);
        kafkaConf1.zkRoot = args[2];
        kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
        kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
        kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
        KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
        System.out.println("started");
        ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
        AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
        //builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
        //This is for field grouping in bolt we need two bolt for field grouping or it wont work
        topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
        topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
        Config config = new Config();
        config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
        config.setDebug(true);
        config.setNumWorkers(1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
        // StormSubmitter.submitTopology(args[0], config,
        // builder.createTopology());
    } else {
        System.out
                .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
    }
}

}

我正在使用 kryo 在 kafka 序列化数据

夫卡制作人:

public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.abc.serializer.MySerializer");
    producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
    return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
    ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
    producer.send(producerRecord);
    //producer.close();
}
public static void closeProducer(){
    producer.close();
}

}

Kyro Serializer:

public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
    output.writeLong(data.getStartedOn().getTime());
    output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
    Data data = new Data();
    data.setStartedOn(new Date(input.readLong()));
    data.setEndedOn(new Date(input.readLong()));
    return data;
}

我需要将数据返回到数据 Bean。

根据几篇文章,我需要提供一个自定义方案并使其成为拓扑的一部分,但到目前为止我没有运气

螺栓和方案代码

方案:

public class KryoScheme implements Scheme {
    private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
            return kryo;
        };
    };
    @Override
    public List<Object> deserialize(ByteBuffer ser) {
        return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
    }
    @Override
    public Fields getOutputFields( ) {
        return new Fields( "data" );
    }
}

和螺栓:

public class AnalysisBolt implements IBasicBolt {
/**
 *
 */
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
    this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
    System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
    System.out.println("execute");
    Fields fields = input.getFields();
    try {   
        JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
                .getValueByField(fields.get(1)));
        String StartTime = (String) eventJson.get("startedOn");
        String EndTime = (String) eventJson.get("endedOn");
        String Oid = (String) eventJson.get("_id");
        int V_id =  (Integer) eventJson.get("vehicleId");
        //call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
        System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
    e.printStackTrace();
}
}

但是如果我提交风暴拓扑,我会收到错误:

java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme$1, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within
the prepare method of 'kafkaspout at the earliest.

感谢帮助调试问题并指导正确的路径。

谢谢

您的 ThreadLocal 不可序列化。 更可取的解决方案是使序列化程序既可序列化又线程安全。 如果这是不可能的,那么我看到 2 种选择,因为没有像您在螺栓中那样的准备方法。

  1. 将其声明为静态,这本质上是暂时的。
  2. 声明它是暂时性的,并通过私有 get 方法访问它。 然后,您可以在首次访问时初始化变量。

在 Storm 生命周期中,拓扑被实例化,然后序列化为字节格式,以便在执行拓扑之前存储在 ZooKeeper 中。在此步骤中,如果拓扑中的喷口或螺栓具有初始化的不可序列化属性,则序列化将失败。

如果需要不可序列化的字段,请在螺栓或喷口的准备方法中对其进行初始化,该方法在拓扑交付给工作线程后运行。

来源:实现 Apache Storm 的最佳实践

最新更新