Skip to content

init

__all__ = ['Saver', 'HDF5Saver', 'NumpySaver', 'ZarrSaver']

HDF5Saver(file_path, dataset_name='dataset', approach='append')

Bases: Saver

A saver that writes data to an HDF5 file.

HDF5Saver provides functionality to persist computational results in the HDF5 format, which is designed for storing and managing large, complex data collections. HDF5 offers a hierarchical structure (similar to a file system) and supports efficient I/O operations for both small and large datasets.

This implementation supports three approaches to saving data:

  • append: Add new data to the existing dataset (creates if not exists)
  • overwrite: Replace existing dataset with new data
  • update: Update specific indices in the existing dataset with new values

HDF5 format offers several advantages:

  • Efficient storage of large, heterogeneous datasets
  • Partial I/O operations (reading/writing subsets of data)
  • Built-in compression
  • Self-describing format with rich metadata support
  • Cross-platform compatibility
Requirements

This saver requires the h5py package to be installed:

pip install h5py

Examples:

Basic usage:

# Create an HDF5Saver for storing results
saver = HDF5Saver("results.h5", dataset_name="experiment_1")

# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)

Overwriting existing data:

# Create a saver that overwrites existing data
saver = HDF5Saver(
    "daily_metrics.h5",
    dataset_name="day_20240306",
    approach="overwrite"
)

# Save new results, replacing any existing dataset
results = process_batch(today_data)
saver.save(results)

Updating specific indices:

# Create a saver for updating existing data
saver = HDF5Saver(
    "time_series.h5",
    dataset_name="sensor_readings",
    approach="update"
)

# Update specific time indices with new values
new_data = [99.5, 98.3, 97.8]
indices = [10, 20, 30]  # Positions to update
saver.save(new_data, indices=indices)

Multiple datasets in one file:

# Create multiple savers with different dataset names
saver1 = HDF5Saver("project_data.h5", dataset_name="raw_data")
saver2 = HDF5Saver("project_data.h5", dataset_name="processed_data")
saver3 = HDF5Saver("project_data.h5", dataset_name="metadata")

# Save different types of data to the same file
saver1.save(raw_measurements)
saver2.save(processed_results)
saver3.save(experiment_metadata)
Notes
  • HDF5 is well-suited for scientific computing and applications that need to store large numerical arrays efficiently.
  • Unlike .npy files, HDF5 allows for efficient partial I/O operations, making it suitable for datasets that are too large to fit in memory.
  • HDF5 files can store multiple datasets with different names, allowing related data to be kept in a single file.
  • HDF5 has limitations with parallel writes from multiple processes. For highly parallel workloads, consider ZarrSaver instead.

Initialize an HDF5Saver instance.

PARAMETER DESCRIPTION
file_path

The path to the HDF5 file where data will be saved.

TYPE: str

dataset_name

Name of the dataset within the HDF5 file. This allows multiple datasets to be stored in a single HDF5 file. Default is "dataset".

TYPE: str DEFAULT: 'dataset'

approach

One of 'append', 'overwrite', or 'update', determining how data is saved when the dataset already exists. Default is "append".

TYPE: str DEFAULT: 'append'

Notes
  • The file_path should have the .h5 or .hdf5 extension by convention.
  • Dataset names can use a path-like syntax with forward slashes to create groups (similar to directories) within the HDF5 file, e.g., "experiments/trial_1/data".

approach = approach.strip().lower()

dataset_name = dataset_name

file_path = file_path

save(data, indices=None, **kwargs)

Saves the data to an HDF5 dataset according to the specified approach.

This method implements the abstract save method from the Saver base class. It persists the provided data to an HDF5 dataset using the configured approach (append, overwrite, or update).

The method handles creating new datasets, extending existing datasets, or updating specific indices in existing datasets. It automatically converts the input data to a numpy array before saving.

PARAMETER DESCRIPTION
data

A list of results to save. The data will be converted to a numpy array before saving.

TYPE: list[Any]

indices

Required when approach is 'update', specifies the indices where data should be written in the existing dataset. Must be compatible with the shape of the input data.

TYPE: Any | None DEFAULT: None

**kwargs

Additional keyword arguments for configuring the save operation. Common options include: - dtype: Data type for the numpy array conversion - compression: Compression filter to use (e.g., "gzip", "lzf") - compression_opts: Options for the compression filter - chunks: Chunk shape for the dataset

TYPE: dict[str, Any] DEFAULT: {}

RAISES DESCRIPTION
ImportError

If the h5py library is not installed.

ValueError

If approach is 'update' but the dataset does not exist.

ValueError

If an unknown approach is specified.

TypeError

If the data cannot be converted to a numpy array.

Examples:

Saving data with the append approach:

saver = HDF5Saver("results.h5", dataset_name="measurements")

# First save creates the dataset
saver.save([1, 2, 3, 4, 5])

# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])

Saving with compression:

saver = HDF5Saver("compressed_results.h5", dataset_name="large_dataset")

# Save with gzip compression
saver.save(
    large_data_array,
    dtype="float32",
    compression="gzip",
    compression_opts=9,
    chunks=(100,)
)

Updating specific indices:

saver = HDF5Saver("values.h5", dataset_name="sensor_data", approach="update")

# Update specific positions in an existing dataset
new_values = [99.5, 98.3, 97.8]
indices = slice(10, 13)  # Update positions 10, 11, 12
saver.save(new_values, indices=indices)
Notes
  • The append operation efficiently extends the dataset without loading the entire existing data into memory, making it suitable for large datasets.
  • When using compression, consider the tradeoff between storage space and read/write performance.
  • HDF5 supports resizable datasets with the maxshape parameter, which is automatically configured for append operations.
  • For optimal performance with large datasets, configure appropriate chunk sizes based on expected access patterns.

NumpySaver(file_path, approach='append')

Bases: Saver

A saver that writes data to a .npy file.

NumpySaver provides functionality to persist computational results in NumPy's .npy format, which is optimized for storing and loading numpy arrays. This format preserves shape, data type, and other array information, making it ideal for numerical data.

This implementation supports three approaches to saving data:

  • append: Add new data to the existing array (creates if not exists)
  • overwrite: Replace existing data with new data
  • update: Update specific indices in the existing array with new values

NumPy's .npy format offers specific advantages:

  • Fast load and save operations
  • Preservation of data types and array structures
  • Compact binary storage
  • Native integration with NumPy's ecosystem
Requirements

This saver requires the numpy package to be installed:

pip install numpy

Examples:

Basic usage:

# Create a NumpySaver for storing results
saver = NumpySaver("results.npy")

# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)

Overwriting existing data:

# Create a saver that overwrites existing data
saver = NumpySaver("daily_metrics.npy", approach="overwrite")

# Save new results, replacing any existing file
results = process_batch(today_data)
saver.save(results)

Updating specific indices:

# Create a saver for updating existing data
saver = NumpySaver("time_series.npy", approach="update")

# Update specific indices with new values
new_data = [99.5, 98.3, 97.8]
indices = [10, 20, 30]  # Positions to update
saver.save(new_data, indices=indices)
Notes
  • NumPy's .npy format is best suited for numerical data where the entire array structure needs to be preserved.
  • For very large datasets where memory is a concern, consider using HDF5Saver or ZarrSaver instead, as .npy files are loaded entirely into memory.
  • The append operation loads the entire existing array into memory before appending, which may be inefficient for very large arrays.
  • For multidimensional arrays, shape compatibility is important when using append or update approaches.

Initialize a NumpySaver instance.

PARAMETER DESCRIPTION
file_path

The path to the .npy file where data will be saved.

TYPE: str

approach

One of append, overwrite, or update, determining how data is saved when the file already exists.

TYPE: Literal['append', 'overwrite', 'update'] DEFAULT: 'append'

Notes
  • The file_path should have the .npy extension for compatibility with NumPy's load and save functions.
  • The approach parameter determines the behavior when saving data to an existing file:

  • append: Concatenates new data to existing data

  • overwrite: Replaces the entire file with new data
  • update: Modifies specific indices in the existing data

approach = approach.strip().lower()

file_path = file_path

save(data, indices=None, **kwargs)

Saves the data to a .npy file according to the specified approach.

This method implements the abstract save method from the Saver base class. It persists the provided data to a NumPy .npy file using the configured approach (append, overwrite, or update).

The method handles creating new files, appending to existing files, or updating specific indices in existing files. It automatically converts the input data to a numpy array before saving.

PARAMETER DESCRIPTION
data

A list of results to save. The data will be converted to a numpy array before saving.

TYPE: list[Any]

indices

Required when approach is 'update', specifies the indices where data should be written in the existing array. Must be compatible with the shape of the input data.

TYPE: Any | None DEFAULT: None

**kwargs

Additional keyword arguments. Current implementation does not use these parameters, but they are accepted for compatibility with the Saver interface.

TYPE: dict[str, Any] DEFAULT: {}

RAISES DESCRIPTION
ImportError

If the numpy library is not installed.

ValueError

If approach is 'update' but indices is None.

ValueError

If an unknown approach is specified.

FileNotFoundError

If attempting to update a non-existent file.

TypeError

If the data cannot be converted to a numpy array.

Examples:

Saving data with the append approach:

saver = NumpySaver("results.npy")

# First save creates the file
saver.save([1, 2, 3, 4, 5])

# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])

Overwriting existing data:

saver = NumpySaver("metrics.npy", approach="overwrite")

# Save data, replacing any existing file
saver.save([10, 20, 30, 40, 50])

Updating specific indices:

saver = NumpySaver("values.npy", approach="update")

# First create the file
saver.approach = "overwrite"
saver.save([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

# Then update specific positions
saver.approach = "update"
new_values = [99, 88, 77]
indices = [2, 5, 8]  # Positions to update
saver.save(new_values, indices=indices)
# Result would be [0, 0, 99, 0, 0, 88, 0, 0, 77, 0]
Notes
  • The append operation loads the entire existing file into memory, concatenates the new data, and saves the combined result. This may be inefficient for very large arrays.
  • When updating, the indices and data must have compatible shapes.
  • For large datasets, consider using HDF5Saver or ZarrSaver which have more efficient append and update operations.

Saver

Bases: ABC

Abstract base class for saving data in various formats or destinations.

The Saver class provides a standardized interface for persisting computational results across the raygent framework. It abstracts away the details of how and where data is stored, allowing TaskManager to work with different storage backends without modification.

This design follows the Strategy pattern, where different concrete Saver implementations (strategies) can be interchanged to save data to files, databases, cloud storage, or other destinations using various formats and approaches.

Custom Savers can be created by subclassing and implementing the save method.

Examples:

Using a Saver with TaskManager:

# Create a TaskManager with a Saver
task_manager = TaskManager(MyTask, use_ray=True)
saver = HDF5Saver("results.h5", dataset_name="experiment_1")

# Submit tasks with periodic saving
task_manager.submit_tasks(items, saver=saver, save_interval=100)

Creating a custom Saver:

class CSVSaver(Saver):
    def __init__(self, file_path):
        self.file_path = file_path
        self.file_exists = os.path.exists(file_path)

    def save(self, data, indices=None, **kwargs):
        mode = 'a' if self.file_exists else 'w'
        header = not self.file_exists

        import pandas as pd
        df = pd.DataFrame(data)
        df.to_csv(self.file_path, mode=mode, header=header, index=False)
        self.file_exists = True
Notes
  • Savers are used by TaskManager to persist intermediate results during long-running computations, reducing memory pressure.
  • A single Saver instance may be called multiple times during task execution, so implementations should handle appending or updating existing data.
  • Error handling within save methods is important to prevent data loss.

save(data, indices=None, **kwargs)

Saves the provided data to the configured destination.

This abstract method must be implemented by all concrete Saver subclasses to define how data is written to the destination (file, database, cloud storage, etc.).

The method should handle details such as format conversion, creating or opening the destination, writing the data, and handling any errors that may occur during the saving process.

PARAMETER DESCRIPTION
data

A list of results to save. Each element can be of any type, though specific Saver implementations may have type requirements.

TYPE: list[Any] | dict[str, Any]

indices

Optional indices or locations where data should be written when using an update approach. The meaning of this parameter depends on the specific Saver implementation. Default is None.

TYPE: Any | None DEFAULT: None

**kwargs

Additional keyword arguments that may be used by specific saver implementations. These arguments are typically passed from TaskManager.save_kwargs and can include parameters like data types, compression options, or format-specific settings.

TYPE: dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
None

None. The effect of this method is to persist data to the configured destination.

Examples:

Implementation in HDF5Saver:

def save(self, data, indices=None, **kwargs):
    arr = np.array(data, dtype=kwargs.get("dtype"))

    with h5py.File(self.file_path, "a") as h5file:
        if self.dataset_name not in h5file:
            h5file.create_dataset(self.dataset_name, data=arr)
        else:
            dset = h5file[self.dataset_name]
            old_size = dset.shape[0]
            new_size = old_size + arr.shape[0]
            dset.resize(new_size, axis=0)
            dset[old_size:new_size] = arr
Notes
  • Implementations should be robust to repeated calls with new data.
  • They should appropriately handle the case where the destination does not yet exist vs. already exists.
  • The indices parameter is primarily used for update operations, and its interpretation varies between Saver implementations.

ZarrSaver(file_path, dataset_name='dataset', approach='append')

Bases: Saver

A saver that writes data to a Zarr array.

ZarrSaver provides functionality to persist computational results in Zarr format, which offers efficient chunked, compressed, N-dimensional array storage. Zarr is particularly well-suited for large arrays that don't fit in memory and for cloud-based storage, supporting both local and remote persistence.

This implementation supports three approaches to saving data:

  • append: Add new data to the existing array (creates if not exists).
  • overwrite: Replace existing data with new data.
  • update: Update specific indices in the existing array with new values.

Zarr offers advantages over other formats for specific use cases:

  • Parallel read/write access for distributed computing.
  • Efficient access to subsets of large arrays.
  • Support for cloud storage backends (S3, GCS, etc.).
  • Good compression options for numerical data.
Requirements

This saver requires the zarr package to be installed:

pip install zarr

Examples:

Basic usage with local storage:

# Create a ZarrSaver for appending data
saver = ZarrSaver("results.zarr", dataset_name="experiment_1")

# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)

Overwriting existing data:

# Create a saver that overwrites existing data
saver = ZarrSaver(
    "results.zarr",
    dataset_name="daily_metrics",
    approach="overwrite"
)

# Save a batch of results directly
results = process_batch(today_data)
saver.save(results)

Updating specific indices:

# Create a saver for updating existing data
saver = ZarrSaver(
    "results.zarr",
    dataset_name="time_series",
    approach="update"
)

# Update specific time indices with new values
new_data = calculate_corrections(raw_data)
indices = [5, 10, 15, 20]  # Indices to update
saver.save(new_data, indices=indices)

Using with a cloud storage backend (requires appropriate zarr plugins):

# Using with AWS S3 (requires s3fs)
import s3fs
s3 = s3fs.S3FileSystem(anon=False)
store = s3fs.S3Map(root="mybucket/zarr-data", s3=s3, check=False)

# Create a ZarrSaver with the S3 location
saver = ZarrSaver(
    store,
    dataset_name="remote_dataset",
    approach="append"
)
Notes
  • Zarr is particularly well-suited for large-scale numerical data and distributed computing workloads.
  • For optimal performance, consider chunk size carefully based on access patterns.
  • Unlike HDF5, Zarr allows concurrent reads and writes from multiple processes or machines, making it ideal for distributed computing.
  • The update approach requires that the dataset already exists and that valid indices are provided.

Initialize a ZarrSaver instance.

PARAMETER DESCRIPTION
file_path

The path to the Zarr container to create or open. This can be a local path or a URL to a supported remote storage backend.

TYPE: str

dataset_name

Name of the dataset within the Zarr store.

TYPE: str DEFAULT: 'dataset'

approach

One of 'append', 'overwrite', or 'update', determining how data is saved when the dataset already exists.

TYPE: Literal['append', 'overwrite', 'update'] DEFAULT: 'append'

Notes
  • The file_path parameter can accept various types of storage locations depending on the zarr plugins installed. This includes local file paths, S3 URLs, etc.
  • For cloud storage options, you may need to install additional dependencies such as s3fs for Amazon S3 access.

approach = approach.strip().lower()

dataset_name = dataset_name

file_path = file_path

save(data, indices=None, **kwargs)

Saves the data to a Zarr array according to the specified approach.

This method implements the abstract save method from the Saver base class. It persists the provided data to a Zarr array using the configured approach (append, overwrite, or update).

The method handles creating new arrays if they don't exist (for append and overwrite approaches) or modifying existing arrays. It automatically converts the input data to a numpy array before saving.

PARAMETER DESCRIPTION
data

A list of results to save. The data will be converted to a numpy array before saving to Zarr.

TYPE: list[Any]

indices

Required when approach is 'update', specifies the indices where data should be written in the existing array. Must match the shape and dimensionality of the input data.

TYPE: Any | None DEFAULT: None

**kwargs

Additional keyword arguments passed to zarr.create_array or zarr.open_array. Common options include:

  • chunks: Chunk shape
  • dtype: Data type
  • compressor: Compression method (default: Blosc)
  • filters: Pre-compression filters

TYPE: dict[str, Any] DEFAULT: {}

RAISES DESCRIPTION
ImportError

If the zarr library is not installed.

ValueError

If approach is 'update' but indices is None.

FileNotFoundError

If attempting to update a non-existent array.

TypeError

If the data cannot be converted to a numpy array.

Examples:

Saving data with append approach:

saver = ZarrSaver("results.zarr")

# First save creates the array
saver.save([1, 2, 3, 4, 5])

# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])

Saving with custom chunk size and compression:

import numcodecs

saver = ZarrSaver("compressed_results.zarr")

# Save with customized storage parameters
compressor = numcodecs.Blosc(cname='zstd', clevel=9)
saver.save(
    large_dataset,
    chunks=(1000,),
    compressor=compressor
)

Updating specific indices:

saver = ZarrSaver("timeseries.zarr", approach="update")

# Update values at specific positions
new_values = [99.5, 98.3, 97.8]
indices = [10, 20, 30]  # Positions to update
saver.save(new_values, indices=indices)
Notes
  • The append operation is optimized for adding new data to existing arrays without reading the entire array into memory.
  • For large datasets, consider specifying appropriate chunk sizes in kwargs when creating the array for the first time.
  • When updating, the indices and data must have compatible shapes.
  • Unlike HDF5, zarr supports concurrent reads and writes from multiple processes, making it suitable for distributed computing environments.