我必须将数据作为单个JPG文件(数百万(从PySpark写入S3存储桶。
我尝试了多种选择:
设置:AWS EMR集群和Jupyter笔记本电脑。
- 在"foreach"方法中创建一个boto3客户端并写入S3==>当我们为每个任务打开客户端时,速度太慢,效率太低
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
client = boto3.client('s3')
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreach(get_image)
- 写入本地文件系统并运行;aws S3拷贝";到S3=>不清楚如果将这些数据写入每个工作节点的卷,如何访问这些数据。在作业运行时登录到工作节点,但无法准确找到JPG的写入位置
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
- 写入HDFS,稍后运行s3 dist cp。可能是最高效的,但在代码方面还没有取得成功。
I get path cannot be found exceptions
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "hdfs://" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
有人能提出一个实现这一目标的好方法吗?
如果将foreach
替换为foreachPartition,则解决方案1效果良好。更改后,每个分区只创建一个客户端:
def get_image(y_it):
client = boto3.client('s3')
for y in y_it:
img_url = ...
cid = ...
res = requests.get(img_url, stream=True)
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreachPartition(get_image)
在y_it
上的循环中,相同的客户端被重用。
如果有要求,事情甚至会变得更快。会话用于http调用,如本应答中所述。在这种情况下,在y_it
上的循环外部创建一个http会话(就像客户端一样(,然后在循环中重用。