了解张量流中的设备分配、并行性(tf.while_loop)和 tf.function



我正在尝试理解 GPU 在张量流中的并行性,因为我需要将其应用于更丑陋的图形。

import tensorflow as tf
from datetime import datetime
with tf.device('/device:GPU:0'):
var = tf.Variable(tf.ones([100000], dtype=tf.dtypes.float32), dtype=tf.dtypes.float32)
@tf.function
def foo():
return tf.while_loop(c, b, [i], parallel_iterations=1000)      #tweak
@tf.function
def b(i):
var.assign(tf.tensor_scatter_nd_update(var, tf.reshape(i, [-1,1]), tf.constant([0], dtype=tf.dtypes.float32)))
return tf.add(i,1)
with tf.device('/device:GPU:0'):
i = tf.constant(0)
c = lambda i: tf.less(i,100000)
start = datetime.today()
with tf.device('/device:GPU:0'):
foo()
print(datetime.today()-start)

在上面的代码中,var 是一个长度为 100000 的张量,其元素如上所示更新。当我将parallel_iterations值从 10、100、1000、10000 更改为 10、10000 时。几乎没有任何时差(均为 9.8 秒),即使明确提到了parallel_iterations变量。

我希望这些在 GPU 上并行发生。我该如何实现它?

首先,请注意,您的tensor_scatter_nd_update只是递增单个索引,因此您只能测量循环本身的开销。

我修改了您的代码以更大的批量大小来完成它。 在 GPU 下的 Colab 中运行,我需要 batch=10000 来隐藏循环延迟。 低于此值的任何值都会测量(或支付)延迟开销。

另外,问题是,var.assign(tensor_scatter_nd_update(...))实际上会阻止tensor_scatter_nd_update制作的额外副本吗? 使用批量大小表明我们确实没有为额外的副本付费,因此似乎可以很好地防止额外的副本。

然而,事实证明,在这种情况下,显然,张量流只是认为迭代是相互依赖的,因此如果你增加循环迭代,它没有任何区别(至少在我的测试中)。 有关 TF 功能的进一步讨论,请参阅此处:https://github.com/tensorflow/tensorflow/issues/1984

只有当它们是独立的(操作)时,它才会并行执行操作。

顺便说一句,任意散射操作在 GPU 上不会非常有效,但如果 TF 认为它们独立,您仍然可能(应该)能够并行执行多个操作。

import tensorflow as tf
from datetime import datetime
size = 1000000
index_count = size
batch = 10000
iterations = 10
with tf.device('/device:GPU:0'):
var = tf.Variable(tf.ones([size], dtype=tf.dtypes.float32), dtype=tf.dtypes.float32)
indexes = tf.Variable(tf.range(index_count, dtype=tf.dtypes.int32), dtype=tf.dtypes.int32)
var2 = tf.Variable(tf.range([index_count], dtype=tf.dtypes.float32), dtype=tf.dtypes.float32)
@tf.function
def foo():
return tf.while_loop(c, b, [i], parallel_iterations = iterations)      #tweak
@tf.function
def b(i):
var.assign(tf.tensor_scatter_nd_update(var, tf.reshape(indexes, [-1,1])[i:i+batch], var2[i:i+batch]))
return tf.add(i, batch)
with tf.device('/device:GPU:0'):
i = tf.constant(0)
c = lambda i: tf.less(i,index_count)
start = datetime.today()
with tf.device('/device:GPU:0'):
foo()
print(datetime.today()-start)

一种技术是使用分布策略和范围:

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
inputs = tf.keras.layers.Input(shape=(1,))
predictions = tf.keras.layers.Dense(1)(inputs)
model = tf.keras.models.Model(inputs=inputs, outputs=predictions)
model.compile(loss='mse',
optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2))

另一种选择是在每个设备上复制操作:

# Replicate your computation on multiple GPUs
c = []
for d in ['/device:GPU:2', '/device:GPU:3']:
with tf.device(d):
a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3])
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2])
c.append(tf.matmul(a, b))
with tf.device('/cpu:0'):
sum = tf.add_n(c)

有关更多详细信息,请参阅本指南

最新更新