Task
task.Task()
¶
Bases: ABC
, Generic[BatchType, OutputType]
Protocol for executing computational tasks on collections of data.
The Task
class provides a flexible framework for processing data items and
serves as the core computational unit in the raygent
framework.
This class implements the Template Method pattern, where the base class (Task
)
defines the protocol of an algorithm in its run_batch
method,
while deferring some steps to subclasses through the
do
method.
Types
To write a new Task
, you need to first understand what your
InputType
and OutputType
will be. These types specify what data that
do
will receive in items
and
expected to return.
Note that items
assumes a batch of multiple values
will be provided. If your task is to square numbers, then you would provide
something like:
If your task squeezes the data into a scalar (e.g., taking the sum), then you would specify the following.
Performing operations on NumPy arrays can be specified the same way, except now we get arrays forInputType
and OutputType
.
Implementation
The only required implementation is do
which specifies how to process a batch of items. For example, writing a
Task
that computes the mean value across rows of a
NumPy array could be implemented like so.
from raygent import Task
import numpy as np
import numpy.typing as npt
class MeanTask(Task[npt.NDArray[np.float64], npt.NDArray[np.float64]):
def do(self, batch: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
mean = np.mean(items, axis=1)
return mean
We can just call do
directly if we
want to use this task.
arr = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float64)
task = MeanTask()
mean = task.do(arr) # Returns: np.array([2., 5.])
Note
TaskManager
calls run_batch()
to
produce a Result
to handle any setup, teardown, and
errors.
You can get the same data from our task
by accessing
Result.value
.
do(batch, *args, **kwargs)
abstractmethod
¶
Processes multiple items at once in a batch operation.
This method defines the computation logic for batch processing a collection
of items together. It is called by the run_batch
.
PARAMETER | DESCRIPTION |
---|---|
batch
|
Data to be processed together.
TYPE:
|
**kwargs
|
Additional keyword arguments that customize processing behavior.
These arguments are passed directly from the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
OutputType | Exception
|
The processed results for all items, typically a list matching the input length, but could be any structure depending on the implementation. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the class does not implement this method. |
Example
class DocumentVectorizerTask(Task[str, list[dict[str, Any]]]):
def do(self, items, **kwargs):
# Load model once for all documents
vectorizer = load_large_language_model()
# Process all documents in an optimized batch operation
# which is much faster than processing one at a time
embeddings = vectorizer.encode_batch(items)
# Return results paired with original items
return [
{"document": doc, "vector": vec}
for doc, vec in zip(items, embeddings)
]
run_batch(index, batch, *args, **kwargs)
¶
This method serves as the primary entry point for task execution.
PARAMETER | DESCRIPTION |
---|---|
index
|
A unique integer used to specify ordering for
TYPE:
|
batch
|
Data to process using
TYPE:
|
*args
|
TYPE:
|
**kwargs
|
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Result[OutputType]
|
Results of processing all data in |
setup(*args, **kwargs)
¶
Optional setup method called once before processing begins.
teardown(*args, **kwargs)
¶
Optional teardown method called once after all processing is complete.