PySpark speed Ubuntu vs Windows



我有一个PySpark的示例作业,它是PageRank算法的一个版本。代码如下:

from __future__ import print_function
from operator import add
import timeit
from pyspark.sql import SparkSession
# Normalize a list of pairs(url, rank) to 1
def normalize(ranks):
    norm = sum([rank for u, rank in ranks])
    ranks = [(u, rank / norm) for (u, rank) in ranks ]
    return sorted(ranks, key=lambda x: x[1], reverse=True)
def pagerank_2(edgeList, n, niter):
    # Loads all URLs from input file and initialize their neighbors.
    m = edgeList.groupByKey().cache()
    s = 0.85
    # Loads all URLs with other URL(s) link to from input file 
    # and initialize ranks of them to one.
    q = spark.sparkContext.range(n).map(lambda x: (x, 1.0)).cache()
    r = spark.sparkContext.range(n).map(lambda x: (x, 0.0)).cache()
    # Calculates and updates URL ranks continuously 
    # using PageRank algorithm.
    for iteration in range(niter):
        # Calculates URL contributions to the rank of other URLs.
        # Add URL ranks based on neighbor contributions.
        # Do not forget to add missing values in q and set to 0.0
        q = q.fullOuterJoin(m)
             .flatMap(lambda x: (x[1][1] and [(u, x[1][0]/len(x[1][1])) for u in x[1][1]]) or [])
             .reduceByKey(add)
             .rightOuterJoin(r)
             .mapValues(lambda x: (x[0] or 0)*s + (1-s))
        print("iteration = ", iteration)
    # Collects all URL ranks and dump them to console after normalization
    ranks = normalize(q.collect())
    print(ranks[0:10])

if __name__ == "__main__":
    spark = SparkSession
            .builder
            .master('local[*]')
            .appName("SparkPageRank")
            .config('spark.driver.allowMultipleContexts', 'true')
            .config('spark.sql.warehouse.dir', 'file:///C:/Home/Org/BigData/python/BE4/') 
            .config('spark.sql.shuffle.partitions', '10')
            .getOrCreate()
    spark.sparkContext.setLogLevel('WARN')
    g = [(0, 1), (0, 5), (1, 2), (1, 3), (2, 3),
         (2, 4), (2, 5), (3, 0), (5, 0), (5, 2)]
    n = 6
    edgeList = spark.sparkContext.parallelize(g)
    print(timeit.timeit('pagerank_2(edgeList, 6, 10)', number=1, globals=globals()))

节点编号从 0 到 n-1。edgeList 参数是一个 RDD,其中包含节点对(也称为边)的列表。

我在本地模式下在Windows 10(Anaconda,Spark 2.1.0,winutils)上运行它。这项工作被分配为 2896 个任务,这些任务都非常轻。

我的问题是运行时间。使用上面的例子:

  • 视窗 10:>40mn !
  • Windows Subsystem for Linux (Ubuntu 14.04): 30s

该计算机是笔记本电脑核心i7-4702HQ,16Gb内存,512Gb SSD。在启动过程方面,Windows比Linux慢,但慢50倍? 肯定有办法缩小这种差距吗?

我已经为所有处于危险之中的文件禁用了Windows Defender:java目录,python目录等。关于看什么还有其他想法吗?

感谢您提供任何线索。

也许键是本地的[*],这意味着

在本地运行 Spark,工作线程数与逻辑内核数相同 机器。

尝试使用本地[10]

最新更新