连接两个Kafka KTable会导致RocksDB中出现一个Nullpointer



我正在尝试使用加入两个Kafka Stream DSL KTable

KTable<String, String> source = builder.table("stream-source");
KTable<String, String> target = builder.table("stream-target");
source.join(target, new ValueJoiner<String, String, String>() {
    public String apply(String value1, String value2) {
        return value1 + ":" + value2;
    }
});

我已经确保密钥和值都不是null:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < PERSONS_SOURCE.length; i++) {
    producer.send(new ProducerRecord<String, String>("stream-source",     Long.toString(i + 1L), PERSONS_SOURCE[i]));
}
for(int i = 0; i < PERSONS_TARGET.length; i++) {
    producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i]));
}
producer.close();

但是应用程序报告在RocksDB层中有一个与分区有关的空指针。

〔2016-07-17 21:58:04682〕错误用户提供的侦听器org.apache.kafka.streams.processer.internals.StreamThread组streams-persons2在分配分区时失败(org.apache.kaf ka.clients.consumer.internals.ConsumerCoordinator)java.lang.NullPointerException网址:org.rocksdb.rocksdb.put(rocksdb.java:432)网址:org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:299)网址:org.apache.kafka.streams.state.internals.RocksDBStore.access$200(RocksDBStore.java:62)网址:org.apache.kafka.streams.state.internals.RocksDBStore$3.restore(RocksDBStore.java:206)网址:org.apache.kafka.streams.processer.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)网址:org.apache.kafka.streams.processer.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)网址:org.apache.kafka.stream.processer.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)网址:org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:202)

发现问题是由于在应用程序代码中创建流而不是使用命令:-

kafka-topics --create --topic stream-a --replication-factor 1 --partitions 1

似乎连接需要分区信息才能工作。

最新更新