Skip to content

Manager

TaskManager(task_class, result_handler=None, n_cores=-1, use_ray=False, n_cores_worker=1)

A manager class for handling task submissions and result collection using Ray's Task Parallelism.

Initializes the TaskManager.

Creates a new TaskManager instance configured for either sequential or parallel task execution using the specified task class.

PARAMETER DESCRIPTION
task_class

A callable that returns an instance with a run method for processing each item.

TYPE: Callable[[], Any]

n_cores

Number of parallel tasks to run. If <= 0, uses all available CPUs. Default is -1 (use all available cores).

TYPE: int DEFAULT: -1

use_ray

Flag to determine if Ray should be used for parallel execution. If False, runs tasks sequentially. Default is False.

TYPE: bool DEFAULT: False

n_cores_worker

The number of cores allocated for each worker. Default is 1.

TYPE: int DEFAULT: 1

futures = []

A list of Ray object references for currently submitted tasks.

This attribute tracks all actively running but not yet completed tasks when using Ray. It is used internally to manage the pool of workers and to ensure that the number of concurrent tasks does not exceed max_concurrent_tasks.

This list is empty when not using Ray or when no tasks are currently running. Users typically do not need to interact with this attribute directly.

max_concurrent_tasks

Maximum number of concurrent tasks that can run in parallel.

This property calculates the maximum number of tasks that can be executed concurrently based on the total available CPU cores (n_cores) and the number of cores allocated per worker (n_cores_worker).

The value represents the task parallelism limit when using Ray for distributed execution. It ensures efficient resource utilization by preventing over-subscription of CPU resources.

RETURNS DESCRIPTION
int

An integer representing the maximum number of concurrent tasks. Always returns at least 1, even if calculations result in a lower value.

Notes
  • This property is primarily used internally by the _submit_ray method to manage the worker pool size.
  • The calculation is done by dividing total cores by cores per worker and flooring the result.
  • Setting n_cores_worker appropriately is important for tasks with different computational profiles:
  • CPU-bound tasks benefit from higher n_cores_worker values
  • I/O-bound tasks typically work better with n_cores_worker=1 and higher max_concurrent_tasks

Examples:

# With 8 total cores and 2 cores per worker
manager = TaskManager(MyTask, n_cores=8, n_cores_worker=2, use_ray=True)
print(manager.max_concurrent_tasks)  # Output: 4

# With 16 total cores and 4 cores per worker
manager = TaskManager(MyTask, n_cores=16, n_cores_worker=4, use_ray=True)
print(manager.max_concurrent_tasks)  # Output: 4

# With 24 total cores and 1 core per worker (for I/O-bound tasks)
manager = TaskManager(IOBoundTask, n_cores=24, n_cores_worker=1, use_ray=True)
print(manager.max_concurrent_tasks)  # Output: 24

n_cores = n_cores

The total number of CPU cores available for parallel execution.

This value determines the overall parallelism level when use_ray=True. A value of -1 or any negative number will use all available CPU cores on the system. For specific resource allocation, set to a positive integer.

For cluster environments, this represents the total cores available across all nodes.

Example
# Use all available cores
manager = TaskManager(MyTask, use_ray=True, n_cores=-1)

# Use exactly 4 cores
manager = TaskManager(MyTask, use_ray=True, n_cores=4)

n_cores_worker = n_cores_worker

The number of CPU cores allocated to each worker process.

This controls how many cores each task instance can utilize. Increase this value for compute-intensive tasks that can leverage multiple cores per task, or keep at 1 for maximum parallelism across tasks.

The effective parallelism is determined by n_cores // n_cores_worker.

Example
# Each task gets 1 core (maximum task parallelism)
manager = TaskManager(SimpleTask, use_ray=True, n_cores_worker=1)

# Each task gets 2 cores (good for moderately parallel tasks)
manager = TaskManager(ComputeTask, use_ray=True, n_cores_worker=2)

# Each task gets 4 cores (for tasks with internal parallelism)
manager = TaskManager(ParallelTask, use_ray=True, n_cores_worker=4)

result_handler = result_handler or ListResultHandler()

TODO:

results = []

A list storing the results of all completed tasks.

This attribute accumulates the outputs from all Task.run calls, maintaining the order in which tasks complete (which may differ from submission order when using Ray). Results are added here after tasks complete and optionally after being processed by a saver.

Access this list using the get_results() method after task submission.

Example
manager = TaskManager(MyTask)
manager.submit_tasks(items)
results = manager.get_results()  # Access the contents of this attribute

save_interval = 1

The number of results to accumulate before invoking the saver.

This controls how frequently results are saved when a saver is provided. Lower values reduce memory usage but may increase I/O overhead, while higher values can improve I/O efficiency at the cost of increased memory usage.

Set during submit_tasks() and not during initialization. Default is 1.

Example
# Save every 100 results (good balance)
manager.submit_tasks(items, saver=my_saver, save_interval=100)

# Save every 1000 results (reduce I/O, more memory usage)
manager.submit_tasks(items, saver=my_saver, save_interval=1000)

save_kwargs = dict()

Additional keyword arguments passed to the saver's save method.

This dictionary contains any extra parameters needed by the saver when saving results. It can include file paths, database connections, or other configuration options specific to the saver implementation.

Example
# Set up a manager with save options
manager = TaskManager(MyTask)
manager.save_kwargs = {
    "compression": "gzip",
    "append": True,
    "dtype": "float32"
}
manager.submit_tasks(items, saver=my_saver)

saver = None

An optional Saver instance for persisting results.

When provided, this Saver is used to save results at intervals specified by save_interval. This allows for checkpointing and persistence of intermediate results during long-running computations.

The saver must implement the Saver interface with a save(data) method. Set during submit_tasks() and not during initialization.

Example
manager = TaskManager(MyTask)
saver = HDF5Saver("results.h5")
manager.submit_tasks(items, saver=saver, save_interval=100)

task_class = task_class

A callable that returns a Task instance.

This callable must return an object that implements the Task interface, with run, process_item, and/or process_items methods. It is invoked to create a new Task instance for each worker when using Ray, or once for sequential processing.

Example
def create_analyzer_task():
    return TextAnalyzerTask()

manager = TaskManager(create_analyzer_task)

use_ray = use_ray

Boolean flag controlling whether to use Ray for parallel execution.

When True, tasks are distributed across multiple cores or machines using Ray. When False, tasks are executed sequentially in the current process.

Set this to True for computationally intensive workloads that can benefit from parallelization, and False for debugging or when the overhead of distributing tasks outweighs the benefits.

Example
# Sequential processing (for debugging)
manager = TaskManager(MyTask, use_ray=False)

# Parallel processing (for production)
manager = TaskManager(MyTask, use_ray=True, n_cores=8)

_submit(task_gen, at_once=False, kwargs_task=dict())

Handles task submission and ordered result collection in sequential mode.

This method processes task chunks one at a time in the current process (i.e., without parallelization via Ray). For each chunk produced by the task_gen generator, a new task instance is created from self.task_class and its run method is invoked. Depending on the at_once flag, the chunk is processed either as a batch (using process_items) or item by item (using process_item).

As each chunk is processed, the resulting items are appended to a temporary list. If a Saver(self.saver) is provided and the number of accumulated results meets or exceeds the specified save_interval, the Saver's save method is called to persist a portion of the results, and the saved results are moved into the final results list (self.results).

After processing all chunks, any remaining results are saved (if applicable) and then appended to the final results list.

PARAMETER DESCRIPTION
task_gen

Generator yielding chunks of items to process.

TYPE: Generator[Any, None, None]

at_once

If True, processes the entire chunk at once using the task's batch processing method; otherwise, processes each item individually.

TYPE: bool DEFAULT: False

kwargs_task

Additional keyword arguments to pass to the task's run method.

TYPE: dict[str, Any] DEFAULT: dict()

RETURNS DESCRIPTION
None

None. The processed results are stored in the instance attribute self.results.

_submit_ray(task_gen, at_once=False, kwargs_task=dict(), kwargs_remote=dict())

Handles task submission and ordered result collection using Ray for parallel execution.

This method iterates over a generator of task chunks and submits each chunk as a separate Ray task.

To ensure that the final results are in the same order as the input items, each submitted task is tagged with a unique chunk index. A mapping is maintained between the string representation of each Ray ObjectRef and its corresponding chunk index. As tasks complete, their results—which may be a list of items—are stored in a temporary dictionary using their chunk index.

Once contiguous chunks are available (starting from the first chunk), these chunks are flushed into a final results list in order. Additionally, if a Saver is provided, results are periodically saved when the number of accumulated results reaches a specified interval.

PARAMETER DESCRIPTION
task_gen

A generator that yields chunks of items to process.

TYPE: Generator[Any, None, None]

at_once

If True, each task processes the entire chunk at once using the batch processing method; otherwise, each item in the chunk is processed individually. Defaults to False.

TYPE: bool DEFAULT: False

kwargs_task

Additional keyword arguments to pass to the task's processing function. Defaults to an empty dictionary.

TYPE: dict[str, Any] DEFAULT: dict()

kwargs_remote

Additional keyword arguments to pass to Ray's remote function options. Do not include 'num_cpus' here; set that using n_cores_worker instead. Defaults to an empty dictionary.

TYPE: dict[str, Any] DEFAULT: dict()

RETURNS DESCRIPTION
None

None. The final, ordered results are stored in the instance attribute results.

get_results()

Retrieves all collected results from completed tasks.

This method provides access to the accumulated results that have been collected from all tasks that have been submitted and completed through the TaskManager. Results are stored in the order they were processed.

If a saver was provided during task submission, the results returned by this method will be the same as those that were passed to the saver.

RETURNS DESCRIPTION
list[Any] | dict[str, Any]

A list containing the results from all completed tasks. The structure of individual results depends on what was returned by the Task's process_item or process_items methods.

Notes
  • This method simply returns the internal results list attribute and does not perform any additional processing or computation.
  • Results are available immediately after tasks complete, whether run sequentially or in parallel via Ray.
  • If no tasks have been submitted or completed, an empty list is returned.

Examples:

# Create a task manager and submit tasks
manager = TaskManager(NumberSquareTask)
manager.submit_tasks([1, 2, 3, 4, 5])

# Retrieve and use results
results = manager.get_results()
# results = [1, 4, 9, 16, 25]

# Results can be processed further
sum_of_squares = sum(results)  # 55

submit_tasks(items, chunk_size=100, saver=None, at_once=False, save_interval=100, kwargs_task=dict(), kwargs_remote=dict())

Submits and processes tasks in parallel or serial mode with optional periodic saving.

This method is the primary entrypoint for task execution in the TaskManager. It takes a list of items to process, chunks them into manageable batches, and either processes them sequentially or distributes them across workers using Ray, depending on the TaskManager's configuration.

The method also supports periodic saving of results through a provided Saver instance, allowing for checkpointing and persistence of intermediate results during long-running computations.

PARAMETER DESCRIPTION
items

A list of items to process. Each item will be passed to the task's process_item method individually or as part of a batch if at_once=True.

TYPE: list[Any]

chunk_size

Number of items per processing chunk. Larger values may improve performance but increase memory usage per worker.

TYPE: int DEFAULT: 100

saver

An optional Saver instance that implements the save method for persisting results. If provided, results will be saved according to save_interval.

TYPE: Saver | None DEFAULT: None

at_once

If True, each chunk is processed as a batch by the task's process_items method; otherwise, items are processed individually with process_item.

TYPE: bool DEFAULT: False

save_interval

The number of results to accumulate before invoking the saver. Has no effect if saver is None.

TYPE: int DEFAULT: 100

kwargs_task

Keyword arguments to pass to the task's run method. These can be used to customize task execution behavior.

TYPE: dict[str, Any] DEFAULT: dict()

kwargs_remote

Keyword arguments to pass to Ray's remote function options. Only used when use_ray=True. Can include options like max_retries, num_gpus, etc., but not num_cpus (use n_cores_worker and n_cores instead).

TYPE: dict[str, Any] DEFAULT: dict()

RETURNS DESCRIPTION
None

None. Results are stored internally and can be retrieved using get_results().

RAISES DESCRIPTION
ValueError

If the saver's save method raises an exception.

ImportError

If Ray is requested but not installed.

Notes
  • When use_ray=True in the TaskManager, this method leverages Ray for parallel execution across cores or machines.
  • When use_ray=False, tasks are processed sequentially in the current process.
  • When a saver is provided, results are saved periodically according to save_interval, reducing memory usage for long-running tasks.

Examples:

Sequential processing:

# Create a task manager for sequential processing
manager = TaskManager(NumberSquareTask, use_ray=False)

# Process items without saving results
manager.submit_tasks([1, 2, 3, 4, 5])
results = manager.get_results()  # [1, 4, 9, 16, 25]

Parallel processing with Ray:

# Create a task manager with Ray for parallel processing
manager = TaskManager(TextProcessorTask, use_ray=True, n_cores=8)

# Process a large list of items in chunks of 50
manager.submit_tasks(
    text_documents,
    chunk_size=50,
    kwargs_task={"min_length": 10, "language": "en"}
)
results = manager.get_results()

Processing with periodic saving:

# Create a task manager and a saver for results
manager = TaskManager(DataAnalysisTask, use_ray=True)
saver = HDF5Saver("results.h5", dataset_name="analysis_results")

# Process items with saving every 1000 results
manager.submit_tasks(
    large_dataset,
    chunk_size=200,
    saver=saver,
    save_interval=1000,
    kwargs_remote={"max_retries": 3}
)

task_generator(items, chunk_size)

Splits a list of items into smaller chunks and yields each chunk for processing.

This generator takes a list of items and partitions it into sublists (chunks) where each chunk contains up to chunk_size items. This is useful for processing large datasets in smaller, manageable batches, whether processing sequentially or in parallel using Ray.

PARAMETER DESCRIPTION
items

The complete list of items to be processed.

TYPE: list[Any]

chunk_size

The number of items to include in each chunk. If the total number of items is not evenly divisible by chunk_size, the final chunk will contain the remaining items.

TYPE: int

YIELDS DESCRIPTION
Any

Each yielded value is a sublist of items containing up to chunk_size elements.

Example
>>> items = list(range(10))
>>> list(self.task_generator(items, 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]