Run tasks or subflows in parallel, create loops and conditional branching.
Add parallelism using Flowable tasks
One of the most common orchestration requirements is to execute independent processes in parallel. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.
The flow below uses the EachParallel flowable task to execute a list of tasks in parallel.
- The 
valueproperty defines the list of items to iterate over. - The 
tasksproperty defines the list of tasks to execute for each item in the list. You can access the iteration value using the{{ taskrun.value }}variable. 
yaml
id: python_partitions
namespace: company.team
description: Process partitions in parallel
tasks:
  - id: getPartitions
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      from kestra import Kestra
      partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
      Kestra.outputs({'partitions': partitions})
  - id: processPartitions
    type: io.kestra.plugin.core.flow.EachParallel
    value: '{{ outputs.getPartitions.vars.partitions }}'
    tasks:
      - id: partition
        type: io.kestra.plugin.scripts.python.Script
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/pydata:latest
        script: |
          import random
          import time
          from kestra import Kestra
          filename = '{{ taskrun.value }}'
          print(f"Reading and processing partition {filename}")
          nr_rows = random.randint(1, 1000)
          processing_time = random.randint(1, 20)
          time.sleep(processing_time)
          Kestra.counter('nr_rows', nr_rows, {'partition': filename})
          Kestra.timer('processing_time', processing_time, {'partition': filename})
Was this page helpful?