我是PySpark的初学者,最近我试图提交一个简单的python应用程序(批量调整图片大小)到我的spark集群。我可以通过pycharm成功运行应用程序,当我将应用程序提交给spark时,图像也会调整大小。
这是我的原始Python代码:
import os
from PIL import Image
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
然后我把它转换成一种我认为可以正确提交python应用程序的方式:
import os
from PIL import Image
from pyspark import SparkContext, SparkConf
APP_NAME = "ImageResizer"
def main(sc):
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
print 'done'
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("spark://10.233.70.48:7077")
sc = SparkContext(conf=conf)
main(sc)
然而,我被告知我实际上根本没有使用spark(我也这么认为,但我只是不知道如何)。我想知道如何正确地将我的原始代码转换为Pyspark方式。
有熟悉pyspark的人可以帮助我吗?有没有什么建议,我可以在哪里正确和系统地学习如何编写PySpark应用程序?谢谢你
现在你根本没有使用spark。您只是简单地使用SparkContext作为传递给主函数的变量(然后不做任何事情)。为了使用PySpark,您需要重新考虑您的应用程序。像os.listdir('.')
这样的命令在单个机器上运行良好,但是如果您在计算机集群上运行它,那么.
指向哪个目录呢?提交作业的机器?每台机器上的本地目录?共享网络驱动器?如果您只是在一台机器上运行(用于测试,那就足够了)。您可以通过简单地并行化列表(将其转换为RDD)来开始使用Spark。然后,您可以在RDD上应用map
, filter
和reduce
等操作
s_list = sc.parallelize(os.listdir('.'))
s_jpg_list = s_list.filter(lambda f: f.endswith('.jpg'))
def resize_image(f):
i = Image.open(f)
size_64 = (64,64)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
out_path = 'resize/{}_64'.format(fn, fext)
i.save(out_path)
return out_path
s_jpg_files = s_jpg_list.map(resize_image)
print('Converted Images:', s_jpg_files.collect())
但图像没有调整大小—这与应用程序失败不同。当应用程序提交时,它使用应用程序特定的工作目录。那里不会有任何文件要处理,它不做任何工作就存在。