init
__all__ = ['Saver', 'HDF5Saver', 'NumpySaver', 'ZarrSaver']
¶
HDF5Saver(file_path, dataset_name='dataset', approach='append')
¶
Bases: Saver
A saver that writes data to an HDF5 file.
HDF5Saver
provides functionality to persist computational results in the HDF5
format, which is designed for storing and managing large, complex data collections.
HDF5 offers a hierarchical structure (similar to a file system) and supports
efficient I/O operations for both small and large datasets.
This implementation supports three approaches to saving data:
append
: Add new data to the existing dataset (creates if not exists)overwrite
: Replace existing dataset with new dataupdate
: Update specific indices in the existing dataset with new values
HDF5 format offers several advantages:
- Efficient storage of large, heterogeneous datasets
- Partial I/O operations (reading/writing subsets of data)
- Built-in compression
- Self-describing format with rich metadata support
- Cross-platform compatibility
Examples:
Basic usage:
# Create an HDF5Saver for storing results
saver = HDF5Saver("results.h5", dataset_name="experiment_1")
# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)
Overwriting existing data:
# Create a saver that overwrites existing data
saver = HDF5Saver(
"daily_metrics.h5",
dataset_name="day_20240306",
approach="overwrite"
)
# Save new results, replacing any existing dataset
results = process_batch(today_data)
saver.save(results)
Updating specific indices:
# Create a saver for updating existing data
saver = HDF5Saver(
"time_series.h5",
dataset_name="sensor_readings",
approach="update"
)
# Update specific time indices with new values
new_data = [99.5, 98.3, 97.8]
indices = [10, 20, 30] # Positions to update
saver.save(new_data, indices=indices)
Multiple datasets in one file:
# Create multiple savers with different dataset names
saver1 = HDF5Saver("project_data.h5", dataset_name="raw_data")
saver2 = HDF5Saver("project_data.h5", dataset_name="processed_data")
saver3 = HDF5Saver("project_data.h5", dataset_name="metadata")
# Save different types of data to the same file
saver1.save(raw_measurements)
saver2.save(processed_results)
saver3.save(experiment_metadata)
Notes
- HDF5 is well-suited for scientific computing and applications that need to store large numerical arrays efficiently.
- Unlike .npy files, HDF5 allows for efficient partial I/O operations, making it suitable for datasets that are too large to fit in memory.
- HDF5 files can store multiple datasets with different names, allowing related data to be kept in a single file.
- HDF5 has limitations with parallel writes from multiple processes. For highly parallel workloads, consider ZarrSaver instead.
Initialize an HDF5Saver instance.
PARAMETER | DESCRIPTION |
---|---|
file_path
|
The path to the HDF5 file where data will be saved.
TYPE:
|
dataset_name
|
Name of the dataset within the HDF5 file. This allows multiple datasets to be stored in a single HDF5 file. Default is "dataset".
TYPE:
|
approach
|
One of 'append', 'overwrite', or 'update', determining how data is saved when the dataset already exists. Default is "append".
TYPE:
|
Notes
- The file_path should have the .h5 or .hdf5 extension by convention.
- Dataset names can use a path-like syntax with forward slashes to create groups (similar to directories) within the HDF5 file, e.g., "experiments/trial_1/data".
approach = approach.strip().lower()
¶
dataset_name = dataset_name
¶
file_path = file_path
¶
save(data, indices=None, **kwargs)
¶
Saves the data to an HDF5 dataset according to the specified approach.
This method implements the abstract save method from the Saver base class.
It persists the provided data to an HDF5 dataset using the configured approach
(append
, overwrite
, or update
).
The method handles creating new datasets, extending existing datasets, or updating specific indices in existing datasets. It automatically converts the input data to a numpy array before saving.
PARAMETER | DESCRIPTION |
---|---|
data
|
A list of results to save. The data will be converted to a numpy array before saving. |
indices
|
Required when approach is 'update', specifies the indices where data should be written in the existing dataset. Must be compatible with the shape of the input data.
TYPE:
|
**kwargs
|
Additional keyword arguments for configuring the save operation. Common options include: - dtype: Data type for the numpy array conversion - compression: Compression filter to use (e.g., "gzip", "lzf") - compression_opts: Options for the compression filter - chunks: Chunk shape for the dataset |
RAISES | DESCRIPTION |
---|---|
ImportError
|
If the h5py library is not installed. |
ValueError
|
If approach is 'update' but the dataset does not exist. |
ValueError
|
If an unknown approach is specified. |
TypeError
|
If the data cannot be converted to a numpy array. |
Examples:
Saving data with the append approach:
saver = HDF5Saver("results.h5", dataset_name="measurements")
# First save creates the dataset
saver.save([1, 2, 3, 4, 5])
# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])
Saving with compression:
saver = HDF5Saver("compressed_results.h5", dataset_name="large_dataset")
# Save with gzip compression
saver.save(
large_data_array,
dtype="float32",
compression="gzip",
compression_opts=9,
chunks=(100,)
)
Updating specific indices:
saver = HDF5Saver("values.h5", dataset_name="sensor_data", approach="update")
# Update specific positions in an existing dataset
new_values = [99.5, 98.3, 97.8]
indices = slice(10, 13) # Update positions 10, 11, 12
saver.save(new_values, indices=indices)
Notes
- The append operation efficiently extends the dataset without loading the entire existing data into memory, making it suitable for large datasets.
- When using compression, consider the tradeoff between storage space and read/write performance.
- HDF5 supports resizable datasets with the maxshape parameter, which is automatically configured for append operations.
- For optimal performance with large datasets, configure appropriate chunk sizes based on expected access patterns.
NumpySaver(file_path, approach='append')
¶
Bases: Saver
A saver that writes data to a .npy file.
NumpySaver
provides functionality to persist computational results in NumPy's
.npy format, which is optimized for storing and loading numpy arrays. This format
preserves shape, data type, and other array information, making it ideal for
numerical data.
This implementation supports three approaches to saving data:
append
: Add new data to the existing array (creates if not exists)overwrite
: Replace existing data with new dataupdate
: Update specific indices in the existing array with new values
NumPy's .npy format offers specific advantages:
- Fast load and save operations
- Preservation of data types and array structures
- Compact binary storage
- Native integration with NumPy's ecosystem
Examples:
Basic usage:
# Create a NumpySaver for storing results
saver = NumpySaver("results.npy")
# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)
Overwriting existing data:
# Create a saver that overwrites existing data
saver = NumpySaver("daily_metrics.npy", approach="overwrite")
# Save new results, replacing any existing file
results = process_batch(today_data)
saver.save(results)
Updating specific indices:
# Create a saver for updating existing data
saver = NumpySaver("time_series.npy", approach="update")
# Update specific indices with new values
new_data = [99.5, 98.3, 97.8]
indices = [10, 20, 30] # Positions to update
saver.save(new_data, indices=indices)
Notes
- NumPy's .npy format is best suited for numerical data where the entire array structure needs to be preserved.
- For very large datasets where memory is a concern, consider using HDF5Saver or ZarrSaver instead, as .npy files are loaded entirely into memory.
- The append operation loads the entire existing array into memory before appending, which may be inefficient for very large arrays.
- For multidimensional arrays, shape compatibility is important when using append or update approaches.
Initialize a NumpySaver instance.
PARAMETER | DESCRIPTION |
---|---|
file_path
|
The path to the .npy file where data will be saved.
TYPE:
|
approach
|
One of
TYPE:
|
Notes
- The file_path should have the .npy extension for compatibility with NumPy's load and save functions.
-
The approach parameter determines the behavior when saving data to an existing file:
-
append
: Concatenates new data to existing data overwrite
: Replaces the entire file with new dataupdate
: Modifies specific indices in the existing data
approach = approach.strip().lower()
¶
file_path = file_path
¶
save(data, indices=None, **kwargs)
¶
Saves the data to a .npy file according to the specified approach.
This method implements the abstract save method from the Saver base class. It persists the provided data to a NumPy .npy file using the configured approach (append, overwrite, or update).
The method handles creating new files, appending to existing files, or updating specific indices in existing files. It automatically converts the input data to a numpy array before saving.
PARAMETER | DESCRIPTION |
---|---|
data
|
A list of results to save. The data will be converted to a numpy array before saving. |
indices
|
Required when approach is 'update', specifies the indices where data should be written in the existing array. Must be compatible with the shape of the input data.
TYPE:
|
**kwargs
|
Additional keyword arguments. Current implementation does not use these parameters, but they are accepted for compatibility with the Saver interface. |
RAISES | DESCRIPTION |
---|---|
ImportError
|
If the numpy library is not installed. |
ValueError
|
If approach is 'update' but indices is None. |
ValueError
|
If an unknown approach is specified. |
FileNotFoundError
|
If attempting to update a non-existent file. |
TypeError
|
If the data cannot be converted to a numpy array. |
Examples:
Saving data with the append approach:
saver = NumpySaver("results.npy")
# First save creates the file
saver.save([1, 2, 3, 4, 5])
# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])
Overwriting existing data:
saver = NumpySaver("metrics.npy", approach="overwrite")
# Save data, replacing any existing file
saver.save([10, 20, 30, 40, 50])
Updating specific indices:
saver = NumpySaver("values.npy", approach="update")
# First create the file
saver.approach = "overwrite"
saver.save([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
# Then update specific positions
saver.approach = "update"
new_values = [99, 88, 77]
indices = [2, 5, 8] # Positions to update
saver.save(new_values, indices=indices)
# Result would be [0, 0, 99, 0, 0, 88, 0, 0, 77, 0]
Notes
- The append operation loads the entire existing file into memory, concatenates the new data, and saves the combined result. This may be inefficient for very large arrays.
- When updating, the indices and data must have compatible shapes.
- For large datasets, consider using HDF5Saver or ZarrSaver which have more efficient append and update operations.
Saver
¶
Bases: ABC
Abstract base class for saving data in various formats or destinations.
The Saver class provides a standardized interface for persisting computational results across the raygent framework. It abstracts away the details of how and where data is stored, allowing TaskManager to work with different storage backends without modification.
This design follows the Strategy pattern, where different concrete Saver implementations (strategies) can be interchanged to save data to files, databases, cloud storage, or other destinations using various formats and approaches.
Custom Savers can be created by subclassing and implementing the save method.
Examples:
Using a Saver with TaskManager:
# Create a TaskManager with a Saver
task_manager = TaskManager(MyTask, use_ray=True)
saver = HDF5Saver("results.h5", dataset_name="experiment_1")
# Submit tasks with periodic saving
task_manager.submit_tasks(items, saver=saver, save_interval=100)
Creating a custom Saver:
class CSVSaver(Saver):
def __init__(self, file_path):
self.file_path = file_path
self.file_exists = os.path.exists(file_path)
def save(self, data, indices=None, **kwargs):
mode = 'a' if self.file_exists else 'w'
header = not self.file_exists
import pandas as pd
df = pd.DataFrame(data)
df.to_csv(self.file_path, mode=mode, header=header, index=False)
self.file_exists = True
Notes
- Savers are used by TaskManager to persist intermediate results during long-running computations, reducing memory pressure.
- A single Saver instance may be called multiple times during task execution, so implementations should handle appending or updating existing data.
- Error handling within save methods is important to prevent data loss.
save(data, indices=None, **kwargs)
¶
Saves the provided data to the configured destination.
This abstract method must be implemented by all concrete Saver subclasses to define how data is written to the destination (file, database, cloud storage, etc.).
The method should handle details such as format conversion, creating or opening the destination, writing the data, and handling any errors that may occur during the saving process.
PARAMETER | DESCRIPTION |
---|---|
data
|
A list of results to save. Each element can be of any type, though specific Saver implementations may have type requirements. |
indices
|
Optional indices or locations where data should be written when using an update approach. The meaning of this parameter depends on the specific Saver implementation. Default is None.
TYPE:
|
**kwargs
|
Additional keyword arguments that may be used by specific saver implementations. These arguments are typically passed from TaskManager.save_kwargs and can include parameters like data types, compression options, or format-specific settings. |
RETURNS | DESCRIPTION |
---|---|
None
|
None. The effect of this method is to persist data to the configured destination. |
Examples:
Implementation in HDF5Saver:
def save(self, data, indices=None, **kwargs):
arr = np.array(data, dtype=kwargs.get("dtype"))
with h5py.File(self.file_path, "a") as h5file:
if self.dataset_name not in h5file:
h5file.create_dataset(self.dataset_name, data=arr)
else:
dset = h5file[self.dataset_name]
old_size = dset.shape[0]
new_size = old_size + arr.shape[0]
dset.resize(new_size, axis=0)
dset[old_size:new_size] = arr
Notes
- Implementations should be robust to repeated calls with new data.
- They should appropriately handle the case where the destination does not yet exist vs. already exists.
- The indices parameter is primarily used for update operations, and its interpretation varies between Saver implementations.
ZarrSaver(file_path, dataset_name='dataset', approach='append')
¶
Bases: Saver
A saver that writes data to a Zarr array.
ZarrSaver
provides functionality to persist computational results in
Zarr format, which offers efficient chunked,
compressed, N-dimensional array storage. Zarr is
particularly well-suited for large arrays that don't fit in memory and
for cloud-based storage, supporting both local and remote persistence.
This implementation supports three approaches to saving data:
- append: Add new data to the existing array (creates if not exists).
- overwrite: Replace existing data with new data.
- update: Update specific indices in the existing array with new values.
Zarr offers advantages over other formats for specific use cases:
- Parallel read/write access for distributed computing.
- Efficient access to subsets of large arrays.
- Support for cloud storage backends (S3, GCS, etc.).
- Good compression options for numerical data.
Examples:
Basic usage with local storage:
# Create a ZarrSaver for appending data
saver = ZarrSaver("results.zarr", dataset_name="experiment_1")
# Use with TaskManager
task_manager = TaskManager(MyTask)
task_manager.submit_tasks(items, saver=saver, save_interval=100)
Overwriting existing data:
# Create a saver that overwrites existing data
saver = ZarrSaver(
"results.zarr",
dataset_name="daily_metrics",
approach="overwrite"
)
# Save a batch of results directly
results = process_batch(today_data)
saver.save(results)
Updating specific indices:
# Create a saver for updating existing data
saver = ZarrSaver(
"results.zarr",
dataset_name="time_series",
approach="update"
)
# Update specific time indices with new values
new_data = calculate_corrections(raw_data)
indices = [5, 10, 15, 20] # Indices to update
saver.save(new_data, indices=indices)
Using with a cloud storage backend (requires appropriate zarr plugins):
# Using with AWS S3 (requires s3fs)
import s3fs
s3 = s3fs.S3FileSystem(anon=False)
store = s3fs.S3Map(root="mybucket/zarr-data", s3=s3, check=False)
# Create a ZarrSaver with the S3 location
saver = ZarrSaver(
store,
dataset_name="remote_dataset",
approach="append"
)
Notes
- Zarr is particularly well-suited for large-scale numerical data and distributed computing workloads.
- For optimal performance, consider chunk size carefully based on access patterns.
- Unlike HDF5, Zarr allows concurrent reads and writes from multiple processes or machines, making it ideal for distributed computing.
- The
update
approach requires that the dataset already exists and that valid indices are provided.
Initialize a ZarrSaver instance.
PARAMETER | DESCRIPTION |
---|---|
file_path
|
The path to the Zarr container to create or open. This can be a local path or a URL to a supported remote storage backend.
TYPE:
|
dataset_name
|
Name of the dataset within the Zarr store.
TYPE:
|
approach
|
One of
TYPE:
|
Notes
- The file_path parameter can accept various types of storage locations depending on the zarr plugins installed. This includes local file paths, S3 URLs, etc.
- For cloud storage options, you may need to install additional dependencies such as s3fs for Amazon S3 access.
approach = approach.strip().lower()
¶
dataset_name = dataset_name
¶
file_path = file_path
¶
save(data, indices=None, **kwargs)
¶
Saves the data to a Zarr array according to the specified approach.
This method implements the abstract save method from the Saver base class. It persists the provided data to a Zarr array using the configured approach (append, overwrite, or update).
The method handles creating new arrays if they don't exist (for append and overwrite approaches) or modifying existing arrays. It automatically converts the input data to a numpy array before saving.
PARAMETER | DESCRIPTION |
---|---|
data
|
A list of results to save. The data will be converted to a numpy array before saving to Zarr. |
indices
|
Required when approach is 'update', specifies the indices where data should be written in the existing array. Must match the shape and dimensionality of the input data.
TYPE:
|
**kwargs
|
Additional keyword arguments passed to zarr.create_array or zarr.open_array. Common options include:
|
RAISES | DESCRIPTION |
---|---|
ImportError
|
If the zarr library is not installed. |
ValueError
|
If approach is 'update' but indices is None. |
FileNotFoundError
|
If attempting to update a non-existent array. |
TypeError
|
If the data cannot be converted to a numpy array. |
Examples:
Saving data with append approach:
saver = ZarrSaver("results.zarr")
# First save creates the array
saver.save([1, 2, 3, 4, 5])
# Subsequent saves append to it
saver.save([6, 7, 8, 9, 10])
Saving with custom chunk size and compression:
import numcodecs
saver = ZarrSaver("compressed_results.zarr")
# Save with customized storage parameters
compressor = numcodecs.Blosc(cname='zstd', clevel=9)
saver.save(
large_dataset,
chunks=(1000,),
compressor=compressor
)
Updating specific indices:
saver = ZarrSaver("timeseries.zarr", approach="update")
# Update values at specific positions
new_values = [99.5, 98.3, 97.8]
indices = [10, 20, 30] # Positions to update
saver.save(new_values, indices=indices)
Notes
- The append operation is optimized for adding new data to existing arrays without reading the entire array into memory.
- For large datasets, consider specifying appropriate chunk sizes in kwargs when creating the array for the first time.
- When updating, the indices and data must have compatible shapes.
- Unlike HDF5, zarr supports concurrent reads and writes from multiple processes, making it suitable for distributed computing environments.