Skip to content

Using the Training Quickstart example🔗

This guide walks you through using and modifying the apheris/quickstart-training repository to build your own federated learning model on the Apheris platform.

Getting Started: Clone the Repository🔗

Before you begin, you'll need to clone the quickstart-training repository to your local machine.

git clone https://github.com/apheris/quickstart-training.git
cd quickstart-training

This will create a local copy of the repository in a directory called quickstart-training and navigate into it. You can now explore the code, modify it for your use case, and test it locally before deploying to the Apheris platform.

Overview🔗

The quickstart-training repository is a boilerplate template for building federated learning models using NVIDIA FLARE on Apheris. It includes:

  • A simple federated logistic regression example
  • Secure Runtime configuration for parameter passing
  • Data download utilities using the Apheris Data Access Layer (DAL)
  • A local simulator for testing before deployment

This guide will show you how to adapt this template for your own machine learning models.

Repository Structure🔗

quickstart-training/
├── src/logistic_regression_quickstart/
│   ├── model/
│   │   ├── executor.py              # Main training executor (YOUR MODEL GOES HERE)
│   │   └── utils/                   # Simulator utilities
│   ├── secure_runtime/
│   │   ├── payload.py               # Job parameters definition
│   │   ├── loaders.py               # FLARE config generation
│   │   └── secure_runtime_service.py # Secure runtime service
│   └── run_simulator.py             # Local testing script
├── tests/                           # Unit tests
├── docs/                            # Documentation (simulator, custom model workflow, etc.)
├── example_data/                    # Example datasets for local testing
├── Dockerfile                       # Container image
└── requirements.txt                 # Python dependencies

Part 1: Understanding Parameter Passing with Secure Runtime🔗

The Secure Runtime translates job parameters into FLARE configuration files. Let's walk through how parameters flow from job submission to your executor.

Step 1: Define Your Job Parameters🔗

The payload defines what parameters your job accepts. Open src/logistic_regression_quickstart/secure_runtime/payload.py:

from typing import Literal
from pydantic import BaseModel, Field

TaskID = Literal["train"]

class LogisticRegressionPayload(BaseModel):
    task_id: TaskID
    cohorts: dict[str, str] = Field(default_factory=dict)
    num_rounds: int = 2

Note

This is the baseline payload in the repository. You'll extend this with your own parameters.

What's happening here:

  • LogisticRegressionPayload is a Pydantic model that validates incoming job parameters
  • task_id specifies which task to run (in this case, only "train")
  • cohorts maps dataset IDs to cohort names
  • num_rounds controls how many federated learning rounds to perform

Step 2: Add Your Own Parameters🔗

Let's say you want to add a learning rate and batch size. Modify the payload:

class LogisticRegressionPayload(BaseModel):
    task_id: TaskID
    cohorts: dict[str, str] = Field(default_factory=dict)
    num_rounds: int = 2

    # Add your new parameters here
    learning_rate: float = 0.01  # Default value 0.01
    batch_size: int = 32  # Default value 32

Key points:

  • Provide sensible default values
  • Use type hints for validation
  • Pydantic will automatically validate these when a job is submitted

Step 3: Pass Parameters to the Executor🔗

Now we need to connect these parameters to the executor. Open src/logistic_regression_quickstart/secure_runtime/loaders.py.

The load_client_config function creates the FLARE configuration for each gateway (client):

def load_client_config(payload: LogisticRegressionPayload) -> dict:
    """
    Load the FLARE client config template and populate with data from the job payload
    """
    return {
        "format_version": 2,
        "executors": [
            {
                "tasks": ["train"],
                "executor": {
                    "path": "logistic_regression_quickstart.model.executor.TrainingExecutor",
                    "args": {"cohorts": payload.cohorts},  # Parameters go here
                },
            }
        ],
        "task_result_filters": [],
        "task_data_filters": [],
        "components": [],
    }

Update the args dictionary to include your new parameters:

def load_client_config(payload: LogisticRegressionPayload) -> dict:
    return {
        "format_version": 2,
        "executors": [
            {
                "tasks": ["train"],
                "executor": {
                    "path": "logistic_regression_quickstart.model.executor.TrainingExecutor",
                    "args": {
                        "cohorts": payload.cohorts,
                        "learning_rate": payload.learning_rate,  # Add this
                        "batch_size": payload.batch_size,        # Add this
                    },
                },
            }
        ],
        # ... rest stays the same
    }

Step 4: Update the Executor Constructor🔗

Finally, update your executor to accept these parameters. Open src/logistic_regression_quickstart/model/executor.py:

class TrainingExecutor(GatewayLogSenderMixin, Executor):

    def __init__(self, cohorts: dict[str, str], learning_rate: float = 0.01, 
                 batch_size: int = 32):
        """
        Arguments are populated from the secure runtime configuration. 

        Args:
            cohorts:  Mapping of dataset IDs to cohort names
            learning_rate: Learning rate for training
            batch_size: Batch size for training
        """
        super().__init__()

        self.cohorts = cohorts
        self.learning_rate = learning_rate
        self.batch_size = batch_size

        self.model = MockLogisticRegression()
        self.dataset_id = None

That's it! The parameter flow is complete:

  1. User submits job with parameters
  2. Secure Runtime validates with LogisticRegressionPayload
  3. load_client_config creates FLARE config
  4. FLARE instantiates TrainingExecutor with parameters

Part 2: Downloading Data Using the DAL🔗

The Apheris Data Access Layer (DAL) provides secure access to datasets. Let's walk through how to download data in your executor.

Understanding the DAL🔗

The DAL provides these key functions from apheris_utils.data:

  • download_dataset(dataset_id, folder) - Download a specific dataset
  • download_all(folder) - Download all available datasets
  • list_dataset_ids() - List all available dataset IDs
  • get_settings() - Get DAL configuration

Step 1: Set Up Data Download Location🔗

In your executor's execute method, determine where to download data:

from pathlib import Path
from apheris_utils.data.primitives import get_settings
from nvflare.apis.fl_constant import FLContextKey

def execute(self, task_name: str, shareable: Shareable, 
            fl_ctx: FLContext, abort_signal: Signal) -> Shareable:

    # Get the app root directory (provided by FLARE)
    app_root = Path(fl_ctx.get_prop(FLContextKey.APP_ROOT))

    # Create a datasets subdirectory
    data_download_root = app_root / "datasets"

    # ...  continue with download

Step 2: Identify Which Dataset to Download🔗

The DAL settings contain information about available datasets:

# Get DAL settings (configured by Apheris platform)
dal_settings = get_settings()

# List all available datasets
available_datasets = list(dal_settings.data.keys())
print(f"Available datasets: {available_datasets}")

# For this simple example, use the first dataset
self.dataset_id = available_datasets[0]

In a real scenario, you might:

  • Use the cohorts parameter to select specific datasets
  • Download multiple datasets
  • Match dataset IDs to specific cohorts

Step 3: Download the Dataset🔗

Now use download_dataset to fetch the data:

from apheris_utils.data import download_dataset

# Download returns a dict:  {dataset_id: Path}
data_dict = download_dataset(self.dataset_id, data_download_root)

# Extract the path
data_path = data_dict.get(self.dataset_id)

if not data_path:
    raise RuntimeError(
        f"Failed to download dataset {self.dataset_id} from DAL."
    )

print(f"Data downloaded to: {data_path}")

Step 4: Create a Helper Method🔗

Let's package this into a reusable method. Add this to your TrainingExecutor class:

def _download_dataset(self, fl_ctx: FLContext) -> Path:
    """
    Download dataset from the Apheris DAL. 

    Returns: 
        Path:  Local path to the downloaded dataset
    """
    # Determine download location
    data_download_root = Path(fl_ctx.get_prop(FLContextKey.APP_ROOT)) / "datasets"

    # Download the dataset
    data_dict = download_dataset(self.dataset_id, data_download_root)

    # Get the path
    data_path = data_dict.get(self.dataset_id)

    if not data_path: 
        raise RuntimeError(
            f"Failed to download dataset {self.dataset_id} from DAL.  "
            "Please check the DAL settings and ensure the dataset is available."
        )

    return data_path

Step 5: Use It in Your Execute Method🔗

Now call this helper in your main execution flow:

@safe_error_catchall_decorator
def execute(self, task_name: str, shareable: Shareable, 
            fl_ctx: FLContext, abort_signal: Signal) -> Shareable:

    # Get dataset ID from DAL settings (take only the first ID)
    dal_settings = get_settings()
    self.dataset_id = next(dal_settings.data.keys())

    # Download the data
    data_path = self._download_dataset(fl_ctx)

    # Log success
    self._send_to_server_log(
        fl_ctx,
        f"Downloaded dataset {self.dataset_id} to {data_path}",
        "INFO"
    )

    # Now you can load and process the data
    # ... (continue to Part 3)

What happens:

  1. FLARE calls execute() with a task
  2. We identify which dataset to use from DAL settings
  3. We download it to a local directory
  4. We get back the path to the downloaded files
  5. We can now load and process the data

Part 3: Adding a Real PyTorch Model🔗

Now let's replace the mock model with a real PyTorch implementation. We'll build a simple linear classifier for binary classification step by step.

Step 1: Define the Model Architecture🔗

Create a simple PyTorch model. Add this to the top of your executor.py file:

import torch
import torch.nn as nn

class SimpleLinearClassifier(nn.Module):
    """
    A simple linear classifier for binary classification. 

    Architecture:  Input → Linear → Sigmoid → Output
    """

    def __init__(self, input_dim: int):
        """
        Args:
            input_dim: Number of input features
        """
        super().__init__()
        self.linear = nn.Linear(input_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """Forward pass through the network"""
        return self.sigmoid(self.linear(x))

What this model does:

  • Takes input features of size input_dim
  • Applies a linear transformation (weights + bias)
  • Applies sigmoid activation to get probability (0-1)
  • Outputs binary classification probability

Step 2: Understanding State Management for Federated Learning🔗

For federated learning, we need to extract and load model parameters. PyTorch provides built-in methods for this via state_dict() and load_state_dict():

Why we need state management:

  • state_dict(): After local training, extract all model parameters to send to the server for aggregation
  • load_state_dict(): Before training, load global parameters received from the server

The SimpleLinearClassifier we defined already has these methods built-in from nn.Module, so no additional code is needed! We'll use them directly in the training code

Step 3: Update Executor Initialization🔗

Replace the mock model with your PyTorch model:

class TrainingExecutor(GatewayLogSenderMixin, Executor):

    def __init__(self, cohorts: dict[str, str], learning_rate: float = 0.01, 
                 batch_size: int = 32, num_epochs: int = 5):
        super().__init__()

        self.cohorts = cohorts
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.num_epochs = num_epochs

        # Model will be initialized once we know input dimensions
        self.model = None
        self.input_dim = None

        self.dataset_id = None

Note

We can't initialize the model yet because we don't know the input dimension until we load the data.

Step 4: Implement Data Loading🔗

Now let's load actual data from the downloaded dataset. Replace the placeholder _load_data_from_dataset:

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

def _load_data_from_dataset(self, dataset_path: Path):
    """
    Load and preprocess data from the dataset.

    This example assumes CSV format with features and a 'target' column.
    Adjust based on your actual data structure.

    Args:
        dataset_path: Path to downloaded dataset directory

    Returns:
        tuple:  (X_tensor, y_tensor) - Features and labels as PyTorch tensors
    """
    # Find CSV files in the dataset
    csv_files = list(dataset_path.rglob("*.csv"))

    if not csv_files: 
        raise FileNotFoundError(f"No CSV files found in {dataset_path}")

    # Load the first CSV file
    df = pd.read_csv(csv_files[0])

    # Separate features and target
    target_column = 'target'  # Change this to match your data

    if target_column not in df.columns:
        raise ValueError(f"Target column '{target_column}' not found in dataset")

    # Extract features (all columns except target)
    X = df.drop(columns=[target_column]).to_numpy().astype(np.float32)

    # Extract target (reshape to column vector)
    y = df[target_column].to_numpy().astype(np.float32).reshape(-1, 1)

    # ... normalize features, etc.

    # Convert to PyTorch tensors
    X_tensor = torch.tensor(X)
    y_tensor = torch.tensor(y)

    return X_tensor, y_tensor

Key steps:

  1. Find CSV files in the downloaded dataset
  2. Load with pandas
  3. Separate features (X) and target (y)
  4. Normalize features with StandardScaler
  5. Convert to PyTorch tensors

Customize this for your data:

  • Change target_column to match your label column name
  • Adjust file loading if you use different formats (Parquet, JSON, etc.)
  • Add custom preprocessing (handling missing values, encoding, etc.)

Step 5: Implement Training Logic🔗

Now implement the actual training. Replace the _train method:

import torch.optim as optim

def _train(self, input_shareable: Shareable, fl_ctx: FLContext, 
           dataset_path: Path) -> Shareable:
    """
    Perform local training on this gateway's data.

    Args:
        input_shareable: Global model parameters from server
        fl_ctx:  FLARE context
        dataset_path: Path to local dataset

    Returns: 
        Shareable: Updated local model parameters
    """
    # Load local data
    X, y = self._load_data_from_dataset(dataset_path)

    # Initialize model on first round (when we know input size)
    if self.model is None:
        self.input_dim = X.shape[1]  # Number of features
        self.model = SimpleLinearClassifier(self.input_dim)

        self._send_to_server_log(
            fl_ctx,
            f"Initialized model with {self.input_dim} input features",
            "INFO"
        )

    # Load global model weights from server (if not first round)
    dxo = from_shareable(input_shareable)
    if dxo.data:   # Check if data exists (not first round)
        self.model.load_state_dict(dxo.data)
        self._send_to_server_log(fl_ctx, "Loaded global model weights", "INFO")

    # ...  (continue below)

Step 6: Training Loop🔗

Continue the _train method with the actual training loop:

    # ... (previous code) ...

    # Set up loss function and optimizer
    criterion = nn.BCELoss()  # Binary Cross Entropy for binary classification
    optimizer = optim.SGD(self.model.parameters(), lr=self.learning_rate)

    # Training loop
    self.model.train()  # Set model to training mode

    for epoch in range(self.num_epochs):
        epoch_loss = 0.0
        num_batches = 0

        # Mini-batch training
        for i in range(0, len(X), self.batch_size):
            # Get batch
            batch_X = X[i:i + self.batch_size]
            batch_y = y[i: i + self.batch_size]

            # Forward pass
            outputs = self.model(batch_X)
            loss = criterion(outputs, batch_y)

            # Backward pass and optimization
            optimizer.zero_grad()  # Clear gradients
            loss.backward()        # Compute gradients
            optimizer.step()       # Update weights

            epoch_loss += loss.item()
            num_batches += 1

        # Log progress
        avg_loss = epoch_loss / num_batches
        self._send_to_server_log(
            fl_ctx,
            f"Epoch {epoch + 1}/{self.num_epochs}, Loss: {avg_loss:.4f}",
            "INFO"
        )

    # ... (continue below)

What's happening:

  • We use Binary Cross Entropy loss (appropriate for binary classification)
  • SGD optimizer updates weights based on gradients
  • We iterate through data in mini-batches
  • We log progress after each epoch

Step 7: Return Updated Parameters🔗

Complete the _train method by returning the updated model:

    # ... (previous code) ...

    # Extract trained model parameters using PyTorch's state_dict
    params = self.model.state_dict()

    self._send_to_server_log(
        fl_ctx,
        f"Training complete. Sending updated model to server.",
        "INFO"
    )

    # Package parameters for sending to server
    output_dxo = DXO(DataKind.WEIGHTS, params)
    output_shareable = output_dxo.to_shareable()
    output_shareable.set_return_code(ReturnCode.OK)

    return output_shareable

What happens next:

  1. Local model parameters are sent to the FLARE server
  2. Server aggregates parameters from all gateways (average, weighted average, etc.)
  3. Next round begins with aggregated global model
  4. Repeat until num_rounds is complete

Step 8: Update Requirements🔗

Add PyTorch and other dependencies to requirements.txt. The repository starts with minimal requirements:

nvflare==2.6.0
apheris-utils[nvflare,simulator]==0.25.0

Add your model-specific dependencies:

nvflare==2.6.0
apheris-utils[nvflare,simulator]==0.25.0
torch>=2.0.0
scikit-learn>=1.0.0
pandas>=1.5.0
numpy>=1.24.0  # numpy is included with nvflare, but explicitly list if you need a specific version

That's it! You now have a complete PyTorch model integrated into the federated learning pipeline.

Part 4: Testing with the Simulator🔗

Before deploying to Apheris, test your model locally using the simulator. The simulator mimics the federated learning environment without requiring real data or infrastructure.

Understanding the Simulator🔗

The simulator:

  • ✅ Runs your model locally on your machine
  • ✅ Uses development data (non-sensitive test data)
  • ✅ Simulates multiple sites/gateways
  • ✅ Tests the complete FLARE workflow
  • ✅ Validates your secure runtime configuration
  • ❌ Does NOT use real sensitive data
  • ❌ Does NOT require cloud infrastructure

Prerequisites🔗

1. Install dependencies:

pip install -r requirements.txt
pip install -r requirements-dev.txt  # If running tests

2. Login to Apheris:

The simulator needs to download development data from Apheris:

apheris login

3. Ensure development datasets exist:

Your datasets must have development data registered (coordinate with your Data Custodian). Verify:

apheris datasets list

Testing without Apheris login

The repository includes example datasets in the example_data/ folder (whas1_gateway-1_org-1 and whas2_gateway-2_org-2) that you can use for local testing without needing Apheris credentials. These demonstrate the expected data structure.

Running the Simulator - Basic Usage🔗

Example 1: Single dataset (single site)🔗

python src/logistic_regression_quickstart/run_simulator.py \
    --dataset-ids my_dataset_id

This simulates a single gateway with one dataset.

Example 2: Multiple datasets (multiple sites)🔗

python src/logistic_regression_quickstart/run_simulator.py \
    --dataset-ids dataset1 dataset2 dataset3

This simulates three gateways, each with their own dataset. Perfect for testing federated aggregation!

Example 3: Custom output location🔗

python src/logistic_regression_quickstart/run_simulator.py \
    --dataset-ids my_dataset_id \
    --simulator-output-path ./my_test_run

By default, output goes to ./simulator_output/.

What Happens During a Run🔗

When you run the simulator:

  1. Setup Phase:

    • Creates job configuration from your secure runtime
    • Downloads development datasets
    • Sets up FLARE server and client workspaces
  2. Execution Phase:

    • FLARE server starts and waits for clients
    • Each simulated site connects and receives initial model
    • Sites train locally on their development data
    • Sites send updated parameters to server
    • Server aggregates parameters
    • Repeat for num_rounds
  3. Completion:

    • Final aggregated model saved
    • Logs written to workspace
    • Exit code returned (0 = success)

Understanding Simulator Output🔗

After running, you'll see a directory structure like this:

simulator_output/
├── jobs/                          # Generated FLARE configs
│   ├── app/config/
│   │   ├── config_fed_client.json  # Client configuration
│   │   └── config_fed_server.json  # Server configuration
│   └── meta.json                   # Job metadata

└── workspace/                      # FLARE execution workspace
    ├── server/                     # Server side
    │   ├── log.txt                 # Server logs (check here first!)
    │   └── simulate_job/
    │       └── models/
    │           └── server.npy      # FINAL TRAINED MODEL

    └── site-1/                     # Client side (one per dataset)
        ├── log.txt                 # Client logs
        └── simulate_job/
            ├── app_site-1/
            │   ├── config/         # Actual config used
            │   └── datasets/       # Downloaded development data
            └── ... 

Important files:

  • workspace/server/simulate_job/models/server.npy - Your trained model!
  • workspace/server/log.txt - Server logs (aggregation, workflow)
  • workspace/site-1/log.txt - Client logs (training, errors)
  • jobs/app/config/ - Generated configs (debug parameter passing)

Success Indicators🔗

Successful run:

INFO: SimulatorRunner:return_code from process. exitcode: 0
Simulator run completed successfully.  Workspace output stored in /path/to/simulator_output/workspace

Failed run:

ERROR: ...  (error message)
RuntimeError:  Simulator run failed.  See logs in /path/to/simulator_output/workspace/simulate_job/log.txt

Debugging Failed Runs🔗

Step 1: Check server logs🔗

cat simulator_output/workspace/server/log.txt | grep ERROR

Look for:

  • Configuration errors
  • Component initialization failures
  • Aggregation issues

Step 2: Check client logs🔗

cat simulator_output/workspace/site-1/log.txt | grep ERROR

Look for:

  • Data loading errors
  • Training failures
  • Parameter passing issues

Step 3: Verify configurations🔗

cat simulator_output/jobs/app/config/config_fed_client.json

Check:

  • Is your executor path correct?
  • Are parameters in the args section?
  • Do parameter names match your executor?

Step 4: Inspect downloaded data🔗

ls -la simulator_output/workspace/site-1/simulate_job/app_site-1/datasets/
  • Is development data present?
  • Does it have the expected structure?
  • Can you manually load it?

Step 5: Test data loading separately🔗

Create a test script:

from pathlib import Path
import pandas as pd

dataset_path = Path("simulator_output/workspace/site-1/simulate_job/app_site-1/datasets/my_dataset")

# Try to load
csv_files = list(dataset_path.rglob("*.csv"))
print(f"Found {len(csv_files)} CSV files")

if csv_files:
    df = pd.read_csv(csv_files[0])
    print(df.head())
    print(df.columns)

Examining the Trained Model🔗

Load and inspect your trained model:

import numpy as np

# Load the model
weights = np.load(
    'simulator_output/workspace/server/simulate_job/models/server.npy',
    allow_pickle=True
).item()

print("Model weights:")
print(weights)

# For our linear classifier
print(f"Weight shape: {weights['linear.weight'].shape}")
print(f"Bias shape: {weights['linear.bias'].shape}")

Common Issues and Solutions🔗

Issue: "No datasets found"🔗

Solution: Ensure you're logged in (apheris login) and datasets have development data registered

Issue: "Dataset column 'target' not found"🔗

Solution: Update your _load_data_from_dataset() to use the correct column name

Issue: "Model shape mismatch"🔗

Solution: Check input dimensions. Print X.shape in _load_data_from_dataset()

Issue: "Loss is NaN"🔗

Solution: Check learning rate (too high? ), normalize inputs, check for missing/inf values

Tips for Effective Testing🔗

  1. Start with one dataset - Debug issues before scaling
  2. Use small num_rounds - Quick iterations during development
  3. Add logging - Use self._send_to_server_log() liberally
  4. Test data loading separately - Write unit tests for _load_data_from_dataset()
  5. Verify configs - Check generated configs match expectations
  6. Clean between runs - rm -rf simulator_output for fresh start

Part 5: Deploying to Apheris🔗

Once your model works in the simulator, you're ready to deploy to the Apheris platform.

Step 1: Commit Your Code🔗

First, commit all your changes to Git:

# Stage all changes
git add .

# Commit with a descriptive message
git commit -m "Implement PyTorch linear classifier with custom parameters"

# Push to remote
git push origin main

Important

Note the commit hash from the output:

[main abc1234] Implement PyTorch linear classifier with custom parameters

The commit hash is abc1234... (full hash will be longer). You'll need this later.

Tip

You can also get it with:

git rev-parse HEAD

Step 2: Build Docker Image🔗

Build the Docker container that will run on Apheris infrastructure:

docker build -t quay.io/apheris/my-model:0.1.0 . 

Important

For Mac users:

Mac uses ARM architecture, but Apheris uses x86. Cross-compile with:

docker build --platform="linux/amd64" \
    -t quay.io/apheris/my-model:0.1.0 . 

Naming convention:

  • quay.io/apheris/ - Apheris Quay registry
  • my-model - Your model name (must match registry name)
  • 0.1.0 - Semantic version (see below)

Version numbering (Semantic Versioning):

Valid versions:

  • 0.1.0 - Major.Minor.Patch
  • 1.2.3 - Standard release
  • 1.0.0-alpha.1 - Pre-release
  • 2.1.0-beta.5 - Beta version

Invalid versions:

  • new-model-0.1.0 - No prefix allowed
  • v1.0.0 - No 'v' prefix
  • latest - Must be specific version

Verify the build:

docker images | grep my-model

You should see your newly built image.

Step 3: Push to Quay Registry🔗

Push your image to the Apheris Quay registry:

docker push quay.io/apheris/my-model:0.1.0

Sample output:

The push refers to repository [quay.io/apheris/my-model]
5f70bf18a086: Pushed
... 
0.1.0: digest: sha256:abc123def456...  size: 2417

Important

Note the digest value sha256:abc123def456... - you'll need this in the next step!

Troubleshooting:

If push fails with authentication error:

# Login to Quay
docker login quay.io

# Enter your Apheris Quay credentials when prompted

Step 4: Register Model Version in Apheris🔗

Now register this version in the Apheris Model Registry:

apheris models add-version \
    --version 0.1.0 \
    --engine nvflare:2.5.2 \
    --digest sha256:abc123def456...  \
    --commit-hash abc1234567890...  \
    my-model-id

Parameter breakdown:

  • --version 0.1.0 - Must match your Docker tag
  • --engine nvflare:2.5.2 - FLARE version (must match requirements.txt)
  • --digest sha256:... - From docker push output (include sha256: prefix!)
  • --commit-hash abc123... - Git commit hash from Step 1
  • my-model-id - Your model's ID in the Apheris registry

Finding your model ID:

apheris models list-versions my-model-id | grep 0.1.0

You should see your version details including digest and commit hash.

Step 5: Create and Run a Job🔗

Now you can run federated training jobs!

Submit the job:

apheris job schedule \
  --dataset_ids dataset-1,dataset-2 \
  --model_id my-model-id \
  --model_version 0.1.0 \
  --payload '{"task_id": "train", "cohorts": {"dataset-1": "cohort-a", "dataset-2": "cohort-b"}, "num_rounds": 10, "learning_rate": 0.01, "batch_size": 32, "num_epochs": 5}'

Payload structure:

The --payload parameter is a JSON string containing your job parameters:

  • task_id: Which task to run (e.g., "train")
  • cohorts: Maps dataset IDs to cohort names
  • num_rounds: Number of federated learning rounds
  • learning_rate, batch_size, num_epochs: Your custom parameters

Monitor the job:

# List all jobs
apheris job list

# Get job status
apheris job status

# View job logs
apheris job logs

Download results:

apheris job download-results ./results/

The trained model will be in results/models/server.npy.

Complete Workflow Summary🔗

1. Code changes → git commit
2. Build Docker image → docker build
3. Push to registry → docker push (note digest)
4. Register version → apheris models add-version
5. Submit job → apheris job schedule
6. Monitor → apheris job status/logs
7. Download results → apheris job download-results

Updating Your Model🔗

When you make changes:

# Commit changes
git commit -m "Improve model architecture"
git push

# Build with NEW version
docker build --platform="linux/amd64" \
    -t quay.io/apheris/my-model:0.2.0 . 

# Push
docker push quay.io/apheris/my-model:0.2.0

# Register new version
apheris models add-version \
    --version 0.2.0 \
    --engine nvflare:2.5.2 \
    --digest sha256:new_digest...  \
    --commit-hash new_commit...  \
    my-model-id

Quick Reference🔗

Checklist: Adapting This Template🔗

When creating your own model, work through these steps:

  • [ ] Define parameters - Update payload.py with job parameters
  • [ ] Pass parameters - Update loaders.py to pass to executor
  • [ ] Update executor - Add parameters to __init__
  • [ ] Implement data loading - Customize _load_data_from_dataset()
  • [ ] Replace model - Create your PyTorch/TensorFlow/sklearn model
  • [ ] Implement training - Write training logic in _train()
  • [ ] Update dependencies - Add to requirements.txt
  • [ ] Test with simulator - Run locally with development data
  • [ ] Debug issues - Check logs and configs
  • [ ] Commit code - Push to Git (note commit hash)
  • [ ] Build Docker image - With proper platform flag
  • [ ] Push to Quay - Note the digest
  • [ ] Register version - Add to model registry
  • [ ] Run job - Submit federated training job

Key Files to Modify🔗

File Purpose What to Change
secure_runtime/payload.py Job parameters Add your parameters
secure_runtime/loaders.py Config generation Pass parameters to executor
model/executor.py Training logic Your model and training code
requirements.txt Dependencies Your ML libraries

Common Commands🔗

# Local testing
python src/logistic_regression_quickstart/run_simulator.py --dataset-ids <id>

# Build and deploy
docker build --platform="linux/amd64" -t quay.io/apheris/my-model:0.1.0 .
docker push quay.io/apheris/my-model:0.1.0
apheris models add-version --version 0.1.0 --engine nvflare:2.5.2 \
    --digest <digest> --commit-hash <hash> my-model-id

# Job management
apheris job schedule --dataset_ids <ids> --model_id <id> --model_version <version> --payload '<json>'
apheris job list
apheris job status
apheris job logs
apheris job download-results <output-path>

Getting Help🔗

You're now ready to build federated learning models on Apheris!