Skip to content

Ray Executor

flowdapt.compute.executor.ray.RayExecutor

Bases: Executor

An Executor built on top of Ray. This is the recommended Executor since it's the most robust and scalable. Supports both local and distributed execution, and can be configured to use GPUs.

To use this Executor, set the services > compute > executor config target, for example:

services:
  compute:
    executor:
      target: flowdapt.compute.executor.ray.RayExecutor

Parameters:

Name Type Description Default
cluster_address str | None

The address of the Ray cluster to connect to. If not specified, will start a local cluster.

None
cpus int | str

The number of CPUs to use. If set to "auto", will use the number of CPUs on the machine. Ignored if connecting to external cluster.

'auto'
gpus int | str | None

The number of GPUs to use. If set to "auto", will use the number of GPUs on the machine. Ignored if connecting to external cluster.

None
resources dict[str, float] | None

A dictionary of custom resource labels to use for the cluster. These labels are logical and not physical, and are used to determine if a stage can be run on a worker. Base resources such as CPUs and memory are automatically added. Defaults to an empty dictionary. Ignored if connecting to an external cluster.

None
object_store_memory int | None

The amount of memory to use for the Ray object store in bytes. Defaults to 1/3 of available memory. Ignored if connecting to an external cluster.

None
dashboard_host str

The host to bind the Ray dashboard to. Ignored if connecting to an external cluster.

'127.0.0.1'
dashboard_port int

The port to bind the Ray dashboard to. Ignored if connecting to an external cluster.

9969
log_to_driver bool

Whether to log to the driver or not.

True
logging_level str | None

The logging level to use.

None
storage_dir str | None

The directory to use for storing Ray data. Defaults to the app directory. Ignored if connecting to an external cluster.

None
working_dir str | None

The working directory to use in the runtime environment.

None
py_modules list[str] | None

A list of Python modules to pass to the worker.

None
pip list[str] | None

A list of pip packages to install on the worker.

None
conda dict[str, str] | str | None

A dictionary of conda packages to install on the worker.

None
env_vars dict[str, str] | None

A dictionary of environment variables to pass to the worker.

None
container dict[str, str] | None

A container image to use for the worker.

None
cluster_memory_actor dict[str, Any] | None

A dictionary of options to use for the cluster memory actor. Requires a "name" key to be set. The rest of the options are passed to Ray. Defaults to a RayClusterMemoryActor with 1 CPU and a max concurrency of 1000.

None
upload_plugins bool

Whether to upload plugins to the worker or not.

True
runtime_env_config dict[str, Any] | None

A dictionary of options to use for the runtime environment config. For available options, see https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnvConfig.html.

None
connection_monitor_interval int

The interval in seconds to check the connection to the Ray cluster. Defaults to 15 seconds. Only applies if connecting to an external cluster.

15
max_reconnect_retries int

The maximum number of times to retry reconnecting to the Ray cluster if the connection is lost. Defaults to 2048.

2048
base_reconnect_delay float

The base delay in seconds to use for the exponential backoff when reconnecting to the Ray cluster. Defaults to 5 seconds.

5.0
max_reconnect_delay float

The maximum delay in seconds to use for the exponential backoff when reconnecting to the Ray cluster. Defaults to 60 seconds.

60.0
ping_timeout float

The timeout in seconds to use when pinging the Ray cluster to check if it's still alive. Defaults to 10 seconds.

10.0
kwargs

Additional keyword arguments to pass to ray.init().

{}