我有一个包含整数的二维矩阵(比如 5000 行 x 8000 列(。我想使用 python 中的多处理将矩阵的每个元素乘以 2,以便每个进程都有一组要处理的行,并获得一个目标函数"array_mult",该函数在已发送的矩阵的分区上完成工作。
数组已按行进行分区,每个分区都发送到(子(进程
import time,os
import multiprocessing as mp
A=[[1,2,3],[4,5,6],[7,8,9]]
global arr
''' I am trying to use a global variable to write the output of the function so that
the storage is persistent and the output doesn't vanish when the process ends'''
def array_mult(a):
'''This is the function which is supposed to
multiply each element of input matrix a'''
print("array is =",a)
for i in range(len(a)):
print("counter is",i)
a[i]=a[i]*2
print(a,os.getpid())
arr.append(a)
if __name__ == '__main__':
starttime = time.time()
array_proc=list()
for i in range(3):
p=mp.Process(target=array_mult, args=(A[i], )) ### I am trying to send partitions of the list as the arg to the function array_mult
array_proc.append(p)
p.start()
for process in array_proc:
process.join()
print(time.time() - starttime)
print(A)
print(arr)
**
- 约束 - 不能使用 python 核心模块之外的任何内容或 python 3.6 以下的任何功能
** 使用 ctypes 库和使用 RawArray 有用吗?如果是这样,我该如何使用它? 还有其他想法来保存二维矩阵吗?(我不想使用 numpy,因为它不是核心包(
'''Created on 12-Jun-2020
@author: Shouvik
'''
import time,os
import multiprocessing as mp
from multiprocessing import sharedctypes
from ctypes import Structure,c_int
num_of_columns=10000
num_of_rows=10000
num_of_cpu=len(os.sched_getaffinity(0))
class Row_Vector(Structure):
_fields_ = [("column", c_int * num_of_columns)]
class array_2d(Structure):
_fields_ = [("Rectangular_Matrix", Row_Vector * num_of_rows)]
'''create_row_boundaries returns a list with the row-numbers which partition the matrix row-wise '''
def create_row_boundaries(num_of_rows):
num_of_cpu=len(os.sched_getaffinity(0))
row_boundary=[0,(int(num_of_rows/num_of_cpu) if num_of_rows>1 else 1)]
'we add num_of_rows/num_of_cpu to each partition'
index=2
while row_boundary[index-1]< (num_of_rows-1):
row_boundary.append(int(row_boundary[index-1])+1)
'''After adding integer number of 'num_of_rows/num_of_cpu' we can be left with any of 0,1,2... (num_of_cpu-1) rows
which should be added to the last element'''
if num_of_rows-num_of_cpu<row_boundary[index-1]+int(num_of_rows/num_of_cpu)<=num_of_rows :
#print("num of rows- num of cpu is",num_of_rows-num_of_cpu,"row_boundary[index]+int(num_of_rows/num_of_cpu)", row_boundary[index]+int(num_of_rows/num_of_cpu))
row_boundary.append(num_of_rows-1)
else:
row_boundary.append(int(row_boundary[index-1])+ int(num_of_rows/num_of_cpu))
index =index+2
return row_boundary
'''matrix_operation operates on each element of the matrix (type RawArray) that is passed to it'''
def matrix_operation(a,row_initial, row_final,column_initial, column_final):
print("this instance of 'matrix_operation' is going to work from row number {} to row number{}".format(row_initial,row_final))
for i in range(row_initial,row_final+1,1):
for j in range(column_initial,column_final):
a[i].column[j]=2 * a[i].column[j]
# print("Row_initial{} to Row_final{} done by process id {}".format(row_initial,row_final,os.getpid()))
# print("The process which operates on below matrix is",os.getpid())
# for i in range(row_initial,row_final+1,1):
# print([a[i].column[j] for j in range(column_initial,column_final)],"pid is {}".format(os.getpid()))
return
if __name__ == '__main__':
'''We create a matrix of type Raw_Array having num_of_rows rows
and each row is a column vector having num_of_columns columns and operate using multiprocessing'''
m1=sharedctypes.RawArray(Row_Vector, num_of_rows)
'''We create another matrix;operate on it sequentially on a single core of the cpu;time the operation'''
m2=sharedctypes.RawArray(Row_Vector, num_of_rows)
'''the two nested for loops below simply populate the matrices m1 and m2 with some values'''
for i in range(num_of_rows):
for j in range(num_of_columns):
m1[i].column[j]=i*j
m2[i].column[j]=i*j
'''The for loop and the print statement below print the matrix row-wise'''
# for i in range(num_of_rows):
# print([m1[i].column[j] for j in range(num_of_columns)] )
matrix_partition_by_row_num=create_row_boundaries(num_of_rows)
index=0
array_proc=list()
starttime = time.time()
for i in range(int(len(matrix_partition_by_row_num)/2)): #originally range had num_of_cpu as the argument
p=mp.Process(target=matrix_operation, args=(m1, matrix_partition_by_row_num[index],matrix_partition_by_row_num[index+1],0,num_of_columns))
'''We pass the matrix m1 and it's various partitioning rows to the 'matrix_operation' function'''
array_proc.append(p)
p.start()
index=index+2
for process in array_proc:
process.join()
print("Time taken for concurrent operation is {:e}".format(time.time() - starttime))
# for i in range(num_of_rows):
# print([m[i].column[j] for j in range(num_of_columns)] )
print("no of cpu",num_of_cpu,"matrix partition values",matrix_partition_by_row_num)
# for i in range(num_of_rows):
# print([m[i].column[j] for j in range(num_of_columns)])
'''We will simply input the entire matrix to the matrix_operation function and time the process'''
sequential_process_starttime=time.time()
matrix_operation(m2, 0, num_of_rows-1, 0, num_of_columns)
print("time taken for sequential operation is {:e}".format(time.time()-sequential_process_starttime))
Is_matrix_operation_correct= True
for i in range(num_of_rows):
for j in range(num_of_columns):
if (m1[i].column[j] != m2[i].column[j]) :
Is_matrix_operation_correct=False
break
print("Is matrix operation correct: {}".format(Is_matrix_operation_correct) )