Skip to content

Apheris Utils🔗

The apheris-utils package provides utilities for working with the Apheris platform, specifically for handling data access, derived datasets, artifacts management, and logging integrations.

Derived Datasets🔗

Apheris empowers data scientists to create Derived Datasets, facilitating the reuse of pre-processed information. This optional feature requires Data Persistence to be enabled on the Gateway.

Derived Datasets are created from either original datasets or existing Derived Datasets. Original datasets are created and managed by Data Custodians. As the owner of an original dataset, you can view but not edit Derived Datasets that are created from it.

Working with Derived Datasets using apheris_utils.data🔗

The apheris_utils.data module provides functions to interact with datasets, including downloading and persisting files to create Derived Datasets.

Downloading Datasets🔗

from apheris_utils.data import download_dataset, download_all, list_dataset_ids, get_asset_policies

# List all available dataset IDs
dataset_ids = list_dataset_ids()

# Download a specific dataset to a folder
# Returns a dict with dataset ID as key and local path as value
result = download_dataset('my-dataset-id', '/path/to/destination/folder')
# result = {'my-dataset-id': '/path/to/destination/folder/my-dataset-id'}

# Download all datasets to a folder
# Returns a dict with dataset IDs as keys and local paths as values
results = download_all('/path/to/destination/folder')
# results = {
#     'dataset-1': '/path/to/destination/folder/dataset-1',
#     'dataset-2': '/path/to/destination/folder/dataset-2',
#     'dataset-3': '/path/to/destination/folder/dataset-3'
# }

Using Primitives for Advanced Operations🔗

from apheris_utils.data.primitives import get, list_remote_files, to_dict, to_folder
import pandas as pd
import io

# List all files in a dataset
remote_files = list_remote_files('my-dataset-id')

# Download files to a folder
file_paths = get(remote_files, to_folder('/path/to/destination/folder'))

# Load files into a dictionary (in-memory)
data_dict = {}
get(remote_files, to_dict(data_dict))

# Process CSV files from the dictionary
dataframes = [pd.read_csv(io.BytesIO(content)) for content in data_dict.values()]
combined_df = pd.concat(dataframes)

Getting Asset Policies🔗

You can retrieve policy information for datasets using the get_asset_policies function:

from apheris_utils.data import get_asset_policies

# Get policies for a single dataset
policies = get_asset_policies('my-dataset-id')

# Get policies for multiple datasets
policies = get_asset_policies(['dataset-1', 'dataset-2', 'dataset-3'])

Creating Derived Datasets🔗

To create a Derived Dataset, you can use the persist_file and persist_bytes functions:

from apheris_utils.data import persist_file, persist_bytes
from pathlib import Path

# Persist a file to create a Derived Dataset
persist_file(
    dataset_name='my-derived-dataset', 
    file_to_persist=Path('/path/to/local/file.csv'),
    file_name='processed_data.csv'  # Optional: specify a different name
)

# Persist bytes directly
with open('/path/to/local/file.csv', 'rb') as f:
    content = f.read()

persist_bytes(
    dataset_name='my-derived-dataset',
    contents=content,
    file_name='processed_data.csv'
)

Important

  • Dataset names are scoped to a user and should be unique
  • It's recommended to use a unique dataset name for each model or experiment
  • External data from outside the Gateway must not be stored in Derived Datasets
  • Derived datasets inherit the Asset Policies of the parent datasets.

NVIDIA FLARE Integration🔗

The apheris_utils package includes optional utilities for NVIDIA FLARE federated learning workflows. These are available in the extras_nvflare module and provide logging, job management, and error handling capabilities.

Installation: NVIDIA FLARE utilities🔗

To use the NVIDIA FLARE utilities, install the package with the NVIDIA FLARE extra:

pip install apheris-utils[nvflare]

This will install the necessary NVIDIA FLARE dependencies.

Available Components🔗

  1. GatewayLogSenderMixin: Used by clients/gateways to send logs to the orchestrator
  2. OrchestratorLogReceiver: Used by the server/orchestrator to receive and process logs from gateways
  3. Error Handling: Configurable safe error handling and utility functions for log formatting
  4. Job Management: Functions for retrieving job information from NVIDIA FLARE contexts

Logging Components🔗

By default, the Apheris Gateway is a closed system - logs that occur on the Gateway, stay there, and can only be accessed by the Data Custodian.

Important

Data shared with the orchestrator is no longer considered private and can be viewed by the user executing the computation. It is very important to ensure that only non-sensitive strings are passed through this mechanism. If in doubt, contact your data custodian for permission.

Gateway-side Logging🔗

The GatewayLogSenderMixin can be added to your FLARE executors, and uses the FLARE federated events system to send messages out:

from nvflare.apis.executor import Executor
from apheris_utils.extras_nvflare.logging.gateway_log_sender import GatewayLogSenderMixin

class MyExecutor(Executor, GatewayLogSenderMixin):
    def execute(self, task_name: str, shareable, fl_ctx, abort_signal):
        try:
            # Your execution code
            result = process_data()

            # Send log information to the orchestrator
            self._send_to_server_log(
                fl_ctx=fl_ctx, 
                message="Data processing successful", 
                level="INFO"
            )

            return result

        except Exception as e:
            # Log errors to the orchestrator
            self._send_to_server_log(
                fl_ctx=fl_ctx, 
                message=f"Error during execution: {str(e)}", 
                level="ERROR"
            )
            raise

Orchestrator-side Logging🔗

The OrchestratorLogReceiver is implemented as a standalone component, which just needs to be added to your FLARE job configuration.

For example:

{
    "format_version": 2,
    "server": {
        "heart_beat_timeout": 600
    },
    "task_data_filters": [],
    "task_result_filters": [],
    "components": [
        {
            "id": "log_writer",
            "path": "apheris_utils.extras_nvflare.logging.OrchestratorLogReceiver",
            "args": {}
        }
    ],
    "workflows": [
        {
            "id": "inference",
            "path": "server.inference_workflow.InferenceWorkflow",
            "args": {
                "min_clients": 2,
                "use_dal": true
            }
        }
    ]
}

It will then capture events sent by the Gateway and log them directly to the Orchestrator's text logs.

Error Handling and Safety Controls🔗

It is common for Python Libraries to log not only the line of code that caused an error but also the value of variables within that line. This can clearly reveal sensitive information into the Gateway logs.

To mitigate this, you can use the safe_error_catchall_decorator, which catches any uncaught exception, extracts key information from the stacktrace, and writes it to the text log.

If the component is an executor, and the function has access to an FLContext object, the decorator will also use the log forwarding functions described above to send a sanitised error message to the Orchestrator.

from apheris_utils.extras_nvflare.logging.util import sanitised_trace
from apheris_utils.extras_nvflare.logging.gateway_log_sender import (
    safe_error_catchall_decorator,
    set_safe_error_handling_enabled
)

# Control safe error handling at runtime
set_safe_error_handling_enabled(True)  # Enable safe error handling
set_safe_error_handling_enabled(False)  # Disable for debugging

# Use the decorator for automatic error handling
class MyExecutor(Executor, GatewayLogSenderMixin):

    @safe_error_catchall_decorator
    def execute(self, task_name: str, shareable, fl_ctx, abort_signal):
        # Your code here - exceptions will be safely caught and sanitized
        pass

# Manual exception handling
try:
    # Your code
except Exception:
    error_trace = sanitised_trace()
    # Log the sanitized trace

Environment Variables:

  • APH_DISABLE_SAFE_ERROR_HANDLING: Set to "1", "true", "yes", or "y" to disable safe error handling
  • APH_MESSAGE_TO_SERVER_DELAY: Configure delay after dispatching a message, used to prevent the client killing the server before it handles the dispatched message in the case of errors (default: 0.01 seconds)

These components facilitate better debugging and monitoring of federated learning workflows across multiple gateways and the orchestrator. The error handling system provides both automatic safety measures and fine-grained control for development and production environments.

Job Management🔗

You can retrieve job information from NVIDIA FLARE contexts:

from apheris_utils.extras_nvflare.job import get_job_id

def my_executor_method(self, task_name: str, shareable, fl_ctx, abort_signal):
    # Get the current job ID
    job_id = get_job_id(fl_ctx)

    # Use the job_id for artifact creation or logging
    print(f"Processing task for job: {job_id}")

Local Simulator Support🔗

The extras_simulator module provides functionality for local development and testing without requiring a full Apheris environment.

Installation: Simulator Utilities🔗

To use the simulator utilities, install its optional dependencies with the simulator extra:

pip install apheris-utils[simulator]

Using the Simulator Utilities🔗

Using the data package, you can switch the DAL client's behaviour to use the Dummy Data endpoint on the orchestrator. This gives you transparent access to DAL functions without requiring access to real, sensitive data in an Apheris Gateway to test your application.

To switch to the Dummy Data endpoint, you simply need run configure_env, which configures the local environment with the credentials and endpoints of the Dummy DAL endpoint, and sets the APH_DATA dictionary, which describes the datasets available to the user.

Important

You must be logged to an Apheris environment to download data from the Dummy DAL. Please ensure you've logged in using apheris.login() first.

from apheris_utils.data import download_all
from apheris_utils.extras_simulator import data

data.configure_env(["whas1_gateway-1_org-1"])
download_all("./data")

Note

Asset policies and multi-file datasets are not currently implemented in the Dummy DAL endpoint.

Artifacts Management🔗

The apheris_utils.artifacts module provides comprehensive functionality for managing machine learning artifacts including models, datasets, checkpoints, and logs. This module supports creating, uploading, downloading, and managing files associated with ML workflows.

The Artifacts API can only be used from the Orchestrator compute pod, so should be integrated with your NVIDIA FLARE Controller.

Once stored, artifacts can be downloaded by any compute job run by the same organization as the user that originally stored it. There is currently no standalone access for stored Artifacts outside of the Orchestrator. To download an Artifact on the Orchestrator, you just need its ID, then you can use the Artifact object to retrieve it as shown below.

Note

The access model for Artifacts is intentionally different to the Derived Dataset functionality, where new datasets inherit the Asset Policies of the parent datasets.

For full implementation details of the underlying API, please see Data Artifacts API.

Artifact Types🔗

The system supports several types of artifacts:

  • CHECKPOINT: Model training checkpoint files
  • METRIC: Metric files for trained machine learning models
  • RESULT: Result files from model training or inference
  • LOG: Log files from training or inference

Creating and Managing Artifacts🔗

from apheris_utils.artifacts import Artifact, ArtifactType
from apheris_utils.extras_nvflare.job import get_job_id

from uuid import uuid4
from pathlib import Path

# Create an artifact with metadata (get the fl_ctx from the current FLARE job)
artifact = Artifact(
    job_id=get_job_id(fl_ctx),
    type=ArtifactType.CHECKPOINT,
    name="my-trained-model",
    metadata={"version": "1.0", "accuracy": 0.95, "framework": "pytorch"},
    created_by={"user": "my@name.com"}
)

# Add files to the artifact
artifact.add_file(local_path="model.pt")
artifact.add_file(local_path="config.json") 

# Add file from bytes data
with open("metrics.json", "rb") as f:
    artifact.add_file(data=f.read(), filename="metrics.json")

# Save the artifact (uploads files and creates the artifact record)
artifact.save()

We recommend outputting the ID of the artifact to the Orchestrator logs, as you will need the ID to use the artifact in future computations.

Convenience Functions🔗

For simple use cases, you can use the convenience functions:

from apheris_utils.artifacts import create_artifact_from_file, create_artifact_from_bytes

# Create artifact directly from a file
artifact = create_artifact_from_file(
    job_id=get_job_id(fl_ctx),
    artifact_type=ArtifactType.CHECKPOINT,
    name="checkpoint-epoch-10",
    file_path="checkpoint.pth",
    metadata={"epoch": 10, "loss": 0.23}
)

# Create artifact from bytes
with open("results.json", "rb") as f:
    artifact = create_artifact_from_bytes(
        job_id=get_job_id(fl_ctx),
        artifact_type=ArtifactType.RESULT,
        name="final-results",
        data=f.read(),
        filename="results.json"
    )

Downloading and Managing Existing Artifacts🔗

To download an existing artifact, you just need its ID, then to enumerate the file objects in the artifact.files list, fetching them as you go:

artifact = Artifact.get(a.id)

for f in artifact.files:
    with open(f.name, "wb") as fh:
        fh.write(f.fetch())