Skip to content

Tasks

A task in raygent represents a unit of computation that processes a batch of items using a user-defined class that extends Task. Each task operates on a subset of input data and executes computations in parallel when using Ray, or sequentially if Ray is disabled.

The execution of a task follows a structured workflow, where the run method orchestrates the computation by applying process_item to each item in the batch. This enables efficient parallelization while ensuring error handling and logging. By defining a structured pipeline for task execution, users can efficiently scale computations across multiple cores or nodes, depending on their system capabilities and needs.

run

The run method is responsible for processing multiple items at once. It determines whether to process each item individually using process_item or to process all items together using process_items when at_once=True. This flexibility allows users to optimize performance by reducing redundant computations in batch operations.

process_item

The process_item method defines the actual computation performed on a single item. This allows tasks to be easily customized for different computational requirements, ranging from simple arithmetic operations to more complex data transformations.

Example: Basic Numeric Computation

from raygent import Task


class SquareTask(Task):
    def process_item(self, item: float) -> float:
        return item ** 2

task = SquareTask()
print(task.run([1, 2, 3, 4]))  # Output: [1, 4, 9, 16]

process_items

The process_items method provides an efficient way to process multiple items together, reducing redundant computations that would otherwise be repeated in process_items. This method is particularly useful when preprocessing steps can be shared among multiple items.

Example: Computing the Mean of a List using NumPy

import numpy as np
import numpy.typing as npt
from raygent import Task


class MeanTask(Task):
    def process_items(self, items: npt.NDArray[np.float64]) -> np.float64:
        return np.mean(items)

task = MeanTask()
data = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print(task.run(data, at_once=True))  # Output: 5.0

Using process_items ensures optimized performance for numerical computations, making this approach well-suited for tasks involving large-scale data processing.