优化生锈 N 素数发生器



rust迭代器和lambdas,我已经实现了N素数生成器(原始游乐场/改进的代码)。
与我在同一框中更精通的其他语言的等效功能代码相比,它的性能非常差(慢 10 倍)。
一些数字:

测试calc_primes...板凳: 7,754,795 纳秒/意大利特级 (+/- 1,887,591)

#[macro_use]
extern crate bencher;
use bencher::Bencher; 
use nth_prime::*;                                                                                                   
fn calc_primes(b: &mut Bencher) {
b.iter(|| primes(10000)); 
} 
benchmark_group!(benches, calc_primes); 
benchmark_main!(benches);                           

我希望 rust 会胜过它,但我很难摆脱下面代码中的缺陷。因此,这里呼吁帮助优化此特定实现。

  • 我不喜欢那些Vec分配,有些肯定不需要......
  • 可能会在这里和那里传递更多引用
  • 其他实现在 FFEC 上使用逻辑ORbitarray这非常快。下面的 rust 中的逻辑在fold调用中表达了相同的方法,但这使用的是Vec<bool>它必须慢而不是位数组等效......
  • 使用不同的数据类型而不是Vec<bool>,不介意分配大量字节数组并使用位移等进行操作,但无法想象将它们放在一起......
pub fn primes(n: u64) -> Vec<usize> {                                                                                                                                             
if n < 4 {                                                                                                                                                                    
vec![2]                                                                                                                                                                   
} else {                                                                                                                                                                      
base(primes((n as f64).sqrt().ceil() as u64), n)                                                                                                                          
}                                                                                                                                                                             
}                                                                                                                                                                                 
fn base(r: Vec<usize>, p: u64) -> Vec<usize> {                                                                                                                                    
let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();                                                                                                                      
let z = r                                                                                                                                                                     
.iter()                                                                                                                                                                   
.map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())                                                                                                                   
.map(|x| {                                                                                                                                                                
(0..p)                                                                                                                                                                
.map(|n| !x[(n % (x.len() as u64)) as usize])                                                                                                                     
.collect::<Vec<bool>>()                                                                                                                                           
})                                                                                                                                                                        
.fold(fvec, |acc, x| {                                                                                                                                                    
acc.iter().zip(x).map(|(a, b)| *a || b).collect()                                                                                                                     
});                                                                                                                                                                       
let yy: Vec<usize> = z                                                                                                                                                        
.iter()                                                                                                                                                                   
.enumerate()                                                                                                                                                              
.filter_map(|(i, x)| if !*x { Some(i) } else { None })                                                                                                                    
.skip(1)                                                                                                                                                                  
.collect();
r.iter().chain(yy.iter()).map(|x| *x).collect()                                                                                                                               
}                                                                                          

关于素数生成算法,在这个线程中有很好的答案,所以这不是一个关于优化算法的问题,而是这个没有外部板条箱的特定实现(这是一个学习练习)。

编辑:

经过评论部分的一些提示,对其进行了显着优化:

test calc_primes       ... bench:   4,776,400 ns/iter (+/- 1,427,534)
test calc_primes_sieve ... bench:     644,220 ns/iter (+/- 1,655,581)
test calc_primes_2     ... bench:     268,598 ns/iter (+/- 46,440)
  1. 将"低效"Vec<bool>@fold位图替换为惯用的迭代器方式:step_by
  2. 放弃不可变
  3. 类型以支持状态(有没有办法保持不可变的方法?
fn base2(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
let mut fvec = (0..p).map(|_| false).collect::<Vec<bool>>();                                                                                                                  
head.iter()                                                                                                                                                                   
.map(|&x| {                                                                                                                                                               
(0..p)                                                                                                                                                                
.step_by(x as usize)                                                                                                                                              
.for_each(|y| fvec[y as usize] = true)                                                                                                                            
})                                                                                                                                                                        
.for_each(drop);                                                                                                                                                          
let tail: Vec<u64> = fvec.iter().enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                     
.skip(1)                                              
.collect();                                     
head.iter().chain(tail.iter()).cloned().collect()                                                                                                                             
}                                                         

编辑2:

作为参考,以下是"其他"语言实现:

q)p:{$[x<4; enlist 2; r, 1_ where not any x#'not til each r:p ceiling sqrt x]}

q)ts do[1000;p 10000]
474 443088  
=
474,000 ns/iter 

对于 64 位 q v4.0:

q)t do[1000;p 10000]
273  
=
273,000 ns/iter 

对于解释性语言来说还不错。

编辑 3

从答案中为新实现添加了更多基准。
(按时间顺序出现)
能够在没有skip(1)和差异fvec分配的情况下剃掉 tad 位 新的游乐场代码。

test ssqrt_p_1__vec_bool_n_mod   ... bench:   6,838,150 ns/iter (+/- 627,389)
test sieve_p_2__mut_step_by      ... bench:     367,229 ns/iter (+/- 38,279)
test ssqrt_p_3__mut_step_by      ... bench:     266,189 ns/iter (+/- 56,215)
test ssqrt_p_4__push_step_by_mod ... bench:   1,255,206 ns/iter (+/- 262,107)
test sieve_p_5__for_loop_mut     ... bench:     441,397 ns/iter (+/- 47,077)
test ssqrt_p_6__no_skip          ... bench:     176,186 ns/iter (+/- 24,765)

StdDev现在扮演着重要的角色...运行方式:

任务集 -C 0 货物工作台 --作业 1

fn base3(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
let mut fvec = vec![false; p as usize]; fvec[1] = true;                                                                                                                       
head.iter()                                                                                                                                                                   
.map(|&x| {                                                                                                                                                               
(0..p)                                                                                                                                                                
.step_by(x as usize)                                                                                                                                              
.for_each(|y| fvec[y as usize] = true)                                                                                                                            
})                                                                                                                                                                        
.for_each(drop);                                                                                                                                                          
let tail: Vec<u64> = fvec                                                                                                                                                     
.iter()                                                                                                                                                                   
.enumerate()                                                                                                                                                              
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                                                             
.collect();                                                                                                                                                               
head.iter().chain(tail.iter()).cloned().collect()                                                                                                                             
} 

编辑 5

令人惊讶的是,低于no_chainbase3:no_skip版本慢。猜猜这就是编译器的魔力(递归函数中的尾部调用优化???)。

fn base4(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
let mut fvec = vec![false; p as usize]; fvec[1] = true;                                                                                                                       
head.iter()                                                                                                                                                                   
.map(|&x| {                                                                                                                                                                           
(0..p)                                                                                                                                                                
.step_by(x as usize)                                                                                                                                              
.for_each(|y| fvec[y as usize] = true);                                                                                                                           
fvec[x as usize] = false;                                                                                                                                             
})                                                                                                                                                                        
.for_each(drop);                                                                                                                                                          
fvec.iter().enumerate()                                                                                                                                                       
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                                                             
.collect()                                                                                                                                                                
}                                              

你的代码有一些非常奇怪的开销,比如

(0..x).map(|y| y > 0).collect::<Vec<bool>>()

你只像x[(n % (x.len() as u64)) as usize]一样使用...即。作为一种非常缓慢的写作方式n % x == 0.

这是您的代码的一个几乎直接的变体,我只需删除您添加的这些开销,并在有意义的地方内联代码。

pub fn primes(n: usize) -> Vec<usize> {
if n < 4 {
vec![2]
} else {
let mut r = primes((n as f64).sqrt().ceil() as usize);
let mut fvec: Vec<_> = (0..n).map(|i| r.iter().any(|x| i % x == 0)).collect();
fvec[1] = true;
for (i, x) in fvec.into_iter().enumerate() {
if !x { r.push(i) }
}
r
}
}

最大的实际变化是用fvec[1] = true替换您的skip(1),因为这样要简单得多。

当然,埃拉托色尼的实际筛子更简单,开销更低,并且具有更好的时间复杂度。

pub fn primes(n: usize) -> Vec<usize> {
let mut primes = Vec::new();
let mut sieve = vec![true; n];
for p in 2..n {
if sieve[p] {
primes.push(p);
let mut i = p * p;
while i < n {
sieve[i] = false;
i += p;
}
}
}
primes
}

这是一个轻度优化的筛子,避免了不必要的工作和内存使用。

pub fn primes6(mut n: usize) -> Vec<usize> {
let mut primes = vec![2];
n >>= 1;
// 0  1  2  3  4
// 2, 3, 5, 7, 9, ...
let mut sieve = vec![true; n];
let end = (n as f64).sqrt().ceil() as usize;
for p in 1..end {
if sieve[p] {
let prime = p * 2 + 1;
primes.push(prime);
let mut i = ((prime * prime) - 1) / 2;
while i < n {
sieve[i] = false;
i += prime; // skip by 2·prime
}
}
}
for p in end..n {
if sieve[p] {
let prime = p * 2 + 1;
primes.push(prime);
}
}
primes
}

我发现了两个更有希望的优化:

经典算法:

pub fn primes_erathosthenes(upper_bound: u64) -> Vec<u64> {
let sieve_size = (upper_bound + 1) as u64;
let mut sieve = vec![false; sieve_size as usize];
let mut p = 2;
'next: loop {
if p * p >= sieve_size {
break 'next;
}
for f in p..=(upper_bound / p) {
let index = p * f;
sieve[index as usize] = true;
}
p += 1;
while sieve[p as usize] {
p += 1;
}
}
sieve
.into_iter()
.enumerate()
.skip(2)
.filter_map(|(i, p)| if p { None } else { Some(i as u64) })
.collect()
}

具有改进循环的最终算法

fn base_improved(head: Vec<u64>, p: u64) -> Vec<u64> {
let mut fvec = vec![false; p as usize];
fvec[0] = true;
fvec[1] = true;
head
.iter()
.flat_map(|&x| (2u64..p).map(move |i| i * x).take_while(|y| *y < p))
.for_each(|y| fvec[y as usize] = true);
fvec
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
.collect()
}

和更快的筛分算法(Sundarams Sive,如果感兴趣,我可以发布代码)

基准:

test calc_erathosthenes          ... bench:      34,192 ns/iter (+/- 2,902)
test calc_exercism               ... bench:      79,314 ns/iter (+/- 16,276)
test calc_stackoverflow          ... bench:      35,431 ns/iter (+/- 5,674)
test calc_stackoverflow_improved ... bench:      32,508 ns/iter (+/- 6,971)
test calc_sundaram               ... bench:      17,930 ns/iter (+/- 3,182)
test calc_veedrac                ... bench:      27,110 ns/iter (+/- 3,870)
test calc_veedrac_6              ... bench:      12,178 ns/iter (+/- 4,113)

tl;博士

  • bitvecs 可能有效,但需要大量特殊情况优化才能提高性能
  • 体面算法的干净实现是最重要的。

长版本

需要注意的一点是,不同方法的性能可能会随着输入大小而变化。也可能不是,关键是当基准测试可能会产生不会推广到所有大小的结果时,仅使用一种输入大小。

这是问题中的原始代码:


pub fn primes(n: u64) -> Vec<usize> {
if n < 4 {
vec![2]
} else {
base(primes((n as f64).sqrt().ceil() as u64), n)
}
}
fn base(r: Vec<usize>, p: u64) -> Vec<usize> {
let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();
let z = r
.iter()
.map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())
.map(|x| {
(0..p)
.map(|n| !x[(n % (x.len() as u64)) as usize])
.collect::<Vec<bool>>()
})
.fold(fvec, |acc, x| {
acc.iter().zip(x).map(|(a, b)| *a || b).collect()
});
let yy: Vec<usize> = z
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i) } else { None })
.skip(1)
.collect();
r.iter().chain(yy.iter()).map(|x| *x).collect()
}

我发现上面的代码很难阅读。它似乎有很多不必要的噪音,比如不需要的收集。此外,变量名称的描述性可能要强得多。

下面是我对筛子的天真实现(calc_primes_mine_naive,在下面的基准上)。它没有可变性,尽管不可否认地有一个不必要的集合。它比上述快 2 到 3 倍。

pub fn primes_less_than(n: u64) -> Vec<usize> {
let is_not_prime: Vec<bool> = (0..n).map(|num| {
let sieve_upper_bound = (num as f64).sqrt().ceil() as u64 + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).collect();
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

以下是优化的第一次尝试。一般的 sqrt 很慢,所以摆脱上面重复的 sqrt 应该是一个绝对的胜利(似乎没有太大区别 irl)。

pub fn primes_less_than_naive(n: u64) -> Vec<usize> {
let mut sqrt = 1;
let is_not_prime: Vec<bool> = (0..n).map(|num| {
if sqrt * sqrt <= num {
sqrt += 1;
}
let sieve_upper_bound = sqrt + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).collect();
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

我查看了组件并在vtune中对其进行了分析。Vtune 抱怨"高 MS 开关"开销。我从本质上假设这是由浮子上的铸件引起的,但在移除所有浮子操作后它仍然存在。看着组件似乎暗示由于collect可能仍然有一些开销,所以我摆脱了额外的收集。

pub fn primes_less_than_naive2(n: u64) -> Vec<usize> {
let mut sqrt = 1;
(0..n).map(|num| {
if sqrt * sqrt <= num {
sqrt += 1;
}
let sieve_upper_bound = sqrt + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).enumerate().filter(|(_, this_one_not_prime)| {
!*this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

没有可变性似乎对性能造成了相当大的影响。 可能有一种方法可以在不使用mut的情况下表达这一点,但可能是相当笨拙的代码。

pub fn primes_less_than_naive_with_mut(n: usize) -> Vec<usize> {
let mut is_not_prime = vec![false; n as usize];
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
(1..sieve_upper_bound).for_each(|step_size| {
(0..n).step_by(step_size).for_each(|i| {
is_not_prime[i] = true;
})
});
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

在这一点上,vtune 抱怨上述部分是核心绑定的,而某些部分是内存绑定的。这很烦人,b/c 解决内存绑定问题的方法是使用 bitvecs,修复核心绑定的方法是不使用 bitvecs。为了减轻内存限制,我尝试交替循环方向,其想法是这样可以更好地利用缓存。当我第一次写这篇文章时,我发现了一些证据,但从那以后我就没有了。

pub fn primes_less_than_naive_alternating(n: usize) -> Vec<usize> {
let mut is_not_prime = vec![false; n as usize];
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
let mut forward = true;
(1..sieve_upper_bound).for_each(|step_size| {
if forward {
(0..n).step_by(step_size).for_each(|i| {
is_not_prime[i] = true;
})
} else {
(0..n).step_by(step_size).rev().for_each(|i| {
is_not_prime[i] = true;
})
};
forward = !forward;
});
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

比特维克版本。这很慢(不出所料)。 通过为小step_size提供某种面具查找表,也许可以使其快速,但我并不乐观。 Vtune 没有什么有趣的可说的,除了可能有很多数据依赖关系搞砸了。(又名它不能乱序 b/c 下一个字节取决于前一个字节[b/c 在幕后加载和存储是字大小?

pub unsafe fn primes_less_than_naive_bitvec(n: usize) -> Vec<usize> {
let num_bytes = n / 8 + 1;
let mut is_not_prime = libc::calloc(size_of::<u8>(), num_bytes) as *mut u8;
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
(1..sieve_upper_bound).for_each(|step_size| {
(0..n).step_by(step_size).for_each(|i| {
let address = i >> 3;
let bit_within_the_byte = i & 0b111;
let cur = is_not_prime.offset(address as isize);
*cur |= ((0x1 as u8) << bit_within_the_byte);
});
});
(0..n).filter(|i|{
let address = i / 8;
let bit_within_the_byte = i % 8;
let cur = is_not_prime.offset(address as isize).read();
let is_not_prime = (cur | ((0x1 as u8) << bit_within_the_byte)) > 0;
!is_not_prime
}).collect::<Vec<usize>>()
}

在 n=10 时:

test calc_primes                                 ... bench:         752 ns/iter (+/- 61)
test calc_primes_mine                            ... bench:         314 ns/iter (+/- 20)
test calc_primes_mine_alternating_loop_direction ... bench:         110 ns/iter (+/- 6)
test calc_primes_mine_bitvec                     ... bench:         165 ns/iter (+/- 888)
test calc_primes_mine_naive                      ... bench:         304 ns/iter (+/- 29)
test calc_primes_mine_naive2                     ... bench:         250 ns/iter (+/- 16)
test calc_primes_mine_naive_with_mut             ... bench:          70 ns/iter (+/- 4)

在 n=100 时:

test calc_primes                                 ... bench:       5,924 ns/iter (+/- 2,210)
test calc_primes_mine                            ... bench:       3,384 ns/iter (+/- 88)
test calc_primes_mine_alternating_loop_direction ... bench:         673 ns/iter (+/- 23)
test calc_primes_mine_bitvec                     ... bench:       1,194 ns/iter (+/- 36)
test calc_primes_mine_naive                      ... bench:       3,141 ns/iter (+/- 195)
test calc_primes_mine_naive2                     ... bench:       2,919 ns/iter (+/- 107)
test calc_primes_mine_naive_with_mut             ... bench:         521 ns/iter (+/- 31)

在 n=1000 时:

test calc_primes                                 ... bench:     120,828 ns/iter (+/- 18,018)
test calc_primes_mine                            ... bench:      59,269 ns/iter (+/- 6,993)
test calc_primes_mine_alternating_loop_direction ... bench:       7,080 ns/iter (+/- 2,795)
test calc_primes_mine_bitvec                     ... bench:      16,186 ns/iter (+/- 470)
test calc_primes_mine_naive                      ... bench:      56,572 ns/iter (+/- 1,849)
test calc_primes_mine_naive2                     ... bench:      54,526 ns/iter (+/- 9,773)
test calc_primes_mine_naive_with_mut             ... bench:       5,873 ns/iter (+/- 531)

在 n=10,000 时

test calc_primes                                 ... bench:   2,672,089 ns/iter (+/- 117,553)
test calc_primes_mine                            ... bench:   1,222,262 ns/iter (+/- 24,242)
test calc_primes_mine_alternating_loop_direction ... bench:      77,348 ns/iter (+/- 4,845)
test calc_primes_mine_bitvec                     ... bench:     201,894 ns/iter (+/- 4,446)
test calc_primes_mine_naive                      ... bench:   1,213,264 ns/iter (+/- 160,097)
test calc_primes_mine_naive2                     ... bench:   1,174,753 ns/iter (+/- 88,116)
test calc_primes_mine_naive_with_mut             ... bench:      66,283 ns/iter (+/- 6,358)

在 = 100,000:

test calc_primes                                 ... bench:  71,953,394 ns/iter (+/- 11,795,923)
test calc_primes_mine                            ... bench:  27,542,006 ns/iter (+/- 5,158,828)
test calc_primes_mine_alternating_loop_direction ... bench:   1,103,863 ns/iter (+/- 135,770)
test calc_primes_mine_bitvec                     ... bench:   2,547,044 ns/iter (+/- 180,308)
test calc_primes_mine_naive                      ... bench:  27,399,779 ns/iter (+/- 3,121,742)
test calc_primes_mine_naive2                     ... bench:  26,430,825 ns/iter (+/- 1,818,277)
test calc_primes_mine_naive_with_mut             ... bench:   1,045,803 ns/iter (+/- 103,090)

在 n=1000000 时:

test calc_primes                                 ... bench: 1,827,068,877 ns/iter (+/- 131,368,352)
test calc_primes_mine                            ... bench: 658,940,900 ns/iter (+/- 45,559,627)
test calc_primes_mine_alternating_loop_direction ... bench:  18,112,057 ns/iter (+/- 1,839,472)
test calc_primes_mine_bitvec                     ... bench:  30,189,983 ns/iter (+/- 2,649,749)
test calc_primes_mine_naive                      ... bench: 658,018,301 ns/iter (+/- 39,323,294)
test calc_primes_mine_naive2                     ... bench: 651,249,390 ns/iter (+/- 22,800,036)
test calc_primes_mine_naive_with_mut             ... bench:  18,212,183 ns/iter (+/- 2,936,353)

最新更新