我正在尝试使用concurrent.futures
实现一些并行作业。每个worker需要一个TensorFlow模型的副本和一些数据。我以以下方式实现它(MWE)
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model, seq):
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,100).reshape(num_of_seq, -1)
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
model_list = [clone_model(model) for _ in range(num_of_seq)]
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
out = worker(model, num_of_seq=4)
print(out)
simple_model()
创建模型。clone_model
克隆TensorFlow模型。work
表示可能做功的MWE。worker
并行分配work
。
这是不工作,它只是卡住,不产生任何输出。但是,如果我用ThreadPoolExecutor
替换ProcessPoolExecutor
,上面的代码就可以工作了。但是不提供任何加速(可能是它没有并行运行工人)。
根据我的理解,错误在于的参数model
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
.
我修改了代码,这样它将模型的路径而不是模型本身发送给子进程。
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if len(gpus) > 0:
# print(f'GPUs {gpus}')
# try: tf.config.experimental.set_memory_growth(gpus[0], True)
# except RuntimeError: pass
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model_path, seq):
# model = clone_model(model)# model_list[model_id]
# print(model)
# import tensorflow as tf
model = tf.keras.models.load_model(model_path)
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
model_savepath = './simple_model.h5'
model.save(model_savepath)
path_list = [model_savepath for _ in range(num_of_seq)]
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
# model_list = [clone_model(model) for _ in range(num_of_seq)]
index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
# print(model_list)
future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
num_of_seq = 400
# model_list = [clone_model(model) for _ in range(4)]
out = worker(model, num_of_seq=num_of_seq)
print(out)