玩rust
迭代器和lambdas,我已经实现了N
素数生成器(原始游乐场/改进的代码)。
与我在同一框中更精通的其他语言的等效功能代码相比,它的性能非常差(慢 10 倍)。
一些数字:
测试calc_primes...板凳: 7,754,795 纳秒/意大利特级 (+/- 1,887,591)
#[macro_use]
extern crate bencher;
use bencher::Bencher;
use nth_prime::*;
fn calc_primes(b: &mut Bencher) {
b.iter(|| primes(10000));
}
benchmark_group!(benches, calc_primes);
benchmark_main!(benches);
我希望 rust 会胜过它,但我很难摆脱下面代码中的缺陷。因此,这里呼吁帮助优化此特定实现。
- 我不喜欢那些
Vec
分配,有些肯定不需要...... - 可能会在这里和那里传递更多引用
- 其他实现在 FFEC 上使用逻辑
OR
bitarray
这非常快。下面的 rust 中的逻辑在fold
调用中表达了相同的方法,但这使用的是Vec<bool>
它必须慢而不是位数组等效...... - 使用不同的数据类型而不是
Vec<bool>
,不介意分配大量字节数组并使用位移等进行操作,但无法想象将它们放在一起......
pub fn primes(n: u64) -> Vec<usize> {
if n < 4 {
vec![2]
} else {
base(primes((n as f64).sqrt().ceil() as u64), n)
}
}
fn base(r: Vec<usize>, p: u64) -> Vec<usize> {
let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();
let z = r
.iter()
.map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())
.map(|x| {
(0..p)
.map(|n| !x[(n % (x.len() as u64)) as usize])
.collect::<Vec<bool>>()
})
.fold(fvec, |acc, x| {
acc.iter().zip(x).map(|(a, b)| *a || b).collect()
});
let yy: Vec<usize> = z
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i) } else { None })
.skip(1)
.collect();
r.iter().chain(yy.iter()).map(|x| *x).collect()
}
关于素数生成算法,在这个线程中有很好的答案,所以这不是一个关于优化算法的问题,而是这个没有外部板条箱的特定实现(这是一个学习练习)。
编辑:
经过评论部分的一些提示,对其进行了显着优化:
test calc_primes ... bench: 4,776,400 ns/iter (+/- 1,427,534)
test calc_primes_sieve ... bench: 644,220 ns/iter (+/- 1,655,581)
test calc_primes_2 ... bench: 268,598 ns/iter (+/- 46,440)
- 将"低效"
Vec<bool>
@fold
位图替换为惯用的迭代器方式:step_by
放弃不可变 - 类型以支持状态(有没有办法保持不可变的方法?
fn base2(head: Vec<u64>, p: u64) -> Vec<u64> {
let mut fvec = (0..p).map(|_| false).collect::<Vec<bool>>();
head.iter()
.map(|&x| {
(0..p)
.step_by(x as usize)
.for_each(|y| fvec[y as usize] = true)
})
.for_each(drop);
let tail: Vec<u64> = fvec.iter().enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
.skip(1)
.collect();
head.iter().chain(tail.iter()).cloned().collect()
}
编辑2:
作为参考,以下是"其他"语言实现:
q)p:{$[x<4; enlist 2; r, 1_ where not any x#'not til each r:p ceiling sqrt x]}
q)ts do[1000;p 10000]
474 443088
=
474,000 ns/iter
对于 64 位 q v4.0:
q)t do[1000;p 10000]
273
=
273,000 ns/iter
对于解释性语言来说还不错。
编辑 3
从答案中为新实现添加了更多基准。
(按时间顺序出现)
能够在没有skip(1)
和差异fvec
分配的情况下剃掉 tad 位 新的游乐场代码。
test ssqrt_p_1__vec_bool_n_mod ... bench: 6,838,150 ns/iter (+/- 627,389)
test sieve_p_2__mut_step_by ... bench: 367,229 ns/iter (+/- 38,279)
test ssqrt_p_3__mut_step_by ... bench: 266,189 ns/iter (+/- 56,215)
test ssqrt_p_4__push_step_by_mod ... bench: 1,255,206 ns/iter (+/- 262,107)
test sieve_p_5__for_loop_mut ... bench: 441,397 ns/iter (+/- 47,077)
test ssqrt_p_6__no_skip ... bench: 176,186 ns/iter (+/- 24,765)
StdDev现在扮演着重要的角色...运行方式:
任务集 -C 0 货物工作台 --作业 1
fn base3(head: Vec<u64>, p: u64) -> Vec<u64> {
let mut fvec = vec![false; p as usize]; fvec[1] = true;
head.iter()
.map(|&x| {
(0..p)
.step_by(x as usize)
.for_each(|y| fvec[y as usize] = true)
})
.for_each(drop);
let tail: Vec<u64> = fvec
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
.collect();
head.iter().chain(tail.iter()).cloned().collect()
}
编辑 5
令人惊讶的是,低于no_chain
比base3:no_skip
版本慢。猜猜这就是编译器的魔力(递归函数中的尾部调用优化???)。
fn base4(head: Vec<u64>, p: u64) -> Vec<u64> {
let mut fvec = vec![false; p as usize]; fvec[1] = true;
head.iter()
.map(|&x| {
(0..p)
.step_by(x as usize)
.for_each(|y| fvec[y as usize] = true);
fvec[x as usize] = false;
})
.for_each(drop);
fvec.iter().enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
.collect()
}
你的代码有一些非常奇怪的开销,比如
(0..x).map(|y| y > 0).collect::<Vec<bool>>()
你只像x[(n % (x.len() as u64)) as usize]
一样使用...即。作为一种非常缓慢的写作方式n % x == 0
.
这是您的代码的一个几乎直接的变体,我只需删除您添加的这些开销,并在有意义的地方内联代码。
pub fn primes(n: usize) -> Vec<usize> {
if n < 4 {
vec![2]
} else {
let mut r = primes((n as f64).sqrt().ceil() as usize);
let mut fvec: Vec<_> = (0..n).map(|i| r.iter().any(|x| i % x == 0)).collect();
fvec[1] = true;
for (i, x) in fvec.into_iter().enumerate() {
if !x { r.push(i) }
}
r
}
}
最大的实际变化是用fvec[1] = true
替换您的skip(1)
,因为这样要简单得多。
当然,埃拉托色尼的实际筛子更简单,开销更低,并且具有更好的时间复杂度。
pub fn primes(n: usize) -> Vec<usize> {
let mut primes = Vec::new();
let mut sieve = vec![true; n];
for p in 2..n {
if sieve[p] {
primes.push(p);
let mut i = p * p;
while i < n {
sieve[i] = false;
i += p;
}
}
}
primes
}
这是一个轻度优化的筛子,避免了不必要的工作和内存使用。
pub fn primes6(mut n: usize) -> Vec<usize> {
let mut primes = vec![2];
n >>= 1;
// 0 1 2 3 4
// 2, 3, 5, 7, 9, ...
let mut sieve = vec![true; n];
let end = (n as f64).sqrt().ceil() as usize;
for p in 1..end {
if sieve[p] {
let prime = p * 2 + 1;
primes.push(prime);
let mut i = ((prime * prime) - 1) / 2;
while i < n {
sieve[i] = false;
i += prime; // skip by 2·prime
}
}
}
for p in end..n {
if sieve[p] {
let prime = p * 2 + 1;
primes.push(prime);
}
}
primes
}
我发现了两个更有希望的优化:
经典算法:
pub fn primes_erathosthenes(upper_bound: u64) -> Vec<u64> {
let sieve_size = (upper_bound + 1) as u64;
let mut sieve = vec![false; sieve_size as usize];
let mut p = 2;
'next: loop {
if p * p >= sieve_size {
break 'next;
}
for f in p..=(upper_bound / p) {
let index = p * f;
sieve[index as usize] = true;
}
p += 1;
while sieve[p as usize] {
p += 1;
}
}
sieve
.into_iter()
.enumerate()
.skip(2)
.filter_map(|(i, p)| if p { None } else { Some(i as u64) })
.collect()
}
具有改进循环的最终算法
fn base_improved(head: Vec<u64>, p: u64) -> Vec<u64> {
let mut fvec = vec![false; p as usize];
fvec[0] = true;
fvec[1] = true;
head
.iter()
.flat_map(|&x| (2u64..p).map(move |i| i * x).take_while(|y| *y < p))
.for_each(|y| fvec[y as usize] = true);
fvec
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
.collect()
}
和更快的筛分算法(Sundarams Sive,如果感兴趣,我可以发布代码)
基准:
test calc_erathosthenes ... bench: 34,192 ns/iter (+/- 2,902)
test calc_exercism ... bench: 79,314 ns/iter (+/- 16,276)
test calc_stackoverflow ... bench: 35,431 ns/iter (+/- 5,674)
test calc_stackoverflow_improved ... bench: 32,508 ns/iter (+/- 6,971)
test calc_sundaram ... bench: 17,930 ns/iter (+/- 3,182)
test calc_veedrac ... bench: 27,110 ns/iter (+/- 3,870)
test calc_veedrac_6 ... bench: 12,178 ns/iter (+/- 4,113)
tl;博士
- bitvecs 可能有效,但需要大量特殊情况优化才能提高性能
- 体面算法的干净实现是最重要的。
长版本
需要注意的一点是,不同方法的性能可能会随着输入大小而变化。也可能不是,关键是当基准测试可能会产生不会推广到所有大小的结果时,仅使用一种输入大小。
这是问题中的原始代码:
pub fn primes(n: u64) -> Vec<usize> {
if n < 4 {
vec![2]
} else {
base(primes((n as f64).sqrt().ceil() as u64), n)
}
}
fn base(r: Vec<usize>, p: u64) -> Vec<usize> {
let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();
let z = r
.iter()
.map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())
.map(|x| {
(0..p)
.map(|n| !x[(n % (x.len() as u64)) as usize])
.collect::<Vec<bool>>()
})
.fold(fvec, |acc, x| {
acc.iter().zip(x).map(|(a, b)| *a || b).collect()
});
let yy: Vec<usize> = z
.iter()
.enumerate()
.filter_map(|(i, x)| if !*x { Some(i) } else { None })
.skip(1)
.collect();
r.iter().chain(yy.iter()).map(|x| *x).collect()
}
我发现上面的代码很难阅读。它似乎有很多不必要的噪音,比如不需要的收集。此外,变量名称的描述性可能要强得多。
下面是我对筛子的天真实现(calc_primes_mine_naive,在下面的基准上)。它没有可变性,尽管不可否认地有一个不必要的集合。它比上述快 2 到 3 倍。
pub fn primes_less_than(n: u64) -> Vec<usize> {
let is_not_prime: Vec<bool> = (0..n).map(|num| {
let sieve_upper_bound = (num as f64).sqrt().ceil() as u64 + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).collect();
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}
以下是优化的第一次尝试。一般的 sqrt 很慢,所以摆脱上面重复的 sqrt 应该是一个绝对的胜利(似乎没有太大区别 irl)。
pub fn primes_less_than_naive(n: u64) -> Vec<usize> {
let mut sqrt = 1;
let is_not_prime: Vec<bool> = (0..n).map(|num| {
if sqrt * sqrt <= num {
sqrt += 1;
}
let sieve_upper_bound = sqrt + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).collect();
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}
我查看了组件并在vtune中对其进行了分析。Vtune 抱怨"高 MS 开关"开销。我从本质上假设这是由浮子上的铸件引起的,但在移除所有浮子操作后它仍然存在。看着组件似乎暗示由于collect
可能仍然有一些开销,所以我摆脱了额外的收集。
pub fn primes_less_than_naive2(n: u64) -> Vec<usize> {
let mut sqrt = 1;
(0..n).map(|num| {
if sqrt * sqrt <= num {
sqrt += 1;
}
let sieve_upper_bound = sqrt + 1;
(2..sieve_upper_bound).any(|divisor| {
(num % divisor) == 0
})
}).enumerate().filter(|(_, this_one_not_prime)| {
!*this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}
没有可变性似乎对性能造成了相当大的影响。 可能有一种方法可以在不使用mut
的情况下表达这一点,但可能是相当笨拙的代码。
pub fn primes_less_than_naive_with_mut(n: usize) -> Vec<usize> {
let mut is_not_prime = vec![false; n as usize];
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
(1..sieve_upper_bound).for_each(|step_size| {
(0..n).step_by(step_size).for_each(|i| {
is_not_prime[i] = true;
})
});
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}
在这一点上,vtune 抱怨上述部分是核心绑定的,而某些部分是内存绑定的。这很烦人,b/c 解决内存绑定问题的方法是使用 bitvecs,修复核心绑定的方法是不使用 bitvecs。为了减轻内存限制,我尝试交替循环方向,其想法是这样可以更好地利用缓存。当我第一次写这篇文章时,我发现了一些证据,但从那以后我就没有了。
pub fn primes_less_than_naive_alternating(n: usize) -> Vec<usize> {
let mut is_not_prime = vec![false; n as usize];
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
let mut forward = true;
(1..sieve_upper_bound).for_each(|step_size| {
if forward {
(0..n).step_by(step_size).for_each(|i| {
is_not_prime[i] = true;
})
} else {
(0..n).step_by(step_size).rev().for_each(|i| {
is_not_prime[i] = true;
})
};
forward = !forward;
});
is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
!**this_one_not_prime
}).map(|(definitely_prime, _)| { definitely_prime }).collect()
}
比特维克版本。这很慢(不出所料)。 通过为小step_size提供某种面具查找表,也许可以使其快速,但我并不乐观。 Vtune 没有什么有趣的可说的,除了可能有很多数据依赖关系搞砸了。(又名它不能乱序 b/c 下一个字节取决于前一个字节[b/c 在幕后加载和存储是字大小?
pub unsafe fn primes_less_than_naive_bitvec(n: usize) -> Vec<usize> {
let num_bytes = n / 8 + 1;
let mut is_not_prime = libc::calloc(size_of::<u8>(), num_bytes) as *mut u8;
let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
(1..sieve_upper_bound).for_each(|step_size| {
(0..n).step_by(step_size).for_each(|i| {
let address = i >> 3;
let bit_within_the_byte = i & 0b111;
let cur = is_not_prime.offset(address as isize);
*cur |= ((0x1 as u8) << bit_within_the_byte);
});
});
(0..n).filter(|i|{
let address = i / 8;
let bit_within_the_byte = i % 8;
let cur = is_not_prime.offset(address as isize).read();
let is_not_prime = (cur | ((0x1 as u8) << bit_within_the_byte)) > 0;
!is_not_prime
}).collect::<Vec<usize>>()
}
在 n=10 时:
test calc_primes ... bench: 752 ns/iter (+/- 61)
test calc_primes_mine ... bench: 314 ns/iter (+/- 20)
test calc_primes_mine_alternating_loop_direction ... bench: 110 ns/iter (+/- 6)
test calc_primes_mine_bitvec ... bench: 165 ns/iter (+/- 888)
test calc_primes_mine_naive ... bench: 304 ns/iter (+/- 29)
test calc_primes_mine_naive2 ... bench: 250 ns/iter (+/- 16)
test calc_primes_mine_naive_with_mut ... bench: 70 ns/iter (+/- 4)
在 n=100 时:
test calc_primes ... bench: 5,924 ns/iter (+/- 2,210)
test calc_primes_mine ... bench: 3,384 ns/iter (+/- 88)
test calc_primes_mine_alternating_loop_direction ... bench: 673 ns/iter (+/- 23)
test calc_primes_mine_bitvec ... bench: 1,194 ns/iter (+/- 36)
test calc_primes_mine_naive ... bench: 3,141 ns/iter (+/- 195)
test calc_primes_mine_naive2 ... bench: 2,919 ns/iter (+/- 107)
test calc_primes_mine_naive_with_mut ... bench: 521 ns/iter (+/- 31)
在 n=1000 时:
test calc_primes ... bench: 120,828 ns/iter (+/- 18,018)
test calc_primes_mine ... bench: 59,269 ns/iter (+/- 6,993)
test calc_primes_mine_alternating_loop_direction ... bench: 7,080 ns/iter (+/- 2,795)
test calc_primes_mine_bitvec ... bench: 16,186 ns/iter (+/- 470)
test calc_primes_mine_naive ... bench: 56,572 ns/iter (+/- 1,849)
test calc_primes_mine_naive2 ... bench: 54,526 ns/iter (+/- 9,773)
test calc_primes_mine_naive_with_mut ... bench: 5,873 ns/iter (+/- 531)
在 n=10,000 时
test calc_primes ... bench: 2,672,089 ns/iter (+/- 117,553)
test calc_primes_mine ... bench: 1,222,262 ns/iter (+/- 24,242)
test calc_primes_mine_alternating_loop_direction ... bench: 77,348 ns/iter (+/- 4,845)
test calc_primes_mine_bitvec ... bench: 201,894 ns/iter (+/- 4,446)
test calc_primes_mine_naive ... bench: 1,213,264 ns/iter (+/- 160,097)
test calc_primes_mine_naive2 ... bench: 1,174,753 ns/iter (+/- 88,116)
test calc_primes_mine_naive_with_mut ... bench: 66,283 ns/iter (+/- 6,358)
在 = 100,000:
test calc_primes ... bench: 71,953,394 ns/iter (+/- 11,795,923)
test calc_primes_mine ... bench: 27,542,006 ns/iter (+/- 5,158,828)
test calc_primes_mine_alternating_loop_direction ... bench: 1,103,863 ns/iter (+/- 135,770)
test calc_primes_mine_bitvec ... bench: 2,547,044 ns/iter (+/- 180,308)
test calc_primes_mine_naive ... bench: 27,399,779 ns/iter (+/- 3,121,742)
test calc_primes_mine_naive2 ... bench: 26,430,825 ns/iter (+/- 1,818,277)
test calc_primes_mine_naive_with_mut ... bench: 1,045,803 ns/iter (+/- 103,090)
在 n=1000000 时:
test calc_primes ... bench: 1,827,068,877 ns/iter (+/- 131,368,352)
test calc_primes_mine ... bench: 658,940,900 ns/iter (+/- 45,559,627)
test calc_primes_mine_alternating_loop_direction ... bench: 18,112,057 ns/iter (+/- 1,839,472)
test calc_primes_mine_bitvec ... bench: 30,189,983 ns/iter (+/- 2,649,749)
test calc_primes_mine_naive ... bench: 658,018,301 ns/iter (+/- 39,323,294)
test calc_primes_mine_naive2 ... bench: 651,249,390 ns/iter (+/- 22,800,036)
test calc_primes_mine_naive_with_mut ... bench: 18,212,183 ns/iter (+/- 2,936,353)