非阻塞功能,保持秩序



我有以下方法:

void store(SomeObject o) {  
}  

该方法的思想是将o存储到永久存储器,但是函数不应该阻塞。也就是说,我不能/一定不能在称为store的同一线程中进行实际存储。
我也不能启动一个线程并存储来自另一个线程的对象,因为store可能被称为"巨大"的次数,我不想开始生成线程。所以我选择了一些我不知道如何运作的选项:
1)使用线程池(Executor族)
2)在store中,将对象存储在数组列表中并返回。当数组列表达到1000(随机数)时,启动另一个线程将数组列表"刷新"到存储中。但我仍然可能有太多的线程(线程池?)的问题
因此,在这两种情况下,我唯一的要求是,我以与传递给store的完全相同的顺序持久地存储对象。使用多个线程会把事情搞混。
如何解决这个问题?
我如何确保:
1)非阻塞store
2)准确的插入顺序
我不关心任何存储保证。例如,如果某些东西崩溃,我不关心丢失数据,例如在存储它们之前缓存在数组列表中。

我将使用SingleThreadExecutor和BlockingQueue。

SingleThreadExecutor顾名思义只有一个线程。使用它从Queue中轮询和持久化对象,如果为空则阻塞。

你可以在你的store方法中为队列添加not blocking。

编辑实际上,您甚至不需要额外的Queue - newSingleThreadExecutor的JavaDoc说:

创建一个Executor,该Executor使用单个工作线程无界队列上操作。(但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,将会有一个新的线程取代它。)任务保证按顺序执行,并且在任何给定时间活动的任务不超过一个。与newFixedThreadPool(1)不同,返回的执行器保证不能被重新配置以使用其他线程。

所以我认为这正是你所需要的。

private final ExecutorService persistor = Executors.newSingleThreadExecutor();
public void store( final SomeObject o ){
    persistor.submit( new Runnable(){
            @Override public void run(){
                // your persist-code here.
            }
        } );
}

使用具有准无限循环和使用额外队列的Runnable的优点是可以编写一些"Burst"功能。例如,你可以让它等待持久化,只有当队列中有10个元素或最老的元素至少在1分钟前被添加…

我建议使用我设计的一个库——Chronicle-Queue。

允许你在不阻塞的情况下写入当前线程。它最初是为低延迟交易系统设计的。对于小消息,写一条消息大约需要300ns。

您不需要使用后台线程或堆上队列,并且默认情况下它不会等待数据写入磁盘。它还确保所有读者的顺序一致。如果程序在调用finish()之后的任何时间点死亡,则消息不会丢失。(除非操作系统崩溃或断电)还支持复制,避免数据丢失

有一个单独的线程从队列的末尾获取项目(阻塞在空队列上),并将它们写入磁盘。主线程的store()函数只是将项目添加到队列的开头。

这是一个粗略的想法(尽管我认为在生产代码中会有更干净或更快的方法来做这件事,这取决于你需要的东西有多快):

import java.util.*;
import java.io.*;
import java.util.concurrent.*;
class ObjectWriter implements Runnable {
    private final Object END = new Object();
    BlockingQueue<Object> queue = new LinkedBlockingQueue();
    public void store(Object o) throws InterruptedException {
        queue.put(o);
    }
    public ObjectWriter() {
        new Thread(this).start();
    }
    public void close() throws InterruptedException {
        queue.put(END);
    }
    public void run() {
        while (true) {
            try {
                Object o = queue.take();
                if (o == END) {
                    // close output file.
                    return;
                }
                System.out.println(o.toString()); // serialize as appropriate
            } catch (InterruptedException e) {
            }
        }
    }
}
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectWriter w = new ObjectWriter();
        w.store("hello");
        w.store("world");
        w.close();
    }
}

你问题中的评论让你听起来好像不熟悉多线程,但实际上并没有那么难。

你只需要另一个线程负责写存储,从队列中取出项目。-你的store函数只是将对象添加到内存队列中,并继续它的方式。

一些伪代码:

final List<SomeObject> queue = new List<SomeObject>();
void store(SomeObject o) {
    // add it to the queue - note that modifying o after this will also alter the
    // instance in the queue
    synchronized(queue) {
       queue.add(queue);
       queue.notify();  // tell the storage thread there's something in the queue
    }
}
void storageThread() {
    SomeObject item;
    while (notfinished) {
       synchronized(queue) {
          if (queue.length > 0) {
             item = queue.get(0);  // get from start to ensure same order
             queue.removeAt(0);
          } else {
             // wait for something
             queue.wait();
             continue;
          }
       }
       writeToStorage(item);
    }
}

相关内容

  • 没有找到相关文章

最新更新