Using the Training Quickstart example🔗
This guide walks you through using and modifying the apheris/quickstart-training repository to build your own federated learning model on the Apheris platform.
Getting Started: Clone the Repository🔗
Before you begin, you'll need to clone the quickstart-training repository to your local machine.
git clone https://github.com/apheris/quickstart-training.git
cd quickstart-training
This will create a local copy of the repository in a directory called quickstart-training and navigate into it. You can now explore the code, modify it for your use case, and test it locally before deploying to the Apheris platform.
Overview🔗
The quickstart-training repository is a boilerplate template for building federated learning models using NVIDIA FLARE on Apheris. It includes:
- A simple federated logistic regression example
- Secure Runtime configuration for parameter passing
- Data download utilities using the Apheris Data Access Layer (DAL)
- A local simulator for testing before deployment
This guide will show you how to adapt this template for your own machine learning models.
Repository Structure🔗
quickstart-training/
├── src/logistic_regression_quickstart/
│ ├── model/
│ │ ├── executor.py # Main training executor (YOUR MODEL GOES HERE)
│ │ └── utils/ # Simulator utilities
│ ├── secure_runtime/
│ │ ├── payload.py # Job parameters definition
│ │ ├── loaders.py # FLARE config generation
│ │ └── secure_runtime_service.py # Secure runtime service
│ └── run_simulator.py # Local testing script
├── tests/ # Unit tests
├── docs/ # Documentation (simulator, custom model workflow, etc.)
├── example_data/ # Example datasets for local testing
├── Dockerfile # Container image
└── requirements.txt # Python dependencies
Part 1: Understanding Parameter Passing with Secure Runtime🔗
The Secure Runtime translates job parameters into FLARE configuration files. Let's walk through how parameters flow from job submission to your executor.
Step 1: Define Your Job Parameters🔗
The payload defines what parameters your job accepts. Open src/logistic_regression_quickstart/secure_runtime/payload.py:
from typing import Literal
from pydantic import BaseModel, Field
TaskID = Literal["train"]
class LogisticRegressionPayload(BaseModel):
task_id: TaskID
cohorts: dict[str, str] = Field(default_factory=dict)
num_rounds: int = 2
Note
This is the baseline payload in the repository. You'll extend this with your own parameters.
What's happening here:
LogisticRegressionPayloadis a Pydantic model that validates incoming job parameterstask_idspecifies which task to run (in this case, only "train")cohortsmaps dataset IDs to cohort namesnum_roundscontrols how many federated learning rounds to perform
Step 2: Add Your Own Parameters🔗
Let's say you want to add a learning rate and batch size. Modify the payload:
class LogisticRegressionPayload(BaseModel):
task_id: TaskID
cohorts: dict[str, str] = Field(default_factory=dict)
num_rounds: int = 2
# Add your new parameters here
learning_rate: float = 0.01 # Default value 0.01
batch_size: int = 32 # Default value 32
Key points:
- Provide sensible default values
- Use type hints for validation
- Pydantic will automatically validate these when a job is submitted
Step 3: Pass Parameters to the Executor🔗
Now we need to connect these parameters to the executor. Open src/logistic_regression_quickstart/secure_runtime/loaders.py.
The load_client_config function creates the FLARE configuration for each gateway (client):
def load_client_config(payload: LogisticRegressionPayload) -> dict:
"""
Load the FLARE client config template and populate with data from the job payload
"""
return {
"format_version": 2,
"executors": [
{
"tasks": ["train"],
"executor": {
"path": "logistic_regression_quickstart.model.executor.TrainingExecutor",
"args": {"cohorts": payload.cohorts}, # Parameters go here
},
}
],
"task_result_filters": [],
"task_data_filters": [],
"components": [],
}
Update the args dictionary to include your new parameters:
def load_client_config(payload: LogisticRegressionPayload) -> dict:
return {
"format_version": 2,
"executors": [
{
"tasks": ["train"],
"executor": {
"path": "logistic_regression_quickstart.model.executor.TrainingExecutor",
"args": {
"cohorts": payload.cohorts,
"learning_rate": payload.learning_rate, # Add this
"batch_size": payload.batch_size, # Add this
},
},
}
],
# ... rest stays the same
}
Step 4: Update the Executor Constructor🔗
Finally, update your executor to accept these parameters. Open src/logistic_regression_quickstart/model/executor.py:
class TrainingExecutor(GatewayLogSenderMixin, Executor):
def __init__(self, cohorts: dict[str, str], learning_rate: float = 0.01,
batch_size: int = 32):
"""
Arguments are populated from the secure runtime configuration.
Args:
cohorts: Mapping of dataset IDs to cohort names
learning_rate: Learning rate for training
batch_size: Batch size for training
"""
super().__init__()
self.cohorts = cohorts
self.learning_rate = learning_rate
self.batch_size = batch_size
self.model = MockLogisticRegression()
self.dataset_id = None
That's it! The parameter flow is complete:
- User submits job with parameters
- Secure Runtime validates with
LogisticRegressionPayload load_client_configcreates FLARE config- FLARE instantiates
TrainingExecutorwith parameters
Part 2: Downloading Data Using the DAL🔗
The Apheris Data Access Layer (DAL) provides secure access to datasets. Let's walk through how to download data in your executor.
Understanding the DAL🔗
The DAL provides these key functions from apheris_utils.data:
download_dataset(dataset_id, folder)- Download a specific datasetdownload_all(folder)- Download all available datasetslist_dataset_ids()- List all available dataset IDsget_settings()- Get DAL configuration
Step 1: Set Up Data Download Location🔗
In your executor's execute method, determine where to download data:
from pathlib import Path
from apheris_utils.data.primitives import get_settings
from nvflare.apis.fl_constant import FLContextKey
def execute(self, task_name: str, shareable: Shareable,
fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
# Get the app root directory (provided by FLARE)
app_root = Path(fl_ctx.get_prop(FLContextKey.APP_ROOT))
# Create a datasets subdirectory
data_download_root = app_root / "datasets"
# ... continue with download
Step 2: Identify Which Dataset to Download🔗
The DAL settings contain information about available datasets:
# Get DAL settings (configured by Apheris platform)
dal_settings = get_settings()
# List all available datasets
available_datasets = list(dal_settings.data.keys())
print(f"Available datasets: {available_datasets}")
# For this simple example, use the first dataset
self.dataset_id = available_datasets[0]
In a real scenario, you might:
- Use the
cohortsparameter to select specific datasets - Download multiple datasets
- Match dataset IDs to specific cohorts
Step 3: Download the Dataset🔗
Now use download_dataset to fetch the data:
from apheris_utils.data import download_dataset
# Download returns a dict: {dataset_id: Path}
data_dict = download_dataset(self.dataset_id, data_download_root)
# Extract the path
data_path = data_dict.get(self.dataset_id)
if not data_path:
raise RuntimeError(
f"Failed to download dataset {self.dataset_id} from DAL."
)
print(f"Data downloaded to: {data_path}")
Step 4: Create a Helper Method🔗
Let's package this into a reusable method. Add this to your TrainingExecutor class:
def _download_dataset(self, fl_ctx: FLContext) -> Path:
"""
Download dataset from the Apheris DAL.
Returns:
Path: Local path to the downloaded dataset
"""
# Determine download location
data_download_root = Path(fl_ctx.get_prop(FLContextKey.APP_ROOT)) / "datasets"
# Download the dataset
data_dict = download_dataset(self.dataset_id, data_download_root)
# Get the path
data_path = data_dict.get(self.dataset_id)
if not data_path:
raise RuntimeError(
f"Failed to download dataset {self.dataset_id} from DAL. "
"Please check the DAL settings and ensure the dataset is available."
)
return data_path
Step 5: Use It in Your Execute Method🔗
Now call this helper in your main execution flow:
@safe_error_catchall_decorator
def execute(self, task_name: str, shareable: Shareable,
fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
# Get dataset ID from DAL settings (take only the first ID)
dal_settings = get_settings()
self.dataset_id = next(dal_settings.data.keys())
# Download the data
data_path = self._download_dataset(fl_ctx)
# Log success
self._send_to_server_log(
fl_ctx,
f"Downloaded dataset {self.dataset_id} to {data_path}",
"INFO"
)
# Now you can load and process the data
# ... (continue to Part 3)
What happens:
- FLARE calls
execute()with a task - We identify which dataset to use from DAL settings
- We download it to a local directory
- We get back the path to the downloaded files
- We can now load and process the data
Part 3: Adding a Real PyTorch Model🔗
Now let's replace the mock model with a real PyTorch implementation. We'll build a simple linear classifier for binary classification step by step.
Step 1: Define the Model Architecture🔗
Create a simple PyTorch model. Add this to the top of your executor.py file:
import torch
import torch.nn as nn
class SimpleLinearClassifier(nn.Module):
"""
A simple linear classifier for binary classification.
Architecture: Input → Linear → Sigmoid → Output
"""
def __init__(self, input_dim: int):
"""
Args:
input_dim: Number of input features
"""
super().__init__()
self.linear = nn.Linear(input_dim, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
"""Forward pass through the network"""
return self.sigmoid(self.linear(x))
What this model does:
- Takes input features of size
input_dim - Applies a linear transformation (weights + bias)
- Applies sigmoid activation to get probability (0-1)
- Outputs binary classification probability
Step 2: Understanding State Management for Federated Learning🔗
For federated learning, we need to extract and load model parameters. PyTorch provides built-in methods for this via state_dict() and load_state_dict():
Why we need state management:
state_dict(): After local training, extract all model parameters to send to the server for aggregationload_state_dict(): Before training, load global parameters received from the server
The SimpleLinearClassifier we defined already has these methods built-in from nn.Module, so no additional code is needed! We'll use them directly in the training code
Step 3: Update Executor Initialization🔗
Replace the mock model with your PyTorch model:
class TrainingExecutor(GatewayLogSenderMixin, Executor):
def __init__(self, cohorts: dict[str, str], learning_rate: float = 0.01,
batch_size: int = 32, num_epochs: int = 5):
super().__init__()
self.cohorts = cohorts
self.learning_rate = learning_rate
self.batch_size = batch_size
self.num_epochs = num_epochs
# Model will be initialized once we know input dimensions
self.model = None
self.input_dim = None
self.dataset_id = None
Note
We can't initialize the model yet because we don't know the input dimension until we load the data.
Step 4: Implement Data Loading🔗
Now let's load actual data from the downloaded dataset. Replace the placeholder _load_data_from_dataset:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
def _load_data_from_dataset(self, dataset_path: Path):
"""
Load and preprocess data from the dataset.
This example assumes CSV format with features and a 'target' column.
Adjust based on your actual data structure.
Args:
dataset_path: Path to downloaded dataset directory
Returns:
tuple: (X_tensor, y_tensor) - Features and labels as PyTorch tensors
"""
# Find CSV files in the dataset
csv_files = list(dataset_path.rglob("*.csv"))
if not csv_files:
raise FileNotFoundError(f"No CSV files found in {dataset_path}")
# Load the first CSV file
df = pd.read_csv(csv_files[0])
# Separate features and target
target_column = 'target' # Change this to match your data
if target_column not in df.columns:
raise ValueError(f"Target column '{target_column}' not found in dataset")
# Extract features (all columns except target)
X = df.drop(columns=[target_column]).to_numpy().astype(np.float32)
# Extract target (reshape to column vector)
y = df[target_column].to_numpy().astype(np.float32).reshape(-1, 1)
# ... normalize features, etc.
# Convert to PyTorch tensors
X_tensor = torch.tensor(X)
y_tensor = torch.tensor(y)
return X_tensor, y_tensor
Key steps:
- Find CSV files in the downloaded dataset
- Load with pandas
- Separate features (X) and target (y)
- Normalize features with StandardScaler
- Convert to PyTorch tensors
Customize this for your data:
- Change
target_columnto match your label column name - Adjust file loading if you use different formats (Parquet, JSON, etc.)
- Add custom preprocessing (handling missing values, encoding, etc.)
Step 5: Implement Training Logic🔗
Now implement the actual training. Replace the _train method:
import torch.optim as optim
def _train(self, input_shareable: Shareable, fl_ctx: FLContext,
dataset_path: Path) -> Shareable:
"""
Perform local training on this gateway's data.
Args:
input_shareable: Global model parameters from server
fl_ctx: FLARE context
dataset_path: Path to local dataset
Returns:
Shareable: Updated local model parameters
"""
# Load local data
X, y = self._load_data_from_dataset(dataset_path)
# Initialize model on first round (when we know input size)
if self.model is None:
self.input_dim = X.shape[1] # Number of features
self.model = SimpleLinearClassifier(self.input_dim)
self._send_to_server_log(
fl_ctx,
f"Initialized model with {self.input_dim} input features",
"INFO"
)
# Load global model weights from server (if not first round)
dxo = from_shareable(input_shareable)
if dxo.data: # Check if data exists (not first round)
self.model.load_state_dict(dxo.data)
self._send_to_server_log(fl_ctx, "Loaded global model weights", "INFO")
# ... (continue below)
Step 6: Training Loop🔗
Continue the _train method with the actual training loop:
# ... (previous code) ...
# Set up loss function and optimizer
criterion = nn.BCELoss() # Binary Cross Entropy for binary classification
optimizer = optim.SGD(self.model.parameters(), lr=self.learning_rate)
# Training loop
self.model.train() # Set model to training mode
for epoch in range(self.num_epochs):
epoch_loss = 0.0
num_batches = 0
# Mini-batch training
for i in range(0, len(X), self.batch_size):
# Get batch
batch_X = X[i:i + self.batch_size]
batch_y = y[i: i + self.batch_size]
# Forward pass
outputs = self.model(batch_X)
loss = criterion(outputs, batch_y)
# Backward pass and optimization
optimizer.zero_grad() # Clear gradients
loss.backward() # Compute gradients
optimizer.step() # Update weights
epoch_loss += loss.item()
num_batches += 1
# Log progress
avg_loss = epoch_loss / num_batches
self._send_to_server_log(
fl_ctx,
f"Epoch {epoch + 1}/{self.num_epochs}, Loss: {avg_loss:.4f}",
"INFO"
)
# ... (continue below)
What's happening:
- We use Binary Cross Entropy loss (appropriate for binary classification)
- SGD optimizer updates weights based on gradients
- We iterate through data in mini-batches
- We log progress after each epoch
Step 7: Return Updated Parameters🔗
Complete the _train method by returning the updated model:
# ... (previous code) ...
# Extract trained model parameters using PyTorch's state_dict
params = self.model.state_dict()
self._send_to_server_log(
fl_ctx,
f"Training complete. Sending updated model to server.",
"INFO"
)
# Package parameters for sending to server
output_dxo = DXO(DataKind.WEIGHTS, params)
output_shareable = output_dxo.to_shareable()
output_shareable.set_return_code(ReturnCode.OK)
return output_shareable
What happens next:
- Local model parameters are sent to the FLARE server
- Server aggregates parameters from all gateways (average, weighted average, etc.)
- Next round begins with aggregated global model
- Repeat until
num_roundsis complete
Step 8: Update Requirements🔗
Add PyTorch and other dependencies to requirements.txt. The repository starts with minimal requirements:
nvflare==2.6.0
apheris-utils[nvflare,simulator]==0.25.0
Add your model-specific dependencies:
nvflare==2.6.0
apheris-utils[nvflare,simulator]==0.25.0
torch>=2.0.0
scikit-learn>=1.0.0
pandas>=1.5.0
numpy>=1.24.0 # numpy is included with nvflare, but explicitly list if you need a specific version
That's it! You now have a complete PyTorch model integrated into the federated learning pipeline.
Part 4: Testing with the Simulator🔗
Before deploying to Apheris, test your model locally using the simulator. The simulator mimics the federated learning environment without requiring real data or infrastructure.
Understanding the Simulator🔗
The simulator:
- ✅ Runs your model locally on your machine
- ✅ Uses development data (non-sensitive test data)
- ✅ Simulates multiple sites/gateways
- ✅ Tests the complete FLARE workflow
- ✅ Validates your secure runtime configuration
- ❌ Does NOT use real sensitive data
- ❌ Does NOT require cloud infrastructure
Prerequisites🔗
1. Install dependencies:
pip install -r requirements.txt
pip install -r requirements-dev.txt # If running tests
2. Login to Apheris:
The simulator needs to download development data from Apheris:
apheris login
3. Ensure development datasets exist:
Your datasets must have development data registered (coordinate with your Data Custodian). Verify:
apheris datasets list
Testing without Apheris login
The repository includes example datasets in the example_data/ folder (whas1_gateway-1_org-1 and whas2_gateway-2_org-2) that you can use for local testing without needing Apheris credentials. These demonstrate the expected data structure.
Running the Simulator - Basic Usage🔗
Example 1: Single dataset (single site)🔗
python src/logistic_regression_quickstart/run_simulator.py \
--dataset-ids my_dataset_id
This simulates a single gateway with one dataset.
Example 2: Multiple datasets (multiple sites)🔗
python src/logistic_regression_quickstart/run_simulator.py \
--dataset-ids dataset1 dataset2 dataset3
This simulates three gateways, each with their own dataset. Perfect for testing federated aggregation!
Example 3: Custom output location🔗
python src/logistic_regression_quickstart/run_simulator.py \
--dataset-ids my_dataset_id \
--simulator-output-path ./my_test_run
By default, output goes to ./simulator_output/.
What Happens During a Run🔗
When you run the simulator:
-
Setup Phase:
- Creates job configuration from your secure runtime
- Downloads development datasets
- Sets up FLARE server and client workspaces
-
Execution Phase:
- FLARE server starts and waits for clients
- Each simulated site connects and receives initial model
- Sites train locally on their development data
- Sites send updated parameters to server
- Server aggregates parameters
- Repeat for
num_rounds
-
Completion:
- Final aggregated model saved
- Logs written to workspace
- Exit code returned (0 = success)
Understanding Simulator Output🔗
After running, you'll see a directory structure like this:
simulator_output/
├── jobs/ # Generated FLARE configs
│ ├── app/config/
│ │ ├── config_fed_client.json # Client configuration
│ │ └── config_fed_server.json # Server configuration
│ └── meta.json # Job metadata
│
└── workspace/ # FLARE execution workspace
├── server/ # Server side
│ ├── log.txt # Server logs (check here first!)
│ └── simulate_job/
│ └── models/
│ └── server.npy # FINAL TRAINED MODEL
│
└── site-1/ # Client side (one per dataset)
├── log.txt # Client logs
└── simulate_job/
├── app_site-1/
│ ├── config/ # Actual config used
│ └── datasets/ # Downloaded development data
└── ...
Important files:
workspace/server/simulate_job/models/server.npy- Your trained model!workspace/server/log.txt- Server logs (aggregation, workflow)workspace/site-1/log.txt- Client logs (training, errors)jobs/app/config/- Generated configs (debug parameter passing)
Success Indicators🔗
Successful run:
INFO: SimulatorRunner:return_code from process. exitcode: 0
Simulator run completed successfully. Workspace output stored in /path/to/simulator_output/workspace
Failed run:
ERROR: ... (error message)
RuntimeError: Simulator run failed. See logs in /path/to/simulator_output/workspace/simulate_job/log.txt
Debugging Failed Runs🔗
Step 1: Check server logs🔗
cat simulator_output/workspace/server/log.txt | grep ERROR
Look for:
- Configuration errors
- Component initialization failures
- Aggregation issues
Step 2: Check client logs🔗
cat simulator_output/workspace/site-1/log.txt | grep ERROR
Look for:
- Data loading errors
- Training failures
- Parameter passing issues
Step 3: Verify configurations🔗
cat simulator_output/jobs/app/config/config_fed_client.json
Check:
- Is your executor path correct?
- Are parameters in the
argssection? - Do parameter names match your executor?
Step 4: Inspect downloaded data🔗
ls -la simulator_output/workspace/site-1/simulate_job/app_site-1/datasets/
- Is development data present?
- Does it have the expected structure?
- Can you manually load it?
Step 5: Test data loading separately🔗
Create a test script:
from pathlib import Path
import pandas as pd
dataset_path = Path("simulator_output/workspace/site-1/simulate_job/app_site-1/datasets/my_dataset")
# Try to load
csv_files = list(dataset_path.rglob("*.csv"))
print(f"Found {len(csv_files)} CSV files")
if csv_files:
df = pd.read_csv(csv_files[0])
print(df.head())
print(df.columns)
Examining the Trained Model🔗
Load and inspect your trained model:
import numpy as np
# Load the model
weights = np.load(
'simulator_output/workspace/server/simulate_job/models/server.npy',
allow_pickle=True
).item()
print("Model weights:")
print(weights)
# For our linear classifier
print(f"Weight shape: {weights['linear.weight'].shape}")
print(f"Bias shape: {weights['linear.bias'].shape}")
Common Issues and Solutions🔗
Issue: "No datasets found"🔗
Solution: Ensure you're logged in (apheris login) and datasets have development data registered
Issue: "Dataset column 'target' not found"🔗
Solution: Update your _load_data_from_dataset() to use the correct column name
Issue: "Model shape mismatch"🔗
Solution: Check input dimensions. Print X.shape in _load_data_from_dataset()
Issue: "Loss is NaN"🔗
Solution: Check learning rate (too high? ), normalize inputs, check for missing/inf values
Tips for Effective Testing🔗
- Start with one dataset - Debug issues before scaling
- Use small num_rounds - Quick iterations during development
- Add logging - Use
self._send_to_server_log()liberally - Test data loading separately - Write unit tests for
_load_data_from_dataset() - Verify configs - Check generated configs match expectations
- Clean between runs -
rm -rf simulator_outputfor fresh start
Part 5: Deploying to Apheris🔗
Once your model works in the simulator, you're ready to deploy to the Apheris platform.
Step 1: Commit Your Code🔗
First, commit all your changes to Git:
# Stage all changes
git add .
# Commit with a descriptive message
git commit -m "Implement PyTorch linear classifier with custom parameters"
# Push to remote
git push origin main
Important
Note the commit hash from the output:
[main abc1234] Implement PyTorch linear classifier with custom parameters
The commit hash is abc1234... (full hash will be longer). You'll need this later.
Tip
You can also get it with:
git rev-parse HEAD
Step 2: Build Docker Image🔗
Build the Docker container that will run on Apheris infrastructure:
docker build -t quay.io/apheris/my-model:0.1.0 .
Important
For Mac users:
Mac uses ARM architecture, but Apheris uses x86. Cross-compile with:
docker build --platform="linux/amd64" \
-t quay.io/apheris/my-model:0.1.0 .
Naming convention:
quay.io/apheris/- Apheris Quay registrymy-model- Your model name (must match registry name)0.1.0- Semantic version (see below)
Version numbering (Semantic Versioning):
Valid versions:
- ✅
0.1.0- Major.Minor.Patch - ✅
1.2.3- Standard release - ✅
1.0.0-alpha.1- Pre-release - ✅
2.1.0-beta.5- Beta version
Invalid versions:
- ❌
new-model-0.1.0- No prefix allowed - ❌
v1.0.0- No 'v' prefix - ❌
latest- Must be specific version
Verify the build:
docker images | grep my-model
You should see your newly built image.
Step 3: Push to Quay Registry🔗
Push your image to the Apheris Quay registry:
docker push quay.io/apheris/my-model:0.1.0
Sample output:
The push refers to repository [quay.io/apheris/my-model]
5f70bf18a086: Pushed
...
0.1.0: digest: sha256:abc123def456... size: 2417
Important
Note the digest value sha256:abc123def456... - you'll need this in the next step!
Troubleshooting:
If push fails with authentication error:
# Login to Quay
docker login quay.io
# Enter your Apheris Quay credentials when prompted
Step 4: Register Model Version in Apheris🔗
Now register this version in the Apheris Model Registry:
apheris models add-version \
--version 0.1.0 \
--engine nvflare:2.5.2 \
--digest sha256:abc123def456... \
--commit-hash abc1234567890... \
my-model-id
Parameter breakdown:
--version 0.1.0- Must match your Docker tag--engine nvflare:2.5.2- FLARE version (must matchrequirements.txt)--digest sha256:...- From docker push output (includesha256:prefix!)--commit-hash abc123...- Git commit hash from Step 1my-model-id- Your model's ID in the Apheris registry
Finding your model ID:
apheris models list-versions my-model-id | grep 0.1.0
You should see your version details including digest and commit hash.
Step 5: Create and Run a Job🔗
Now you can run federated training jobs!
Submit the job:
apheris job schedule \
--dataset_ids dataset-1,dataset-2 \
--model_id my-model-id \
--model_version 0.1.0 \
--payload '{"task_id": "train", "cohorts": {"dataset-1": "cohort-a", "dataset-2": "cohort-b"}, "num_rounds": 10, "learning_rate": 0.01, "batch_size": 32, "num_epochs": 5}'
Payload structure:
The --payload parameter is a JSON string containing your job parameters:
task_id: Which task to run (e.g., "train")cohorts: Maps dataset IDs to cohort namesnum_rounds: Number of federated learning roundslearning_rate,batch_size,num_epochs: Your custom parameters
Monitor the job:
# List all jobs
apheris job list
# Get job status
apheris job status
# View job logs
apheris job logs
Download results:
apheris job download-results ./results/
The trained model will be in results/models/server.npy.
Complete Workflow Summary🔗
1. Code changes → git commit
2. Build Docker image → docker build
3. Push to registry → docker push (note digest)
4. Register version → apheris models add-version
5. Submit job → apheris job schedule
6. Monitor → apheris job status/logs
7. Download results → apheris job download-results
Updating Your Model🔗
When you make changes:
# Commit changes
git commit -m "Improve model architecture"
git push
# Build with NEW version
docker build --platform="linux/amd64" \
-t quay.io/apheris/my-model:0.2.0 .
# Push
docker push quay.io/apheris/my-model:0.2.0
# Register new version
apheris models add-version \
--version 0.2.0 \
--engine nvflare:2.5.2 \
--digest sha256:new_digest... \
--commit-hash new_commit... \
my-model-id
Quick Reference🔗
Checklist: Adapting This Template🔗
When creating your own model, work through these steps:
- [ ] Define parameters - Update
payload.pywith job parameters - [ ] Pass parameters - Update
loaders.pyto pass to executor - [ ] Update executor - Add parameters to
__init__ - [ ] Implement data loading - Customize
_load_data_from_dataset() - [ ] Replace model - Create your PyTorch/TensorFlow/sklearn model
- [ ] Implement training - Write training logic in
_train() - [ ] Update dependencies - Add to
requirements.txt - [ ] Test with simulator - Run locally with development data
- [ ] Debug issues - Check logs and configs
- [ ] Commit code - Push to Git (note commit hash)
- [ ] Build Docker image - With proper platform flag
- [ ] Push to Quay - Note the digest
- [ ] Register version - Add to model registry
- [ ] Run job - Submit federated training job
Key Files to Modify🔗
| File | Purpose | What to Change |
|---|---|---|
secure_runtime/payload.py |
Job parameters | Add your parameters |
secure_runtime/loaders.py |
Config generation | Pass parameters to executor |
model/executor.py |
Training logic | Your model and training code |
requirements.txt |
Dependencies | Your ML libraries |
Common Commands🔗
# Local testing
python src/logistic_regression_quickstart/run_simulator.py --dataset-ids <id>
# Build and deploy
docker build --platform="linux/amd64" -t quay.io/apheris/my-model:0.1.0 .
docker push quay.io/apheris/my-model:0.1.0
apheris models add-version --version 0.1.0 --engine nvflare:2.5.2 \
--digest <digest> --commit-hash <hash> my-model-id
# Job management
apheris job schedule --dataset_ids <ids> --model_id <id> --model_version <version> --payload '<json>'
apheris job list
apheris job status
apheris job logs
apheris job download-results <output-path>
Getting Help🔗
- Apheris Documentation: https://www.apheris.com/docs/gateway/latest/index.html
- NVIDIA FLARE Docs: https://nvflare.readthedocs.io/
- Existing docs in the quickstart-training repo
docs/folder:adding-a-new-parameter.md- Parameter passing detailssimulator.md- Simulator usage and output structurecustom-model-workflow.md- Deployment workflow to Apheris Model Registry
- Apheris Support: Contact your Apheris representative
You're now ready to build federated learning models on Apheris!