100万卡夫卡制作人录制了一秒钟的唱片



是否可以通过kafka每秒生成一百万条记录?需要多少台服务器?目前,我每秒发送7000条来自卡夫卡生产商的信息,并努力超越这个

我注意到一些消息来源说卡夫卡一秒钟可以发送数百万条信息

我创建了一个使用@Autowired kafkaTemplate的作业,并制作了一个while循环,发送一个短文本字符串";asdf";我已经设置了1000毫秒的徘徊时间,并以7000条为一组查看消息。生产者、消费者经纪人、动物园管理员在同一台机器上,经纪人和动物园管理员以及具有默认配置的非常简单的docker映像

我在第二次中最多收到7000个请求

application.props

spring.kafka.bootstrap-servers=PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093
host.name=localhost

拨打电话的作业

@Async
@Scheduled(fixedDelay = 15000)
public void scheduleTaskUsingCronExpression() {
generateCalls();
}
private void generateCalls() {
try{
int i = 0;
System.out.println("start");
long startTime = System.currentTimeMillis();
while(i <= 1000000){

String message = "Test Message sadg sad-";
kafkaTemplate.send(TOPIC, message + i);
i++;
}
long endTime = System.currentTimeMillis();
System.out.println((endTime - startTime));
System.out.println("done");
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("RUNNING");
}

Kaffa分区配置

@Bean
public KafkaAdmin kafkaAdmin() {
//String bootstrapAddress = "localhost:29092";
String bootstrapAddress = "localhost:9092";
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic testTopic() {
return new NewTopic("test-topic", 6, (short) 1);
}

卡夫卡消费者消费信息

@KafkaListener(topics = "test-topic", groupId = "one", concurrency = "6" )
public void listenGroupFoo(String message) {
if(message.indexOf("-0") != -1){
startTime = new Date().getTime();
System.out.println("Starting Message in group foo: " + message);
}
else if(message.indexOf("-100000") != -1){
endTime = new Date().getTime();
System.out.println("Received Message in group foo: " + message);
System.out.println(endTime - startTime);
}
}

对于硬件,我有一个10900k和64gb内存5ghz时钟速度970 Evo单nvme磁盘10芯20线

所有请求都来自同一台机器到同一台

有没有更好的方法来组织/优化代码以进行大量请求?理论:

  1. 多线程
  2. 更改服务器的配置,如tomcat配置(接收端或发送端)
  3. 不使用自动连接或创建多个的kafkaTemplate
  4. 是否修改硬件以拥有多个磁盘
  5. 不为制片人工作
  6. 还有什么人能想到的帮助吗

KafkaProducers吞吐量调整过程中的关键属性有:

生产者配置:

  • linger.ms
  • 批量大小
  • 压缩类型
  • acks启用幂等性

主题配置:

  • min.insync.副本

因此,在研究了链接的文档后,您将看到两组属性耦合:

1.在Kafka生产者上进行有效的批处理和压缩

卡夫卡生产商内置了批处理和压缩机制。当生产者将许多消息分组,然后将其压缩为一个批时,该机制会获得最佳结果,因此简单地为批大小设置一个高值并启用压缩听起来非常好,实际上确实如此,但要想了解最后一块拼图是什么,我们需要通过内部Kafka生产者实现。Kafka生产者生成了一个单独的线程,负责从内部队列中获取批,并将其发送到Kafka代理,我们可以将其命名为Sender线程。在默认配置中,无论批大小有多大,Sender都会在可能的情况下从队列中获取批,因此您可以将批大小设置为1024MB,并且仍然不会像预期的那样对消息进行分组和压缩。为了解决这个问题,并将许多记录分组和压缩成一批,Kafka提供了linger.ms属性,该属性告诉Sender线程等待一段时间,直到它向Kafka服务器发送一批。在您的特定情况下,理论上最有效的方法是在一批中发送一百万条消息,但这当然取决于消息的大小。

2.高吞吐量,但复制和确认可能不一致

Kafka生产者有3种不同的策略,说明生产者需要等待多长时间,直到它认为批处理已正确发送到服务器。最一致的策略等待消息被引导服务器保存,并且是复制服务器的主题配置号(min.in.sync.re副本)所要求的。与最低的策略相反,它更喜欢可用性而不是一致性(正如CAP定理所说),它不等待来自服务器的任何确认,它以推送模式工作,您可以将其与通过UDP协议发送数据进行比较。所以在你的情况下,你需要使用第二个。为了实现这一点,您需要通过在生产者上设置enable.idempotence=falseack=0来禁用Kafka生产者的独立功能。最后一部分是在主题创建过程中设置min.insync.replicas=1,该属性与生产者端的acks属性耦合,告诉生产者在向生产者发送确认之前需要保留多少数据集群副本,这在您的情况下似乎并不重要,因为设置了acks=0的生产者不会等待任何确认,但是min.insync.replicas=1在领导者选举过程中也会被集群解释-这个过程不会为单个主题启动,直到没有数量大于或等于min.insync.replicas的可用Kafka服务器,所以为了看到全貌,我不得不提到它。与我们的教育相关的最后一个参数是主题属性unclan.leader.election.enabled,但它更多地与可用性有关,而不是与性能有关,但imho还是很高兴知道这一点。

如何测试卡夫卡制作人的表现

为了检查上述属性(生产者属性和主题属性)是如何协同工作的,Kafka发行版附带了一个名为Kafka-productor-perf-test的整洁脚本。此脚本将帮助您检查配置是否符合您的需要。下面你可以找到具有非常有用命令的注册表。

Spring Boot将生产者确认默认配置为"all"。

请参阅https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_acks

  • 对于最高吞吐量,请将值设置为0
  • 如果没有数据丢失,请将ack值设置为all(或-1)
  • 对于高但不是最大耐用性,以及高但不是最高吞吐量,请将ack值设置为1。Ack值1可以被看作是上述两者之间的中间值

我认为这可能是此类测试的主要瓶颈。

您可以通过Spring属性(或在YAML中)覆盖它:

spring.kafka.producer.acks=0

OTOH,这可能会导致发送1M条记录,但99%的消息都会丢失,所以这取决于你想要实现什么:-)

最新更新