我在databricks工作,并有一个Pyspark数据框架,我正在转换为熊猫,然后到json行文件,并希望将其上传到Azure容器(ADLS gen2)。文件太大了,我想先压缩一下再上传。
我首先将pyspark数据框转换为pandas。
pandas_df = df.select("*").toPandas()
然后将其转换为换行分隔的json:
json_lines_data = pandas_df.to_json(orient='records', lines=True)
然后用以下函数写入blob存储:
def upload_blob(json_lines_data, connection_string, container_name, blob_name):
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
try:
blob_client.get_blob_properties()
blob_client.delete_blob()
# except if no delete necessary
except:
pass
blob_client.upload_blob(json_lines_data)
这工作得很好,但是每个文件的数据大约是3gb,下载需要很长时间,所以我宁愿压缩文件。这里有人可以帮助如何压缩json行文件,并将其上传到azure容器?我试了很多不同的方法,但都没用。
如果有更好的方法在数据块中做到这一点,我可以改变它。我没有使用databricks编写,因为我需要输出1个文件并控制文件名。有一个方法可以在上传到blob存储之前压缩JSON文件。
下面是将数据转换为JSON和转换为二进制代码(utf-8),最后压缩它的代码。
建议您在上传功能之前添加此代码。
import json
import gzip
def compress_data(data):
# Convert to JSON
json_data = json.dumps(data, indent=2)
# Convert to bytes
encoded = json_data.encode('utf-8')
# Compress
compressed = gzip.compress(encoded)
参考: https://gist.github.com/LouisAmon/4bd79b8ab80d3851601f3f9016300ac4 file-json_to_gzip-py