Java 中的多个线程用于 ConcurrentHashMap



>我做了一个两个线程,一个是获取数据,另一个是保存数据。我的问题是在存储从 Thread1 读取的数据的过程中没有处理它。

我想提取 1,000,000 个元素并将它们创建为文件。元素大小如此之大,所以我将元素大小除以 100,000。然后,循环将运行 10 次。一个线程从另一个服务器读取的数据 100,000。另一个线程从第一个线程获取数据并将其写入文件。

我的原始场景如下:

第一个线程读取总键,值大小。 它将是100,000~1,000,000。我假设我将处理 1,000,000 个数据。然后计数设置为 1,000,000。第一个线程除以 100,000,从服务器读取数据 100,000。然后,First Thread 调用 setData(Key,Value map(。 它将循环 10 次。

第二个线程将循环 10 次。 首先,通过调用 getMap(( 方法获取数据。它调用writeSeq(hashmap(方法。它将数据写入写入器流。它还没有冲洗。这里有一个问题。它通过调用 getMap(( 成功获取数据大小。但是,writeSeq方法不能处理所有大小的值。当我得到 100,000 的大小时,它会随机处理。它将是 100, 1500, 0, 8203 ...

第一个线程如下:

public void run() {
getValueCount(); //initialize value.
while (this.jobFlag) {
getSortedMap(this.count); //count starts the number of all elements size.
//For example, Total size is 1,000,000. Then count will sets a 1,000,000 and it is decreased as 100,000.
// Also setMap() is called in this method.
if (!jobFlag) //If all processing is done, jobFlag is set as false.
break;
}
resetValue();
}

第二个线程如下:

public void run() {
setWriter(); //Writer Stream creates;
double count  = 10; //the number of loop. 
ConcurrentHashMap<String, String> hash = new ConcurrentHashMap<String,String>();
for (int i = 0; i <= count - 1; i++) {
hash = share.getMap();
writeSeq(hash);
}
closeWriter(); //close Writer stream
}

这是共享源:

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
public class ShareData {
ConcurrentHashMap<String, String> map;
public synchronized ConcurrentHashMap<String, String> getMap(){
if (this.map == null) {
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
ConcurrentHashMap<String, String> hashmap = map;
this.map = null;
return hashmap;
}
public synchronized void setMap(ConcurrentHashMap<String, String> KV) {
if (this.map != null) {
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.map = KV;
notify();
}
}

之后,保存数据的第二个线程被盯着。KV 的大小很好,但是在处理 foreach 时不会处理所有值。另外,每次创建文件时,大小都不同。是同步的问题吗?

public synchronized void writeSeq(ConcurrentHashMap<String, String> KV) {
AtomicInteger a = new AtomicInteger(0);
System.out.println(KV.size()); //ex) 65300
redisKV.entrySet().parallelStream().forEach(
entry -> { 
try {
a.incrementAndGet();
writer.append(new Text(entry.getKey()), new Text(entry.getValue()));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
System.out.println(a.get()); //ex) 1300
i = 0;
notify();
}

KV 的大小很好,但在处理 foreach 时不会处理所有值。另外,每次创建文件时,大小都不同。是同步的问题吗?

清楚。 我可以看到一个小问题,但它不太可能导致您描述的问题。

  • if (map == null) wait();代码应该是while循环。

  • if (map != null) wait();代码应为while循环。

问题是,如果一个线程收到虚假通知,它可能会以错误的状态继续map。 您需要重试测试。 (如果您阅读 javadoc forObject,您将看到一个正确实现条件变量的示例。

除此之外,问题的根本原因似乎不在您向我们展示的代码中。


但是,如果我猜测一下,我的猜测是一个线程在ConcurrentHashMap中添加或删除条目,而第二个线程正在处理它1。 您必须适当地使用您向我们展示的getMap/setMap方法(即在适当的点使用适当的参数调用(,以避免两个线程相互干扰。 您尚未向我们展示代码。

因此,如果我的猜测是正确的,那么您的问题是逻辑错误而不是低级同步问题。 但是,如果您需要更好的答案,则需要编写并发布适当的MCVE。


1 - ConcurrentHashMap 的迭代器是弱一致性的。 这意味着,如果在迭代时更新地图,则可能会错过迭代中的条目,或者可能会多次看到它们。

更好的方法是使用BlockingQueue,一个线程放置队列,另一个线程从队列中获取。

i++;不是线程安全的。您将获得比更新更少的计数。请改用AtomicInteger及其incrementAndGet()方法。

最新更新