Skip to content

Worker

ray_worker(task_class, chunk, at_once, *args, **kwargs)

Remote Ray worker function that processes tasks in parallel.

This function is a helper wrapper around a Task object that executes the task's run method with the provided chunk of data. It serves as the core execution unit when using Ray for parallel processing within the raygent framework.

While primarily used internally by TaskManager's _submit_ray method, it can be called directly for custom Ray deployments if needed.

PARAMETER DESCRIPTION
task_class

A callable that returns a Task instance with run and process_item or process_items methods.

TYPE: Callable[[], Any]

chunk

A list of items to be processed by the task.

TYPE: list[Any]

at_once

If True, processes all items at once using process_items; otherwise, processes each item individually using process_item.

TYPE: bool

*args

Additional positional arguments passed to the task's run method.

TYPE: tuple[Any] DEFAULT: ()

**kwargs

Additional keyword arguments passed to the task's run method. These can include task-specific parameters that customize execution.

TYPE: dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
Any

The results from executing the task's run method on the provided chunk. Typically this is a list of processed items or results.

Examples:

Basic usage through TaskManager (recommended):

# This is handled automatically by TaskManager when use_ray=True
manager = TaskManager(MyTask, use_ray=True)
manager.submit_tasks(items)

Direct usage (advanced):

# Initialize Ray if not already initialized
if not ray.is_initialized():
    ray.init()

# Create a remote worker with 2 CPUs
future = ray_worker.options(num_cpus=2).remote(
    MyTask,
    items_chunk,
    at_once=False,
    custom_param="value"
)

# Get results
results = ray.get(future)

Advanced configuration:

# Configure worker with custom resources and retries
future = ray_worker.options(
    num_cpus=4,
    num_gpus=1,
    max_retries=3,
    resources={"custom_resource": 1}
).remote(
    ComplexTask,
    large_chunk,
    at_once=True,
    preprocessing_steps=["normalize", "filter"],
    batch_size=64
)
Note

All Ray options (num_cpus, num_gpus, etc.) should be specified via the options() method on the ray_worker function, not as direct arguments.

This function is decorated with @ray.remote, making it a Ray remote function that can be executed on any worker in the Ray cluster.