所以我通过在附加到EMR集群的jupyter笔记本中玩DMOZ数据集来学习PySpark。我试图实现的过程如下:
- 在PySpark DataFrame中加载一个包含s3公共数据集文件位置的csv文件(~130k行)
- 用一个函数映射DF,该函数检索文件内容(html)并撕裂文本
- 将输出与原始DF合并为新列
- 将加入的DF写入s3(问题:它似乎永远挂起,它不是一个大的工作,输出json应该只有几个gigs)
所有的写入都是在一个叫做run_job()的函数中完成的
我让它在一个有10个m5.8xlarge实例的集群上放置了大约2小时,应该就足够了。除了df.write()之外,所有其他步骤都可以独立执行。我已经测试过了更小的子集,它写入s3没有问题,但当我去做整个文件时,它似乎挂起在&;0/n作业完成。&;
我是PySpark和分布式计算的新手,所以这可能是一个简单的"最佳实践"。我想念的东西。(编辑:也许它在笔记本的配置?我目前没有使用任何魔法来配置spark,我需要吗?)
下面的代码…
import html2text
import boto3
import botocore
import os
import re
import zlib
import gzip
from bs4 import BeautifulSoup as bs
from bs4 import Comment
# from pyspark import SparkContext, SparkConf
# from pyspark.sql import SQLContext, SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, LongType
import logging
def load_index():
input_file='s3://cc-stuff/uploads/DMOZ_bussineses_ccindex.csv'
df = spark.read.option("header",True)
.csv(input_file)
#df = df.select('url_surtkey','warc_filename', 'warc_record_offset', 'warc_record_length','content_charset','content_languages','fetch_time','fetch_status','content_mime_type')
return df
def process_warcs(id_,iterator):
html_textract = html2text.HTML2Text()
html_textract.ignore_links = True
html_textract.ignore_images = True
no_sign_request = botocore.client.Config(signature_version=botocore.UNSIGNED)
s3client = boto3.client('s3', config=no_sign_request)
text = None
s3pattern = re.compile('^s3://([^/]+)/(.+)')
PREFIX = "s3://commoncrawl/"
for row in iterator:
try:
start_byte = int(row['warc_record_offset'])
stop_byte = (start_byte + int(row['warc_record_length']))
s3match = s3pattern.match((PREFIX + row['warc_filename']))
bucketname = s3match.group(1)
path = s3match.group(2)
#print('Bucketname: ',bucketname,'nPath: ',path)
resp = s3client.get_object(Bucket=bucketname, Key=path, Range='bytes={}-{}'.format(start_byte, stop_byte))
content = resp['Body'].read()#.decode()
data = zlib.decompress(content, wbits = zlib.MAX_WBITS | 16).decode('utf-8',errors='ignore')
data = data.split('rnrn',2)[2]
soup = bs(data,'html.parser')
for x in soup.findAll(text=lambda text:isinstance(text, Comment)):
x.extract()
for x in soup.find_all(["head","script","button","form","noscript","style"]):
x.decompose()
text = html_textract.handle(str(soup))
except Exception as e:
pass
yield (id_,text)
def run_job(write_out=True):
df = load_index()
df2 = df.rdd.repartition(200).mapPartitionsWithIndex(process_warcs).toDF()
df2 = df2.withColumnRenamed('_1','idx').withColumnRenamed('_2','page_md')
df = df.join(df2.select('page_md'))
if write_out:
output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML"
df.coalesce(4).write.json(output)
return df
df = run_job(write_out=True)
所以我设法使它工作。我将此归因于以下两个变化之一。我还更改了硬件配置,选择了更多数量的小型实例。天哪,我就是喜欢这样,当我花了一整天的时间在极度困惑的状态中,而我所需要做的只是添加一个"/"到保存位置......
- 我添加了尾部"/"到s3中的输出文件位置
1岁:
output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML"
1新:
output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML/"
- 我删除了"coalesce"在run_job()"函数,我现在有200个输出文件,但它工作,它是超级快(不到1分钟)。
2岁:
df.coalesce(4).write.json(output)
2新:
df.write.mode('overwrite').json(output)