Spark:如何将单个行作为JPG写入S3/HDFS



我必须将数据作为单个JPG文件(数百万(从PySpark写入S3存储桶。

我尝试了多种选择:

设置:AWS EMR集群和Jupyter笔记本电脑。

  1. 在"foreach"方法中创建一个boto3客户端并写入S3==>当我们为每个任务打开客户端时,速度太慢,效率太低

def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
client = boto3.client('s3')
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)
myRdd.foreach(get_image)

  1. 写入本地文件系统并运行;aws S3拷贝";到S3=>不清楚如果将这些数据写入每个工作节点的卷,如何访问这些数据。在作业运行时登录到工作节点,但无法准确找到JPG的写入位置

def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)

  1. 写入HDFS,稍后运行s3 dist cp。可能是最高效的,但在代码方面还没有取得成功。I get path cannot be found exceptions

def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "hdfs://" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)

有人能提出一个实现这一目标的好方法吗?

如果将foreach替换为foreachPartition,则解决方案1效果良好。更改后,每个分区只创建一个客户端:

def get_image(y_it):
client = boto3.client('s3')
for y in y_it:
img_url = ...
cid = ...
res = requests.get(img_url, stream=True)
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)
myRdd.foreachPartition(get_image)

y_it上的循环中,相同的客户端被重用。

如果有要求,事情甚至会变得更快。会话用于http调用,如本应答中所述。在这种情况下,在y_it上的循环外部创建一个http会话(就像客户端一样(,然后在循环中重用。

最新更新