Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions leakpro/attacks/utils/model_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,16 @@ def __init__(

caller_configs = getattr(handler.configs, caller) if caller is not None else None
self.use_target_model_setup = caller_configs is None
optimizer_name, criterion_name = self._load_model_setup(caller_configs)

# get the bluepring for the model
if self.use_target_model_setup:
self.model_class = handler.target_model_blueprint.__name__
self.model_blueprint = handler.target_model_blueprint
else:
try:
self.model_path = caller_configs.module_path
self.model_class = caller_configs.model_class
except AttributeError as e:
raise ValueError("Model path or class not provided in shadow model config") from e
try:
self.model_blueprint = self._import_model_from_path(self.model_path, self.model_class)
except Exception as e:
raise ValueError(f"Failed to create model blueprint from {self.model_class} in {self.model_path}") from e

# Pick either target config or caller config
setup_config = handler.target_model_metadata if self.use_target_model_setup else caller_configs
# extract the init params
self.init_params = setup_config.init_params
self.optimizer_name = optimizer_name.lower()
self.criterion_name = criterion_name.lower()

# Get optimizer class
optimizer_name = setup_config.optimizer.name
self.optimizer_class = self._get_optimizer_class(optimizer_name)
# copy to only have parameters left
self.optimizer_config = setup_config.optimizer.params

# Get criterion class
criterion_class = setup_config.criterion.name
self.criterion_class = self._get_criterion_class(criterion_class)
# copy to only have parameters left
self.loss_config = setup_config.criterion.params

self.epochs = setup_config.epochs
self.criterion_class = self._get_criterion_class(criterion_name)

# Set the storage paths for objects created by the handler
storage_path = handler.configs.audit.output_dir
Expand All @@ -91,6 +67,41 @@ def __init__(
criterion = self.handler.get_criterion()
self.cache_logits(PytorchModel(self.handler.target_model, criterion), name="target")

def _load_model_setup(self:Self, caller_configs) -> Tuple[str, str]: # noqa: ANN001
"""Load the effective model, optimizer, and criterion setup."""
target_setup = self.handler.target_model_metadata

if self.use_target_model_setup:
self.model_path = self.handler.configs.target.module_path
self.model_class = self.handler.target_model_blueprint.__name__
self.model_blueprint = self.handler.target_model_blueprint
self.init_params = (target_setup.init_params or {}).copy()
self.optimizer_config = (target_setup.optimizer.params or {}).copy()
self.loss_config = (target_setup.criterion.params or {}).copy()
self.epochs = target_setup.epochs
self.batch_size = target_setup.data_loader.params.get("batch_size")
return target_setup.optimizer.name, target_setup.criterion.name

# Allow partial shadow model config by inheriting from the target setup.
self.model_path = caller_configs.module_path or self.handler.configs.target.module_path
self.model_class = caller_configs.model_class or self.handler.configs.target.model_class
try:
self.model_blueprint = self._import_model_from_path(self.model_path, self.model_class)
except Exception as e:
raise ValueError(f"Failed to create model blueprint from {self.model_class} in {self.model_path}") from e

# Inherit defaults from target and apply caller overrides when present.
self.init_params = (target_setup.init_params or {}).copy()
self.init_params.update(caller_configs.init_params or {})
optimizer_cfg = caller_configs.optimizer or target_setup.optimizer
criterion_cfg = caller_configs.criterion or target_setup.criterion
self.optimizer_config = (optimizer_cfg.params or {}).copy()
self.loss_config = (criterion_cfg.params or {}).copy()
self.epochs = caller_configs.epochs if caller_configs.epochs is not None else target_setup.epochs
target_batch_size = target_setup.data_loader.params.get("batch_size")
self.batch_size = caller_configs.batch_size if caller_configs.batch_size is not None else target_batch_size
return optimizer_cfg.name, criterion_cfg.name


def cache_logits(self:Self, model:Union[Module, list[Module]], name:str) -> None:
"""Cache the target model logits."""
Expand Down Expand Up @@ -214,4 +225,3 @@ def _load_metadata(self:Self, metadata_path:str) -> dict:
return joblib.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(f"Metadata at {metadata_path} not found") from e

121 changes: 110 additions & 11 deletions leakpro/attacks/utils/shadow_model_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from leakpro.input_handler.mia_handler import MIAHandler
from leakpro.schemas import ShadowModelTrainingSchema, TrainingOutput
from leakpro.signals.signal_extractor import PytorchModel
from leakpro.utils.import_helper import Self, Tuple
from leakpro.utils.import_helper import Any, Dict, List, Self, Tuple, Union
from leakpro.utils.logger import logger


Expand Down Expand Up @@ -67,7 +67,105 @@ def __init__(self:Self, handler: MIAHandler) -> None: # noqa: PLR0912

self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def _filter(self:Self, data_size:int)->list[int]:
def _freeze_value(self:Self, value: Union[List, Dict, Any]) -> Union[tuple, Any]:
"""Convert nested config values to a stable, comparable structure.

Args:
----
value: Arbitrary config value taken from shadow-model settings or metadata.

Returns:
-------
A recursively normalized representation that can be compared safely
when deciding whether cached shadow models were trained with the
same effective configuration.

"""
if isinstance(value, dict):
return tuple((key, self._freeze_value(val)) for key, val in sorted(value.items()))
if isinstance(value, list):
return tuple(self._freeze_value(item) for item in value)
return value

def _current_training_signature(self:Self, data_size:int, online:bool) -> tuple:
"""Build the effective training signature for the current shadow setup.

Args:
----
data_size (int): Number of samples that will be used to train a
shadow model.
online (bool): Whether the shadow models are created in online mode.

Returns:
-------
tuple: Normalized description of the active shadow-model training
configuration, including architecture, optimizer, criterion,
initialization parameters, batch size, and target-model identity.

"""
return (
data_size,
self.model_class,
self.model_path,
self.target_model_hash,
self.optimizer_name,
self._freeze_value(self.optimizer_config),
self.criterion_name,
self._freeze_value(self.loss_config),
self.epochs,
self.batch_size,
online,
self._freeze_value(self.init_params),
)

def _metadata_training_signature(self:Self, metadata: ShadowModelTrainingSchema) -> tuple:
"""Build a normalized training signature from stored shadow metadata.

Args:
----
metadata (ShadowModelTrainingSchema): Metadata loaded for a cached
shadow model.

Returns:
-------
tuple: Normalized representation of the stored training setup. If
older metadata files do not contain newly added fields, they will
naturally fail to match the current signature and therefore be
retrained instead of being silently reused.

"""
return (
metadata.num_train,
metadata.model_class,
getattr(metadata, "model_module_path", None),
metadata.target_model_hash,
metadata.optimizer.lower(),
self._freeze_value(getattr(metadata, "optimizer_params", None)),
metadata.criterion.lower(),
self._freeze_value(getattr(metadata, "criterion_params", None)),
metadata.epochs,
getattr(metadata, "batch_size", None),
metadata.online,
self._freeze_value(metadata.init_params),
)

def _filter(self:Self, data_size:int, online:bool) -> tuple[list[int], list[int]]:
"""Find cached shadow models compatible with the current configuration.

Args:
----
data_size (int): Number of samples that should be used for each
shadow-model training run.
online (bool): Whether the requested shadow models are for online
or offline use.

Returns:
-------
tuple[list[int], list[int]]: Two lists packed in a tuple-like return value:
all discovered metadata indices and the subset whose stored
signature matches the current effective training configuration.

"""
# Get the metadata for the shadow models
entries = os.listdir(self.storage_path)
pattern = re.compile(rf"^{self.metadata_storage_name}_\d+\.pkl$")
Expand All @@ -76,17 +174,14 @@ def _filter(self:Self, data_size:int)->list[int]:
# Extract the index of the metadata
all_indices = [int(re.search(r"\d+", f).group()) for f in files]

# Setup checks
# TODO: we use the same shadow models for all targets!
filter_checks = [data_size, self.model_class]
expected_signature = self._current_training_signature(data_size, online)

# Filter out indices to only keep the ones that passes the checks
filtered_indices = []
for i in all_indices:
metadata = self._load_shadow_metadata(i)
assert isinstance(metadata, ShadowModelTrainingSchema), "Shadow Model metadata is not of the correct type"
meta_check_values = [metadata.num_train, metadata.model_class]
if all(a == b for a, b in zip(filter_checks, meta_check_values)):
if self._metadata_training_signature(metadata) == expected_signature:
filtered_indices.append(i)

return all_indices, filtered_indices
Expand Down Expand Up @@ -151,7 +246,7 @@ def create_shadow_models(

# Get the size of the dataset
data_size = int(len(shadow_population)*training_fraction)
all_indices, filtered_indices = self._filter(data_size)
all_indices, filtered_indices = self._filter(data_size, online)

# Create a list of indices to use for the new shadow models
n_existing_models = len(filtered_indices)
Expand All @@ -173,7 +268,7 @@ def create_shadow_models(
for i, indx in enumerate(indices_to_use):
# Get dataloader
data_indices = shadow_population[np.where(A[i,:] == 1)]
data_loader = self.handler.get_dataloader(data_indices, params=None)
data_loader = self.handler.get_dataloader(data_indices, params=None, batch_size=self.batch_size)

# Get shadow model blueprint
model, criterion, optimizer = self._get_model_criterion_optimizer()
Expand Down Expand Up @@ -206,13 +301,17 @@ def create_shadow_models(
init_params=self.init_params,
train_indices = data_indices,
num_train = len(data_indices),
optimizer = optimizer.__class__.__name__,
criterion = criterion.__class__.__name__,
optimizer = self.optimizer_name,
optimizer_params = self.optimizer_config,
criterion = self.criterion_name,
criterion_params = self.loss_config,
epochs = self.epochs,
batch_size = data_loader.batch_size,
train_result = training_results.metrics,
test_result = test_result,
online = online,
model_class = self.model_class,
model_module_path = self.model_path,
target_model_hash= self.target_model_hash
)

Expand Down
16 changes: 10 additions & 6 deletions leakpro/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ class TargetConfig(BaseModel):
class ShadowModelConfig(BaseModel):
"""Configuration for the Shadow models."""

model_class: Optional[str] = None
module_path: Optional[str] = None
model_class: Optional[str] = Field(default=None, description="Class name of the shadow model")
module_path: Optional[str] = Field(default=None, description="Path to the shadow model module")
init_params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Model initialization parameters")
optimizer: Optional[OptimizerConfig] = Field(..., description="Optimizer configuration")
criterion: Optional[LossConfig] = Field(..., description="Loss function configuration")
batch_size: Optional[int] = Field(..., ge=1, description="Batch size used during training")
epochs: Optional[int] = Field(..., ge=1, description="Number of training epochs")
optimizer: Optional[OptimizerConfig] = Field(default=None, description="Optimizer configuration")
criterion: Optional[LossConfig] = Field(default=None, description="Loss function configuration")
batch_size: Optional[int] = Field(default=None, ge=1, description="Batch size used during training")
epochs: Optional[int] = Field(default=None, ge=1, description="Number of training epochs")

model_config = ConfigDict(extra="forbid") # Prevent extra fields

Expand Down Expand Up @@ -179,12 +179,16 @@ class ShadowModelTrainingSchema(BaseModel):
train_indices: List[int] = Field(..., description="Indices of training samples")
num_train: int = Field(..., ge=0, description="Number of training samples")
optimizer: str = Field(..., description="Optimizer name")
optimizer_params: Dict[str, Any] = Field(default_factory=dict, description="Optimizer parameters")
criterion: str = Field(..., description="Criterion (loss function) name")
criterion_params: Dict[str, Any] = Field(default_factory=dict, description="Criterion parameters")
epochs: int = Field(..., ge=1, description="Number of training epochs")
batch_size: Optional[int] = Field(default=None, ge=1, description="Batch size used during training")
train_result: EvalOutput = Field(..., description="Evaluation output for the training set")
test_result: EvalOutput = Field(..., description="Evaluation output for the test set")
online: bool = Field(..., description="Online vs. offline training")
model_class: str = Field(..., description="Model class name")
model_module_path: Optional[str] = Field(default=None, description="Path to the model module")
target_model_hash: str = Field(..., description="Hash of target model")

model_config = ConfigDict(extra="forbid") # Prevent extra fields
Expand Down
Loading
Loading