在我的处理器API中,我将消息存储在一个键值存储中,每100条消息我发出一个POST
请求。如果在尝试发送消息时出现故障(api没有响应等(,我想停止处理消息。直到有证据表明API调用了工作。这是我的代码:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
目前,在我的代码中,如果对API的调用失败,它将迭代下一个100(只要失败,这种情况就会一直发生(,并将它们添加到keyValueStore
。我不想发生这种事。相反,我更愿意停止流,并在keyValueStore
被清空后继续。这可能吗
我可以扔一个StreamsException
吗?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
这会杀死我的流媒体应用程序,从而导致进程终止吗?
- 只有在确保API成功处理记录后,才能从状态存储中删除记录,因此删除第一个
keyValueStore.delete(entry.key);
并保留第二个。如果没有,那么当keyValueStore.delete
被提交到底层变更日志主题时,您可能会丢失一些消息,但您的消息还没有成功处理,所以这最多只是一个保证 - 只需将调用API代码包裹在一个无限循环中并继续尝试,直到记录成功处理,您的处理器将不会消耗来自处理器节点上方的新消息,因为它运行在同一StreamThread中:
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
//remove this state store delete code : keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
while (true) {
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
break;
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
}
iterator.close();
count = 0;
}
- 是的,您可以抛出StreamsException,此
StreamTask
将在重新平衡期间迁移到另一个StreamThread,可能是在示例应用程序实例上。如果API一直导致异常,直到所有StreamThread死亡,您的应用程序将不会自动退出并接收以下异常,您应该添加自定义StreamsException处理程序,以便在所有流线程使用KafkaStreams#setUncaughtExceptionHandler
死亡时退出应用程序,或听流状态更改(错误状态(:
All stream threads have died. The instance will be in error state and should be closed.
最后我使用了一个简单的KafkaConsumer
而不是KafkaStreams
,但最重要的是我更改了BulkApiException
以扩展RuntimeException
,在我记录它之后我再次抛出它
} catch (BulkApiException bae) {
logger.error(bae.getMessage(), bae.fillInStackTrace());
throw new BulkApiException();
} finally {
consumer.close();
int exitCode = SpringApplication.exit(ctx, () -> 1);
System.exit(exitCode);
}
通过这种方式,应用程序将退出,k8s将重新启动pod。这是因为如果我试图转发请求的api坏了,那么继续读取消息就没有意义了。因此,在其他api恢复之前,k8s将重新启动一个pod。