我有以下方法:
void store(SomeObject o) {
}
该方法的思想是将o
存储到永久存储器,但是函数不应该阻塞。也就是说,我不能/一定不能在称为store
的同一线程中进行实际存储。
我也不能启动一个线程并存储来自另一个线程的对象,因为store
可能被称为"巨大"的次数,我不想开始生成线程。所以我选择了一些我不知道如何运作的选项:
1)使用线程池(Executor
族)
2)在store
中,将对象存储在数组列表中并返回。当数组列表达到1000(随机数)时,启动另一个线程将数组列表"刷新"到存储中。但我仍然可能有太多的线程(线程池?)的问题
因此,在这两种情况下,我唯一的要求是,我以与传递给store
的完全相同的顺序持久地存储对象。使用多个线程会把事情搞混。
如何解决这个问题?
我如何确保:
1)非阻塞store
2)准确的插入顺序
我不关心任何存储保证。例如,如果某些东西崩溃,我不关心丢失数据,例如在存储它们之前缓存在数组列表中。
我将使用SingleThreadExecutor和BlockingQueue。
SingleThreadExecutor顾名思义只有一个线程。使用它从Queue中轮询和持久化对象,如果为空则阻塞。
你可以在你的store方法中为队列添加not blocking。
编辑实际上,您甚至不需要额外的Queue - newSingleThreadExecutor的JavaDoc说:
创建一个Executor,该Executor使用单个工作线程在无界队列上操作。(但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,将会有一个新的线程取代它。)任务保证按顺序执行,并且在任何给定时间活动的任务不超过一个。与newFixedThreadPool(1)不同,返回的执行器保证不能被重新配置以使用其他线程。
所以我认为这正是你所需要的。
private final ExecutorService persistor = Executors.newSingleThreadExecutor();
public void store( final SomeObject o ){
persistor.submit( new Runnable(){
@Override public void run(){
// your persist-code here.
}
} );
}
使用具有准无限循环和使用额外队列的Runnable的优点是可以编写一些"Burst"功能。例如,你可以让它等待持久化,只有当队列中有10个元素或最老的元素至少在1分钟前被添加…
我建议使用我设计的一个库——Chronicle-Queue。
允许你在不阻塞的情况下写入当前线程。它最初是为低延迟交易系统设计的。对于小消息,写一条消息大约需要300ns。
您不需要使用后台线程或堆上队列,并且默认情况下它不会等待数据写入磁盘。它还确保所有读者的顺序一致。如果程序在调用finish()
之后的任何时间点死亡,则消息不会丢失。(除非操作系统崩溃或断电)还支持复制,避免数据丢失
有一个单独的线程从队列的末尾获取项目(阻塞在空队列上),并将它们写入磁盘。主线程的store()
函数只是将项目添加到队列的开头。
这是一个粗略的想法(尽管我认为在生产代码中会有更干净或更快的方法来做这件事,这取决于你需要的东西有多快):
import java.util.*;
import java.io.*;
import java.util.concurrent.*;
class ObjectWriter implements Runnable {
private final Object END = new Object();
BlockingQueue<Object> queue = new LinkedBlockingQueue();
public void store(Object o) throws InterruptedException {
queue.put(o);
}
public ObjectWriter() {
new Thread(this).start();
}
public void close() throws InterruptedException {
queue.put(END);
}
public void run() {
while (true) {
try {
Object o = queue.take();
if (o == END) {
// close output file.
return;
}
System.out.println(o.toString()); // serialize as appropriate
} catch (InterruptedException e) {
}
}
}
}
public class Test {
public static void main(String[] args) throws Exception {
ObjectWriter w = new ObjectWriter();
w.store("hello");
w.store("world");
w.close();
}
}
你问题中的评论让你听起来好像不熟悉多线程,但实际上并没有那么难。
你只需要另一个线程负责写存储,从队列中取出项目。-你的store
函数只是将对象添加到内存队列中,并继续它的方式。
一些伪代码:
final List<SomeObject> queue = new List<SomeObject>();
void store(SomeObject o) {
// add it to the queue - note that modifying o after this will also alter the
// instance in the queue
synchronized(queue) {
queue.add(queue);
queue.notify(); // tell the storage thread there's something in the queue
}
}
void storageThread() {
SomeObject item;
while (notfinished) {
synchronized(queue) {
if (queue.length > 0) {
item = queue.get(0); // get from start to ensure same order
queue.removeAt(0);
} else {
// wait for something
queue.wait();
continue;
}
}
writeToStorage(item);
}
}