我是Tensorflow的新手,编写了以下分布式训练代码。代码运行正常
import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_hub as hub
import tensorflow.python.keras.backend as K
#1. Define Workers
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec, job_name="worker", task_index=i, config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec, job_name="ps", task_index=i, protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, task_id=0, task_type="worker",rpc_layer="grpc")
return cluster_resolver
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
variable_partitioner = (
tf.distribute.experimental.partitioners.FixedShardsPartitioner(
num_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
"Universal Sentence Encoder embeddings also support short paragraphs. "
"There is no hard limit on how long the paragraph is. Roughly, the longer "
"the more 'diluted' the embedding will be.")
messages = [word, sentence, paragraph]
#labels=["1","2","3"]
reviews = [[1,0,0],[0,1,0],[0,0,1]]
encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
X_train=encoder(messages)
BUFFER_SIZE = len(X_train)
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 4
with strategy.scope():
model = keras.Sequential()
model.add(
keras.layers.Dense(
units=256,
input_shape=(X_train.shape[1],),
activation='relu'
)
)
model.add(
keras.layers.Dropout(rate=0.5)
)
model.add(
keras.layers.Dense(
units=128,
activation='relu'
)
)
model.add(
keras.layers.Dropout(rate=0.5)
)
model.add(keras.layers.Dense(3, activation='softmax'))
# model.compile(
# loss='categorical_crossentropy',
# optimizer=keras.optimizers.Adam(0.001),
# metrics=['accuracy']
# )
# history = model.fit(
# np.array(X_train), np.array(reviews),
# epochs=10,
# batch_size=16,
# verbose=1,
# shuffle=True
# )
optimizer=keras.optimizers.Adam(0.001)
accuracy = keras.metrics.Accuracy()
def step_fn(x_train_slice):
x_train, y_train = next(x_train_slice)
with tf.GradientTape() as tape:
pred=model(x_train,training=True)
# tf.print(x_train)
# tf.print(pred)
# tf.print(y_train)
per_example_loss = keras.losses.CategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(y_train, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
tf.print("train values are",x_train)
tf.print(" pred Values are : ", pred)
tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
tf.print(" actual_pred Values are : ", actual_pred)
tf.print(" Labels are : ", y_train)
tf.print(" Labels Max Values are : ", tf.argmax(y_train))
accuracy.update_state(y_train, actual_pred)
tf.print("Accuracy is : ",accuracy.result())
return loss
@tf.function
def distributed_train_step(x_train_slice):
losses = strategy.run(step_fn,args=(x_train_slice,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
@tf.function
def per_worker_dataset_fn():
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, reviews)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
# test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
# test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
return train_dist_dataset
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
num_epoches = 5
steps_per_epoch = 1
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
问题是,在step_fn中,一旦我得到预测值,我想要得到相应的标签,为此我使用了这行代码tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
argmax给出了最大概率的索引数组。我想将其提取为numpy数组并将其索引到评论数组(One-Hot编码值)以获得混淆矩阵。
但是我无法将tf.math.argmax(pred,axis=0)
张量转换为numpy数组。我尝试了许多方法,如eval(K.get_session())等,但没有任何效果。如有任何帮助,不胜感激。
谢谢
好的,我找到了两个解决方案。
你可能应该这样做:
在精度之后添加更多的Keras度量,可以用于计算混淆矩阵:
accuracy = keras.metrics.Accuracy()
tp = keras.metrics.TruePositives()
tn = keras.metrics.TrueNegatives()
fp = keras.metrics.FalsePositives()
fn = keras.metrics.FalseNegatives()
现在在step_fn:
中也更新这些accuracy.update_state(y_train, actual_pred)
argmax_pred = tf.one_hot(tf.math.argmax(pred,axis=1),depth=pred.shape[1])
tp.update_state(y_train, argmax_pred)
tn.update_state(y_train, argmax_pred)
fp.update_state(y_train, argmax_pred)
fn.update_state(y_train, argmax_pred)
现在你可以在访问精度结果的地方访问结果了:
coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
print ("TP=%f TN=%f FP=%f FN=%f" % (tp.result().numpy(),tn.result().numpy(),fp.result().numpy(),fn.result().numpy()))
那应该对你有用。
还有另一种方法:
策略就是一直返回你的argmax值,直到它们回到你的主循环中,它们将作为RemoteValue对象出现,然后取回()它们的值。
例如,在step_fn中,将argmax值发送回调用函数:
return (loss, tf.math.argmax(pred,axis=0))
然后,在distributed_train_step中,调整被返回的元组,并继续返回argmax到下一步,可能像这样:
def distributed_train_step(x_train_slice):
(losses,argmaxes) = strategy.run(step_fn,args=(x_train_slice,))
strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
return argmaxes
注意这里我移动了你的策略。从返回线减少到自己的线。您没有使用返回值,因为您没有协调器的lval。Schedule line,但是现在您可以添加一个来获取返回的argmax:
argmaxes = coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
print ("Back at home, argmaxes=",argmaxes.fetch())
确保使用fetch()命令,因为argmax一旦像这样返回,就会与张量不同。RemoteValue类的文档在这里:https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/coordinator/RemoteValue
您需要通过返回您自己将用于计算TP/FP/TN/FN的任何其他值来扩展此解决方案。