如何发送 Map to Kafka 主题,使 ProducerRecord 键与相应的 Map 键相同



我正在使用Spark流,数据正在发送到Kafka。我正在向卡夫卡发送地图。假设我有一个 20 的地图(在流式批处理持续时间内可能会增长到 1000(元素,如下所示:

HashMap<Integer,String> input = new HashMap<Integer,String>();
input.put(11,"One");
input.put(312,"two");
input.put(33,"One");
input.put(24,"One");
input.put(35,"One");
input.put(612,"One");
input.put(7,"One");
input.put(128,"One");
input.put(9,"One");
input.put(10,"One");
input.put(11,"One1");
input.put(12,"two1");
input.put(13,"One1");
input.put(14,"One1");
input.put(15,"One1");
input.put(136,"One1");
input.put(137,"One1");
input.put(158,"One1");
input.put(159,"One1");
input.put(120,"One1");

Set<Integer> inputKeys = input.keySet();
Iterator<Integer> inputKeysIterator = inputKeys.iterator();
while (inputKeysIterator.hasNext()) {
Integer key = inputKeysIterator.next();
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic,
key%10, input.get(key));
KafkaProducer.send(record);
}

我的 Kafka 主题有 10 个分区。在这里,我调用 kafkaProducer.send(( 20 次,因此调用 20 次 Kafka。我怎样才能批量发送整个数据,即在一个 Kafka 调用中,但我想再次确保每条记录都进入由公式键 %10驱动的特定分区,如

制作人记录 = 新的制作人记录(主题,key%10, input.get(key((;

我看到的选项:linger.ms=1可以确保这一点,但延迟为 1 毫秒。 如何避免这种延迟并避免 20 个网络 (Kafka( 调用或最小化 Kafka 调用?

Kafka Producer API 已经批量发送消息,即使您逐个调用也是如此

请参阅文档中的batch.size,它是按字节而不是消息,但您可以通过在生产者上调用 flush 来强制实际的网络事件

关于分区,您需要创建代码分区程序。简单地将 mod 值作为键传递并不能保证在默认分区程序中不会发生哈希冲突

最新更新