我使用以下代码在独立模式下运行Apache Spark:
from __future__ import division
from pyspark import SparkContext
import sys
sc = SparkContext()
sc.setLogLevel("ERROR")
data = sc.textFile(sys.argv[1])
words = data.flatMap(lambda x: x.split())
word_cant = words.map(lambda x: (x,1))
total = words.count()
cant_by_word = word_cant.reduceByKey(lambda x, y: x + y)
freq = cant_by_word.map(lambda x: (x[0],x[1]/total))
sortedFreq = freq.sortBy(lambda x: x[1], False)
out = freq.takeOrdered(5, lambda s: -1*s)
print('output', out)
此示例的运行时间大于 8 秒,无论输入有多小。我尝试在创建 Spark 后立即停止程序上下文和运行时间约为 5 秒。这是我使用 ptime 来衡量的运行示例:
ptime python freq_words.py sample.txt
ptime 1.0 for Win32, Freeware - http://www.pc-tools.net/
Copyright(C) 2002, Jem Berkes <jberkes@pc-tools.net>
=== python freq_words.py sample.txt ===
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
('output', [(u'tiene', 0.06666666666666667), (u'este', 0.06666666666666667), (u'veces', 0.13333333333333333), (u'la', 0.13333333333333333), (u'texto', 0.13333333333333333)])
Execution time: 9.046 s
C:UsersuserDesktoptest>SUCCESS: The process with PID 5620 (child process of PID 6092) has been terminated.
SUCCESS: The process with PID 6092 (child process of PID 5776) has been terminated.
SUCCESS: The process with PID 5776 (child process of PID 1440) has been terminated.
有没有办法加快速度? 或者也许在第二次运行时重用 Spark 上下文?
Spark 是重量级的处理引擎,它不是为低延迟处理而设计的。恒定的8s ovherhead对于典型的Spark工作来说真的不是问题。
但要回答你的问题:
一种加快速度的方法?
不要使用 MS Windows。Spark,尤其是PySpark,必须使用非常低效的逻辑来解决操作系统的限制。