我有两个数据文件,每个数据文件包含大量的3维点(文件A存储约50,000点,文件B存储约500,000点(。我的目标是在文件A中的每个点(a(中查找文件B中的点(b(,该文件B(b(的距离最小到(a(。我将积分存储在这样的两个列表中:
列表a 节点:
(ID X Y Z)
[ ['478277', -107.0, 190.5674, 128.1634],
['478279', -107.0, 190.5674, 134.0172],
['478282', -107.0, 190.5674, 131.0903],
['478283', -107.0, 191.9798, 124.6807],
... ]
列表b 数据:
(X Y Z Data)
[ [-28.102, 173.657, 229.744, 14.318],
[-28.265, 175.549, 227.824, 13.648],
[-27.695, 175.925, 227.133, 13.142],
...]
我的第一种方法是简单地用嵌套循环迭代第一个和第二个列表,然后计算每个点之间的距离:
outfile = open(job[0] + '/' + output, 'wb');
dist_min = float(job[5]);
dist_max = float(job[6]);
dists = [];
for node in nodes:
shortest_distance = 1000.0;
shortest_data = 0.0;
for entry in data:
dist = math.sqrt((node[1] - entry[0])**2 + (node[2] - entry[1])**2 + (node[3] - entry[2])**2);
if (dist_min <= dist <= dist_max) and (dist < shortest_distance):
shortest_distance = dist;
shortest_data = entry[3];
outfile.write(node[0] + ', ' + str('%10.5f' % shortest_data + 'n'));
outfile.close();
我认识到,python必须运行的循环数量太大(约25,000,000,000(,因此我不得不固定我的代码。我尝试首先通过列表综合来计算所有距离,但是代码仍然太慢:
p_x = [row[1] for row in nodes];
p_y = [row[2] for row in nodes];
p_z = [row[3] for row in nodes];
q_x = [row[0] for row in data];
q_y = [row[1] for row in data];
q_z = [row[2] for row in data];
dx = [[(px - qx) for px in p_x] for qx in q_x];
dy = [[(py - qy) for py in p_y] for qy in q_y];
dz = [[(pz - qz) for pz in p_z] for qz in q_z];
dx = [[dxxx * dxxx for dxxx in dxx] for dxx in dx];
dy = [[dyyy * dyyy for dyyy in dyy] for dyy in dy];
dz = [[dzzz * dzzz for dzzz in dzz] for dzz in dz];
D = [[(dx[i][j] + dy[i][j] + dz[i][j]) for j in range(len(dx[0]))] for i in range(len(dx))];
D = [[(DDD**(0.5)) for DDD in DD] for DD in D];
说实话,在这一点上,我不知道这两种方法中的哪一种更好,无论如何,这两种方法似乎都不可行。我什至不确定是否可以编写一个代码,该代码在可接受的时间内计算所有距离。是否有另一种方法可以解决我的问题而不计算所有距离?
编辑:我忘了提到我在Python 2.5.1上运行,不允许安装或添加任何新库...
以防万一有人在解决方案中遇到:
我找到了一种通过不计算所有距离来加快整个过程的方法:
i创建了一个3D列表,代表给定的3D空间中的网格,在给定的步长以x,y和z为单位(例如(最大 - 最小(/1,000(。然后,我在每个3D点上迭代以将其放入网格中。之后,我再次迭代设置A的点,查看是否有同一立方体中的B点,如果没有,我会增加搜索半径,因此该过程正在相邻的26个立方体中查看点。半径一直在增加,直到至少发现一个点为止。结果列表相对较小,可以在短时间内排序,并找到最近的点。
处理时间下降到几分钟,而且工作正常。
p_x = [row[1] for row in nodes];
p_y = [row[2] for row in nodes];
p_z = [row[3] for row in nodes];
q_x = [row[0] for row in data];
q_y = [row[1] for row in data];
q_z = [row[2] for row in data];
min_x = min(p_x + q_x);
min_y = min(p_y + q_y);
min_z = min(p_z + q_z);
max_x = max(p_x + q_x);
max_y = max(p_y + q_y);
max_z = max(p_z + q_z);
max_n = max(max_x, max_y, max_z);
min_n = min(min_x, min_y, max_z);
gridcount = 1000;
step = (max_n - min_n) / gridcount;
ruler_x = [min_x + (i * step) for i in range(gridcount + 1)];
ruler_y = [min_y + (i * step) for i in range(gridcount + 1)];
ruler_z = [min_z + (i * step) for i in range(gridcount + 1)];
grid = [[[0 for i in range(gridcount)] for j in range(gridcount)] for k in range(gridcount)];
for node in nodes:
loc_x = self.abatemp_get_cell(node[1], ruler_x);
loc_y = self.abatemp_get_cell(node[2], ruler_y);
loc_z = self.abatemp_get_cell(node[3], ruler_z);
if grid[loc_x][loc_y][loc_z] is 0:
grid[loc_x][loc_y][loc_z] = [[node[1], node[2], node[3], node[0]]];
else:
grid[loc_x][loc_y][loc_z].append([node[1], node[2], node[3], node[0]]);
for entry in data:
loc_x = self.abatemp_get_cell(entry[0], ruler_x);
loc_y = self.abatemp_get_cell(entry[1], ruler_y);
loc_z = self.abatemp_get_cell(entry[2], ruler_z);
if grid[loc_x][loc_y][loc_z] is 0:
grid[loc_x][loc_y][loc_z] = [[entry[0], entry[1], entry[2], entry[3]]];
else:
grid[loc_x][loc_y][loc_z].append([entry[0], entry[1], entry[2], entry[3]]);
out = [];
outfile = open(job[0] + '/' + output, 'wb');
for node in nodes:
neighbours = [];
radius = -1;
loc_nx = self.abatemp_get_cell(node[1], ruler_x);
loc_ny = self.abatemp_get_cell(node[2], ruler_y);
loc_nz = self.abatemp_get_cell(node[3], ruler_z);
reloop = True;
while reloop:
if neighbours:
reloop = False;
radius += 1;
start_x = 0 if ((loc_nx - radius) < 0) else (loc_nx - radius);
start_y = 0 if ((loc_ny - radius) < 0) else (loc_ny - radius);
start_z = 0 if ((loc_nz - radius) < 0) else (loc_nz - radius);
end_x = (len(ruler_x) - 1) if ((loc_nx + radius + 1) > (len(ruler_x) - 1)) else (loc_nx + radius + 1);
end_y = (len(ruler_y) - 1) if ((loc_ny + radius + 1) > (len(ruler_y) - 1)) else (loc_ny + radius + 1);
end_z = (len(ruler_z) - 1) if ((loc_nz + radius + 1) > (len(ruler_z) - 1)) else (loc_nz + radius + 1);
for i in range(start_x, end_x):
for j in range(start_y, end_y):
for k in range(start_z, end_z):
if not grid[i][j][k] is 0:
for grid_entry in grid[i][j][k]:
if not isinstance(grid_entry[3], basestring):
neighbours.append(grid_entry);
dists = [];
for n in neighbours:
d = math.sqrt((node[1] - n[0])**2 + (node[2] - n[1])**2 + (node[3] - n[2])**2);
dists.append([d, n[3]]);
dists = sorted(dists);
outfile.write(node[0] + ', ' + str(dists[0][-1]) + 'n');
outfile.close();
功能以获取一个点的位置:
def abatemp_get_cell(self, n, ruler):
for i in range(len(ruler)):
if i >= len(ruler):
return False;
if ruler[i] <= n <= ruler[i + 1]:
return i;
gridcount 变量使一个机会固定过程,用一个小的 gridcount 将点分解为网格的过程非常快,但是列表的列表搜索循环中的邻居变得更大,并且该过程的这一部分需要更多的时间。一开始需要大的 gridcount 需要更多的时间,但是循环运行速度更快。
我现在唯一的问题是事实是,在某些情况下,该过程找到了邻居,但是还有其他要点尚未找到,但尚未发现该点(请参阅图片(。到目前为止,我通过将搜索半径递增了已经存在Neigbours的时间来解决此问题。那时,我仍然有更接近的点,但在邻居名单中没有,尽管这只是很少的数量(在约100,000个中,有92个(。找到邻居后,我可以通过两次增加半径来解决这个问题,但是这个解决方案似乎不是很聪明。也许你们有一个主意...
这是该过程的第一个工作草案,我认为可以进一步改进它,只是为了让您了解它的工作原理...
这花了我一些思考,但最后我想我为您找到了一个解决方案。您的问题不在您编写的代码中,而是在其实施的算法中。有一种算法称为Dijkstra的算法,这是其中的要点:https://en.wikipedia.org/wiki/dijkstra'S_ALGORITHM。
现在您需要做的是以一种巧妙的方式使用此算法:
创建一个节点S(源代表源(。现在,链接从s到B组的所有节点。完成后,您应该将B中的每个点B的边缘链接到A中的每个点A。您应该将链接的成本从源源设置为0,而另一个链接的成本为2点之间的距离(仅在3D中(。现在,如果我们使用Dijkstra的算法,我们将获得的输出将是图表中从s到每个点的成本(我们只对A组中的点到点的距离感兴趣(。因此,由于b和s中的每个点的成本为0,仅与B中的点相连线(。
我不确定这是否会固定您的代码,但据我所知,在不计算所有距离的情况下解决此问题的一种方法,并且该算法是人们希望的最佳时间复杂性。
看这个通用3D数据结构:
https://github.com/m4nh/skimap_ros
它具有非常快的radiussearch功能,就可以使用。此解决方案(类似于OCTREE,但更快(避免了您首先创建常规网格(您不必沿每个轴固定最大/分钟尺寸(,并且可以节省大量内存