如何在Python中使用并行处理生成随机素数?



我正在用python实现RSA,最初我使用python Multiprocessing包并行处理生成素数随机数。我正在用米勒·拉宾算法来检查质数。这种并行实现应该运行得更快,但我不明白为什么不是这样。并行处理耗费了大量时间,即使是像50位这样较小的素数大小。有人能帮忙解决这个问题吗?(注:我是并行处理的初学者)

这是我的python代码(实现函数+ RSA算法实现)

所有必需的功能:

# Prime Generation using Parallel Processing (Miller Rabin)
import time
from multiprocessing import Pool
import random
# Prime-Generating-Functions start---
from random import randrange, getrandbits
def is_prime(n, k=128):
# Test if n is not even.
# But care, 2 is prime !
if n == 2 or n == 3:
return True
if n <= 1 or n % 2 == 0:
return False
# find r and s
s = 0
r = n - 1
while r & 1 == 0:
s += 1
r //= 2
flag=1
p=Pool()
results = [p.apply_async(primetest, args=(s,r,n,)) for x in range(k)]
for k in results:
output = [k.get()]
#print("VALUE OF OUTPUT",k.get())
if k.get()==False:
flag=0
break
if flag==0:
p.close()
p.join()
return False
else:
p.close()
p.join()
return True
def primetest(s,r,n):
#for _ in range(k):
#print("Test.value in primetest", test.value)
a = randrange(2, n - 1)
x = pow(a, r, n)
if x != 1 and x != n - 1:
j = 1
while j < s and x != n - 1:
x = pow(x, 2, n)
if x == 1:
return False
j += 1
if x != n - 1:
return False
return True
def generate_prime_candidate(length):
# generate random bits
p = getrandbits(length)
# apply a mask to set MSB and LSB to 1
p |= (1 << length - 1) | 1
return p
def generate_prime_number(length=1024):
p = 4
# keep generating while the primality test fail
while not is_prime(p, 128):
p = generate_prime_candidate(length)
#print("value of p is",p)
return p
# Prime-Generating-Functions end---

# Miscellaneous Functions
def gcd(a, b):
while b != 0:
a, b = b, a % b
return a
def modInverse(a, m):
m0 = m
y = 0
x = 1
if (m == 1):
return 0
while (a > 1):
# q is quotient
q = a // m
t = m
# m is remainder now, process
# same as Euclid's algo
m = a % m
a = t
t = y
# Update x and y
y = x - q * y
x = t
# Make x positive
if (x < 0):
x = x + m0
return x

RSA算法从这里开始:

import time    
#key gen start----------------------------------------
start_key_gen_time = time.time()
# print("Key Generation:n");
p = generate_prime_number(50) #Takes the size (eg 50) in bits, and generates random prime of that size
q = generate_prime_number(50)
# print("Is p=",p, "prime ? ", is_prime(p));
# print("Is q=",q,"prime ?",is_prime(q));
n = p*q; # evaluating n
# print("nn=",n);
phi = (p-1)*(q-1); # evaluating phi
# print("phi=", phi)
e = ZZ.random_element(phi)
while (gcd(e, phi) != 1):
e = ZZ.random_element(phi) #choosing e
# print("ne=",e);
# print("*e < phi=",e < phi) #checking conditions for e
# print("*gcd(e,phi)=",gcd(e,phi))
# print("*Is e prime ?", is_prime(e))
d = inverse_mod(e, phi) #finding corresponding d, s.t. e*d = 1*mod(phi)
# print("nd=",d);
# print("*Is d prime ?", is_prime(d))
# print("npublic key = (",n,", ",e,")"); #Generated Key pairs
# print("private key = (",n,", ",d,")n");
final_key_gen_time = time.time() 
#key gen finish------------------------------------------------
total_key_gen_time = final_key_gen_time - start_key_gen_time
print("nTotal Key generation time taken in seconds: ", total_key_gen_time )

#Encryption start--------------------------------------------
start_encrypt_time = time.time()
# print("Encryption & Decryption:n"); 
m = 59; #print("original msg=",m) #original message m
c = power_mod(m,e,n); #print("encrypted msg=",c) #encrypted message c
finish_encrypt_time = time.time()
#Encryption finish---------------------------------------------
total_encrypt_time = finish_encrypt_time - start_encrypt_time
print("nTotal encrypt taken in seconds: ", total_encrypt_time )
#Decryption start----------------------------------------------
start_decrypt_time = time.time() 
decrypt_msg = power_mod(c,d,n) # decrypting 
# print("decrypted msg=",decrypt_msg)# so m was correctly decrypted
finish_decrypt_time = time.time()
#Decryption finish----------------------------------------------
total_decrypt_time = finish_decrypt_time - start_decrypt_time
print("nTotal decrypt time taken in seconds: ", total_decrypt_time )

total_time_taken = total_key_gen_time + total_encrypt_time + total_decrypt_time
print("nnTotal algorithm time taken in seconds: ", total_time_taken)
print("nIs decrypted msg & original msg same?", (decrypt_msg==m))

最佳解决方案

我不确定在这种情况下我是否理解Time complexity: O(sqrt(n))Space complexity: O(1)的概念,但是功能prime(n)可能是fastest way (least iterations)计算一个数是否为任意大小的素数。

这可能是今天11号互联网上最好的解决方案2022年3月。欢迎反馈和使用。

相同的代码可以应用于任何语言,如C, c++, Go Lang,Java, .NET, Python, Rust等具有相同的逻辑和性能的好处。它非常快。我以前从未见过这样的实现

这也可以帮助您避免并行处理。这大大减少了迭代次数。

如果你正在考虑速度和性能,这里是"""BEST"""有希望的解决方案,我可以给出:

对于n == 100000,而不是常规的100000,最大迭代次数为16666

function prime(n) {

if ((n === 2 || n === 3 || n === 5 || n === 7)) {
return true
}
if (n === 1 || ((n > 7) && (n % 5 == 0 || n % 7 == 0 || n % 2 == 0 || n % 3 == 0))) {
return false
}
if ((Number.isInteger(((n - 1) / 6))) || (Number.isInteger((n + 1) / 6))) {
for (let i = 1; i < n; i++) {
let five = (5 + (i * 6)), seven = (7 + (i * 6))
if ((((n / five) > 1) && Number.isInteger((n / five))) || (((n / seven) > 1) && (Number.isInteger(n / seven)))) {
return false;
}
if ((i * 6) > n) {
break;
}
}
return true
}
return false
}

下面是对所有计算方法的分析:

检验素数的常规方法:

def isPrimeConventionalWay(n):
count = 0
if (n <= 1):
return False;
# Check from 2 to n-1
# Max iterations 99998 for n == 100000 
for i in range(2,n):
# Counting Iterations
count += 1
if (n % i == 0):
print("count: Prime Conventional way", count)
return False;
print("count: Prime Conventional way", count)
return True;

平方根质数检测方法:

def isPrimeSquarerootWay(num):
count = 0
# if not is_number num return False
if (num < 2):
print("count: Prime Squareroot way", count)
return False

s = math.sqrt(num)
for  i in range(2, s):
# Counting Iterations
count += 1
if (num % i == 0):
print("count: Prime Squareroot way", count)
return False
print("count: Prime Squareroot way", count)
return True

其他方式:

def isprimeAKSWay(n):
"""Returns True if n is prime."""
count = 0
if n == 2:
return True
if n == 3:
return True
if n % 2 == 0:
return False
if n % 3 == 0:
return False
i = 5
w = 2
while i * i <= n:
count += 1
if n % i == 0:
print("count: Prime AKS - Mersenne primes - Fermat's little theorem or whatever way", count)
return False
i += w
w = 6 - w
print("count: Prime AKS - Mersenne primes - Fermat's little theorem or whatever way", count)
return True

建议检查素数的方法:

def prime(n):
count = 0
if ((n == 2 or n == 3 or n == 5 or n == 7)):
print("count: Prime Unconventional way", count)
return True

if (n == 1 or ((n > 7) and (n % 5 == 0 or n % 7 == 0 or n % 2 == 0 or n % 3 == 0))):
print("count: Prime Unconventional way", count)
return False

if (((n - 1) / 6).is_integer()) or (((n + 1) / 6).is_integer()):
for i in range(1, n):
# Counting Iterations
count += 1
five = 5 + (i * 6)
seven = 7 + (i * 6)
if ((((n / five) > 1) and (n / five).is_integer()) or (((n / seven) > 1) and ((n / seven).is_integer()))):
print("count: Prime Unconventional way", count)
return False;

if ((i * 6) > n):
# Max iterations 16666 for n == 100000 instead of 100000
break;

print("count: Prime Unconventional way", count)
return True

print("count: Prime Unconventional way", count)
return False

与传统的质数检测方法进行比较的测试。

def test_primecalculations():
count = 0
iterations = 100000
arr = []
for i in range(1, iterations):
traditional = isPrimeConventionalWay(i)
newer = prime(i)
if (traditional == newer):
count = count + 1
else:
arr.push([traditional, newer, i])
print("[count, iterations, arr] list: ", count, iterations, arr)
if (count == iterations):
return True
return False

# print("Tests Passed: ", test_primecalculations())

您将看到check of prime number: 100007的迭代次数计数结果如下:

print("Is Prime 100007: ", isPrimeConventionalWay(100007))
print("Is Prime 100007: ", isPrimeSquarerootWay(100007))
print("Is Prime 100007: ", prime(100007))
print("Is Prime 100007: ", isprimeAKSWay(100007))
count: Prime Conventional way 96
Is Prime 100007:  False
count: Prime Squareroot way 96
Is Prime 100007:  False
count: Prime Unconventional way 15
Is Prime 100007:  False
count: Prime AKS - Mersenne primes - Fermat's little theorem or whatever way 32
Is Prime 100007:  False

最新更新