如何从Java EE批处理工作发送电子邮件



我需要每天处理大量用户列表,以根据某些情况将电子邮件和短信通知发送。我正在使用Java EE批处理处理模型。我的工作XML如下:

<step id="sendNotification">
    <chunk item-count="10" retry-limit="3">
        <reader ref="myItemReader"></reader>
        <processor ref="myItemProcessor"></processor>
        <writer ref="myItemWriter"></writer>
        <retryable-exception-classes>
            <include class="java.lang.IllegalArgumentException"/>
        </retryable-exception-classes>
    </chunk>
</step>

myitemreader的onopen方法读取数据库中的所有用户,并且readitem()一次使用列表迭代器一次读取一个用户。在MyitemPropersesor中,实际的电子邮件通知发送给用户,然后将用户持续在MyitemWriter类的数据库中,以供该片段。

@Named
public class MyItemReader extends AbstractItemReader {
    private Iterator<User> iterator = null;
    private User lastUser;
    @Inject
    private MyService service;
    @Override
    public void open(Serializable checkpoint) throws Exception {
        super.open(checkpoint);
        List<User> users = service.getUsers();
        iterator = users.iterator();
        if(checkpoint != null) {
            User checkpointUser = (User) checkpoint;
            System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
            while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
                System.out.println("skipping already read users ... ");
            }
        }
    }
    @Override
    public Object readItem() throws Exception {
        User user=null;
        if(iterator.hasNext()) {
            user = iterator.next();
            lastUser = user;
        }
        return user;
    }
    @Override
    public Serializable checkpointInfo() throws Exception {
        return lastUser;
    }
}

我的问题是,CheckPoint存储了上一个块中执行的最后一个记录。如果我有一个与接下来的10个用户的块,并且在第五用户的MyitemProcessor中抛出了例外,那么重试将执行整个Chunck,并且将再次处理所有10个用户。我不希望通知再次发送给已经处理过的用户。

有办法处理此问题吗?这应该如何有效地完成?

任何帮助将不胜感激。谢谢。

我将基于@cheng的评论。我在这里对他的荣誉,希望我的答案在组织和展示选项方面提供了更多的价值。

答案:排队邮件以送出另一个MDB发送电子邮件

背景:

正如@cheng指出的那样,故障意味着整个交易已回滚,检查点没有前进。

那么,如何处理您的块已向某些用户发送电子邮件但不是全部的事实?(您可能会说它回滚,但带有"副作用"。)

因此,我们可以重述您的问题,然后以:如何从批处理步骤发送电子邮件?

好吧,假设您有一种通过交易API发送电子邮件(实现 Xaresource 等),则可以使用该API。

假设您不这样做,我会写交易给JMS队列,然后用单独的MDB发送电子邮件(正如@Cheng在他的一篇评论中所建议的那样)。

建议的替代方案:使用ItemWriter将消息发送给JMS队列,然后使用单独的MDB实际发送电子邮件

使用这种方法,您仍然可以通过批处理处理和DB的更新来提高效率(无论如何您一次只发送电子邮件),并且您可以从简单的检查点和重新启动中受益,而无需编写复杂的错误处理。

这也可能可以作为跨批处理作业和批处理外的模式重复使用。

其他替代方案

我认为不太好的其他一些想法是为了讨论的目的:

添加批处理应用程序逻辑跟踪用户通过电子邮件(使用itemProcessListener)

您可以使用 itemProcessListener 方法构建自己的任一/成功/失败电子邮件的列表:efterProcess和onprocesserror。

在重新启动时,您可以知道当前块中的哪些用户已通过电子邮件进行了电子邮件,因为整个块已经退回,即使已经发送了一些电子邮件。

这当然会使您的批处理逻辑复杂化,您还必须以某种方式坚持此成功或失败列表。再加上这种方法可能是高度特定的(而不是排队进行MDB进行处理)。

但是,这更简单,因为您有一个批处理作业,而无需消息传递提供商和单独的应用程序组件。

如果您走这条路线,则可能需要使用可跳动和"无滚动"的可重试例外的组合。

单项目块

如果您用 item-count =" 1" 定义了块,则避免复杂的检查点和错误处理代码。但是,您牺牲效率,因此,只有批处理的其他方面非常引人注目,这才有意义:例如通过通用界面安排和管理作业,即在作业中重新启动步骤

的失败步骤的能力

如果您要去此路线,则可能需要考虑将套接字和超时例外定义为"无滚动"异常(使用),因为没有什么可从回滚中获得的,您可能想在网络超时问题上重试。

由于您特别提到效率,我猜这对您来说是不好的。

使用交易同步

这也许可以起作用,但是批处理API并不是特别简单,您仍然可以使用块完成,但发送一封或多封电子邮件失败。

您的当前项目处理器正在块交易范围之外做某事,这导致应用程序状态不同步。如果您的要求仅在块中的所有项目成功完成后发送电子邮件,那么您可以将电子邮件零件移至itemWriterListener.afterwrite(项目)。

最新更新