Python多处理池映射-Databricks中的内存问题



我正在Databricks环境中运行一个python组件,该组件创建一组JSON消息,每个JSON消息都用Avro模式编码。编码需要更长的时间(对具有复杂JSON结构的10K消息进行编码需要8分钟(,因此我尝试使用带有池映射功能的多处理。该进程在第一次执行时似乎运行良好,但在随后的运行中,性能会下降,最终会因oom错误而失败。我正在确保在执行结束时发出pool.close((和pool.join((,但不确定它是否真的释放了内存。当我看到Databricks Ganglia UI时,它显示交换内存和CPU利用率在每次运行中都在增加。我还试图减少池的数量(驱动程序节点有8个核心,所以尝试了6个和4个池(,以及maxtasksperchild=1,但仍然没有帮助。我在想我是不是做错了什么。以下是我现在使用的代码。想知道这里的问题是什么。如有任何建议,我们将不胜感激。

from multiprocessing import Pool
import multiprocessing
import json
from avro.io import *
import avro.schema
from avro_json_serializer import AvroJsonSerializer, AvroJsonDeserializer
import pyspark.sql.functions as F
def create_json_avro_encoding(row):
row_dict = row.asDict(True)
json_data = json.loads(avro_serializer.to_json(row_dict))
#print(f"JSON created { multiprocessing.current_process().name }")
return json_data 
avro_schema = avro.schema.SchemaFromJSONData(avro_schema_dict, avro.schema.Names())
avro_serializer = AvroJsonSerializer(avro_schema)
records = df.collect()
pool_cnt = int(multiprocessing.cpu_count()*0.5)
print(f"No of records: {len(records)}")
print(f"starting timestamp {datetime.now().isoformat(sep=' ')}")
with Pool(pool_cnt, maxtasksperchild=1) as pool:
json_data_ret = pool.map(create_json_avro_encoding, records)
pool.close()
pool.join()

加入之前不应该关闭池。事实上,在with块中使用池时根本不应该关闭池,退出with块时池会自动关闭。

相关内容

  • 没有找到相关文章

最新更新