在使用python多处理模块时,有没有一种方法可以不使用Pickling



我很难弄清楚这一点。我正在尝试创建一个实时卫星跟踪器,并使用天域python模块。它读取TLE数据,然后给出相对于地球的LAT和LON位置。sky-field模块创建了无法腌制的satrec对象(甚至尝试使用dill(。我使用for循环在所有卫星上循环,但这非常慢,所以我想使用池方法的多处理来加快速度,但如上所述,这不起作用,因为多处理使用pickle。有什么方法可以解决这个问题吗?或者有人对使用多处理的其他方法有建议吗?

from skyfield.api import load, wgs84, EarthSatellite
import numpy as np
import pandas as pd
import time
import os 
from pyspark import SparkContext
from multiprocessing import Pool
import dill
data = pd.read_json('tempSatelliteData.json')
print(data.head())
newData = data.filter(['tle_line0', 'tle_line1', 'tle_line2'])
newData.to_csv('test.txt', sep='n', index=False)
stations_file = 'test.txt'
satellites = load.tle_file(stations_file)
ts = load.timescale()
t = ts.now()
#print(satellites)
#data = pd.DataFrame(data=satellites)
#data = data.to_numpy()
def normal_for():
# this for loop takes 9 seconds to comeplete TOO SLOW
ty = time.time()
for satellite in satellites:
geocentric = satellite.at(t)
lat,lon = wgs84.latlon_of(geocentric)
print('Latitude:', lat)
print('Longitude:', lon)
print(np.round_(time.time()-ty,3),'sec')
def sat_lat_lon(satellite):
geocentric = satellite.at(t)
lat,lon = wgs84.latlon_of(geocentric)
p = Pool()
result = p.map(sat_lat_lon, satellites)
p.close()
p.join()

我是dillmultiprocessppftpathos的作者。您可以尝试multiprocess,它使用dill而不是pickle,但如果您说对象不能通过dill序列化,那么这将不起作用。备选方案是multiprocess.dummy,它使用线程,并且不需要像multiprocess中那样的对象序列化(如注释中所建议的(。还有pathos.pools.ParallelPool(或者只使用底层ppft(。。。它将对象转换为源代码,以便跨进程运送它们。还有一些其他代码提供并行映射,但大多数代码都需要某种序列化。如果上面的代码都不起作用,则可能需要更加努力地使对象可序列化。例如,您可以为对象注册序列化函数,该函数可以通知dill如何pickle对象。dilldill.settings中也有序列化变体,使您能够尝试可能有效的不同序列化。有时,只需更改代码构造或导入位置就可以使对象序列化。

如果是序列化的速度,而不是序列化对象的能力。。。那么您可以尝试mpi4py(或pyina来获得MPI映射(。MPI更多地用于重载(昂贵的代码(。然而,如果是大型序列化对象的序列化和运输使您的速度减慢。。。那么使用线程或添加自定义序列化程序可能是您的最佳选择

通过将tle线和时间刻度传递给构造函数,可以直接创建EarthSatellite对象。因此,使用Pool.map()或类似方法将tle线和时间刻度传递给进程,并让它们自己创建satrec对象。

您可能可以直接从json数据中获取tle行,并跳过read_jsonwrite_csv的步骤。但是您没有提供一个示例json文件。

我没有任何样本数据,所以这是未经测试的:

from skyfield.api import load, wgs84, EarthSatellite
import pandas as pd
from multiprocessing import Pool
ts = load.timescale()
t = ts.now()
# load the .json data and convert it to a list of
# lists containing tle data for a satellite
data = pd.read_json('tempSatelliteData.json')
tle_data = [(row.tle_line0, row.tle_line1, row.tle_line2, ts, t)
for row in data.itertuple()]

def sat_lat_lon(line0, line1, line2, ts, t):
satellite = EarthSatellite(line1, line2, line0, ts)
geocentric = satellite.at(t)
lat,lon = wgs84.latlon_of(geocentric)
return satellite.satnum, lat, lon
with Pool() as p:
result = p.starmap(sat_lat_lon, tle_data)
p.close()
p.join()

相关内容