init
__all__ = ['BaseResultHandler', 'ListResultHandler', 'OnlineMeanResultHandler']
¶
BaseResultHandler
¶
Bases: ABC
Abstract base class for result handlers that accumulate results from processed chunks.
This class defines the interface for custom result handling strategies, such as collecting results in a list, computing running statistics (e.g., mean/variance), or any other aggregation approach. Implementations must override the abstract methods to provide concrete behavior.
add_chunk(chunk_results, chunk_index=None, *args, **kwargs)
¶
Incorporates a new chunk of results into the handler.
This method is called when a chunk of results is ready, such as after a parallel task completes. The method should implement the logic to update the handler's state with the new results. If the results need to be ordered (e.g., based on chunk_index), the implementation should ensure that the order is maintained.
PARAMETER | DESCRIPTION |
---|---|
chunk_results
|
A list containing the results of the current chunk. |
chunk_index
|
The index of the chunk, used to maintain order. If None, the handler may assume that the results are already in the correct order.
TYPE:
|
*args
|
Additional positional arguments that may be used by specific implementations. |
**kwargs
|
Additional keyword arguments that may be used by specific implementations. |
finalize(saver)
¶
Finalizes the result collection process by flushing any remaining data and saving it if required.
This method is called once after all chunks have been processed. It provides an opportunity to persist any remaining results or perform any necessary cleanup. The implementation should ensure that the final state of the accumulated results is complete and ready for retrieval.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver used to persist the final accumulated results. If None, no saving operation is performed.
TYPE:
|
get_results()
¶
Retrieves the accumulated results.
Depending on the implementation, this method may return a list, a dictionary, or another data structure representing the collected results. For instance, a list aggregator may return all individual results in order, while a statistical aggregator might return computed metrics like mean and variance.
RETURNS | DESCRIPTION |
---|---|
list[Any] | dict[str, Any]
|
The accumulated results in their appropriate format. |
periodic_save_if_needed(saver, save_interval)
¶
Optionally saves partial results based on a defined save interval.
This method is called periodically (e.g., after processing each chunk) by the TaskManager to determine if the current accumulated results should be persisted to a storage medium. Implementations may override this method to incorporate saving logic, or leave it as a no-op if saving is handled differently.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver responsible for persisting results. If None, no saving operation is performed.
TYPE:
|
save_interval
|
The number of results that should trigger a save operation.
TYPE:
|
ListResultHandler()
¶
Bases: BaseResultHandler
A result handler that preserves the submission (chunk) order when collecting results in parallel. This handler uses an internal dictionary to store chunks keyed by their chunk index, and flushes completed chunks in ascending order (0, 1, 2, ...) into a final list. This ensures that even if chunks complete out of order, the final results are returned in the correct submission order.
_buffered = {}
¶
A temporary storage for chunks that are not yet flushed, keyed by their chunk index.
_collected = []
¶
A list containing the final, ordered results after flushing.
_next_index = 0
¶
The index of the next expected chunk to flush from the buffered dictionary.
add_chunk(chunk_results, chunk_index=None, *args, **kwargs)
¶
Adds a chunk of results to the handler. This method is typically called when a parallel task (such as a Ray task) completes execution.
If a chunk_index
is provided, the chunk is buffered until all prior chunks
have been processed to maintain the original submission order. If no
chunk_index
is provided (i.e., when running sequentially), the chunk
results are appended directly to the final results.
PARAMETER | DESCRIPTION |
---|---|
chunk_results
|
The list of results produced by a chunk. |
chunk_index
|
The index of the chunk. This value is used to maintain the correct order.
TYPE:
|
*args
|
Additional positional arguments (unused). |
**kwargs
|
Additional keyword arguments (unused). |
finalize(saver)
¶
Finalizes the result collection process by flushing any remaining buffered chunks and persisting any unsaved results if a saver is provided.
This method should be called after all chunks have been submitted and completed. It ensures that any remaining results in the buffer are appended to the final results, and if a Saver is available, it saves the unsaved results.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver used to persist the final results. If None, no saving is performed.
TYPE:
|
get_results()
¶
periodic_save_if_needed(saver, save_interval)
¶
Persists a slice of the currently collected results if a saving interval has been met.
This method is typically called by a TaskManager after each chunk is added. If a Saver is provided and the number of collected results meets or exceeds the specified save_interval, a slice of results is saved using the saver, and the saved results are removed from the in-memory collection.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver responsible for persisting results. If None, no saving is performed.
TYPE:
|
save_interval
|
The minimum number of results required before triggering a save.
TYPE:
|
OnlineMeanResultHandler()
¶
Bases: BaseResultHandler
OnlineMeanResultHandler
provides a numerically stable, online (incremental)
algorithm to compute the arithmetic mean of large, streaming, or distributed
datasets represented as NumPy arrays. In many real-world applications—such a
distributed computing or real-time data processing—data is processed
in chunks, with each chunk yielding a partial mean and its corresponding count.
This class merges these partial results into a global mean without needing to
store all the raw data, thus avoiding issues such as numerical overflow and
precision loss.
Suppose the overall dataset is divided into k chunks. For each chunk \(i\) (where \(1 \leq i \leq k\)), let:
- \(m_i\) be the partial mean computed over \(n_i\) data points.
- \(M_i\) be the global mean computed after processing \(i\) chunks.
- \(N_i = n_1 + n_2 + ... + n_i\) be the cumulative count after \(i\) chunks.
The arithmetic mean of all data points is defined as:
Rather than computing \(M_{total}\) from scratch after processing all data,
the class uses an iterative update rule. When merging a new partial result
(m_partial, n_partial)
with the current global mean M_old
(with count
n_old
), the updated mean is given by:
This update is mathematically equivalent to the weighted average:
but is rearranged to enhance numerical stability. By focusing on the
difference (m_partial - M_old)
and scaling it by the relative weight
n_partial / (n_old + n_partial)
, the algorithm minimizes the round-off
errors that can occur when summing large numbers or when processing many
chunks sequentially.
The handler starts with no accumulated data. The global mean (global_mean
) is initially
set to None, and it will be defined by the first partial result received. The total number
of observations (total_count
) is initialized to zero.
global_mean = None
¶
The current global mean of all processed observations.
total_count = 0
¶
The total number of observations processed.
add_chunk(chunk_results, chunk_index=None, *args, **kwargs)
¶
Processes one or more chunks of partial results to update the global mean.
PARAMETER | DESCRIPTION |
---|---|
chunk_results
|
Either a single tuple (partial_mean, count) or a list of such tuples. Each partial result must be a tuple consisting of: - A NumPy array representing the partial mean of a data chunk. - An integer representing the count of observations in that chunk.
TYPE:
|
chunk_index
|
An optional index identifier for the chunk (for interface consistency, not used in calculations).
TYPE:
|
finalize(saver)
¶
Finalizes the accumulation process and persists the final global mean if a saver is provided.
This method should be called when no more data is expected. It ensures that the final computed mean and the total observation count are saved.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver to save the final result, or None.
TYPE:
|
get_results()
¶
Retrieves the final computed global mean along with the total number of observations.
RETURNS | DESCRIPTION |
---|---|
dict[str, NDArray[float64] | int]
|
A dictionary with the following keys:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If no data has been processed (i.e., |
periodic_save_if_needed(saver, save_interval)
¶
Persists the current global mean at periodic intervals if a saver is provided.
This method is useful in long-running or distributed applications where saving
intermediate results is necessary for fault tolerance. It checks if the total
number of observations processed meets or exceeds the save_interval
and,
if so, invokes the save method of the provided saver.
PARAMETER | DESCRIPTION |
---|---|
saver
|
An instance of a Saver, which has a save method to persist data, or None.
TYPE:
|
save_interval
|
The threshold for the total number of observations to trigger a save.
TYPE:
|