Task
Task
¶
Bases: ABC
Abstract base class for executing computational tasks on collections of data.
The Task
class provides a flexible framework for processing data items either
individually or in batches. It serves as the core computational unit in the
raygent
framework, defining how work is performed on data items.
This class implements the Template Method pattern, where the base class (Task
)
defines the skeleton of an algorithm in its run
method,
while deferring some steps to subclasses through the abstract
process_item
and
process_items
methods.
Subclasses must implement at least one of the processing methods:
process_item
: For item-by-item processing (more flexible, better error isolation)process_items
: For batch processing (potentially more efficient for shared resources)
The choice between these methods depends on the specific requirements of the task, such as performance needs, error handling requirements, and interdependencies between items.
Examples:
Basic implementation example:
class TextAnalyzerTask(Task):
def __init__(self):
self.nlp = load_nlp_model() # Load model once per task instance
def process_item(self, item, **kwargs):
# Process each text individually with options from kwargs
min_word_length = kwargs.get('min_word_length', 3)
return {
'text': item,
'word_count': len([w for w in item.split() if len(w) >= min_word_length]),
'sentiment': self.nlp.analyze_sentiment(item)
}
# Usage
analyzer = TextAnalyzerTask()
results = analyzer.run(
["Hello world", "Goodbye wonderful world"], min_word_length=4
)
Implementing both processing methods:
class DataProcessorTask(Task):
def process_item(self, item, **kwargs):
# For processing single items with detailed error handling
try:
return self._transform_data(item, **kwargs)
except ValueError as e:
return {"item": item, "error": str(e), "status": "failed"}
def process_items(self, items, **kwargs):
# For batch processing when efficiency is critical
import numpy as np
# Convert to numpy array for vectorized operations
data = np.array(items)
processed = self._batch_transform(data, **kwargs)
return processed.tolist()
def _transform_data(self, item, **kwargs):
# Helper method with common transformation logic
pass
def _batch_transform(self, data_array, **kwargs):
# Helper method with vectorized transformation logic
pass
Working with TaskManager:
from raygent.manager import TaskManager
# Create a custom task
class MyTask(Task):
def process_item(self, item, **kwargs):
return item * 2
# Process items using TaskManager (handles parallelization)
manager = TaskManager(MyTask, use_ray=True, n_cores=4)
manager.submit_tasks([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
results = manager.get_results()
Notes
- The Task class is designed to be used with the TaskManager for parallel execution across multiple cores or machines.
- When used with Ray (via
TaskManager
), each Task instance will be created on the worker node, so any initialization in__init__
will happen per worker. - Error handling is automatically provided for individual item processing, but must be manually implemented for batch processing.
- The design allows for flexible task implementation while maintaining a consistent interface for the execution framework.
process_item(item, **kwargs)
¶
Processes a single item independently.
This method defines the computation logic for processing an individual item.
It is called by the run
method when at_once=False
(the default), allowing
items to be processed one at a time.
Subclasses must implement this method to define specific task logic, especially when items can be processed independently without shared context or resources.
PARAMETER | DESCRIPTION |
---|---|
item
|
A single data item to be processed.
TYPE:
|
**kwargs
|
Additional keyword arguments that customize processing behavior.
These arguments are passed directly from the |
RETURNS | DESCRIPTION |
---|---|
Any
|
The processed result for the single item. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the subclass does not implement this method. |
Notes
Choose this implementation approach when:
- Items can be processed completely independently.
- There is minimal shared setup or resource loading.
- You want fine-grained error handling per item.
- Processing can be easily parallelized.
This method is typically more memory-efficient for large datasets as it processes one item at a time, but may incur repeated setup costs if each item requires loading the same resources.
Example
class TextClassifierTask(Task):
def __init__(self):
# This setup happens once per task instance
self.model = load_classification_model()
def process_item(self, item, **kwargs):
# Each text is processed independently
return {
"text": item,
"category": self.model.predict(item),
"confidence": self.model.confidence(item)
}
process_items(items, **kwargs)
¶
Processes multiple items at once in a batch operation.
This method defines the computation logic for batch processing a collection
of items together. It is called by the run
method when
at_once=True
, allowing for optimized batch operations.
Subclasses must implement this method to define specific batch processing logic, especially when there are substantial efficiency gains from processing items together or when items share common setup or resources.
PARAMETER | DESCRIPTION |
---|---|
items
|
A list of data items to be processed together. |
**kwargs
|
Additional keyword arguments that customize processing behavior.
These arguments are passed directly from the |
RETURNS | DESCRIPTION |
---|---|
list[Any]
|
The processed results for all items, typically a list matching the input length, but could be any structure depending on the implementation. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError
|
If the subclass does not implement this method. |
Notes
Choose this implementation approach when:
- Substantial shared setup or resource loading is required.
- Vectorized operations can improve performance.
- Items benefit from being processed in context with each other.
- Memory overhead of loading all items at once is acceptable.
This method can be significantly more efficient when:
- Loading common files, models, or resources that would otherwise be loaded repeatedly in process_item
- Using libraries that have optimized batch operations (like NumPy, PyTorch, or TensorFlow)
- Items have interdependencies in their processing logic.
Example
class DocumentVectorizerTask(Task):
def process_items(self, items, **kwargs):
# Load model once for all documents
vectorizer = load_large_language_model()
# Process all documents in an optimized batch operation
# which is much faster than processing one at a time
embeddings = vectorizer.encode_batch(items)
# Return results paired with original items
return [
{"document": doc, "vector": vec}
for doc, vec in zip(items, embeddings)
]
run(items, at_once=False, **kwargs)
¶
Processes a list of items using either individual or batch processing.
This method serves as the primary entry point for task execution.
It determines whether to process items individually or as a batch based on the
at_once
parameter, and then delegates to either
process_item
or
process_items
accordingly.
The method handles error management, ensuring that failures in processing
individual items don't halt the entire execution. When operating in
individual mode (at_once=False
), errors for specific items are captured
and included in the results list.
PARAMETER | DESCRIPTION |
---|---|
items
|
A list of input data items to be processed. |
at_once
|
If
TYPE:
|
**kwargs
|
Additional keyword arguments passed to either
|
RETURNS | DESCRIPTION |
---|---|
list[Any]
|
A list containing the results of processing each item. When |
RAISES | DESCRIPTION |
---|---|
TypeError
|
If |
NotImplementedError
|
Indirectly, if the appropriate processing method
( |
Notes
Choose at_once=False
(default) when:
- Items are independent and can be processed separately
- You need granular error handling for each item
- Memory constraints require processing one item at a time
- You want to easily parallelize across items
Choose at_once=True
when:
- There's significant shared setup or resource loading
- Batch operations are more efficient (e.g., with vectorized libraries)
- Items have interdependencies in their processing
- The overhead of processing each item separately is high
Examples:
Individual processing (default):
class NumberSquarerTask(Task):
def process_item(self, item, **kwargs):
return item ** 2
task = NumberSquarerTask()
results = task.run([1, 2, 3, 4, 5])
# results = [1, 4, 9, 16, 25]
Batch processing:
class VectorMultiplierTask(Task):
def process_items(self, items, **kwargs):
import numpy as np
# Convert to numpy array for efficient vector operations
arr = np.array(items)
# Apply scaling factor from kwargs if provided
scale = kwargs.get('scale', 1.0)
return (arr * scale).tolist()
task = VectorMultiplierTask()
results = task.run([1, 2, 3, 4, 5], at_once=True, scale=2.0)
# results = [2.0, 4.0, 6.0, 8.0, 10.0]
Error handling with individual processing:
class DivisionTask(Task):
def process_item(self, item, **kwargs):
divisor = kwargs.get('divisor', 2)
return item / divisor
task = DivisionTask()
results = task.run([4, 6, 0, 8], divisor=0)
# results may contain error tuples for division by zero
# e.g., [('error', 'division by zero'), ('error', 'division by zero'),
# ('error', 'division by zero'), ('error', 'division by zero')]