使用线程将电子邮件复制到javamail中的文件夹时遇到问题



我在使用线程将一些电子邮件复制到其他文件夹时遇到问题,我的问题是,代码没有等待完成任务。

我想按线程移动消息以访问作业,但我需要等待移动所有消息,所以我该如何做到这一点?

private static void moveMessagesToFolders(List<Message> listMessages, Store store, Set<String> setSender) throws MessagingException {
    HashMap<String, List<Message>> mapMessages = separeteMessagesBySender(listMessages, setSender);
    for (Entry<String, List<Message>> mapMessage : mapMessages.entrySet()) {
        Message[] messageArray = mapMessage.getValue().toArray(new Message[mapMessage.getValue().size()]);
        moveMessagesThread(messageArray, mapMessage, store);
    }
}
private static void moveMessagesThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {
    Set<Thread> setThread = createMovimentSetThread(messageArray, mapMessage, store);
    for (Thread thread : setThread) {
        thread.start();
    }
}
private static Set<Thread> createMovimentSetThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {
    int [] threadIndexs = MathUtil.generateIndex(messageArray);
    Set<Thread> setThread = new HashSet<>(threadIndexs.length);
    for (int i = 0; i < threadIndexs.length; i++) {
        setThread.add(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
    }
    return setThread;
}

在我将方法更改为这个实现Executor之后。

private static void moveMessagesThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {
        int [] threadIndexs = MathUtil.generateIndex(messageArray);
        ExecutorService executor = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            executor.execute(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
        }
        executor.shutdown();
    }

实现类线程

public class ThreadMoveMessages implements Callable<Boolean> {
    private Entry<String, List<Message>> mapMessage;
    private Store store;
    private Message[] messageArray;
    private static int indexControler;
    private static int indexLimit;
    public ThreadMoveMessages(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store, int indexEnd) {
        this.messageArray = Arrays.copyOf(messageArray, indexEnd);
        this.indexControler += indexEnd;
        this.indexLimit = indexControler;
        this.mapMessage = mapMessage;
    }
    @Override
    public Boolean call() throws Exception {
        Folder folder = null;
        try {
            folder = this.store.getDefaultFolder().getFolder(this.mapMessage.getKey());
            folder.open(Folder.READ_WRITE);
            folder.appendMessages(this.messageArray);
            EmailUtil.deleteListMessage(this.mapMessage.getValue());
        } catch (MessagingException e) {
            e.printStackTrace();
        }                       
        return true;
    }
}

如果要等待异步计算并等待结果,则应使用FuturesCallables

实现Callable接口:

class MoveMessages implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
    boolean success = true;
    // Your implementation here
    return success;
}

}

接下来将其提交给Executor并检索Future,在Future上调用get(),直到Callable的计算完成。

ExecutorService executor = Executors.newFixedThreadPool(5);
MoveMessages moveMessages = new MoveMessages();
Future<Boolean> submit = executor.submit(moveMessages);
Boolean integer = submit.get(); // Will wait until task is finished
executor.shutdown();

当然,你可以提交更多的任务,把所有的任务都列出来,然后等到所有的任务完成。

编辑:

好吧,首先您说您需要等待,直到所有消息都被移动,所以这种情况的一种方法是将FuturecallableExecutorService一起使用。使用ExecutorService,您不需要创建和启动大量新线程。记住创建新线程会产生成本。在代码中,您为每个发送方创建4个新线程,使用ExecutorService只创建固定数量的线程,并为每个发送方重用它们。以下是您使用ExecutorsFutures的示例,请注意,ExecutorService创建一次用于调用moveMessagesToFolders:

private static ExecutorService executor 

private static void moveMessagesToFolders(List<Message> listMessages, Store store, Set<String> setSender) throws MessagingException {
    executor = Executors.newFixedThreadPool(4);
    HashMap<String, List<Message>> mapMessages = separeteMessagesBySender(listMessages, setSender);
    for (Map.Entry<String, List<Message>> mapMessage : mapMessages.entrySet()) {
        Message[] messageArray = mapMessage.getValue().toArray(new Message[mapMessage.getValue().size()]);
        moveMessagesThread(messageArray, mapMessage, store);
    }
    executor.shutdown();
}
private static void moveMessagesThread(Message[] messageArray, Map.Entry<String, List<Message>> mapMessage, Store store) {
    List<Future<Boolean>> futures = createMovimentSetThread(messageArray, mapMessage, store);
    for (Future<Boolean> future : futures) {
        try {
            Boolean success = future.get(); // Will wait to accomplished all submited Callables
            if(!success) { // Check if all submited callables end succesulfy 
                throw new RuntimeException("Something goes wrong while moving messages");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
private static List<Future<Boolean>> createMovimentSetThread(Message[] messageArray, Map.Entry<String, List<Message>> mapMessage, Store store) {
    int [] threadIndexs = MathUtil.generateIndex(messageArray);
    List<Future<Boolean>> futures = new ArrayList<>();
    for (int i = 0; i < threadIndexs.length; i++) {
        Future<Boolean> submit = executor.submit(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
        futures.add(submit);
    }
    return futures;
}

根据您的评论,使用Fork/Join框架将数组拆分为更小的部分可能是更好的解决方案。在谷歌上查找更多信息。一些链接:

  • http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
  • Java 7:Fork/Join框架

最新更新