获取主题和分区偏移量



线程"main"java.nio.channels.ClosedChannelException中的异常在kafka.network.BlockingChannel.send(BlockingChannel.scala:100)在kafka.customer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)在kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConservator$$sendRequest(SimpleConsumer.scala:68)位于kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)位于kafka.javaapi.customer.SimpleConsumer.send(SimpleConsumer.scala:68)在cmb。SparkStream。kafka.kafkaOffsetTool.getTopicOffsets(kafkaOffsetTool.java:47)在cmb。SparkStream.LogClassify.main(LogClassify.java:95)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:606)在org.apache.spark.deploy.SparkSubmit$org.apache$spark$deploy$SparkSubmit$$runMain(SparkSubmitte.scala:729)网址:org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)网址:org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmitte.scala:210)网址:org.apache.spark.deploy.SparkSubmit$.main(SparkSubmitte.scala:124)网址:org.apache.spark.deploy.SparkSubmit.main(SparkSubmitte.scala)

我的代码是:

public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) {
Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
for (String zkserver : zkServers.split(",")) {
SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0],
Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024,
"consumser");
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
for (PartitionMetadata part : metadata.partitionsMetadata()) {
Broker leader = part.leader();
if (leader != null) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.LatestTime(), 10000);
OffsetRequest offsetRequest = new OffsetRequest(
ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
if (!offsetResponse.hasError()) {
long[] offsets = offsetResponse.offsets(topic, part.partitionId());
retVals.put(topicAndPartition, offsets[0]);
}
}
}
}
simpleConsumer.close();
}
return retVals;
}

我认为您可能过于复杂了。使用org.apache.kafka.clients.csumer.KafkaConsumer(此处为consumer)并执行类似的操作

val partitions = consumer.partitionsFor(topic).map[new TopicPartition(topic,it.partition)]
consumer.assign(partitions)
consumer.seekToEnd(partitions)
val offsets = partitions.map[ it -> consumer.position(it)]
println(offsets)

你会得到像这样的结果

[topicname-8->1917258,topicname-2->1876810,topicname-5->1857012,topicame-4->3844,topicname-7->4043972,topic name-1->1811078,topic name-9->12217819,topicnames-3->3844

最新更新