Source code for byzfl.fed_framework.robust_aggregator

from byzfl.aggregators import aggregators
from byzfl.aggregators import preaggregators
import inspect

[docs] class RobustAggregator: """ Initialization Parameters ------------------------- aggregator_info : dict A dictionary specifying the aggregation method and its parameters. - **Keys**: - `"name"`: str Name of the aggregation method (e.g., `"TrMean"`). - `"parameters"`: dict A dictionary of parameters required by the specified aggregation method. pre_agg_list : list, optional (default: []) A list of dictionaries, each specifying a pre-aggregation method and its parameters. - **Keys**: - `"name"`: str Name of the pre-aggregation method (e.g., `"NNM"`). - `"parameters"`: dict A dictionary of parameters required by the specified pre-aggregation method. Methods ------- aggregate_vectors(vectors) Applies the specified pre-aggregation and aggregation methods to the input vectors, returning the aggregated result. Calling the Instance -------------------- Input Parameters ---------------- vectors : numpy.ndarray, torch.Tensor, list of numpy.ndarray, or list of torch.Tensor A collection of input vectors, matrices, or tensors to process. These vectors conceptually correspond to gradients submitted by honest and Byzantine participants during a training iteration. Returns ------- numpy.ndarray or torch.Tensor The aggregated output vector with the same data type as the input. Examples -------- Initialize the `RobustAggregator` with both pre-aggregation and aggregation methods: >>> from byzfl import RobustAggregator >>> # Define pre-aggregation methods >>> pre_aggregators = [ >>> {"name": "Clipping", "parameters": {"c": 2.0}}, >>> {"name": "NNM", "parameters": {"f": 1}}, >>> ] >>> # Define an aggregation method >>> aggregator_info = {"name": "TrMean", "parameters": {"f": 1}} >>> # Create the RobustAggregator instance >>> rob_agg = RobustAggregator(aggregator_info, pre_agg_list=pre_aggregators) Apply the RobustAggregator to various types of input data: Using NumPy arrays: >>> import numpy as np >>> vectors = np.array([[1., 2., 3.], [4., 5., 6.], [7., 8., 9.]]) >>> rob_agg.aggregate_vectors(vectors) array([0.95841302, 1.14416941, 1.3299258]) Using PyTorch tensors: >>> import torch >>> vectors = torch.tensor([[1., 2., 3.], [4., 5., 6.], [7., 8., 9.]]) >>> rob_agg.aggregate_vectors(vectors) tensor([0.9584, 1.1442, 1.3299]) Using a list of NumPy arrays: >>> import numpy as np >>> vectors = [np.array([1., 2., 3.]), np.array([4., 5., 6.]), np.array([7., 8., 9.])] >>> rob_agg.aggregate_vectors(vectors) array([0.95841302, 1.14416941, 1.3299258]) Using a list of PyTorch tensors: >>> import torch >>> vectors = [torch.tensor([1., 2., 3.]), torch.tensor([4., 5., 6.]), torch.tensor([7., 8., 9.])] >>> rob_agg.aggregate_vectors(vectors) tensor([0.9584, 1.1442, 1.3299]) """ def __init__(self, aggregator_info, pre_agg_list=[]): """ Initializes the RobustAggregator with the specified pre-aggregation and aggregation configurations. Parameters ---------- aggregator_info : dict Dictionary specifying the aggregation method and its parameters. pre_agg_list : list, optional List of dictionaries specifying pre-aggregation methods and their parameters. """ # Check for correct types and values in params if not isinstance(aggregator_info, dict): raise TypeError(f"'aggregator_info' must be of type dict, but got {type(aggregator_info).__name__}") if not isinstance(pre_agg_list, list) or not all(isinstance(pre_agg, dict) for pre_agg in pre_agg_list): raise TypeError(f"'pre_agg_list' must be must be a list of dict") # Initialize the RobustAggregator instance self.aggregator = getattr(aggregators, aggregator_info["name"]) signature_agg = inspect.signature(self.aggregator.__init__) agg_parameters = { param.name: aggregator_info["parameters"].get(param.name, param.default) for param in signature_agg.parameters.values() if param.name in aggregator_info["parameters"] } self.aggregator = self.aggregator(**agg_parameters) self.pre_agg_list = [] for pre_agg_info in pre_agg_list: pre_agg = getattr(preaggregators, pre_agg_info["name"]) signature_pre_agg = inspect.signature(pre_agg.__init__) pre_agg_parameters = { param.name: pre_agg_info["parameters"].get(param.name, param.default) for param in signature_pre_agg.parameters.values() if param.name in pre_agg_info["parameters"] } self.pre_agg_list.append(pre_agg(**pre_agg_parameters))
[docs] def aggregate_vectors(self, vectors): """ Applies the configured pre-aggregations and robust aggregation method to the input vectors. Parameters ---------- vectors : numpy.ndarray, torch.Tensor, list of numpy.ndarray, or list of torch.Tensor A collection of input vectors to process. Returns ------- numpy.ndarray or torch.Tensor The aggregated output vector with the same data type as the input. """ for pre_agg in self.pre_agg_list: vectors = pre_agg(vectors) return self.aggregator(vectors)