如何控制子线程的生命周期并将其与主线程同步



我正在尝试创建另一个处理数据的线程,而主线程正在做更多的工作。主线程必须等待,直到另一个线程完成具有所有元素的doStuff。我的实现非常直接。

请看一下processData,然后告诉我还有其他类似Java的方法吗?

我读过Phaser,但仍然无法想象如何使用它,或者我还能尝试什么?

public class MyClass {
private final NodeQueue queue;

MyClass() {
queue = new NodeQueue();
}

public void processData(Set<String> dataSet) {
// allow transfer
queue.transferEnable()
Thread transfer = new Thread(() -> {
queue.transferData();
})
transfer.start();

// doStuff in another thread
for (String element : dataSet) {
queue.add(element);
// do something more
}

// stop transfer
queue.waitTillEmptyQueue();
queue.transferDisable();
try {
transfer.join();
} catch (...) {
// catch
}
}



public class NodeQueue {
private final ConcurrentLinkedQueue<String> queue;

private boolean transferEnabled;

protected NodeQueue() {
queue = new ConcurrentLinkedQueue<>();
transferEnabled = true;
}

protected void transfer() {
while (!queue.isEmpty()) {
doStuff(queue.poll());
}
}

public void transferData() {
while (tranfserEnabled) {
transfer();
}
}

public synchronized void transferEnable() {
transferEnabled = true;
}

public synchronized void transferDisable() {
transferEnabled = false;
}

public void add(String s) {
queue.add(s);
}

public synchronized void waitTillEmptyQueue() {
while (!queue.isEmpty()) {
if (queue.isEmpty()) {
break;
}
}
}
}
}

让我从我自己的后中复制Phaser示例

主线程

// Add producer as a party
Phaser phaser = new Phaser(1);
for (int i=0; i<10000; ++i) {
// Add each task as a party
phaser.register();
queue.put(new Task());
}
// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();
// At the end, there is only 1 party left which is the producer itself

消费者

while (true) {
Task task = queue.take();
processTask(task);
// Task completed and remove itself as a party
phaser.arriveAndDeregister();
}

最新更新