我有一个函数,在一个大的如下所示的未排序的数字数组
import java.util.*;
public class program {
// Linear-search function to find the index of an element
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
// find length of array
int len = arr.length;
int i = 0;
// traverse in the array
while (i < len) {
// if the i-th element is t
// then return the index
if (arr[i] > t) {
return i;
}
else {
i = i + 1;
}
}
return -1;
}
// Driver Code
public static void main(String[] args)
{
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int i = findIndex(my_array, 7);
// find the index of 5
System.out.println("Index position of 5 is: "
+ my_array[i]);
}
}
但是我必须找到一种方法来并行地实现它。我不确定如何开始或做什么,因为我是并行编程领域的新手。
任何帮助都将不胜感激。
最直接的方法是使用并行流,@Govinda sakhaare很好地说明了这一点。
然而,如果你想用这个例子来学习如何使用线程,那么按照下面的步骤并行化你的代码:
- 创建线程
- 将工作分配给线程,即每个线程将尝试找到一个比作为参数传递的值更大的值,但只适用于数组的一部分;
- 第一个发现该值的线程通知其他线程,以便每个线程都退出。
创建线程的方法如下:
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
threads[t] = new Thread(/** the parallel work **/);
threads[t].start();
}
要将工作分配给线程,需要在线程之间拆分数组。最简单的方法实际上是拆分迭代,而不是拆分数组本身。线程接收整个数组,但只处理其中的一些位置,例如:
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads){
for (int i = threadId; i < arr.length; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}
对于每个线程,我们从0 to N-1
中分配一个ID
范围,N
为线程总数。
public class program {
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads, AtomicInteger shared_index){
for (int i = threadId; i < arr.length && shared_index.get() == -1; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}
public static void main(String[] args) throws InterruptedException {
final int N = 8;
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int total_threads = 4;
AtomicInteger shared_index = new AtomicInteger(-1);
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
final int thread_id = t;
threads[t] = new Thread(() ->parallel_work(N, my_array, total_threads, shared_index, thread_id));
threads[t].start();
}
for (Thread thread : threads)
thread.join();
System.out.println("Index of value bigger than " + N + " : " + shared_index.get());
}
private static void parallel_work(int n, int[] my_array, int total_threads, AtomicInteger shared_index, int thread_id) {
int index_found = findIndex(my_array, n, thread_id, total_threads, shared_index);
shared_index.compareAndExchange(NO_FOUND, index_found);
}
}
输出:
Index of value bigger than 8 : 8
您可以使用并行流来并行运行操作。
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
return IntStream.rangeClosed(0, arr.length)
.parallel()
.filter(index -> arr[index] > t)
.findFirst()
.orElse(-1);
}
如果你这样做是为了学习,你可以把数组分成子数组,然后把每一部分提交给一个单独的线程,然后求值并返回结果。
首先,实现一个可以接受子数组并返回索引的可调用对象。
class RangeFinder implements Callable<Integer> {
private final int[] arr;
private final int startIndex;
private final int t;
RangeFinder(int[] arr, int startIndex, int t) {
this.arr = arr;
this.startIndex = startIndex;
this.t = t;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < this.arr.length; i++) {
if (arr[i] > t) {
return startIndex + i;
}
}
return -1;
}
}
现在你可以把数组分成块并提交给并行处理。
public static int findIndex(int arr[], int t) throws ExecutionException, InterruptedException {
if (arr == null) {
return -1;
}
int numberOfThreads = 4;
return findIndexInParallel(arr, t, numberOfThreads);
}
private static int findIndexInParallel(int[] arr, int t, int threadCount) throws ExecutionException, InterruptedException {
if(threadCount > arr.length) {
threadCount = arr.length;
}
int startIndex = 0;
int range = (int) Math.ceil(arr.length / (double) threadCount);
int endIndex = range;
// step 1: create threads using Executor FrameWork
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
// step 2: create object of callable
RangeFinder rangeFinder = new RangeFinder(Arrays.copyOfRange(arr, startIndex, endIndex), startIndex, t);
// Step 3: submit the task to thread pool
Future<Integer> submit = executorService.submit(rangeFinder);
// Step 4: recalculate array indexes for the next iteration
startIndex = startIndex + range;
int newEndIndex = endIndex + range;
endIndex = newEndIndex < arr.length ? newEndIndex : arr.length;
futures.add(submit);
}
// step 5: evaluate and return the results
for (Future future : futures) {
int index = (int) future.get();
if(index != -1) {
executorService.shutdownNow();
return index;
}
}
return -1;
}
上面的代码可以进一步重构,当然,你还需要处理异常。