我正在尝试创建另一个处理数据的线程,而主线程正在做更多的工作。主线程必须等待,直到另一个线程完成具有所有元素的doStuff
。我的实现非常直接。
请看一下processData
,然后告诉我还有其他类似Java的方法吗?
我读过Phaser
,但仍然无法想象如何使用它,或者我还能尝试什么?
public class MyClass {
private final NodeQueue queue;
MyClass() {
queue = new NodeQueue();
}
public void processData(Set<String> dataSet) {
// allow transfer
queue.transferEnable()
Thread transfer = new Thread(() -> {
queue.transferData();
})
transfer.start();
// doStuff in another thread
for (String element : dataSet) {
queue.add(element);
// do something more
}
// stop transfer
queue.waitTillEmptyQueue();
queue.transferDisable();
try {
transfer.join();
} catch (...) {
// catch
}
}
public class NodeQueue {
private final ConcurrentLinkedQueue<String> queue;
private boolean transferEnabled;
protected NodeQueue() {
queue = new ConcurrentLinkedQueue<>();
transferEnabled = true;
}
protected void transfer() {
while (!queue.isEmpty()) {
doStuff(queue.poll());
}
}
public void transferData() {
while (tranfserEnabled) {
transfer();
}
}
public synchronized void transferEnable() {
transferEnabled = true;
}
public synchronized void transferDisable() {
transferEnabled = false;
}
public void add(String s) {
queue.add(s);
}
public synchronized void waitTillEmptyQueue() {
while (!queue.isEmpty()) {
if (queue.isEmpty()) {
break;
}
}
}
}
}
让我从我自己的后中复制Phaser
示例
主线程
// Add producer as a party
Phaser phaser = new Phaser(1);
for (int i=0; i<10000; ++i) {
// Add each task as a party
phaser.register();
queue.put(new Task());
}
// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();
// At the end, there is only 1 party left which is the producer itself
消费者
while (true) {
Task task = queue.take();
processTask(task);
// Task completed and remove itself as a party
phaser.arriveAndDeregister();
}