使用IScheduledExecutiorService实现集群协同定时器(仅在Payara Micro Cluster



我正在努力实现集群协调事件的以下行为:

  1. 定时器(事件)仅在Payara Micro集群中的一个线程\JVM中执行
  2. 如果节点出现故障,将在集群中的另一个节点上执行定时器(事件)

来自Payara Micro指南:

持久定时器不会在Payara Micro集群中进行协调。它们总是在具有相同名称的实例上执行创建了计时器。

如果该实例发生故障,将在另一个实例上重新创建计时器实例加入集群后具有相同名称直到时间,计时器将变为非活动状态

根据定义,持久定时器在Payara Micro集群中似乎无法按要求工作。

因此,我正在尝试使用Hazelcast的IScheduledExecutiorService,这似乎是一个完美的匹配。

基本上,IScheduledExecutiorService的实现工作良好,除了新的Payara Micro节点正在启动的场景&加入集群(其中一些事件已经使用IScheduledExecutiorService进行了调度的集群)。在此期间发生以下异常:

异常1:java.lang.RuntimeException:ConcurrentRuntime未初始化

[2021-02-15T23:00:31.870+0800] [] [INFO] [] [fish.payara.nucleus.cluster.PayaraCluster] [tid: _ThreadID=63 _ThreadName=hz.angry_yalow.event-5] [timeMillis: 1613401231870] [levelValue: 800] [[
Data Grid Status 
Payara Data Grid State: DG Version: 4 DG Name: testClusterDev DG Size: 2
Instances: {
DataGrid: testClusterDev Name: testNode0 Lite: false This: true UUID: 493b19ed-a58d-4508-b9ef-f5c58e05b859 Address: /10.41.0.7:6900
DataGrid: testClusterDev Lite: false This: false UUID: f12342bf-a37e-452a-8c67-1d36dd4dbac7 Address: /10.41.0.7:6901
}]]
[2021-02-15T23:00:32.290+0800] [] [WARNING] [] [com.hazelcast.internal.partition.operation.MigrationRequestOperation] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232290] [levelValue: 900] [[
[10.41.0.7]:6900 [testClusterDev] [4.1] Failure while executing MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}
com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.RuntimeException: ConcurrentRuntime not initialized
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:103)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:292)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.readData(ScheduledRunnableAdapter.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.TaskDefinition.readData(TaskDefinition.java:144)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.ScheduledTaskDescriptor.readData(ScheduledTaskDescriptor.java:208)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.operations.ReplicationOperation.readInternal(ReplicationOperation.java:87)
at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.internal.partition.ReplicaFragmentMigrationState.readData(ReplicaFragmentMigrationState.java:97)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.internal.partition.operation.MigrationOperation.readInternal(MigrationOperation.java:249)
at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:205)
at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:346)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:437)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:166)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:136)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.RuntimeException: ConcurrentRuntime not initialized
at org.glassfish.concurrent.runtime.ConcurrentRuntime.getRuntime(ConcurrentRuntime.java:121)
at org.glassfish.concurrent.runtime.InvocationContext.readObject(InvocationContext.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:83)
at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:76)
at fish.payara.nucleus.hazelcast.PayaraHazelcastSerializer.read(PayaraHazelcastSerializer.java:84)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
... 50 more
]]
[2021-02-15T23:00:32.304+0800] [] [WARNING] [] [com.hazelcast.internal.partition.impl.MigrationManager] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232304] [levelValue: 900] [10.41.0.7]:6900 [testClusterDev] [4.1] Migration failed: MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}

这似乎是因为新节点没有完全初始化(因为它刚刚启动)。与下一个异常相比,此异常似乎不那么重要。

异常2:java.lang.NullPointerException:无法执行java.util.concurrent.ScheduledThreadPoolExecutitor$ScheduledFutureTask

[2021-02-15T23:44:19.544+0800] [] [SEVERE] [] [com.hazelcast.spi.impl.executionservice.ExecutionService] [tid: _ThreadID=35 _ThreadName=hz.elated_murdock.scheduled.thread-] [timeMillis: 1613403859544] [levelValue: 1000] [[
[10.4.0.7]:6901 [testClusterDev] [4.1] Failed to execute java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@55a27ce3
java.lang.NullPointerException
at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.isApplicationEnabled(ContextSetupProviderImpl.java:326)
at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.setup(ContextSetupProviderImpl.java:194)
at org.glassfish.enterprise.concurrent.internal.ContextProxyInvocationHandler.invoke(ContextProxyInvocationHandler.java:94)
at com.sun.proxy.$Proxy154.run(Unknown Source)
at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.call(ScheduledRunnableAdapter.java:56)
at com.hazelcast.scheduledexecutor.impl.TaskRunner.call(TaskRunner.java:78)
at com.hazelcast.scheduledexecutor.impl.TaskRunner.run(TaskRunner.java:104)
at com.hazelcast.spi.impl.executionservice.impl.DelegateAndSkipOnConcurrentExecutionDecorator$DelegateDecorator.run(DelegateAndSkipOnConcurrentExecutionDecorator.java:77)
at com.hazelcast.internal.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:217)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
]]

此异常发生在正在加入群集的新节点上。这种情况并不总是发生,可能Hazelcast试图在正在启动的新节点上执行事件,但由于环境仍未完全初始化而失败。在两次这样的尝试失败后,Hazelcast卸载了事件。


实施见解:

使用IScheduledExecutorService(驻留在主应用程序WAR中应用程序范围的bean中)调度事件的方法:

@Resource
ContextService _ctxService;
public void sheduleClusteredEvent() {
IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "TestEvent1"));
Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
executorService.scheduleAtFixedRate((Runnable) ceProxy, 0, 3, TimeUnit.SECONDS);
}

ClusteredEvent类(位于一个单独的JAR中,并通过--addLibs-param添加到Payara Micro的类路径中)。它需要以某种方式通知主应用程序要触发的事件,因此使用BeanManager.fireEvent()。

public class ClusteredEvent implements Runnable, Serializable {
private final DiagEvent _event;
public ClusteredEvent(DiagEvent event) {
_event = event;
}
@Override
public void run() {
// For sake of shortness - all check for nulls etc. were removed
((BeanManager) ic.lookup("java:comp/BeanManager")).fireEvent(_event);
}
}

所以我的问题:

  1. 如何解决上述异常/问题
  2. 我在Payara Micro集群中实现协同集群事件行为的方向是否正确?我希望这是一项开箱即用的琐碎任务,但它需要一些自定义实现,因为持久定时器无法按要求工作。Payara Micro Cluster(>=v5.2021.1)是否还有其他更优雅的方式可以实现协调的集群事件行为

非常感谢您!


更新1:

请记住,本练习的主要目的是在Payara Micro Cluster中提供协调的计时器(事件)功能,因此非常欢迎关于更优雅解决方案的建议。

回答评论中的问题/建议:

Q1:

为什么需要为偶数对象创建上下文代理?

A1:事实上,用普通的ClusteredEvent()对象制作上下文代理增加了这里的主要复杂性,并导致了上述异常(意思是:在不制作上下文代理的情况下调度ClusteredEvent()-工作正常,不会导致异常,但有一点需要注意)。

之所以使用上下文代理,是因为我需要从IScheduledExecutorService启动的非托管线程以某种方式触发Payara Micro上运行的主应用程序。到目前为止,我还没有找到任何其他可行的方法来从非托管线程触发主应用程序中的任何CDI/EJB bean。仅使其具有上下文-例如,允许ClusteredEvent.run()通过BeanManger与主应用程序通信。

欢迎就如何在非托管线程和在单独应用程序中运行的CDI/EJB bean之间建立通信(两者都在同一Payara Micro实例上运行)提出任何建议。

Q2:

例如,您可以将ceProxy包装到Runnable,该Runnable在try-catch块中执行ceProxy.run()

A2:我试过了,它确实有助于处理";例外2";如上所述。我在下面发布ClusteredEventWrapper类的实现,try/catch在run()方法内部处理";例外2";。

第三季度:

第一个异常来自试图反序列化新实例上的代理,由于代理需要初始化了要反序列化的环境。要解决此问题,您需要包装ceProxy对象并自定义包装器,等待ContextService初始化。

A3:ClusteredEventWrapper的序列化/反序列化添加自定义实现确实允许处理";例外1";,但在这里,我仍在努力寻找最好的处理方法。通过Thread.sleep()-延迟反序列化会导致新的(不同的)异常。抑制异常-需要检查,但在这种情况下,我担心ClusteredEventWrapper不会在新的(启动)节点上正确反序列化,因为Hazelcast会认为同步很好,不会再次尝试同步(我可能错了-我仍然需要检查)。正如目前看来的那样,Hazelcast尝试多次同步;例外1";走了。

封装ClusteredEvent:的ClusteredEventWrapper的实现

public class ClusteredEventWrapper implements Runnable, Serializable {
private static final long serialVersionUID = 5878537035999797427L;
private static final Logger LOG = Logger.getLogger(ClusteredEventWrapper.class.getName());
private final Runnable _clusteredEvent;
public ClusteredEventWrapper(Runnable clusteredEvent) {
_clusteredEvent = clusteredEvent;
}
@Override
public void run() {
try {
_clusteredEvent.run();
} catch (Throwable e) {
if (e instanceof NullPointerException
&& e.getStackTrace() != null && e.getStackTrace().length > 0
&& "org.glassfish.concurrent.runtime.ContextSetupProviderImpl".equals(e.getStackTrace()[0].getClassName())
&& "isApplicationEnabled".equals(e.getStackTrace()[0].getMethodName())) {
// Means we got the "Exception 2" (posted above)
LOG.log(Level.WARNING, "Skipping scheduled event execution on this node as this node is still being initialized...");
} else {
LOG.log(Level.SEVERE, "Error executing scheduled event", e);
}
}
}

private void writeObject(ObjectOutputStream out) throws IOException {
LOG.log(Level.INFO, "1_WRITE_OBJECT...");
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
LOG.log(Level.INFO, "2_READ_OBJECT...");
int retry = 0;
while (readObjectInner(in) != true && retry < 5) { // This doesn't work good, need to think of some other way on handling it
retry++;
LOG.log(Level.INFO, "2_READ_OBJECT: retry {0}", retry);
try {
// We need to wait 
Thread.sleep(15000);
} catch (InterruptedException ex) {
}
}
}

private boolean readObjectInner(ObjectInputStream in) throws IOException, ClassNotFoundException {
try {
in.defaultReadObject();
return true;
} catch (Throwable e) {
if (e instanceof RuntimeException && "ConcurrentRuntime not initialized".equals(e.getMessage())) {
// This means node which is trying to desiarialize this objet is not ready yet
return false;
} else {
// For all other exceptions - we throw error
throw e;
}
}
}
}

所以现在事件按以下方式安排:

@Resource
ContextService _ctxService;
public void sheduleClusteredEvent() {
IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "PersistentEvent1"));
Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
executorService.scheduleAtFixedRate(new ClusteredEventWrapper((Runnable) ceProxy), 0, 3, TimeUnit.SECONDS);
}

下面我在评论中发布了基于@OndroMih建议的实施解决方案:

摘录1:

。。。更好的方法是避免将对象包装成上下文,而是将BeanManager注册到全局变量中(singleton)。在ClusteredEvent.run()中从静态方法(例如Registry.getBeanManager())中检索它。这方法必须等到应用程序启动并保存其BeanManager实例与Registry.setBeanManager()

还有这个:

摘录2:

或者,如果您存储对ManagedExecutorService而不是BeanManager,执行运行方法,然后注入所需的任何内容。

@OndroMih,请将其作为回复发布-我会将其标记为已接受的答案!


在详细介绍实现之前-在我们的应用程序包上只需几句话:它包括:

  1. war文件作为Uber jar绑定到Payara Micro中,因此我们不重新部署应用程序战争,我们启动和停止整个Payara Micro,并在其上部署战争
  2. 以及主要在Hazelcast中使用并通过CCD_ 21 arg提供给Payara Micro Uber jar的具有少量类的微小CCD_

现在关于为我们提供集群定时器/事件所需行为的实现(请参阅第一篇文章):

I) 按照上面的建议使用ManagedExecutorService看起来确实更加灵活,因为它允许将任何想要的对象注入集群事件中,所以我从这种方法开始。但由于某种原因,我无法注射任何东西。由于时间有限,我把这个留给了未来的调查,并转向了下一种方法。在这篇文章的末尾,我还提供了这个案例的示例代码。

II) 所以我切换到BeanManager的场景。

  1. 我得到了如下实现的Registrysignleton(为了简洁起见,删除了所有注释)。此类驻留在通过--addLibsarg添加到Payara Micro的小罐子中:
public final class Registry {
private ManagedExecutorService _executorService;
private BeanManager _beanManager;
private Registry() {
}
public ManagedExecutorService getExecutorService() {
return _executorService;
}
public void setExecutorService(ManagedExecutorService executorService) {
_executorService = executorService;
}
public BeanManager getBeanManager() {
return _beanManager;
}
public void setBeanManager(BeanManager beanManager) {
_beanManager = beanManager;
}
public static Registry getInstance() {
return InstanceHolder._instance;
}
private static class InstanceHolder {
private static final Registry _instance = new Registry();
}
}
  1. 在主应用程序大战中,我们已经有了一个AppListener类,它在部署应用程序时侦听事件,因此我们在其中添加了Registry填充逻辑:
public class AppListener implements SystemEventListener {
...
@Resource
private ManagedExecutorService _managedExecutorService;
@Resource
private BeanManager _beanManager;
@Override
public void processEvent(SystemEvent event) throws AbortProcessingException {
try {
if (event instanceof PostConstructApplicationEvent) {
LOG.log(Level.INFO, ">> Application started");
...
// Once app marked as started - populate global objects in the Registry
Registry.getInstance().setExecutorService(_managedExecutorService);
Registry.getInstance().setBeanManager(_beanManager);
}

...

} catch (Exception e) {
LOG.log(Level.SEVERE, ">> Error processing event: " + event, e);
}
}
}
  1. ClusteredEvent类,按计划通过IScheduledExecutorService.scheduleAtFixedRate()也驻留在小jar中,并具有以下实现:
public final class ClusteredEvent implements NamedTask, Runnable, Serializable {
...
private final MultiTenantEvent _event;
public ClusteredEvent(MultiTenantEvent event) {
if (event == null) {
throw new NullPointerException("Event can not be null");
}
_event = event;
}
@Override
public void run() {
try {
if (Registry.getInstance().getBeanManager() == null) {
LOG.log(Level.WARNING, "Skipping timer execution - application not initialized yet...");
return;
}
Registry.getInstance().getBeanManager().fireEvent(_event);
} catch (Throwable e) {
LOG.log(Level.SEVERE, "Error executing timer: " + _event, e);
}
}
@Override
public final String getName() {
return _event.getName();
}
}
  1. 基本上就是这些。日程安排使用以下简单步骤完成:
@Resource(lookup = "payara/Hazelcast")
private HazelcastInstance _instance;
_instance.getScheduledExecutorService("default").scheduleAtFixedRate(new ClusteredEvent(event), initialDelaySec, invocationPeriodSec, TimeUnit.SECONDS);

到目前为止,所有测试都很好。我担心Registry.getBeanManager()在一段时间后会因为某个地方的一些封闭上下文而被"破坏"(我不确定BeanManager引用的性质),但测试表明,对BeanManager的引用在1天后仍然有效,所以希望它能正常工作。

另一个问题(即使不是问题,但需要注意)是,无法控制IScheduledExecutorService将触发哪个节点事件,例如当事件在尚未初始化(仍在启动)的节点上触发时,该事件将被跳过。但对于我们的使用场景来说,这是可以接受的,所以目前我们可以接受这些考虑。


回到ManagedExecutorService:ClusteredEvent的使用问题,实现如下:

public class ClusteredEvent implements Runnable, Serializable {
private final MultiTenantEvent _event;
public ClusteredEvent(MultiTenantEvent event) {
_event = event;
}
@Override
public void run() {
try {
LOG.log(Level.INFO, "TIMER THREAD NAME: {0}", Thread.currentThread().getName());
if (Registry.getInstance().getExecutorService() == null) {
LOG.log(Level.WARNING, "Skipping timer execution - application not initialized yet...");
return;
}

Registry.getInstance().getExecutorService().submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
LOG.log(Level.INFO, "Timer.Run() THREAD NAME: {0}", Thread.currentThread().getName());                    
String beanManagerJndiName = "java:comp/BeanManager";
try {
Context ic = new InitialContext();
BeanManager beanManager = (BeanManager) ic.lookup(beanManagerJndiName);
beanManager.fireEvent(_event);
return true;
} catch (NullPointerException | NamingException ex) {
LOG.log(Level.SEVERE, "ERROR: no BeanManager resource could be located by JNDI name: " + beanManagerJndiName, ex);
return false;
}
}
}).get();
} catch (Throwable e) {
LOG.log(Level.SEVERE, "Error executing timer: " + _event, e);
}
}
}

输出如下:

[2021-02-24 07:56:07] [INFO] [ua.appName.model.event.ClusteredEvent run]
TIMER THREAD NAME: hz.competent_mccarthy.cached.thread-11
[2021-02-24 07:56:07] [INFO] [ua.appName.model.event.ClusteredEvent$1 call]
Timer.Run() THREAD NAME: concurrent/__defaultManagedExecutorService-managedThreadFactory-Thread-1
[2021-02-24 07:56:07] [SEVERE] [ua.appName.model.event.ClusteredEvent$1 call]
ERROR: no BeanManager resource could be located by JNDI name: java:comp/BeanManager
javax.naming.NamingException: Lookup failed for 'java:comp/BeanManager' in SerialContext[myEnv={java.naming.factory.initial=com.sun.enterprise.naming.impl.SerialInitContextFactory, java.naming.factory.url.pkgs=com.sun.enterprise.naming, java.naming.factory.state=com.sun.corba.ee.impl.presentation.rmi.JNDIStateFactoryImpl} [Root exception is javax.naming.NamingException: Invocation exception: Got null ComponentInvocation ]
at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:496)
at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:442)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at ua.appName.model.event.ClusteredEvent$1.call(ClusteredEvent.java:70)
at ua.appName.model.event.ClusteredEvent$1.call(ClusteredEvent.java:63)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.glassfish.enterprise.concurrent.internal.ManagedFutureTask.run(ManagedFutureTask.java:143)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl$ManagedThread.run(ManagedThreadFactoryImpl.java:250)
Caused by: javax.naming.NamingException: Invocation exception: Got null ComponentInvocation 
at com.sun.enterprise.naming.impl.GlassfishNamingManagerImpl.getComponentId(GlassfishNamingManagerImpl.java:870)
at com.sun.enterprise.naming.impl.GlassfishNamingManagerImpl.lookup(GlassfishNamingManagerImpl.java:737)
at com.sun.enterprise.naming.impl.JavaURLContext.lookup(JavaURLContext.java:167)
at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:476)
... 11 more

因此,第Timer.Run() THREAD NAME: concurrent/__defaultManagedExecutorService-managedThreadFactory-Thread-1行确认代码已经在托管线程中运行,但我仍然无法注入或查找任何内容。这次我把调查留给了未来。


再次感谢@OndroMih对实施的建议!

谢谢!


截至2023年6月6日的更新:


因此,在我将应用程序迁移到Payara 6上的Jakarta EE 10后,必须更改以下行:发件人:

Registry.getInstance().getBeanManager().fireEvent(_event);

收件人:

Registry.getInstance().getBeanManager().getEvent().fire(_event);

根据这些弃用通知:https://github.com/jakartaee/cdi/issues/472

相关内容

  • 没有找到相关文章

最新更新