Skip to content

init

__all__ = ['BaseResultHandler', 'ListResultHandler', 'OnlineMeanResultHandler']

BaseResultHandler

Bases: ABC

Abstract base class for result handlers that accumulate results from processed chunks.

This class defines the interface for custom result handling strategies, such as collecting results in a list, computing running statistics (e.g., mean/variance), or any other aggregation approach. Implementations must override the abstract methods to provide concrete behavior.

add_chunk(chunk_results, chunk_index=None, *args, **kwargs)

Incorporates a new chunk of results into the handler.

This method is called when a chunk of results is ready, such as after a parallel task completes. The method should implement the logic to update the handler's state with the new results. If the results need to be ordered (e.g., based on chunk_index), the implementation should ensure that the order is maintained.

PARAMETER DESCRIPTION
chunk_results

A list containing the results of the current chunk.

TYPE: list[Any]

chunk_index

The index of the chunk, used to maintain order. If None, the handler may assume that the results are already in the correct order.

TYPE: int | None DEFAULT: None

*args

Additional positional arguments that may be used by specific implementations.

TYPE: tuple[Any] DEFAULT: ()

**kwargs

Additional keyword arguments that may be used by specific implementations.

TYPE: dict[str, Any] DEFAULT: {}

finalize(saver)

Finalizes the result collection process by flushing any remaining data and saving it if required.

This method is called once after all chunks have been processed. It provides an opportunity to persist any remaining results or perform any necessary cleanup. The implementation should ensure that the final state of the accumulated results is complete and ready for retrieval.

PARAMETER DESCRIPTION
saver

An instance of a Saver used to persist the final accumulated results. If None, no saving operation is performed.

TYPE: Saver | None

get_results()

Retrieves the accumulated results.

Depending on the implementation, this method may return a list, a dictionary, or another data structure representing the collected results. For instance, a list aggregator may return all individual results in order, while a statistical aggregator might return computed metrics like mean and variance.

RETURNS DESCRIPTION
list[Any] | dict[str, Any]

The accumulated results in their appropriate format.

periodic_save_if_needed(saver, save_interval)

Optionally saves partial results based on a defined save interval.

This method is called periodically (e.g., after processing each chunk) by the TaskManager to determine if the current accumulated results should be persisted to a storage medium. Implementations may override this method to incorporate saving logic, or leave it as a no-op if saving is handled differently.

PARAMETER DESCRIPTION
saver

An instance of a Saver responsible for persisting results. If None, no saving operation is performed.

TYPE: Saver | None

save_interval

The number of results that should trigger a save operation.

TYPE: int

ListResultHandler()

Bases: BaseResultHandler

A result handler that preserves the submission (chunk) order when collecting results in parallel. This handler uses an internal dictionary to store chunks keyed by their chunk index, and flushes completed chunks in ascending order (0, 1, 2, ...) into a final list. This ensures that even if chunks complete out of order, the final results are returned in the correct submission order.

_buffered = {}

A temporary storage for chunks that are not yet flushed, keyed by their chunk index.

_collected = []

A list containing the final, ordered results after flushing.

_next_index = 0

The index of the next expected chunk to flush from the buffered dictionary.

add_chunk(chunk_results, chunk_index=None, *args, **kwargs)

Adds a chunk of results to the handler. This method is typically called when a parallel task (such as a Ray task) completes execution.

If a chunk_index is provided, the chunk is buffered until all prior chunks have been processed to maintain the original submission order. If no chunk_index is provided (i.e., when running sequentially), the chunk results are appended directly to the final results.

PARAMETER DESCRIPTION
chunk_results

The list of results produced by a chunk.

TYPE: list[Any]

chunk_index

The index of the chunk. This value is used to maintain the correct order.

TYPE: int | None DEFAULT: None

*args

Additional positional arguments (unused).

TYPE: tuple[Any] DEFAULT: ()

**kwargs

Additional keyword arguments (unused).

TYPE: dict[str, Any] DEFAULT: {}

finalize(saver)

Finalizes the result collection process by flushing any remaining buffered chunks and persisting any unsaved results if a saver is provided.

This method should be called after all chunks have been submitted and completed. It ensures that any remaining results in the buffer are appended to the final results, and if a Saver is available, it saves the unsaved results.

PARAMETER DESCRIPTION
saver

An instance of a Saver used to persist the final results. If None, no saving is performed.

TYPE: Saver | None

get_results()

Retrieves all results collected by the handler in their original submission order.

RETURNS DESCRIPTION
list[Any]

A list containing all the results in order.

periodic_save_if_needed(saver, save_interval)

Persists a slice of the currently collected results if a saving interval has been met.

This method is typically called by a TaskManager after each chunk is added. If a Saver is provided and the number of collected results meets or exceeds the specified save_interval, a slice of results is saved using the saver, and the saved results are removed from the in-memory collection.

PARAMETER DESCRIPTION
saver

An instance of a Saver responsible for persisting results. If None, no saving is performed.

TYPE: Saver | None

save_interval

The minimum number of results required before triggering a save.

TYPE: int

OnlineMeanResultHandler()

Bases: BaseResultHandler

OnlineMeanResultHandler provides a numerically stable, online (incremental) algorithm to compute the arithmetic mean of large, streaming, or distributed datasets represented as NumPy arrays. In many real-world applications—such a distributed computing or real-time data processing—data is processed in chunks, with each chunk yielding a partial mean and its corresponding count. This class merges these partial results into a global mean without needing to store all the raw data, thus avoiding issues such as numerical overflow and precision loss.

Suppose the overall dataset is divided into k chunks. For each chunk \(i\) (where \(1 \leq i \leq k\)), let:

  • \(m_i\) be the partial mean computed over \(n_i\) data points.
  • \(M_i\) be the global mean computed after processing \(i\) chunks.
  • \(N_i = n_1 + n_2 + ... + n_i\) be the cumulative count after \(i\) chunks.

The arithmetic mean of all data points is defined as:

\[ M_\text{total} = \frac{n_1 m_1 + n_2 m_2 + \ldots + n_k m_k}{n_1 + n_2 + \ldots + n_k} \]

Rather than computing \(M_{total}\) from scratch after processing all data, the class uses an iterative update rule. When merging a new partial result (m_partial, n_partial) with the current global mean M_old (with count n_old), the updated mean is given by:

\[ M_\text{new} = M_\text{old} + \left( m_\text{partial} - M_\text{old} \right) \cdot \frac{n_\text{partial}}{n_\text{old} + n_\text{partial}} \]

This update is mathematically equivalent to the weighted average:

\[ M_\text{new} = \frac{n_\text{old} M_\text{old} + n_\text{partial} m_\text{partial}}{n_\text{old} + n_\text{partial}} \]

but is rearranged to enhance numerical stability. By focusing on the difference (m_partial - M_old) and scaling it by the relative weight n_partial / (n_old + n_partial), the algorithm minimizes the round-off errors that can occur when summing large numbers or when processing many chunks sequentially.

The handler starts with no accumulated data. The global mean (global_mean) is initially set to None, and it will be defined by the first partial result received. The total number of observations (total_count) is initialized to zero.

global_mean = None

The current global mean of all processed observations.

total_count = 0

The total number of observations processed.

add_chunk(chunk_results, chunk_index=None, *args, **kwargs)

Processes one or more chunks of partial results to update the global mean.

PARAMETER DESCRIPTION
chunk_results

Either a single tuple (partial_mean, count) or a list of such tuples. Each partial result must be a tuple consisting of: - A NumPy array representing the partial mean of a data chunk. - An integer representing the count of observations in that chunk.

TYPE: list[tuple[NDArray[float64], int]] | tuple[NDArray[float64], int]

chunk_index

An optional index identifier for the chunk (for interface consistency, not used in calculations).

TYPE: int | None DEFAULT: None

finalize(saver)

Finalizes the accumulation process and persists the final global mean if a saver is provided.

This method should be called when no more data is expected. It ensures that the final computed mean and the total observation count are saved.

PARAMETER DESCRIPTION
saver

An instance of a Saver to save the final result, or None.

TYPE: Saver | None

get_results()

Retrieves the final computed global mean along with the total number of observations.

RETURNS DESCRIPTION
dict[str, NDArray[float64] | int]

A dictionary with the following keys:

  • "mean": A NumPy array representing the computed global mean.
  • "n": An integer representing the total number of observations processed.
RAISES DESCRIPTION
ValueError

If no data has been processed (i.e., global_mean is None or total_count is zero).

periodic_save_if_needed(saver, save_interval)

Persists the current global mean at periodic intervals if a saver is provided.

This method is useful in long-running or distributed applications where saving intermediate results is necessary for fault tolerance. It checks if the total number of observations processed meets or exceeds the save_interval and, if so, invokes the save method of the provided saver.

PARAMETER DESCRIPTION
saver

An instance of a Saver, which has a save method to persist data, or None.

TYPE: Saver | None

save_interval

The threshold for the total number of observations to trigger a save.

TYPE: int