|
| 1 | +# Copyright 2022 MosaicML Composer authors |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +"""Monitor gradients during training.""" |
| 5 | + |
| 6 | +import torch |
| 7 | + |
| 8 | +from composer.core import Callback, State |
| 9 | +from composer.loggers import Logger |
| 10 | +from composer.utils import dist |
| 11 | +import collections |
| 12 | + |
| 13 | +__all__ = ['LossSpikeIntervention'] |
| 14 | + |
| 15 | + |
| 16 | + |
| 17 | +class MetricSpikeDetector: |
| 18 | + |
| 19 | + def __init__(self, |
| 20 | + window_moving_average=25, |
| 21 | + increase_factor=5, |
| 22 | + increase_lookback=500, |
| 23 | + plateau_min_duration=100, |
| 24 | + end_spike_factor=1.10): |
| 25 | + |
| 26 | + self.window_moving_average=window_moving_average |
| 27 | + self.increase_factor=increase_factor |
| 28 | + self.plateau_min_duration=plateau_min_duration |
| 29 | + self.increase_lookback = increase_lookback |
| 30 | + self.fast_moving_average = collections.deque(maxlen=window_moving_average) |
| 31 | + self.intermediate_data_queue = collections.deque(maxlen=increase_lookback-window_moving_average) |
| 32 | + self.slow_moving_average = collections.deque(maxlen=increase_lookback) |
| 33 | + self.end_spike_factor = end_spike_factor |
| 34 | + self.in_spike = False |
| 35 | + self.mva_before_spike = None |
| 36 | + self.spike_batch_idx_start = None |
| 37 | + |
| 38 | + |
| 39 | + |
| 40 | + def insert_observation(self, obs, batch_idx): |
| 41 | + if len(self.fast_moving_average) >= self.fast_moving_average.maxlen: |
| 42 | + # move the oldest obs out of the fast moving average into the |
| 43 | + # intermediate data queue |
| 44 | + fast_obs = self.fast_moving_average.popleft() |
| 45 | + |
| 46 | + if len(self.intermediate_data_queue) >= self.intermediate_data_queue.maxlen: |
| 47 | + # move data from intermediate quque to slow MCVA queue |
| 48 | + intermediate_obs = self.intermediate_data_queue.popleft() |
| 49 | + self.slow_moving_average.append(intermediate_obs) |
| 50 | + |
| 51 | + self.intermediate_data_queue.append(fast_obs) |
| 52 | + |
| 53 | + self.fast_moving_average.append(obs) |
| 54 | + |
| 55 | + fast_mva = sum(self.fast_moving_average) / len(self.fast_moving_average) |
| 56 | + if not self.in_spike: |
| 57 | + if len(self.slow_moving_average) > self.window_moving_average: |
| 58 | + if self.mva_before_spike is None: |
| 59 | + slow_mva = sum(self.slow_moving_average) / len(self.slow_moving_average) |
| 60 | + else: |
| 61 | + slow_mva = self.mva_before_spike |
| 62 | + |
| 63 | + |
| 64 | + if fast_mva >= self.increase_factor * slow_mva: |
| 65 | + self.in_spike = True |
| 66 | + self.mva_before_spike = slow_mva |
| 67 | + self.spike_batch_idx_start = batch_idx |
| 68 | + else: |
| 69 | + if batch_idx - self.spike_batch_idx_start > self.plateau_min_duration: |
| 70 | + # kill the layer! |
| 71 | + return True |
| 72 | + else: |
| 73 | + if fast_mva <= self.mva_before_spike * self.end_spike_factor: |
| 74 | + self.in_spike = False |
| 75 | + self.spike_batch_idx_start = None |
| 76 | + |
| 77 | + return False |
| 78 | + |
| 79 | + |
| 80 | + |
| 81 | +class LossSpikeIntervention(Callback): |
| 82 | + |
| 83 | + def __init__(self, |
| 84 | + metric = 'l2_norm/moment', |
| 85 | + window_moving_average=25, |
| 86 | + increase_factor=5, |
| 87 | + increase_lookback=500, |
| 88 | + plateau_min_duration=100, |
| 89 | + end_spike_factor=1.10, |
| 90 | + lr_scale=0.0 |
| 91 | + ): |
| 92 | + self.metric = metric |
| 93 | + self.lr_scale = lr_scale |
| 94 | + self.window_moving_average = window_moving_average |
| 95 | + self.increase_factor = increase_factor |
| 96 | + self.increase_lookback = increase_lookback |
| 97 | + self.plateau_min_duration = plateau_min_duration |
| 98 | + self.end_spike_factor = end_spike_factor |
| 99 | + |
| 100 | + self.metric_spike_detectors = {} |
| 101 | + self.frozen_layers = set() |
| 102 | + self.all_layers = set() |
| 103 | + |
| 104 | + def fit_start(self, state: State, logger: Logger) -> None: |
| 105 | + for name, p in state.model.named_parameters(): |
| 106 | + if p.requires_grad: |
| 107 | + self.all_layers.add(name) |
| 108 | + full_metric_name = f"{self.metric}/{name}" |
| 109 | + self.metric_spike_detectors[full_metric_name] = MetricSpikeDetector( |
| 110 | + self.window_moving_average, |
| 111 | + self.increase_factor, |
| 112 | + self.increase_lookback, |
| 113 | + self.plateau_min_duration, |
| 114 | + self.end_spike_factor, |
| 115 | + ) |
| 116 | + |
| 117 | + def batch_end(self, state: State, logger: Logger): |
| 118 | + norm = 0.0 |
| 119 | + optimizer_metrics = {} |
| 120 | + |
| 121 | + for name, p in state.model.named_parameters(): |
| 122 | + if p.grad is not None and p.requires_grad: |
| 123 | + |
| 124 | + metric_reporter = getattr(state.optimizers[0], 'report_per_parameter_metrics', None) |
| 125 | + if callable(metric_reporter): |
| 126 | + optimizer_metrics = metric_reporter(p, name, optimizer_metrics) |
| 127 | + |
| 128 | + if f'l2_norm/grad/{name}' not in optimizer_metrics: |
| 129 | + param_grad_norm = torch.linalg.vector_norm(p.grad) |
| 130 | + optimizer_metrics[f'l2_norm/grad/{name}'] = param_grad_norm |
| 131 | + |
| 132 | + if state.fsdp_enabled and dist.get_world_size() > 0 : |
| 133 | + pre_reduce_metrics = getattr(state.optimizers[0], 'pre_reduce_metrics', None) |
| 134 | + if callable(pre_reduce_metrics): |
| 135 | + optimizer_metrics = pre_reduce_metrics(optimizer_metrics) |
| 136 | + |
| 137 | + dist_reduce_metrics = getattr(state.optimizers[0], 'dist_reduce_metrics', None) |
| 138 | + if callable(dist_reduce_metrics): |
| 139 | + optimizer_metrics = dist_reduce_metrics(optimizer_metrics) |
| 140 | + |
| 141 | + for metric in optimizer_metrics: |
| 142 | + if metric.startswith('l2_norm/grad'): |
| 143 | + norm += optimizer_metrics[metric]**2 |
| 144 | + |
| 145 | + optimizer_metrics['l2_norm/grad/global'] = norm**0.5 |
| 146 | + |
| 147 | + for metric in optimizer_metrics: |
| 148 | + if isinstance(optimizer_metrics[metric], torch.Tensor): |
| 149 | + optimizer_metrics[metric] = optimizer_metrics[metric].item() |
| 150 | + |
| 151 | + batch_idx = state.timestamp.batch.value |
| 152 | + newly_failed_layers = self.detect_failed_layers(optimizer_metrics, batch_idx) |
| 153 | + |
| 154 | + if len(newly_failed_layers) > 0: |
| 155 | + self.freeze_layers(newly_failed_layers, state) |
| 156 | + for optimizer in state.optimizers: |
| 157 | + for group in optimizer.param_groups: |
| 158 | + group['lr'] *= self.lr_scale |
| 159 | + |
| 160 | + for scheduler in state.schedulers: |
| 161 | + scheduler.base_lrs = [self.lr_scale * lr for lr in scheduler.base_lrs] |
| 162 | + |
| 163 | + |
| 164 | + optimizer_metrics['num_frozen_layers'] = len(self.frozen_layers) |
| 165 | + logger.log_metrics(optimizer_metrics) |
| 166 | + |
| 167 | + if len(self.all_layers) == 0: |
| 168 | + state.stop_training() |
| 169 | + |
| 170 | + |
| 171 | + def freeze_layers(self, newly_failed_layers, state): |
| 172 | + for layer in newly_failed_layers: |
| 173 | + self.all_layers.remove(layer) |
| 174 | + if layer not in self.frozen_layers: |
| 175 | + self.frozen_layers.add(layer) |
| 176 | + |
| 177 | + for name, p in state.model.named_parameters(): |
| 178 | + if name in self.frozen_layers: |
| 179 | + p.requires_grad = False |
| 180 | + |
| 181 | + |
| 182 | + def detect_failed_layers(self, optimizer_metrics, batch_idx): |
| 183 | + newly_failed = [] |
| 184 | + for logger_name, value in optimizer_metrics.items(): |
| 185 | + if logger_name.startswith(self.metric): |
| 186 | + layer_name = logger_name.split('/')[-1] |
| 187 | + if layer_name in self.frozen_layers: |
| 188 | + continue |
| 189 | + if self.metric_spike_detectors[logger_name].insert_observation(value, batch_idx): |
| 190 | + newly_failed.append(layer_name) |
| 191 | + |
| 192 | + return newly_failed |
0 commit comments