>我正在开发将数据发送到zeromq
的应用程序。以下是我的应用程序的作用:
- 我有一个将数据发送到 zeromq 的类
SendToZeroMQ
。 - 将相同的数据添加到同一类中的
retryQueue
,以便在未收到确认时可以重试。它使用具有最大大小限制的番石榴缓存。 - 有一个单独的线程,该线程从 zeromq 接收之前发送的数据的确认,如果未收到确认,则
SendToZeroMQ
将重试发送相同的数据。如果收到确认,那么我们将从retryQueue
中删除它,以便无法再次重试。
想法非常简单,我必须确保我的重试策略正常工作,以免丢失数据。这是非常罕见的,但以防万一我们没有得到楔形。
我正在考虑构建两种类型的RetryPolicies
但我无法理解如何在这里构建与我的程序相对应的内容:
RetryNTimes:
在这种情况下,它将重试 N 次,每次重试之间有一个特定的睡眠,之后,它将删除记录。ExponentialBackoffRetry:
在这种情况下,它将成倍地不断重试。我们可以设置一些最大重试限制,之后它不会重试并会删除记录。
下面是我的SendToZeroMQ
类,它将数据发送到zeromq,也每30秒从后台线程重试一次,并启动ResponsePoller
可运行,该类永远运行:
public class SendToZeroMQ {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long, byte[]> retryQueue =
CacheBuilder
.newBuilder()
.maximumSize(10000000)
.concurrencyLevel(200)
.removalListener(
RemovalListeners.asynchronous(new CustomListener(), executorService)).build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorService.submit(new ResponsePoller());
// retry every 30 seconds for now
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
sendTo(entry.getKey(), entry.getValue());
}
}
}, 0, 30, TimeUnit.SECONDS);
}
public boolean sendTo(final long address, final byte[] encodedRecords) {
Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}
public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// adding to retry queue
retryQueue.put(address, encodedByteArray);
return sent;
}
public void removeFromRetryQueue(final long address) {
retryQueue.invalidate(address);
}
}
下面是我的ResponsePoller
类,它轮询了来自 zeromq 的所有确认。如果我们从 zeromq 获得确认,那么我们将从重试队列中删除该记录,这样它就不会重试,否则它将被重试。
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry queue since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryQueue(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
问题:
正如您在上面看到的,我使用类将encodedRecords
发送到 zeromqSendToZeroMQ
然后每 30 秒重试一次,具体取决于我们是否从ResponsePoller
类中得到了回复。
对于每个encodedRecords
都有一个称为address
的唯一键,这就是我们将从 zeromq 返回
的确认。 我如何继续扩展此示例以构建我上面提到的两个重试策略,然后我可以选择要在发送数据时使用的重试策略。我想出了下面的接口,但后来我无法理解我应该如何继续实施这些重试策略并在上面的代码中使用它。
public interface RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs);
}
我可以在这里使用 guava 重试或故障保护吗,因为这些库已经有许多我可以使用的重试策略?
我无法弄清楚有关如何使用相关 API-s 的所有细节,但至于算法,您可以尝试:
- 重试策略需要将某种状态附加到每条消息(至少是当前消息重试的次数,可能是当前延迟的次数)。您需要决定 RetryPolicy 是应该保留它本身,还是要将其存储在邮件中。
- 代替 allowRetry,您可以使用一种方法计算下一次重试何时发生(绝对时间或将来的毫秒数),这将是上述状态的函数 重试
- 队列应包含有关何时应重试每条消息的信息。
- 不要使用
scheduleAtFixedRate
,而是在重试队列中找到when_is_next_retry
最低的消息(可能通过对绝对重试时间戳进行排序并选择第一个),并让 executorService 使用schedule
和time_to_next_retry
重新调度自身 - 对于每次重试,将其从重试队列中拉出,发送消息,使用 RetryPolicy 计算下一次重试的时间(如果要重试),并使用新的
when_is_next_retry
值插入回重试队列(如果 RetryPolicy 返回 -1,则可能意味着不再重试该消息)
不是一个完美的方法,但也可以通过下面的方法实现。
public interface RetryPolicy {
public boolean allowRetry();
public void decreaseRetryCount();
}
创建两个实现。对于重试
public class RetryNTimes implements RetryPolicy {
private int maxRetryCount;
public RetryNTimes(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}
public boolean allowRetry() {
return maxRetryCount > 0;
}
public void decreaseRetryCount()
{
maxRetryCount = maxRetryCount-1;
}}
对于指数退避重试
public class ExponentialBackoffRetry implements RetryPolicy {
private int maxRetryCount;
private final Date retryUpto;
public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) {
this.maxRetryCount = maxRetryCount;
this.retryUpto = retryUpto;
}
public boolean allowRetry() {
Date date = new Date();
if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0)
{
return false;
}
return true;
}
public void decreaseRetryCount() {
maxRetryCount = maxRetryCount-1;
}}
您需要在 SendToZeroMQ 类中进行一些更改
public class SendToZeroMQ {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long,RetryMessage> retryQueue =
CacheBuilder
.newBuilder()
.maximumSize(10000000)
.concurrencyLevel(200)
.removalListener(
RemovalListeners.asynchronous(new CustomListener(), executorService)).build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorService.submit(new ResponsePoller());
// retry every 30 seconds for now
executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
for (Map.Entry<Long, RetryMessage> entry : retryQueue.asMap().entrySet()) {
RetryMessage retryMessage = entry.getValue();
if(retryMessage.getRetryPolicy().allowRetry())
{
retryMessage.getRetryPolicy().decreaseRetryCount();
entry.setValue(retryMessage);
sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage);
}else
{
retryQueue.asMap().remove(entry.getKey());
}
}
}
}, 0, 30, TimeUnit.SECONDS);
}
public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) {
Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
if(null==retryMessage)
{
RetryPolicy retryPolicy = new RetryNTimes(10);
retryMessage = new RetryMessage(retryPolicy,encodedRecords);
retryQueue.asMap().put(address,retryMessage);
}
return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}
public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
return sent;
}
public void removeFromRetryQueue(final long address) {
retryQueue.invalidate(address);
}}
下面是一个对环境的小模拟,展示了如何做到这一点。请注意,Guava缓存在这里是错误的数据结构,因为您对逐出不感兴趣(我认为)。所以我正在使用并发哈希图:
package experimental;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
class Experimental {
/** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
interface RetryStrategy {
long getDelayMs(int retry);
}
enum ConstantBackoff implements RetryStrategy {
INSTANCE;
@Override
public long getDelayMs(int retry) {
return 1000L;
}
}
enum ExponentialBackoff implements RetryStrategy {
INSTANCE;
@Override
public long getDelayMs(int retry) {
return 100 + (1L << retry);
}
}
static class Sender {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();
/** Send the given data with given address on the given socket. */
void sendTo(long addr, byte[] data, int socket) {
System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
}
private class Retrier implements Runnable {
private final RetryStrategy retryStrategy;
private final long addr;
private final byte[] data;
private final int socket;
private int retry;
private Future<?> future;
Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
this.retryStrategy = retryStrategy;
this.addr = addr;
this.data = data;
this.socket = socket;
this.retry = 0;
}
synchronized void start() {
if (future == null) {
future = executorService.submit(this);
pending.put(addr, this);
}
}
synchronized void cancel() {
if (future != null) {
future.cancel(true);
future = null;
}
}
private synchronized void reschedule() {
if (future != null) {
future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
}
}
@Override
synchronized public void run() {
sendTo(addr, data, socket);
reschedule();
}
}
long getVerifiedAddr() {
System.err.println("Pending messages: " + pending.size());
Iterator<Long> i = pending.keySet().iterator();
long addr = i.hasNext() ? i.next() : 0;
return addr;
}
class CancellationPoller implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
long addr = getVerifiedAddr();
if (addr == 0) {
continue;
}
System.err.println("Verified message (to be cancelled) " + addr);
Retrier retrier = pending.remove(addr);
if (retrier != null) {
retrier.cancel();
}
}
}
}
Sender initialize() {
executorService.submit(new CancellationPoller());
return this;
}
void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
new Retrier(retryStrategy, addr, data, socket).start();
}
}
public static void main(String[] args) {
Sender sender = new Sender().initialize();
for (long i = 1; i <= 10; i++) {
sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
}
for (long i = -1; i >= -10; i--) {
sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
}
}
}
你可以使用阿帕奇骆驼。它为 zeromq 提供了一个组件,并且原生提供了诸如 errohandler、redeliverypolicy、死信通道等工具。