PCGrad tensorflow2优雅实现

发布于 2023-06-23  432 次阅读


>

尽管机器学习和深度学习在各个领域已经取得了显著的成果,但数据效率仍然是一个主要挑战。为了实现更有效的学习,多任务学习已经成为一种非常有前途的方法,它能够在多个任务之间共享结构。然而,多任务设置存在许多优化挑战,使得其效率收益相对于独立学习任务来说较小。迄今为止,为什么多任务学习比单任务学习更具挑战性的原因还没有完全被理解。

梯度投影在解决多任务学习中的优化问题方面具有很大的潜力。它有助于减少梯度冲突,从而提高模型的学习效率和泛化能力。我们的工作对于理解多任务学习的挑战并提供解决方案具有重要意义,为实现更高效的机器学习算法铺平了道路。

为了在tensorflow中实现PCGrad,需要了解model.fit的操作流程,代码逻辑如下。

def train_step(self, data):
    #获取训练样本
    x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data)
    # Run forward pass.
    # 前向传播
    with tf.GradientTape() as tape:
        y_pred = self(x, training=True)
        #如果多输出 多loss的情况 将会按照model.complie里的loss权重加和
        loss = self.compute_loss(x, y, y_pred, sample_weight)
    
    self._validate_target_and_loss(y, loss)
    # Run backwards pass.
    #反向传播计算梯度
    #默认使用model.complie方法中添加的优化器来计算梯度 并且应用梯度
    self.optimizer.minimize(loss, self.trainable_variables, tape=tape)
    #更新metrics
    return self.compute_metrics(x, y, y_pred, sample_weight)

则我们应该修改self.optimizer.minimize行代码,修改如下

#自己实现计算梯度的特殊逻辑 loss为加权求和的最终loss 实际的原始loss列表为loss.op.inputs
grads_and_vars =self.compute_gradients(loss.op.inputs, var_list)
#使用优化器来应用梯度
self.optimizer.apply_gradients(grads_and_vars)
def compute_gradients(self, loss, var_list):
    num_tasks = len(loss)
    loss = tf.stack(loss)
    tf.random.shuffle(loss)

    # Compute per-task gradients.
    grads_task = tf.vectorized_map(lambda x: tf.concat([tf.reshape(grad, [-1, ])
                                                        for grad in tf.gradients(x, var_list)
                                                        if grad is not None], axis=0), loss)

    # Compute gradient projections.
    def proj_grad(grad_task):
        for k in range(num_tasks):
            inner_product = tf.reduce_sum(grad_task * grads_task[k])
            proj_direction = inner_product / tf.reduce_sum(grads_task[k] * grads_task[k])
            grad_task = grad_task - tf.minimum(proj_direction, 0.) * grads_task[k]
        return grad_task

    proj_grads_flatten = tf.vectorized_map(proj_grad, grads_task)
    proj_grads = []
    for j in range(num_tasks):
        start_idx = 0
        for idx, var in enumerate(var_list):
            grad_shape = var.get_shape()
            flatten_dim = tf.math.reduce_prod([grad_shape.dims[i].value for i in range(len(grad_shape.dims))])
            proj_grad = proj_grads_flatten[j][start_idx:start_idx + flatten_dim]
            proj_grad = tf.reshape(proj_grad, grad_shape)
            if len(proj_grads) < len(var_list):
                proj_grads.append(proj_grad)
            else:
                proj_grads[idx] += proj_grad
            start_idx += flatten_dim
    grads_and_vars = list(zip(proj_grads, var_list))
    return grads_and_vars

这样基于重写Model类的train_step方法优雅的实现操作梯度逻辑,并不会影响各种metric的计算,也不会影响各个优化器的选择,只需要在使用functional api时将

model = Model(inputs=input, outputs=output)

修改为

model = PCGradWarp(inputs=input, outputs=output)

如果要使用subclass则继承类由Model修改为PCGradWarp。

完整代码如下

import tensorflow as tf
from keras import optimizers
from keras.optimizers import optimizer
from keras.engine import data_adapter

class PCGradWarp(tf.keras.Model):
    def compute_gradients(self, loss, var_list):
        num_tasks=len(loss)
        loss = tf.stack(loss)
        tf.random.shuffle(loss)

        # Compute per-task gradients.
        grads_task = tf.vectorized_map(lambda x: tf.concat([tf.reshape(grad, [-1, ])
                                                            for grad in tf.gradients(x, var_list)
                                                            if grad is not None], axis=0), loss)

        # Compute gradient projections.
        def proj_grad(grad_task):
            for k in range(num_tasks):
                inner_product = tf.reduce_sum(grad_task * grads_task[k])
                proj_direction = inner_product / tf.reduce_sum(grads_task[k] * grads_task[k])
                grad_task = grad_task - tf.minimum(proj_direction, 0.) * grads_task[k]
            return grad_task

        proj_grads_flatten = tf.vectorized_map(proj_grad, grads_task)
        proj_grads = []
        for j in range(num_tasks):
            start_idx = 0
            for idx, var in enumerate(var_list):
                grad_shape = var.get_shape()
                flatten_dim = tf.math.reduce_prod([grad_shape.dims[i].value for i in range(len(grad_shape.dims))])
                proj_grad = proj_grads_flatten[j][start_idx:start_idx + flatten_dim]
                proj_grad = tf.reshape(proj_grad, grad_shape)
                if len(proj_grads) < len(var_list):
                    proj_grads.append(proj_grad)
                else:
                    proj_grads[idx] += proj_grad
                start_idx += flatten_dim
        grads_and_vars = list(zip(proj_grads, var_list))
        return grads_and_vars


    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)


    def train_step(self, data):


        x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data)
        # Run forward pass.
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)
            loss = self.compute_loss(x, y, y_pred, sample_weight)

        var_list = self.trainable_variables

        grads_and_vars =self.compute_gradients(loss.op.inputs, var_list)
        self.optimizer.apply_gradients(grads_and_vars)

        return self.compute_metrics(x, y, y_pred, sample_weight)

 

 

 


面向ACG编程