如何多线程默克尔树哈希



我有一个很大的列表,我希望能够在java中获取Merkle根。它足够大,能够多线程处理会显着加快它的速度,因此,我一直在尝试这样做。

这是我到目前为止的代码:

public static byte[] multiMerkleRoot(ArrayList<byte[]> temp) {
int count = temp.size();
List<byte[]> hashList = new ArrayList<>();
for(byte[] o : temp) {
hashList.add(merkleHash(o));
}
if (count % 2 == 0) {
return getRoot(hashList);
} else {
return merkleHash(concat(getRoot(hashList.subList(0, hashList.size() - 1)), hashList.get(hashList.size() - 1)));
}
}
private static byte[] getRoot(List<byte[]> temp) {
if(temp.size() % 2 != 0) {
return merkleHash(concat(getRoot(temp.subList(0, temp.size() - 1)), temp.get(temp.size() - 1)));
} else {
if (temp.size() > 2) {
List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);
return merkleHash(concat(getRoot(subsets.get(0)), getRoot(subsets.get(1))));
} else {
return merkleHash(concat(temp.get(0), temp.get(1)));
}
}
}
public static byte[] trueMultiMerkleRoot(ArrayList<byte[]> temp, int threads) {
try {
int count = temp.size();
List<byte[]> hashList = new ArrayList<>();
for(byte[] o : temp) {
hashList.add(merkleHash(o));
}
if(count % 2 == 0) {
byte[] chunk1 = null;
switch(threads) {
case 1: chunk1 = getRoot(hashList);
break;
case 2: chunk1 = twoThreadMerkle(hashList);
break;
default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
break;
}
return chunk1;
} else {
byte[] chunk1 = null;
byte[] chunk2 = hashList.get(hashList.size() - 1);
switch(threads) {
case 1: chunk1 = getRoot(hashList.subList(0, hashList.size() - 1));
break;
case 2: chunk1 = twoThreadMerkle(hashList.subList(0, hashList.size() - 1));
break;
default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
break;
}
return chunk1;
}
} catch(Exception e) {
return null;
}
}
private static byte[] twoThreadMerkle(List<byte[]> temp) throws Exception {
if (!(temp.size() >= 2)) {
return twoThreadMerkle(temp);
} else {
if(temp.size() % 2 != 0) {
return getRoot(temp);
} else {
List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);
Executor exe1 = Executors.newSingleThreadExecutor();
Executor exe2 = Executors.newSingleThreadExecutor();
Future<byte[]> fut1 = ((ExecutorService) exe1).submit(() -> getRoot(subsets.get(0)));
Future<byte[]> fut2 = ((ExecutorService) exe2).submit(() -> getRoot(subsets.get(1)));
while ((!fut1.isDone()) || (!fut2.isDone())) {
Thread.sleep(500);
}
return merkleHash(concat(fut1.get(), fut2.get()));
}
}
}

multiMerkleRoot是单线程版本,trueMultiMerkleRoot是多线程版本的尝试。

这是我的问题:无论我使用什么大小的列表(我尝试过使用 2 的精确幂、奇数、偶数、小数和大(,我总是从这两种方法中得到两个不同的答案,我一辈子都无法弄清楚如何解决这个问题。

在这个实现中,merkleHash(( 只是 Keccak 256 的包装器,我用它来散列我正在连接的两个字节数组。

如果有人能以任何方式帮助我解决这个问题,无论是向我展示我的代码哪里出了问题以及如何修复它,还是只是让我的代码着火并告诉我如何正确做到这一点,我将不胜感激。

编辑:在我意识到以前的方法存在一些问题后,我尝试了另一种方法。但是,即使我认为它更接近,这个仍然不会多线程。

这是我的新代码:

package crypto;
import org.bouncycastle.util.encoders.Hex;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import static crypto.Hash.keccak256;
import static util.ByteUtil.concat;
public class Merkle {
private Queue<byte[]> data;
public Merkle() {
this.data = new LinkedList<>();
}
public Merkle(ArrayList<byte[]> in) {
this.data = new LinkedList<>();
this.data.addAll(in);
}
public void add(List<byte[]> in) {
data.addAll(in);
}
public void add(byte[] in) {
data.add(in);
}
public byte[] hash() {
Queue<byte[]> nextLevel = new LinkedList<>();
while((data.size() > 1) || (nextLevel.size() > 1)) {
while(data.size() > 0) {
if(data.size() > 1) {
nextLevel.add(merkleHash(data.remove(), data.remove()));
} else {
nextLevel.add(data.remove());
}
}
data.addAll(nextLevel);
nextLevel.clear();
}
return data.remove();
}
private byte[] hash(Queue<byte[]> data) {
Queue<byte[]> nextLevel = new LinkedList<>();
while((data.size() > 1) || (nextLevel.size() > 1)) {
while(data.size() > 0) {
if(data.size() > 1) {
nextLevel.add(merkleHash(data.remove(), data.remove()));
} else {
nextLevel.add(data.remove());
}
}
data.addAll(nextLevel);
nextLevel.clear();
}
return data.remove();
}
public byte[] dualHash() throws Exception {
Queue<byte[]> temp1 = new LinkedList<>();
Queue<byte[]> temp2 = new LinkedList<>();
if(data.size() == Math.pow(2, log2(data.size()))) return hash();
int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
while(temp1.size() < temponesize) {
temp1.add(data.remove());
}
while(!data.isEmpty()) {
temp2.add(data.remove());
}
/*
ExecutorService exe1 = Executors.newSingleThreadExecutor();
ExecutorService exe2 = Executors.newSingleThreadExecutor();
Callable<byte[]> call1 = new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return hash(temp1);
}
};
Callable<byte[]> call2 = new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return hash(temp2);
}
};
Future<byte[]> fut1 = exe1.submit(call1);
Future<byte[]> fut2 = exe2.submit(call2);
*/
byte[] tem1 = hash(temp1);
byte[] tem2 = hash(temp2);

return merkleHash(tem1, tem2);
}
public int size() {
return data.size();
}
private byte[] merkleHash(byte[] a, byte[] b) {
return keccak256(concat(a, b));
}
private byte[] merkleHash(byte[] a) {
return keccak256(a);
}
private int log2(int x) {
return (int)Math.floor((Math.log(x))/(Math.log(2)));
}
}

如果我们专门研究 dualHash 方法,在这种情况下,它可以工作并给我与哈希方法相同的结果。但是,当我尝试将其委托给两个线程时,如下所示:

public byte[] dualHash() throws Exception {
Queue<byte[]> temp1 = new LinkedList<>();
Queue<byte[]> temp2 = new LinkedList<>();
if(data.size() == Math.pow(2, log2(data.size()))) return hash();
int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
while(temp1.size() < temponesize) {
temp1.add(data.remove());
}
while(!data.isEmpty()) {
temp2.add(data.remove());
}
ExecutorService exe1 = Executors.newSingleThreadExecutor();
ExecutorService exe2 = Executors.newSingleThreadExecutor();
Callable<byte[]> call1 = new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return hash(temp1);
}
};
Callable<byte[]> call2 = new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return hash(temp2);
}
};
Future<byte[]> fut1 = exe1.submit(call1);
Future<byte[]> fut2 = exe2.submit(call2);
byte[] tem1 = fut1.get();
byte[] tem2 = fut2.get();

return merkleHash(tem1, tem2);
}

它不再给我预期的结果。 知道为什么吗?

谢谢!

找到解决方案!

事实证明,我的代码不是问题所在(至少编辑中的代码,我 100% 确定第一个代码块是完全错误的(。问题是,两个线程都在尝试哈希结果,同时都使用一个 MessageDigest 实例。现在我已经强制它们使用单独的实例,代码运行良好。

最新更新