如何将自定义编码器同时应用于多个客户端?如何在runone_round中使用自定义编码器



所以我的目标基本上是实现全局top-k子采样。梯度稀疏化非常简单,我已经在有状态客户端的例子上完成了这个构建,但现在我想使用编码器,正如您在第28页推荐的那样。此外,我只想对非零梯度取平均值,假设我们有10个客户端,但在一轮通信中,在给定位置只有4个具有非零梯度,那么我想将这些梯度的总和除以4,而不是10。我希望通过对分子处的梯度和分母处的掩码(1和0(求和来实现这一点。此外,我将为渐变选择添加随机性,因此我必须在渐变选择的同时创建这些遮罩。我现在的代码是

import tensorflow as tf
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

@te.core.tf_style_adaptive_encoding_stage
class GrandienrSparsificationEncodingStage(te.core.AdaptiveEncodingStageInterface):
"""An example custom implementation of an `EncodingStageInterface`.
Note: This is likely not what one would want to use in practice. Rather, this
serves as an illustration of how a custom compression algorithm can be
provided to `tff`.
This encoding stage is expected to be run in an iterative manner, and
alternatively zeroes out values corresponding to odd and even indices. Given
the determinism of the non-zero indices selection, the encoded structure does
not need to be represented as a sparse vector, but only the non-zero values
are necessary. In the decode mehtod, the state (i.e., params derived from the
state) is used to reconstruct the corresponding indices.
Thus, this example encoding stage can realize representation saving of 2x.
"""
ENCODED_VALUES_KEY = 'stateful_topk_values'
INDICES_KEY = 'indices'
SHAPES_KEY = 'shapes'
ERROR_COMPENSATION_KEY = 'error_compensation'
def encode(self, x, encode_params):
shapes_list = [tf.shape(y) for y in x]
flattened = tf.nest.map_structure(lambda y: tf.reshape(y, [-1]), x)
gradients = tf.concat(flattened, axis=0)
error_compensation = encode_params[self.ERROR_COMPENSATION_KEY]

gradients_and_error_compensation = tf.math.add(gradients, error_compensation)
percentage = tf.constant(0.1, dtype=tf.float32)
k_float = tf.multiply(percentage, tf.cast(tf.size(gradients_and_error_compensation), tf.float32))
k_int = tf.cast(tf.math.round(k_float), dtype=tf.int32)
values, indices = tf.math.top_k(tf.math.abs(gradients_and_error_compensation), k = k_int, sorted = False)
indices = tf.expand_dims(indices, 1)
sparse_gradients_and_error_compensation = tf.scatter_nd(indices, values, tf.shape(gradients_and_error_compensation))
new_error_compensation = tf.math.subtract(gradients_and_error_compensation, sparse_gradients_and_error_compensation)
state_update_tensors = {self.ERROR_COMPENSATION_KEY: new_error_compensation}

encoded_x = {self.ENCODED_VALUES_KEY: values,
self.INDICES_KEY: indices,
self.SHAPES_KEY: shapes_list}
return encoded_x, state_update_tensors
def decode(self,
encoded_tensors,
decode_params,
num_summands=None,
shape=None):
del num_summands, decode_params, shape  # Unused.
flat_shape = tf.math.reduce_sum([tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]])
sizes_list = [tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]]
scatter_tensor = tf.scatter_nd(
indices=encoded_tensors[self.INDICES_KEY],
updates=encoded_tensors[self.ENCODED_VALUES_KEY],
shape=[flat_shape])
nonzero_locations = tf.nest.map_structure(lambda x: tf.cast(tf.where(tf.math.greater(x, 0), 1, 0), tf.float32) , scatter_tensor)
reshaped_tensor = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
zip(tf.split(scatter_tensor, sizes_list), encoded_tensors[self.SHAPES_KEY])]
reshaped_nonzero = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
zip(tf.split(nonzero_locations, sizes_list), encoded_tensors[self.SHAPES_KEY])]
return  reshaped_tensor, reshaped_nonzero

def initial_state(self):
return {self.ERROR_COMPENSATION_KEY: tf.constant(0, dtype=tf.float32)}
def update_state(self, state, state_update_tensors):
return {self.ERROR_COMPENSATION_KEY: state_update_tensors[self.ERROR_COMPENSATION_KEY]}
def get_params(self, state):
encode_params = {self.ERROR_COMPENSATION_KEY: state[self.ERROR_COMPENSATION_KEY]}
decode_params = {}
return encode_params, decode_params
@property
def name(self):
return 'gradient_sparsification_encoding_stage'
@property
def compressible_tensors_keys(self):
return False
@property
def commutes_with_sum(self):
return False
@property
def decode_needs_input_shape(self):
return False
@property
def state_update_aggregation_modes(self):
return {}

我已经按照您在第45页概述的步骤手动运行了一些简单的测试。它有效,但我有一些问题。

  1. 当我使用相同形状的张量列表(例如:2 2x25张量(作为编码的输入x时,它可以毫无问题地工作,但当我尝试使用不同形状的张量(2x20和6x10(列表时,它会给出并错误地说

InvalidArgumentError:所有输入的形状必须匹配:values[0]。shape=[2,20]!=values1.shape=[6,10][操作:包装]名称:包装

如何解决此问题?正如我所说,我想使用全局top-k,所以我必须一次对整个可训练模型权重进行编码。以这里使用的cnn模型为例,所有的张量都有不同的形状。

  1. 如何进行开头所述的平均?例如,您已经完成

mean_factory=tff.aggregators.MeanFactory(tff.aggregators.EncodedSumFactory(mean_encoder_fn(,#numeratortff.aggregators.EncodedSumFactory(mean_encoder_fn(,#分母(

有没有一种方法可以重复这一点,解码的一个输出到分子,另一个到分母?如何处理0除以0?tensorflow有divide_no_nan函数,我可以以某种方式使用它吗?还是需要向每个函数添加eps?

  1. 使用编码器时如何处理分区?每个客户端是否都有一个唯一的编码器,为其保存一个唯一状态?正如您在第6页所讨论的,客户端状态用于跨思洛存储器设置,但如果客户端排序发生变化,会发生什么?

  2. 在这里,您建议使用有状态客户端示例。你能进一步解释一下吗?我的意思是,在run_one_round中,编码器到底去了哪里?它们是如何与客户端更新和聚合结合使用的?

  3. 我有一些额外的信息,比如稀疏性,我想传递给编码。建议的方法是什么?

以下是一些答案,希望能有所帮助:

  1. 如果要将所有聚合结构视为单个张量,请使用concat_factory作为最外层的聚合器。这将在客户端将整个结构连接到一个秩为1的张量,然后在最后解压缩回原始结构。示例用法:tff.aggregators.concat_factory(tff.aggregators.MeanFactory(...))

请注意,编码阶段对象旨在使用单个张量,因此使用相同张量描述的内容可能只是偶然发生的。

  1. 有两个选项。

    a。修改客户端训练代码,使传递给加权聚合器的权重已经是您希望的值(零/一蒙版(。在您链接的有状态客户端示例中,它就在这里。然后,默认情况下(通过对分子求和(,您将获得所需的内容。

    b。修改UnweightedMeanFactory,使其与您描述并使用的平均值完全相同。开始将修改这个

  2. (和4.(我认为这就是你需要执行的。与这里的示例中初始化现有客户端状态的方式相同,您需要扩展它以包含聚合器状态,并确保这些状态与客户端一起采样,就像这里所做的那样。然后,要集成示例中的聚合器,您需要替换这个硬编码的tff.federated_mean。这种集成的一个例子是tff.learning.build_federated_averaging_process的实现,这里主要是

  3. 我不确定问题是什么。也许先做之前的工作(对我来说似乎是一个先决条件(,然后在新的帖子中澄清和提问?

最新更新