Hazelcast从iQueue中删除,跳过一些元素



我有一个榛子cast客户端,它正在将通用消息类放入iQueue中,榛子cast成员通过Listener使用该通用消息,执行逻辑并从队列中删除对象。但它并没有删除所有对象。在mancenter上,我可以看到队列中仍然有项目(不是所有的项目,例如队列中的100个对象,它从中删除了大约80个(,我不知道为什么它没有删除一些对象。目前在mancentre,它在队列中显示了12个项目(大约100个请求(,但不应该有任何项目。尽管如此,代码仍在工作并返回结果。唯一的问题是,我可以在mancenter中看到这些项目越来越多地进入队列,直到我停止hazelcast服务器。

我的通用消息类:

public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = -1927585972068115172L;
private final T payload;
private MessageHeaders headers;
public GenericMessage(T payload) {
Assert.notNull(payload, "payload must not be null");
HashMap<Object, Object> headers = new HashMap<>();
this.headers = new MessageHeaders(headers);
this.payload = payload;
}
@Override
public MessageHeaders getHeaders() {
return this.headers;
}
@Override
public T getPayload() {
return this.payload;
}
@Override
public String toString() {
return "[Payload=" + this.payload + "][Headers=" + this.headers + "]";
}
@Override
public int hashCode() {
return this.headers.hashCode() * 23 + ObjectUtils.nullSafeHashCode(this.payload);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof GenericMessage<?>) {
GenericMessage<?> other = (GenericMessage<?>) obj;
if (this.headers.getKey() != null && other.headers.getKey() != null) {
return this.headers.getKey().equals(other.headers.getKey());
} else {
return false;
}
}
return false;
}
}

MessageHeaders类:

public class MessageHeaders implements Map<Object, Object>, Serializable {
private static final long serialVersionUID = 4469807275189880042L;
protected Map<Object, Object> headers;
public static final String KEY = "key";
public MessageHeaders(Map<Object, Object> headers) {
this.headers = (headers != null) ? headers : new HashMap<>();
}
@SuppressWarnings("unchecked")
public <T> T get(Object key, Class<T> type) {
Object value = this.headers.get(key);
if (value == null) {
return null;
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '"
+ key
+ "'. Expected ["
+ type
+ "] but actual type is ["
+ value.getClass()
+ "]");
}
return (T) value;
}
@Override
public int hashCode() {
return this.headers.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof MessageHeaders) {
MessageHeaders other = (MessageHeaders) obj;
return this.headers.equals(other.headers);
}
return false;
}
@Override
public boolean containsKey(Object key) {
return this.headers.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return this.headers.containsValue(value);
}
@Override
public Set<Map.Entry<Object, Object>> entrySet() {
return Collections.unmodifiableSet(this.headers.entrySet());
}
@Override
public Object get(Object key) {
return this.headers.get(key);
}
@Override
public boolean isEmpty() {
return this.headers.isEmpty();
}
@Override
public Set<Object> keySet() {
return Collections.unmodifiableSet(this.headers.keySet());
}
@Override
public int size() {
return this.headers.size();
}
@Override
public Collection<Object> values() {
return Collections.unmodifiableCollection(this.headers.values());
}
@Override
public Object put(Object key, Object value) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public void putAll(Map<? extends Object, ? extends Object> t) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public Object remove(Object key) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public void clear() {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
private void writeObject(ObjectOutputStream out) throws IOException {
List<String> keysToRemove = new ArrayList<>();
for (Map.Entry<Object, Object> entry : this.headers.entrySet()) {
if (!(entry.getValue() instanceof Serializable)) {
keysToRemove.add(String.valueOf(entry.getKey()));
}
}
for (String key : keysToRemove) {
// if (logger.isInfoEnabled()) {
// logger.info("removing non-serializable header: " +
// key);
// }
this.headers.remove(key);
}
out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
public String getKey() {
return this.get(KEY, String.class);
}
public void setKey(String key) {
this.headers.put(KEY, key);
}
}

放入队列实现:

User user = new User();
GenericMessage<User> message = new GenericMessage<User>(user);
String key="123";
message.getHeaders().setKey(key);
IQueue<Object> queue = hazelcastInstance.getQueue("user_queue");
queue.add(message);

Hazelcast侦听器配置:

IQueue<Object> userQueue = hazelcastInstance.getQueue("user_queue");
UserListener userListener = context.getBean(UserListener.class);
userQueue.addItemListener(userListener, true);

侦听器:

public class UserListener implements ItemListener<Object> {
@Autowired
private UserService service;
@Override
public void itemAdded(ItemEvent<Object> arg0) {
service.process(arg0);
}
}

服务:

public class UserService {
@Async("userTaskExecutor")
public void process(ItemEvent<Object> item) {
GenericMessage<User> message = (GenericMessage<User>) item.getItem();
hazelcastInstance.getQueue("user_queue").remove(message);
}

经过大量的测试和调试,我发现了这个问题。事实证明,remove(object(方法的文档具有误导性。文档中说这个方法依赖于.equals((类方法,但事实证明,hazelcast将序列化的对象与每个序列化的对象进行比较。所以我实现了一个自定义比较:

GenericMessage<?> incomeMessage = (GenericMessage<?>) object;
boolean removed = hazelcastInstance.getQueue(queueId).remove(object);
if (!removed) {
Iterator<Object> iterator = hazelcastInstance.getQueue(queueId).iterator();
while (iterator.hasNext()) {
Object next = iterator.next();
GenericMessage<?> message = (GenericMessage<?>) next;
if (incomeMessage.getHeaders().getKey()
.equals(message.getHeaders().getKey())) {
object = next;
removed = hazelcastInstance.getQueue(queueId).remove(object);
break;
}
}
}

最新更新