将 Kafka 与应用程序分离



>我有一个应用程序接收了大量的GET请求(5分钟内大约250000个(。应用程序分析查询参数并发布到 Kafka。要发布的代码如下:

public class KafkaProcessor {
private static final String BATCH_SIZE = "batch.size";
private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
private static final String PRODUCER_TYPE = "producer.type";
private static final String VALUE_SERIALIZER = "value.serializer";
private static final String KEY_SERIALIZER = "key.serializer";
private static final String METADATA_BROKER_LIST = "bootstrap.servers";
private static final String MAX_BLOCK_MS = "max.block.ms";
private static final String KAFKA_ENABLED = "enabled";
private static Properties props = new Properties();
private static KafkaProducer<String, String> producer;
private static ProducerRecord<String, String> producerRecord;
private static String topic;

static {
boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
if (isEnabled) {
//Setting up a producer configuration.
props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
props.put(PRODUCER_TYPE, "async");
props.put(REQUEST_REQUIRED_ACKS, "1");
props.put(BATCH_SIZE, "1000");
props.put(MAX_BLOCK_MS, "10000");
producer = new KafkaProducer<>(props);
topic = "pixel-server";
}
}

private static void publishToKafka(JSONObject data) {
producerRecord = new ProducerRecord<String, String>(topic, data.toString());
producer.send(producerRecord, new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
}
}

我的应用程序托管在 AWS 实例中。Kafka 服务器也在另一台 AWS 机器中。

但是,如果 kafka 关闭或 kafka 由于任何原因响应缓慢,那么我的应用程序将冻结并且无法进一步处理任何请求。我想知道如何使我的应用程序独立于 Kafka,这意味着,如果 kafka 出现故障(或响应缓慢(,那么它应该不会影响我的应用程序。

我尝试了几种方法,例如如果 kafka 给出超时,然后计算超时异常的数量并停止发布到 kafka,但由于请求量非常大,所以到那时,我收到任何超时异常,我的应用程序冻结。

任何帮助或指针将不胜感激。

我正在使用卡夫卡 0.8.2。我的服务器在 Vertx 中。Ubuntu 中使用的操作系统。限制设置为最大值。

假设您的 Kafka 集群中有三个或更多节点(这对于任何高负载应用程序都至关重要(,您可以尝试一些技巧:

  1. 尝试将acks生产者配置设置为0。这将影响应用程序的一致性(某些消息可能会在生产者端丢弃,并将永远丢失(。文档 说:

    如果设置为零,则生产者根本不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,无法保证服务器已收到记录

  2. max.block.ms生产者配置设置为0。这将导致您的应用程序在每次发送到 Kafka 集群时立即引发 TimeoutException,而不会造成任何阻塞,而仅在内存缓冲区溢出时引发。请注意,它只影响客户端阻止,而不影响网络调用!

  3. request.timeout.ms减少到较小的值(如10100(。这将导致 Kafka 客户端在任何需要时间超过request.timeout.ms的网络操作上抛出超时异常。

还有一些建议:

  1. 将您的 Kafka 实例更新到最新版本,以获得更稳定的集群;

  2. 要实现高可用性,Kafka 集群必须包含至少三个节点(并且始终包含奇数个节点以避免出现裂脑情况(

  3. 您应该尝试使用max.batch.sizelinger.ms生产者配置,以达到应用程序的最佳延迟吞吐量比

最新更新