如何为spark结构化流媒体应用程序获得kafka消费者滞后



我正在为我的spark结构化流媒体应用程序构建监控,需要获得spark应用程序所使用的特定主题的消费者滞后。我相信火花驱动程序必须意识到这种滞后,因为它拥有执行器的所有元数据。我看不出有任何方法可以从任何现有的spark文档或资源中获得这些指标。我检查了streaminQueryListener接口,但它的功能也有限,因为我们只能从中获得每个查询的度量

跟踪结构化流式处理作业的消费者滞后的挑战在于,结构化流式传输不会将任何偏移提交回Kafka(有关更多详细信息,请参阅此处(。因此,Kafka并没有意识到结构化流工作的实际进展。

另一方面,Spark对当前位于Kafka主题中的消息/偏移量没有深入了解。

为了监控消费者的滞后,你需要把这些信息放在一起:

  • 持续请求TopicPartition中的最新偏移
  • 持续检查结构化流应用程序处理的当前偏移

例如,您可以创建一个KafkaAdminClient,并在StreamingQueryListeneronQueryProgress调用期间从Kafka获取所需信息。在该方法中,您需要将提到的最新事件的偏移量与Kafka中实际可用的最高偏移量进行比较。

这里有一种获取执行器节点请求信息的方法。信息是为每条消息提取的,您可以以最适合您需求的方式(计数、时间等(减少请求量。

下面我将监控信息发送到另一个Kafka主题。

我经常在每一批流式消息上打开Kafka消费者连接(以获取关于最大偏移量的信息(。也许你无法接受。

final JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Income> streamPair = stream
.mapPartitionsToPair(new PairFlatMapFunction<Iterator<ConsumerRecord<String, byte[]>>, String, Income>() {
private Map<String, Object> getProps() {
Map<String, Object> kafkaParams2 = new HashMap<>();
kafkaParams2.put("bootstrap.servers", ApiConsts.BOOTSTRAP_SERVERS);
kafkaParams2.put("key.deserializer", StringDeserializer.class);
kafkaParams2.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams2.put("group.id", "ta_calc_spark" + UUID.randomUUID().toString());
kafkaParams2.put("auto.offset.reset", "latest");
kafkaParams2.put("enable.auto.commit", false);
kafkaParams2.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
kafkaParams2.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
return kafkaParams2;
}
@Override
public Iterator<Tuple2<String, Income>> call(Iterator<ConsumerRecord<String, byte[]>> t) throws Exception {
KafkaConsumer consumer = new KafkaConsumer<>(getProps());
ArrayList<TopicPartition> partitions0 = new ArrayList<TopicPartition>();
IntStream.range(0, consumer.partitionsFor(ApiConsts.TOPIC_TA_CALC_SPARK_TASK).size())
.forEach(i -> partitions0.add(new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, i)));
consumer.assign(partitions0);
KafkaProducer producerMonitoring = getKafkaProducer();
List<Tuple2<String, Income>> result = new ArrayList<Tuple2<String, Income>>();
try {
t.forEachRemaining(t2 -> {
// business logic - message handling
try {
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, t2.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = (Long) consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);
String monitorValue = String.format(
"diff: %s   (partition:%s; actualEndOffsetStreaming:%s; actualEndOffset:%s; actualPosition=%s)",
actualEndOffset - actualPosition, t2.partition(), t2.offset(), actualEndOffset, actualPosition);
ProducerRecord<String, String> pRecord = new ProducerRecord<String, String>(ApiConsts.TOPIC_TA_CALC_SPARK_TEMP_RESULT,
UUID.randomUUID().toString(), monitorValue);
producerMonitoring.send(pRecord);
} catch (Exception ex) {
log.error("################# mapPartitionsToPair.call() ERROR", ex);
ex.printStackTrace();
}
});
} finally {
producerMonitoring.close();
consumer.close();
}
return result.iterator();
}
});

输出:

Consumer Record:(f45cd24b-6232-45b2-b8f2-814753ae89bf, diff: 0   (partition:4; actualEndOffsetStreaming:1177; actualEndOffset:1178; actualPosition=1178), 2, 109)
Consumer Record:(3ec4f576-1fff-4c91-885f-fc709f7f4531, diff: 0   (partition:4; actualEndOffsetStreaming:1176; actualEndOffset:1178; actualPosition=1178), 3, 105)

最新更新