在我们的生产集群上,我们看到一些事件没有被插入或更新。应用程序未报告错误。
为了研究这一点,我想监听Ignite集群中的缓存放置事件。我遵循了这里提到的内容https://ignite.apache.org/docs/latest/events/listening-to-events.这里也借鉴了这个例子。https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEventsExample.java
我试着运行和示例链接中提到的完全相同的代码。它就像低于
try (Ignite ignite = Ignition.start("C:\Users\example\ignite-client-config-local.xml")) {
System.out.println();
System.out.println(">>> Cache events example started.");
EntityKey entityKey = new EntityKey("T123");
Entity entity = new Entity(entityKey);
entity.setProductType("prodct1");
entity.setSubProductType("subproduct1");
try (IgniteCache<EntityKey,Entity> cache = ignite.getOrCreateCache("ProductsCache")) {
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@Override public boolean apply(UUID uuid, CacheEvent evt) {
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());
return true; // Continue listening.
}
};
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes("ProductsCache")).remoteListen(locLsnr, rmtLsnr,
EVT_CACHE_OBJECT_PUT);
// Generate cache events.
cache.put(entityKey,entity);
// Wait for a while while callback is notified about remaining puts.
Thread.sleep(2000);
System.out.println(cache.get(entityKey));
}
}
但是侦听器功能不起作用。数据被正确地推送到缓存中,并且也被读回。这是我第一次在本地机器上尝试。我还在服务器和客户端配置xml中将peerClassLoadingEnabled
都设置为true。此外,在两个配置中,我都添加了CACHE_EVT_OBJECT_PUT。在Ignite服务器上,我可以看到下面的错误。
[16:56:07,469][SEVERE][sys-#109%personal.local%][GridContinuousProcessor] Failed to unmarshal continuous routine handler [routineId=1c49245f-238c-49a0-ae48-c4da3085052a, srcNodeId=89b3117f-43aa-428d-a5cf-647a518d089e]
class org.apache.ignite.internal.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter
at org.apache.ignite.internal.GridEventConsumeHandler.p2pUnmarshal(GridEventConsumeHandler.java:418)
at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.lambda$null$2(GridContinuousProcessor.java:707)
at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7133)
at org.apache.ignite.internal.processors.closure.GridClosureProcessor$1.body(GridClosureProcessor.java:827)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[16:56:07,481][SEVERE][sys-#108%personal.local%][] JVM will be halted immediately due to the failure: [failureCtx=FailureContext [type=CRITICAL_ERROR, err=class o.a.i.i.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter]]
尝试通过@IgniteInstanceResource
注入点火实例,以便在远程筛选器中使用它。
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@IgniteInstanceResource
private IgniteEx ignite;
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
return true;
}
};