正在从grpc调用迁移服务于1到2的TensorFlow,RESSOURCE_EXHAUSTD



我们正试图从/tensorflow/servering:1.12迁移到/tensorflow/servering:2.8.2,我们使用grpc调用来获得建议,我们有两种类型的调用,一种用于每个member,grpc调用要求为一个成员提供建议,另一种类型用于我们发送批量member,TF1一切正常,现在使用TF2,第一种类型正常,对于批处理调用,我们在grpc端得到一个错误:

[2022-10-19 16:22:01.454] [      Thread-53] [batch-slave:e4ccd2a7-e0d7-4cc0-9481-b3d98fec3df8] ERROR AgoraRecSysBatchService - Failed while processing : e4ccd2a7-e0d7-4cc0-9481-b3d98fec3df8
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED
at io.grpc.Status.asRuntimeException(Status.java:535)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这里是我们的基础图像

FROM tensorflow/serving:2.8.2
COPY entry.py /usr/bin/entry.py
COPY requirements.txt /requirements.txt
RUN apt update && 
apt install -y python3 python3-pip wget && 
mkdir /bucket && 
chmod +x /usr/bin/entry.py && 
apt autoremove -y && 
rm -rf /var/lib/apt/lists/* && 
pip3 install -r /requirements.txt
RUN wget -P /bin https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.2.0/grpc_health_probe-linux-amd64 && 
mv /bin/grpc_health_probe-linux-amd64 /bin/grpc_health_probe && 
chmod +x /bin/grpc_health_probe
# ENTRYPOINT ["/bin/bash"]
ENTRYPOINT ["/usr/bin/entry.py"]

在生产中,我们为每个请求配置了100个成员的批量大小,并且在一次调用中可以有多达30K的成员。

客户在http->我们在一个pod中获得请求,并创建到tf服务的小批量grpc请求,直到我们获得所有这些请求,然后我们用hole 30k的http响应进行响应。

你知道可能是什么问题吗,特别是来自枯竭的资源,没有任何信息可以作为线索提供帮助,因为我们有足够的吊舱和足够的浓缩物,我们正在使用GKE,吊舱中没有尖峰。

编辑:

我们用来训练tf2:的代码

def train_step(self, batch):
with tf.GradientTape() as tape:
scores = self(batch, training=True)['similarity']
scores = tf.keras.activations.sigmoid(scores)
pos_scores, neg_scores = self.split_pos_neg(scores)
labels = self.get_classification_labels()
loss = self.compiled_loss(labels, scores)
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
self.histogram(pos_scores, neg_scores)
self.compiled_metrics.update_state(labels, scores)
results = {m.name: m.result() for m in self.metrics}
results['pos_neg_score'] = pos_better_neg(pos_scores, neg_scores)
return results

TF1处于渴望模式,TF2处于图形模式,在TF1远程执行中使用TF2时会占用带宽,但您可以使用GradientTape。

当变化不是自己决定的而是积累的时候,我把我所有的价值观都装进了叶片。

TF1会话需要占位符或tf.Variable.

X = tf.compat.v1.placeholder(tf.float32, shape=( (1, 28, 28)))
y = tf.compat.v1.placeholder(tf.float32, shape=(1,))
loss = tf.reduce_mean(input_tensor=tf.square((X - y)))
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate)
training_op = optimizer.minimize(loss)
with tf.compat.v1.Session() as sess:
saver = tf.compat.v1.train.Saver()
train_loss, _ = sess.run([loss, training_op], feed_dict={X:list_image, y:list_label})

GradientTape监视变量并计算累积值。

with tf.GradientTape() as tape:
result = self.model( inputs=tf.constant( content_batch[0], shape=( 1, WIDTH, HEIGHT, CHANNEL ) ) )
result = tf.constant( result, shape=( 2, 1 ) )
predict_label = tf.Variable( tf.constant( self.model.trainable_weights[len(self.model.trainable_weights) - 1], shape=( 2, 1 ) ) )
loss_value =  self.loss( result.numpy(), current_label )
loss_value =  tf.Variable( tf.constant( loss_value, shape=( 1, ) ).numpy() )
tape.watch( loss_value )
gradients = tape.gradient( loss_value, loss_value )
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))

最新更新