Spring Executorservice Error-Handling



我实现了 Spring-TaskExecutor(相当于 JDK 1.5 的 Executor(来处理从外部系统接收的通知通知。

仅使用一种方法进行接口:

public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}

以及相应的实现:

public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}

任务执行器(旧版应用程序(的 XML 配置:

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="queueCapacity" value="100"/>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

为corePoolSize和maxPoolSize都设置了1,因为我希望任务按顺序执行(处理任务的池只创建1个线程(。

我想根据收到通知的日期对任务进行排序,因此我需要覆盖此函数以允许优先级排序:

public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}

下面是通知任务类:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {
private final NotificationService notificationService;
private final Notification notification;
public NotificationService(NotificationService notificationService, 
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}
@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}
@Override
public void run() {
notificationService.processNotification(notification);
}
}

这就是我执行它的方式:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

现在,如果出现问题或您如何知道出了问题怎么办?例如,如果其中一个任务引发异常?你是怎么处理的?如果抛出异常,我想记录

。我找到了一篇文章(https://ewirch.github.io/2013/12/a-executor-is-not-a-thread.html(,建议覆盖afterExecute()-类的方法:ThreadPoolExecutor。但是,我目前正在使用Spring的ThreadPoolTaskExecutor,它没有JavaThreadPoolExecutorbeforeExecute()afterExecute()回调方法。

我可以扩展ThreadPoolTaskExecutor并覆盖initializeExecutor()方法并创建我的自定义ThreadPoolExecutor的实例。但问题是initializeExecutor方法使用ThreadPoolTaskExecutor的私有字段。

是否有人有更好的想法或更好的方法。

Spring 文档说

对于替代方法,可以使用构造函数注入直接设置 ThreadPoolExecutor 实例,或使用指向 Executors类的工厂方法定义。要将这样的原始执行器公开为 Spring TaskExecutor,只需使用 ConcurrentTaskExecutor 适配器包装它即可。

但是我没有看到任何与我们可以注入ThreadPoolExecutor相关的构造函数,所以这可能是一个神话,或者他们删除了该功能。

幸运的是,我们有ThreadPoolExecutorFactoryBean可以拯救:

如果您更喜欢本机 ExecutorService 公开,请考虑将 ThreadPoolExecutorFactoryBean 作为此类的替代方法。

这个执行器公开了一个我们可以自定义线程池的方法:

public class NotificationPool extends ThreadPoolExecutorFactoryBean {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
@Override
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new YourCustomThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
}

并覆盖默认的afterExecute回调:

public class YourCustomThreadPoolExecutor extends ThreadPoolExecutor {
public YourCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// Here do something with your exception
}
}

您可以像这样轻松使用。

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
@Override
public void execute(@NotNull Runnable task) {
try {
super.execute(task);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
};

您可以使用正式的 Spring 异步机制。

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
return new ThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler(); // U can customize
}

用法:

public class BlaBla {
@Async("taskExecutor")
public void doSomething() {
...
}
}

最新更新