Stages¶
Stages are the lowest form of building blocks in Flowdapt. They are Python functions which are chained together to form a Workflow. Stages are defined in the Workflow definition, and are configured to run in a specific order.
Defining Stages¶
When defining your Workflow, you can define your Stages in the stages
key of the Workflow definition. Each stage requires a name
and target
key. The name
is a unique identifier for the stage, and the target
is the python function to be called. For example:
kind: workflow
metadata:
name: test_workflow
annotations:
group: test
spec:
stages:
- name: test_stage
target: user_modules.testing_stuff.test_stage
where user_modules.testing.test_stage
is a python function defined as:
def test_stage() -> list:
return list(range(10))
You can then specify which Stages depend on each other via its depends_on
key. For example:
kind: workflow
metadata:
name: test_workflow
annotations:
group: test
spec:
stages:
- name: test_stage
target: user_modules.testing_stuff.test_stage
- name: next_stage
target: user_modules.testing_stuff.next_stage
depends_on:
- test_stage
where user_modules.testing.next_stage
is a python function defined as:
def next_stage(rand_array: list) -> None:
logger.info(f"Got list from test_stage: {rand_array}")
By specifying depends_on
in the next_stage
definition, we are telling Flowdapt to run next_stage
after test_stage
has completed. The rand_array
argument in next_stage
will be the value returned by test_stage
.
Defining Stages in this way allows users to chain together python functions in a modular way, and Flowdapt will handle the orchestration of the execution. This means it's easy to re-use other pre-defined or custom Stages as building blocks in your Workflows. Additionally, defining two stages with the same depends_on
will tell Flowdapt that these stages can run in parallel. If then a subsequent stage depends on the previous two parallelized stages, it can define its depends_on
as:
- name: funnel_stage
target: user_modules.testing_stuff.funnel_stage
depends_on:
- parallel_stage_1
- parallel_stage_2
Danger
If your stages create multiple discrete branches, the best practice is to make a final stage that depends on the last stage of each branch. This ensures that the full workflow is run and that your workflow returns are concatenated into a single return value.
Stage Arguments¶
Under the hood, Flowdapt is packing up the Stage
(python function) and sending it to a worker in the Executor to run. This enables massive parallelization possibilities, but it also means that each Stage
does not share a scope with any other functions/stages. Instead, data can be passed to the Stage
via function arguments.
When a Workflow is executed, you can give it an input. That input is passed as the parameters for the first stage in the Workflow. For example if your first stage takes a parameter called n_vals
:
def test_stage(n_vals: int) -> list:
return list(range(n_vals))
Use the Object Store
While passing data directly between stages is easy and helpful, many other cases require more complex object handling. This is why Flowdapt exposes the Object Store
, which allows you to save an object in one stage/workflow, and fetch it in another via string naming. For more information, see the Object Store documentation.
When running the Workflow, you can pass the input if the first Stage takes any parameters, for example when running a Workflow with flowctl
:
flowctl run test_workflow --n_vals 5
This will pass the value 5
to the n_vals
parameter in the test_stage
function. The return value of the test_stage
function will then be passed as the parameter to the next Stage in the Workflow, and so on and so forth until the last Stage is finished and the output is returned to the Driver.
config.input
The input to the workflow can also be obtained via the get_run_context().config.input
dictionary. This can be obtained in any function that is part of a workflow. This includes stage functions that call other functions. In the present example, it would be equivalent to n_vals = get_run_context().config.input["n_vals"]
.
Parameterizing Stages¶
The previous section shows Stages which are run once per Workflow. However, since Flowdapt is geared for massive parallelization, it includes a special stage type which is parameterized
. The Workflow defines the Parameterized Stage by adding a type
key to the Stage definition. For example:
kind: workflow
metadata:
name: build_features
annotations:
group: nowcast
spec:
stages:
- name: create_city_list
target: flowdapt_nowcast_plugin.stages.create_city_list
- target: flowdapt_nowcast_plugin.stages.fetch_city_data
name: fetch_city_data
type: parameterized
depends_on:
create_city_list
- target: flowdapt_nowcast_plugin.stages.process_city_data
name: process_city_data
type: parameterized
depends_on:
- fetch_city_data
where we see that we set type
to parameterized
. The process_city_data
stage will now run n
times for each item in the first value returned from the previous stage. In our example, we add the stage create_cities_list
which would be defined as:
from flowdapt.compute.resources.workflows.context import get_run_context
def create_city_list() -> dict[str, Any]:
"""
STAGE
Creates a city list for the subsequent stage to use for parameterization
"""
# Note: Stages can access the Workflow information from the Run Context
data_config = get_run_context().config.data_config
df_cities = get_city_grid(
data_config["n_cities"],
data_config["neighbors"],
data_config["city_data_path"]
)
# Convert the DataFrame to a list of dictionaries
# schema: [{"city": "city_name"}, {"lat": latitude}, {"long": longitude}]
cities_list = df_cities.to_dict(orient='records')
return cities_list
Mapping on Values
You can pass lists in the Workflow payload with key names that can be used in the map_on
key. For example, if you pass a list with the key payload_cities
, you can use map_on: payload_cities
in the Stage definition to run the Stage once for each item in the list. This payload based map-on functionality is less common since it is not as dynamic as using a stage to define the mapping of a subsequent stage (as shown in the example above).
Now, the parameterized stage called fetch_city_data
will be run once for each city in cities_list
. When we define the Parameterized Stage in python, we assume the first argument of the python function will come in as an entry from cities_list
:
def fetch_city_data(city_dict: dict):
city = city_dict["city"]
print(f"Fetching data for {city})
So the first argument in the fetch_city_data
function will be a single entry from cities_list
. This means the fetch_data
function should simply use a specific item in the iterable as opposed to the entire iterable. This is because the iterable is split up and sent to different workers to be run in parallel.
Mapping on Values
The iterable to be mapped on (if no map_on
key specified) will be the values returned by the previous stage. This can mean that the previous stage returns a list, or it may return a tuple.
Stage Resources¶
In Flowdapt, the resource utilization of a Stage can be dictated through the addition of a resources key within the Stage definition. This enables users to define specific resource requirements for each individual Stage. Here's an example of how to implement this:
kind: workflow
metadata:
name: test_workflow
annotations:
group: test
spec:
stages:
- name: test_stage
target: user_modules.testing_stuff.test_stage
resources:
cpus: 2
memory: 8GB
gpus: 1
custom_resource: 1
The fields under resources act as labels within the compute environment, guiding the Executor on where to run the Stages based on available resources. These labels aren't tied to any particular hardware, but rather describe the required resources for the Stage execution.
For instance, if a Stage has a GPU requirement, the Executor will schedule the Stage to run in an environment that has access to a GPU, as long as the Executor itself supports GPU usage and is configured accordingly.
Another common case of resource management is the assignment of a custom named resource. In fact, you can define any custom resource name in your Executor config at any amount, and then ask for that resource in your stage. For a detailed understanding of how Executors manage these resources, refer to the respective Executor documentation.
Resource Labels
The Local Executor does not support resource labels. If you are using the Local Executor, you can still define resource labels in your Workflow, but they will be ignored.
Accessing the Workflow definition inside Stages¶
When a Workflow is executed, Flowdapt creates a WorkflowRunContext
object which holds any information about the current execution including the Workflow definition, and is accessible inside individual Stages (it is also callable from any function called within a stage). Taking the example from earlier, the fetch_city_data()
function could look something like this:
from flowdapt.compute.resources.workflow.context import get_run_context()
def fetch_city_data():
context = get_run_context()
config = context.config
print(config["study_identifier"])
The get_run_context()
function will return the WorkflowRunContext
object for the current execution if called from within a Stage. For more information, see the Workflow Run Context documentation.