如何使用Spark查找10亿条记录的最近邻居



给定10亿条包含以下信息的记录:

    ID  x1  x2  x3  ... x100
    1   0.1  0.12  1.3  ... -2.00
    2   -1   1.2    2   ... 3
    ...

对于上面的每个ID,我想根据它们的向量(x1,x2,…,x100)的欧几里得距离找到前10个最接近的ID。

计算这个的最佳方法是什么?

碰巧,我有一个解决方案,包括将sklearn与Spark相结合:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/

其要点是:

  • 集中使用sklearn的k-NN fit()方法
  • 然后分布式使用sklearn的k-NN kneighbors()方法

对所有记录进行暴力比较是一场失败的战斗。我的建议是采用k最近邻算法的现成实现,例如scikit-learn提供的算法,然后广播索引和距离的结果数组,然后再进一步。

这种情况下的步骤是:

1-按照Bryce的建议对特性进行矢量化,并让矢量化方法返回一个浮动列表(或numpy数组),其中包含与特性一样多的元素

2-将您的scikit学习nn与您的数据相匹配:

nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)

3-在矢量化数据上运行经过训练的算法(在您的情况下,训练和查询数据相同)

distances, indices = nbrs.kneighbors(qpa)

步骤2和3将在pyspark节点上运行,在这种情况下是不可并行的。您需要在此节点上有足够的内存。在我有150万张唱片和4个功能的情况下,它花了一两秒钟的时间。

在我们为spark很好地实现NN之前,我想我们必须坚持这些变通方法。如果你想尝试一些新的东西,那就去吧http://spark-packages.org/package/saurfang/spark-knn

您没有提供太多细节,但我对这个问题的一般方法是:

  1. 将记录转换为类似LabeledPoint的数据结构,其中(ID,x1..x100)作为标签和特征
  2. 映射每个记录,并将该记录与所有其他记录进行比较(此处有很大的优化空间)
  3. 创建一些截止逻辑,这样一旦您开始比较ID=5和ID=1,就会中断计算,因为您已经比较了ID=1和ID=5
  4. 一些简化步骤可以得到类似{id_pair: [1,5], distance: 123}的数据结构
  5. 另一个查找每个记录的10个最近邻居的映射步骤

您已经确定了pyspark,我通常使用scala来完成这类工作,但每个步骤的一些伪代码可能看起来像:

# 1. vectorize the features
def vectorize_raw_data(record)
    arr_of_features = record[1..99]
    LabeledPoint( record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = [] 
def calc_distance(record, comparison)
    # here you want to keep a broadcast variable with a list or dictionary of
    # already compared IDs and break if the key pair already exists
    # then, calc the euclidean distance by mapping over the features of
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum
    return {"id_pair" : [1,5], "distance" : 123}    
for record in allRecords:
  for comparison in allRecords:
    broadcast_var.append( calc_distance(record, comparison) )
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
     broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)

伪代码很糟糕,但我认为它传达了意图。当你将所有记录与所有其他记录进行比较时,这里会有很多洗牌和排序。IMHO,您希望将关键点对/距离存储在中心位置(就像更新的广播变量一样,尽管这很危险),以减少您执行的总欧几里得距离计算。

@xenocyon的博客遗漏了很多关于格式和用法的信息,为了更好地理解,我在下面写了一个片段。

df = df.withColumn('vector_list', F.array('x1', 'x2', 'x3', ... , 'x100'))
vectors_collected = df.select(df['x1'],df['x2'], ... , df['x100']).rdd.map(list).collect()
knn = NearestNeighbors(n_neighbors=5).fit(vectors_collected)
broadcast_knn = spark.sparkContext.broadcast(knn)
knn_results = df.select(df['vector_list']).rdd.map(lambda x: broadcast_knn.value.kneighbors(x))

下面的代码用于复制类似于sklearn的距离和索引。

numpy_knn_results = np.array(knn_results.collect())
# k is 5, hence reshape with 5 each row.
distance = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[0::2]
indices = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[1::2]

相关内容

  • 没有找到相关文章

最新更新