Keras的多工人培训:如何使其在实际集群中工作,而不是在笔记本中



我正试图在一个由3台机器组成的实际集群中,按照此处的文档页面,使用MultiWorkerMirroredStrategy运行Keras多工人示例。

在每个盒子上,Tensorflow 2.4.1已使用以下方法安装:

pip3 install --user tensorflow==2.4.1

TF_CONFIG变量设置如下,在每个盒子上,然后每个盒子反弹:

>> TF_CONFIG - for xx.x.xxx.xx:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 0}}'
>> TF_CONFIG - for yy.y.yyy.yy:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 1}}'
>> TF_CONFIG - for zz.z.zzz.zz:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 2}}'

python代码包含在下面。当我以";python tf_multi_worker_mnist.py"在";酋长;节点,我得到的只是下面的输出。我如何真正获得一些东西来运行代码、训练模型等?

输出:

2021-04-02 18:57:14.168712: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
>>
>> Running the prototype...
>> TF_CONFIG: {'cluster': {'worker': ['xx.x.xxx.xx:2121', 'yy.y.yyy.yy:2121', 'zz.z.zzz.zz:2121']}, 'task': {'type': 'worker', 'index': 0}}
>>
2021-04-02 18:57:19.524282: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.556744: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-04-02 18:57:19.772575: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-04-02 18:57:19.772638: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: ip-10-2-248-96.*******.acme.com
2021-04-02 18:57:19.772660: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: ip-xx-x-xxx-xx.*******.acme.com
2021-04-02 18:57:19.773805: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.80.2
2021-04-02 18:57:19.773865: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.80.2
2021-04-02 18:57:19.773886: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.80.2
2021-04-02 18:57:19.776615: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-04-02 18:57:19.778581: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.784246: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.818028: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> xx.x.xxx.xx:2121, 1 -> yy.y.yyy.yy:2121, 2 -> zz.z.zzz.zz:2121}
2021-04-02 18:57:19.818895: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://xx.x.xxx.xx:2121

代码:

import json
import os
import sys
import time
import numpy as np
import tensorflow as tf

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
if "." not in sys.path:
sys.path.insert(0, ".")

def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset

def build_and_compile_cnn_model():
model = tf.keras.Sequential(
[
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation="relu"),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation="relu"),
tf.keras.layers.Dense(10),
]
)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=["accuracy"],
)
return model

def main(args):
start_time = time.time()
tf_config = json.loads(os.environ["TF_CONFIG"])
print(">>")
print(">> Running the prototype...")
print(">> TF_CONFIG: {}".format(tf_config))
print(">>")
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# per_worker_batch_size = 64
# num_workers = len(tf_config["cluster"]["worker"])
# global_batch_size = per_worker_batch_size * num_workers
# multi_worker_dataset = mnist_dataset(global_batch_size)
# turn on sharding
global_batch_size = 64
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
multi_worker_dataset = mnist_dataset(global_batch_size)
multi_worker_dataset_with_shrd = multi_worker_dataset.with_options(options)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset_with_shrd, epochs=3, steps_per_epoch=70)
elapsed_time = time.time() - start_time
str_elapsed_time = time.strftime("%H : %M : %S", time.gmtime(elapsed_time))
print(">>")
print(">> Prototype run: finished. Duration: {}.".format(str_elapsed_time))
print(">>")

if __name__ == "__main__":
main(sys.argv)

Whew,刚刚发现这个

您需要在每个工作上启动您的程序

在多工人战略文档中。

在Keras的笔记本上也可以浏览一下。

此外,笔记本电脑也不错,但如果你从实际的机器开始,特定于集群的演练也很好。基本上,这将接近我在帖子中写的内容。

相关内容

最新更新