我需要一种非常快速的方法来查找NxN数组中M个最大元素的2D位置和值。
现在我正在做这个:
struct SourcePoint {
Point point;
float value;
}
SourcePoint* maxValues = new SourcePoint[ M ];
maxCoefficients = new SourcePoint*[
for (int j = 0; j < rows; j++) {
for (int i = 0; i < cols; i++) {
float sample = arr[i][j];
if (sample > maxValues[0].value) {
int q = 1;
while ( sample > maxValues[q].value && q < M ) {
maxValues[q-1] = maxValues[q]; // shuffle the values back
q++;
}
maxValues[q-1].value = sample;
maxValues[q-1].point = Point(i,j);
}
}
}
Point结构体只有两个int型——x和y。
这段代码基本上是对输入的值进行插入排序。maxValues[0]总是包含最低值的SourcePoint,它仍然保持在到目前为止遇到的前M个值之内。如果sample <= maxValues,我们不做任何事情,这给了我们一个快速而简单的救助。我遇到的问题是每次找到一个新的更好的值时都会进行洗牌。它一直沿着maxValues向下工作,直到找到它的位置,打乱maxValues中的所有元素,为自己腾出空间。
我已经到了准备研究SIMD解决方案或缓存优化的地步,因为看起来有相当多的缓存抖动正在发生。降低这个操作的成本将极大地影响我的整个算法的性能,因为它被调用了很多次,占我的总成本的60-80%。
我尝试过使用std::vector和make_heap,但我认为创建堆的开销超过了堆操作的节省。这可能是因为M和N通常都不大。M通常为10-20,n10 -30 (nxn100 - 900)。问题是这个操作被反复调用,它不能被预先计算。
我只是想预加载maxValues的前M个元素,这可能会提供一些小的节省。在当前的算法中,前M个元素保证会一直洗牌下去,只是为了最初填充maxValues。
任何来自优化专家的帮助将是非常感激的:)
你可以尝试一些想法。在一些N=100和M=15的快速测试中,我能够在vc++ 2010中获得大约25%的速度,但你自己测试一下,看看它们是否对你的情况有所帮助。根据实际使用/数据和编译器优化,其中一些更改可能没有甚至是负面影响。
- 除非你需要,否则不要每次都分配一个新的
- 将
g_Source[i][j]
更改为g_Source[j][i]
会使您获得一点点收益(不像我想象的那么多)。 - 使用底部列出的结构
SourcePoint1
,我又得到了几个百分点。 - 最大的增益约为+15%,是用
g_Source[j][i]
代替局部变量sample
。编译器可能足够聪明,可以优化出对数组的多次读取,如果你使用局部变量,它就无法做到这一点。 - 尝试一个简单的二分搜索使我损失了几个百分点。对于较大的M/n,您可能会看到好处。
- 如果可能的话,尽量保持
arr[][]
中的源数据排序,即使只是部分。理想情况下,您希望在创建源数据的同时生成maxValues[]
。 - 查看如何创建/存储/组织数据可能会为您提供模式或信息,以减少生成
maxValues[]
数组的时间。例如,在最好的情况下,你可以想出一个公式,不需要迭代和排序就能给出最上面的M个坐标。
maxValues
数组。使用堆栈变量而不是动态分配使我获得+5%。以上代码:
struct SourcePoint1 {
int x;
int y;
float value;
int test; //Play with manual/compiler padding if needed
};
如果您想在此时进行微优化,那么简单的第一步应该是摆脱Point
s并将两个维度都塞进一个int中。这减少了你需要移动的数据量,和将SourcePoint降低到2的幂,这简化了对它的索引。
另外,你确定保持列表的排序比每次移出旧的最低值后重新计算哪个元素是新的最低值更好吗?
(更新22:37 UTC时间2011-08-20)
我建议一个固定大小的二进制最小堆,包含M个最大的元素(但仍然按照最小堆的顺序!)。在实践中可能不会更快,因为我认为OPs插入排序可能具有体面的现实世界性能(至少当考虑到本线程中其他海报的建议时)。
在失败的情况下查找应该是常数时间:如果当前元素小于堆的最小元素(包含最大M个元素),我们可以直接拒绝它。
如果有一个元素大于当前堆的最小值(第m大的元素),则提取(丢弃)之前的最小值并插入新元素。
如果元素需要按顺序排序,则可以在之后对堆进行排序。
第一次尝试最小c++实现:
template<unsigned size, typename T>
class m_heap {
private:
T nodes[size];
static const unsigned last = size - 1;
static unsigned parent(unsigned i) { return (i - 1) / 2; }
static unsigned left(unsigned i) { return i * 2; }
static unsigned right(unsigned i) { return i * 2 + 1; }
void bubble_down(unsigned int i) {
for (;;) {
unsigned j = i;
if (left(i) < size && nodes[left(i)] < nodes[i])
j = left(i);
if (right(i) < size && nodes[right(i)] < nodes[j])
j = right(i);
if (i != j) {
swap(nodes[i], nodes[j]);
i = j;
} else {
break;
}
}
}
void bubble_up(unsigned i) {
while (i > 0 && nodes[i] < nodes[parent(i)]) {
swap(nodes[parent(i)], nodes[i]);
i = parent(i);
}
}
public:
m_heap() {
for (unsigned i = 0; i < size; i++) {
nodes[i] = numeric_limits<T>::min();
}
}
void add(const T& x) {
if (x < nodes[0]) {
// reject outright
return;
}
nodes[0] = x;
swap(nodes[0], nodes[last]);
bubble_down(0);
}
};
小测试/用例:
#include <iostream>
#include <limits>
#include <algorithm>
#include <vector>
#include <stdlib.h>
#include <assert.h>
#include <math.h>
using namespace std;
// INCLUDE TEMPLATED CLASS FROM ABOVE
typedef vector<float> vf;
bool compare(float a, float b) { return a > b; }
int main()
{
int N = 2000;
vf v;
for (int i = 0; i < N; i++) v.push_back( rand()*1e6 / RAND_MAX);
static const int M = 50;
m_heap<M, float> h;
for (int i = 0; i < N; i++) h.add( v[i] );
sort(v.begin(), v.end(), compare);
vf heap(h.get(), h.get() + M); // assume public in m_heap: T* get() { return nodes; }
sort(heap.begin(), heap.end(), compare);
cout << "RealtFake" << endl;
for (int i = 0; i < M; i++) {
cout << v[i] << "t" << heap[i] << endl;
if (fabs(v[i] - heap[i]) > 1e-5) abort();
}
}
您正在寻找一个优先级队列:
template < class T, class Container = vector<T>,
class Compare = less<typename Container::value_type> >
class priority_queue;
您需要找出要使用的最佳底层容器,并可能定义一个Compare
函数来处理您的Point
类型。
如果你想优化它,你可以在你的矩阵的每一行上运行一个队列,在它自己的工作线程中,然后运行一个算法来选择队列前面最大的项,直到你有M个元素。
一个快速的优化方法是向maxValues
数组添加一个哨兵值。如果maxValues[M].value
等于std::numeric_limits<float>::max()
,则可以在while循环条件中消除q < M
测试。
一种想法是对NxN数组中的普通一维引用序列使用std::partial_sort
算法。您可能还可以缓存此引用序列以供后续调用。我不知道它的性能有多好,但值得一试——如果它运行得足够好,你就不会有那么多的"魔力"。特别是,你不能求助于微优化。
考虑这个展示:
#include <algorithm>
#include <iostream>
#include <vector>
#include <stddef.h>
static const int M = 15;
static const int N = 20;
// Represents a reference to a sample of some two-dimensional array
class Sample
{
public:
Sample( float *arr, size_t row, size_t col )
: m_arr( arr ),
m_row( row ),
m_col( col )
{
}
inline operator float() const {
return m_arr[m_row * N + m_col];
}
bool operator<( const Sample &rhs ) const {
return (float)other < (float)*this;
}
int row() const {
return m_row;
}
int col() const {
return m_col;
}
private:
float *m_arr;
size_t m_row;
size_t m_col;
};
int main()
{
// Setup a demo array
float arr[N][N];
memset( arr, 0, sizeof( arr ) );
// Put in some sample values
arr[2][1] = 5.0;
arr[9][11] = 2.0;
arr[5][4] = 4.0;
arr[15][7] = 3.0;
arr[12][19] = 1.0;
// Setup the sequence of references into this array; you could keep
// a copy of this sequence around to reuse it later, I think.
std::vector<Sample> samples;
samples.reserve( N * N );
for ( size_t row = 0; row < N; ++row ) {
for ( size_t col = 0; col < N; ++col ) {
samples.push_back( Sample( (float *)arr, row, col ) );
}
}
// Let partial_sort find the M largest entry
std::partial_sort( samples.begin(), samples.begin() + M, samples.end() );
// Print out the row/column of the M largest entries.
for ( std::vector<Sample>::size_type i = 0; i < M; ++i ) {
std::cout << "#" << (i + 1) << " is " << (float)samples[i] << " at " << samples[i].row() << "/" << samples[i].col() << std::endl;
}
}
首先,您在数组中行进的顺序是错误的!
你总是,总是,总是想要线性扫描内存。这意味着数组的最后一个索引需要以最快的速度变化。所以不用这个:
for (int j = 0; j < rows; j++) {
for (int i = 0; i < cols; i++) {
float sample = arr[i][j];
试试这个:
for (int i = 0; i < cols; i++) {
for (int j = 0; j < rows; j++) {
float sample = arr[i][j];
我预测这将比任何其他单一的改变产生更大的影响。
接下来,我将使用堆而不是排序数组。标准的<algorithm>
头已经有push_heap
和pop_heap
函数来使用向量作为堆。(除非M
相当大,否则这可能不会有太大的帮助。对于较小的M
和随机数组,您平均不需要执行那么多插入……大概是O(log N)吧
之后是使用SSE2。但与以正确的顺序在记忆中行进相比,这是微不足道的。
您应该能够通过并行处理获得接近线性的加速。
使用N
CPU,您可以使用每个CPU处理rows/N
行(和所有列)的频带,查找每个频带中的前M
项。然后做一个选择排序来找到整个顶部的M
。
您可能也可以使用SIMD来做这件事(但是这里您将通过交错列而不是将行分开来划分任务)。不要试图让SIMD更快地执行插入排序,而是让它一次执行更多的插入排序,最后使用一个非常快的步骤将这些排序组合在一起。
当然,您可以同时执行多线程和SIMD,但对于只有30x30的问题,这可能不值得。
我尝试用double
替换float
,有趣的是,这给了我大约20%的速度提高(使用vc++ 2008)。这有点违反直觉,但现代处理器或编译器似乎针对双值处理进行了优化。
使用链表存储最佳的M值。你仍然需要遍历它来找到正确的位置,但是插入是O(1)。它甚至可能比二分查找和插入O(N)+O(1)比O(lg(N))+O(N)更好。交换fors,这样您就不会访问内存中的每个N个元素并破坏缓存。
LE:抛出另一个可能适用于均匀分布值的想法。
在3/2*O(N^2)次比较中找到最小,最大。
创建N到N^2个均匀分布的桶,最好更接近N^2而不是N
对于NxN矩阵中的每个元素,将其放入bucket[(int)(value-min)/range]中,range=max-min。
最后创建一个从最高桶到最低桶的集合,当|当前集| + |下一个桶| <= m时,将其他桶中的元素添加到集合中。
如果你得到M个元素,你就完成了。你可能会得到比M更少的元素,比如p
对剩下的桶应用你的算法,从中得到最大的M-P元素。
如果元素是统一的,你使用N^2个桶,它的复杂性大约是3.5*(N^2),而你目前的解决方案大约是O(N^2)*ln(M)。