Skip to content

Weather Nowcasting with Flowdapt

Introduction

Emergent Methods performed a large-scale nowcasting experiment in real-time to study the relationship between forecast accuracy and computational efficiency. The full report with all results are detailed in the companion paper. The experimental code is simplified in the Nowcast repository as an example for how flowdapt can be used to build a real-time adaptive modeling system. Here we provide an overview of how the components are constructed.

Nowcast

Train workflow

The train workflow gets the city iterable to parameterize the train_pipeline across the desired number of cities. The train workflow does not worry about getting data, it pulls the latest data available, which is already prepped by the data update workflow.

kind: workflow
metadata:
  name: train
  annotations:
    group: nowcast
spec:
  stages:
    - name: get_city_model_iterable
      target: flowdapt_nowcast_plugin.stages.get_city_model_iterable
      resources:
        cpus: 0.5

    - name: train_pipeline
      target: flowdapt_nowcast_plugin.stages.train_pipeline
      type: parameterized
      depends_on:
        - get_city_model_iterable
      resources:
        gpus: 1

Inference workflow

The inference workflow gets the city iterable to parameterize the predict_pipeline across the desired number of cities. The inference workflow does not worry about getting data, it pulls the latest data available, which is already prepped by the data update workflow.

kind: workflow
metadata:
  name: predict
  annotations:
    group: nowcast
spec:
  stages:
    - name: get_city_model_iterable
      target: flowdapt_nowcast_plugin.stages.get_city_model_iterable
      resources:
        cpus: 0.5

    - name: predict_pipeline
      target: flowdapt_nowcast_plugin.stages.predict_pipeline
      type: parameterized
      depends_on:
        - get_city_model_iterable
      resources:
        cpus: 4

Data update workflow

The data update workflow fetches data for the target cities, as well as auxiliary cities that may be used to help improve accuracy on the target cities. It stores the data and prepares the features so that the train and inference workflows are focused entirely on training or inferencing at low-latency.

kind: workflow
metadata:
  name: create_features
  annotations:
    group: nowcast
spec:
  stages:
    - name: create_parameterization_lists
      target: flowdapt_nowcast_plugin.stages.create_parameterization_lists
      resources:
        cpus: 0.5

    - name: get_unique_city_iterable
      target: flowdapt_nowcast_plugin.stages.get_unique_city_iterable
      depends_on:
        - create_parameterization_lists
      resources:
        cpus: 0.5

    - name: get_and_publish_data
      target: flowdapt_nowcast_plugin.stages.get_and_publish_data
      type: parameterized
      depends_on:
        - get_unique_city_iterable
      resources:
        cpus: 1

    - name: get_target_city_iterable
      target: flowdapt_nowcast_plugin.stages.get_target_city_iterable
      depends_on:
        - get_and_publish_data
      resources:
        cpus: 0.5

    - name: construct_dataframes
      target: flowdapt_nowcast_plugin.stages.construct_dataframes
      type: parameterized
      depends_on:
        - get_target_city_iterable
      resources:
        cpus: 1

Configuration

kind: config
metadata:
  name: main
  annotations:
    group: nowcast
spec:
  selector:
    type: annotation
    value:
      group: nowcast
  data:
    study_identifier: "openmeteo_test"
    # Study control parameters
    n_days: 50
    neighbors: 2
    radius: 150
    prediction_points: 1
    target_horizon: 6
    city_data_path: "flowdapt_nowcast_plugin/data/uscities.csv"
    model: "XGBoostRegressor"
    cities: ['Los Angeles', 'Miami']
    targets: ["temperature_2m", "windspeed_10m", "cloudcover"]
    # training parameters
    model_train_parameters:
      # xgboost parameters
      n_jobs: 4
      n_estimators: 200
      # device: "cuda"
      # tree_method: "hist"
      alpha: 0.5
      min_child_weight: 5
      learning_rate: 0.1
      eval_metric: "rmse"
      max_depth: 6
      verbosity: 1
      lookback: 6
      # # neural net parameters for PyTorch variants
      # epochs: 10  # 35
      # batch_size: 64
      # lookback: 6
      # hidden_dim: 2048
      # shuffle: True
      # num_threads: 4
    extras:
      weight_factor: 0.9
      di_threshold: 5.0
      num_points: 1200  # 3500
      artifact_only: true
      data_split_parameters:
        test_size: 256
        shuffle: false

Running Nowcast

After you installed flowdapt with pip install flowdapt and flowctl with pip install flowctl, you can clone the Nowcast Plugin repository and install:

git clone https://gitlab.com/emergentmethods/flowdapt-nowcast-plugin.git
cd flowdapt-nowcast-plugin
poetry install

Now you can apply your resources:

flowctl apply -p flowdapt_nowcast_plugin/workflows
flowctl apply -p flowdapt_nowcast_plugin/configs

You can run the workflows manually if you wish with flowctl run create_features and flowctl run train_pipeline and flowctl run predict_pipeline. However, these are typically scheduled with the included triggers:

flowctl apply -p flowdapt_nowcast_plugin/triggers

Further, you can use the flowdapt_sdk to automate these processes, or to ask for the latest predictions. This functionality is exemplified in the python_driver.py file.

Details about the train_pipeline, predict_pipeline, and create_features stages can be found in the Nowcast Plugin repository.