当我在Heron
中运行Storm
和KafkaSpout
的拓扑时,会出现以下exception
:
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance:
Starting instance container_2_ads_2 for topology AdvertisingTopology and topologyId AdvertisingTopologyf7b4acbe-bdbc-4772-aaa4-9dd2f113f405 for component ads with taskId 2 and componentIndex 0 and stmgrId stmgr-2 and stmgrPort 31162 and metricsManagerPort 31067
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: System Config: {heron.streammgr.network.backpressure.lowwatermark.mb=50, heron.streammgr.connection.write.batch.size.mb=1, heron.streammgr.stateful.buffer.size.mb=100, heron.instance.internal.bolt.write.queue.capacity=128, heron.instance.tuning.expected.spout.read.queue.size=512, heron.metricsmgr.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.reconnect.streammgr.interval.sec=PT5S, heron.instance.tuning.interval.ms=PT0.1S, heron.instance.emit.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.directory=log-files, heron.check.tmaster.location.interval.sec=120, heron.instance.reconnect.metricsmgr.interval.sec=PT5S, heron.streammgr.client.reconnect.tmaster.max.attempts=30, heron.streammgr.network.backpressure.highwatermark.mb=100, heron.instance.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.tuning.expected.metrics.write.queue.size=8, heron.instance.internal.spout.write.queue.capacity=128, heron.instance.force.exit.timeout.ms=PT2S, heron.tmaster.network.stats.options.maximum.packet.mb=1, heron.streammgr.xormgr.rotatingmap.nbuckets=3, heron.instance.set.control.tuple.capacity=1024, heron.metricsmgr.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.streammgr.client.reconnect.tmaster.interval.sec=10, heron.instance.execute.batch.time.ms=PT0.016S, heron.metrics.export.interval.sec=PT1M, heron.streammgr.connection.read.batch.size.mb=1, heron.streammgr.cache.drain.size.mb=100, heron.tmaster.network.master.options.maximum.packet.mb=16, heron.tmaster.establish.retry.interval.sec=1, heron.metrics.max.exceptions.per.message.count=1024, heron.tmaster.stmgr.state.timeout.sec=60, heron.instance.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.err.threshold=3, heron.tmaster.network.controller.options.maximum.packet.mb=1, heron.tmaster.metrics.collector.maximum.exception=256, heron.instance.network.write.batch.time.ms=PT0.016S, heron.instance.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.streammgr.mempool.max.message.number=512, heron.logging.maximum.size.mb=100, heron.streammgr.tmaster.heartbeat.interval.sec=10, heron.instance.network.read.batch.time.ms=PT0.016S, heron.tmaster.metrics.network.bindallinterfaces=false, heron.streammgr.network.options.maximum.packet.mb=10, heron.instance.tuning.expected.bolt.write.queue.size=8, heron.metricsmgr.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.logging.maximum.files=5, heron.instance.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.instance.execute.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.acknowledgement.nbuckets=10, heron.metricsmgr.network.read.batch.time.ms=PT0.016S, heron.metricsmgr.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.metricsmgr.network.options.maximum.packetsize.bytes=ByteAmount{1 MB (1048576 bytes)}, heron.instance.tuning.expected.bolt.read.queue.size=8, heron.logging.flush.interval.sec=10, heron.streammgr.cache.drain.frequency.ms=10, heron.tmaster.establish.retry.times=30, heron.instance.network.options.maximum.packetsize.bytes=ByteAmount{10 MB (10485760 bytes)}, heron.instance.tuning.current.sample.weight=0.8, heron.instance.reconnect.streammgr.times=60, heron.logging.prune.interval.sec=300, heron.instance.reconnect.metricsmgr.times=60, heron.tmaster.metrics.collector.maximum.interval.min=PT3H, heron.tmaster.metrics.collector.purge.interval.sec=PT1M, heron.streammgr.client.reconnect.interval.sec=1, heron.instance.internal.spout.read.queue.capacity=1024, heron.instance.ack.batch.time.ms=PT0.128S, heron.instance.set.data.tuple.size.bytes=ByteAmount{8 MB (8388608 bytes)}, heron.instance.tuning.expected.spout.write.queue.size=8, heron.instance.internal.bolt.read.queue.capacity=128, heron.instance.set.data.tuple.capacity=1024, heron.instance.metrics.system.sample.interval.sec=PT10S, heron.streammgr.network.backpressure.threshold=3, heron.instance.emit.batch.time.ms=PT0.016S, heron.metricsmgr.network.write.batch.time.ms=PT0.016S, heron.instance.internal.metrics.write.queue.capacity=128}
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31162
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31067
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Connected to Stream Manager. Ready to send register request
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Connected to Metrics Manager. Ready to send register request
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We registered ourselves to the Stream Manager
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Handling assignment message from response
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We received a new Physical Plan.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Push to Slave
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: We registered ourselves to the Metrics Manager
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Building configs for component: ads
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added topology-level configs: {topology.acker.executors=2, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30}
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added component-specific configs: {topology.acker.executors=2, config.zkRoot=/ad-events/6647e83d-6bd8-454e-ad91-d3ec0a012e62, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, config.topics=ad-events, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30, config.zkNodeBrokers=/brokers}
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Incarnating ourselves as ads with task id 2
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Is this topology stateful: false
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Enable Ack: true
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: EnableMessageTimeouts: true
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Exception caught in thread: SlaveThread with id: 12
java.lang.NullPointerException
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:80)
at org.apache.storm.topology.IRichSpoutDelegate.open(IRichSpoutDelegate.java:53)
at com.twitter.heron.instance.spout.SpoutInstance.init(SpoutInstance.java:173)
at com.twitter.heron.instance.Slave.startInstanceIfNeeded(Slave.java:222)
at com.twitter.heron.instance.Slave.handleNewAssignment(Slave.java:173)
at com.twitter.heron.instance.Slave.handleNewPhysicalPlan(Slave.java:349)
at com.twitter.heron.instance.Slave.access$300(Slave.java:49)
at com.twitter.heron.instance.Slave$1.run(Slave.java:118)
at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:160)
at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:89)
at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:79)
at com.twitter.heron.instance.Slave.run(Slave.java:180)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: Waiting for process exit in PT2S
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Closing the Slave Thread
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Shutting down the instance
[2018-11-01 22:43:49 +0800] [WARNING] com.twitter.heron.common.basics.SysUtils: Failed to close com.twitter.heron.instance.Slave@4bef4d93
java.lang.NullPointerException
at org.apache.storm.kafka.KafkaSpout.close(KafkaSpout.java:136)
at org.apache.storm.topology.IRichSpoutDelegate.close(IRichSpoutDelegate.java:58)
at com.twitter.heron.instance.spout.SpoutInstance.clean(SpoutInstance.java:195)
at com.twitter.heron.instance.spout.SpoutInstance.shutdown(SpoutInstance.java:204)
at com.twitter.heron.instance.Slave.close(Slave.java:238)
at com.twitter.heron.common.basics.SysUtils.closeIgnoringExceptions(SysUtils.java:66)
at com.twitter.heron.instance.HeronInstance$SlaveExitTask.run(HeronInstance.java:428)
at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.handleException(HeronInstance.java:396)
at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.uncaughtException(HeronInstance.java:360)
at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Gateway: Closing the Gateway thread
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Flushing all pending data in MetricsManagerClient
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Flushing all pending data in StreamManagerClient
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: MetricsManagerClient exits
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: StreamManagerClient exits.
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Instance Process exiting.
拓扑结构的代码如下:
String zkServerHosts = "MY_ZK_IP:2181";
ZkHosts hosts = new ZkHosts(zkServerHosts);
SpoutConfig spoutConfig = new SpoutConfig(hosts, kafkaTopic, "/" + kafkaTopic, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
NPE的位置是KafkaSpout类中开放方法的80行:
public Object getValueAndReset() {
List<PartitionManager> pms = KafkaSpout.this.coordinator.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
Iterator var3 = pms.iterator();
PartitionManager pm;
while(var3.hasNext()) { // the line of NPE happened
pm = (PartitionManager)var3.next();
latestPartitions.add(pm.getPartition());
}
this.kafkaOffsetMetric.refreshPartitions(latestPartitions);
var3 = pms.iterator();
while(var3.hasNext()) {
pm = (PartitionManager)var3.next();
this.kafkaOffsetMetric.setOffsetData(pm.getPartition(),
pm.getOffsetData());
}
return this.kafkaOffsetMetric.getValueAndReset();
}
我不知道是什么导致了这个问题,也不知道如何解决。任何帮助都是感激的。
新编辑:所有进口都指向鹭风暴级,但NPE仍然发生。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
基于Storm的Kafka喷口不适用于本地Heron拓扑API。您需要在兼容模式下使用heron-storm API(将此依赖项添加到您的pom文件中(来构建您的拓扑结构和与storm-Kafka喷口的接口。这应该只是一个交换鹭进口鹭风暴进口在你的螺栓。
这里展示了一些使用heron storm api的示例。
Storm和Heron以不同的方式激活它们的螺栓/喷口,这可能会导致本地Heron拓扑中的Storm专用代码出现问题。
此问题已解决。
final KafkaSpout<byte[], byte[]> spout =
new KafkaSpout<byte[], byte[]>(kafkaSpoutConfig) {
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
super.open(conf, context, collector);
super.activate();
}
};