step

A pipeline is composed of one or more steps. Each step has a type and a name, and the arguments are dependent on the type. The step types are sometimes referred to as "primitives".

Each step type has its own distinct set of attributes, though there are some common step arguments and attributes that all steps implement.

Step Types

TypeDescription
containerRun a Docker container.
email[DEPRECATED] Send an email.
functionRun an AWS Lambda-compatible function.
httpMake an HTTP request.
inputPrompt a user for input
messageSend a message to an integration.
pipelineRun another Flowpipe pipeline.
queryRun a SQL query.
sleepWait for a defined time period.
transformUse HCL functions to transform data .

Common Step Arguments

ArgumentTypeOptional?Description
depends_onList of StepsOptionalA list of steps that this step depends on.
descriptionStringOptionalA description of the step.
errorBlockOptionalAn error block to handle errors from the step.
for_eachMap or ListOptionalA map or list used as a step iterator. A step instance will be created for each item in the map or list.
ifConditionOptionalAn if condition to evaluate to determine whether to run this step.
loopBlockOptionalA loop block to run the step in a sequential loop.
max_concurrencyNumberOptionalThe maximum number of instances of the step that can be run at a time. By default, there is no limit but note the step is also subject to the per-step-type limits (FLOWPIPE_MAX_CONCURRENCY_CONTAINER, FLOWPIPE_MAX_CONCURRENCY_FUNCTION, FLOWPIPE_MAX_CONCURRENCY_HTTP, FLOWPIPE_MAX_CONCURRENCY_QUERY, etc).
outputBlockOptionalOne or more output blocks to return custom values from the step.
retryBlockOptionalA retry block to retry the step when an error occurs.
throwBlockOptionalOne or more throw blocks to raise an error from the step.
timeoutNumber or StringOptionalAmount of time this step has to run before an error is raised.
titleStringOptionalA display title for the step.

depends_on

The depends_on argument allows you to define explicit dependencies to make steps run in a specific order. Note that Flowpipe will create implicit dependencies based on references to other steps so you only need depends_on when you don't have an implicit reference.

The depends_on argument accepts a list of steps that this step depends on:

step "sleep" "sleep_10_seconds" {
depends_on = [ step.http.http_1 ]
duration = "10s"
}

for_each

The for_each argument is used to run a step in a parallel loop. This argument accepts a map or list used as a step iterator. A step instance will be created for each item in the map or list:

step "http" "add_a_user" {
for_each = ["Jerry","Elaine", "Newman"]
url = "https://myapi.local/api/v1/user"
method = "post"
request_body = jsonencode({
user_name = "${each.value}"
})
}

if

The if argument accepts a condition to evaluate to determine whether to run this step. The step will only run when if evaluates to true:

step "email" "send_it" {
if = step.pipeline.order.output.order_count > 0
to = ["darin@kramerica.com"]
from = "elaine@jpetermancatalog.com"
host = "smtp.example.com"
subject = "Order Shipped"
body = "Your order has shipped"
}

timeout

Most steps accept a timeout argument which specifies the amount of time to wait for the step to complete before raising an error.

The timeout argument may be an integer or a Go duration string. If the duration is an integer, it will be interpreted as the number of milliseconds:

step "http" "whos_in_space" {
url = "http://api.open-notify.org/astros"
method = "get"
timeout = 5000
}

You may instead pass a string that specifies the number and type of units. Valid time units are ns, us (or µs), ms, s, m, h. Note that the granularity varies by step type, and fractional amounts will be rounded up to the appropriate granularity. For instance, The timeout for a container has a granularity of 1 second, so if you set the timeout to 500ms it will be rounded up to 1 second.

step "http" "whos_in_space" {
url = "http://api.open-notify.org/astros"
method = "get"
timeout = "5s"
}

You can even include multiple units:

step "http" "whos_in_space" {
url = "http://api.open-notify.org/astros"
method = "get"
timeout = "1m5s"
}

error

By default, all errors are fatal and are not retried - When a step encounters an error, it causes the step the fail. A failed step results in a failed pipeline - Any step instances that are already running will complete (but will not be retried) but then the pipeline will stop with a failed status.

The error block allows you to ignore the error and continue execution. You can then "handle" the error in subsequent steps.

step "http" "my_request" {
url = "https://myapi.local/subscribe"
method = "post"
body = jsonencode({
name = param.subscriber
})
error {
ignore = true
}
}
step "email" "send_it" {
to = param.subscriber
subject = "You have been subscribed"
body = step.http.my_request.response_body
if = !is_error(step.http.my_request)
}

Arguments

ArgumentTypeOptional?Description
ifBooleanOptionalA condition to evaluate to determine whether to evaluate this error block. The error block will only be evaluated when the if condition evaluates to true.
ignoreBooleanOptionalIf true, the error will be ignored.

loop

The loop block will run a step in a sequential loop, changing the arguments with each iteration. This is useful for handling HTTP pagination, for example.

step "http" "list_workspaces" {
url = "https://latestpipe.turbot.io/api/v1/org/latesttank/workspace/?limit=3"
method = "get"
request_headers = {
Content-Type = "application/json"
Authorization = "Bearer ${param.pipes_token}"
}
loop {
until = result.response_body.next_token == null
url = "https://latestpipe.turbot.io/api/v1/org/latesttank/workspace/?limit=3&next_token=${result.response_body.next_token}"
}
}

The loop is evaluated last after the step instance has finished executing and all retries have been completed. You can use the special value result to evaluate the attributes of the completed step instance. result is essentially a self-reference to "this" step after it has run (e.g. the attributes are populated).

You can also use the special attribute called loop inside the block. loop has a single attribute index which is the (zero-based) index of the loop count.

Arguments

ArgumentTypeOptional?Description
untilBooleanOptionalA condition to evaluate to determine whether to run this step again. The step will loop until this condition is true.
{any argument}anyOptionalA step argument to override for the next iteration. When the step runs again due to the loop, it inherits all of the arguments from the step, but you can override them inside the loop block if desired, allowing you to pass data from the step execution into the next step execution

retry

The retry block allows you to retry the step when an error occurs.

step "http" "my_request" {
url = "https://myapi.local/subscribe"
method = "post"
body = jsonencode({
name = param.subscriber
})
retry {
max_attempts = 5
strategy = "exponential"
min_interval = 100
max_interval = 10000
}
}

Arguments

ArgumentDefaultOptional?Description
iftrueOptionalA condition to evaluate to determine whether to retry the step. The step will be retried only if the if condition evaluates to true.
max_attempts3OptionalSpecifies the maximum number attempts to run the step.
strategyconstantOptionalThe backoff strategy. One of exponential, linear, constant.
min_interval1000OptionalThe first interval between retries, in milliseconds. If the strategy is exponential or linear, subsequent intervals will be scaled based on this value.
max_interval10000OptionalThe maximum interval between retries, in milliseconds.

throw

You can also explicitly raise an exception with a throw block to raise an error when the step would usually succeed. You can include as many throw blocks as you want. Thrown errors are not retried, though they can be ignored.

step "http" "my_request" {
url = "https://myapi.local/subscribe"
method = "post"
body = jsonencode({
name = param.subscriber
})
throw {
if = length(result.response_body.errors) > 1
message = result.response_body.errors[0]
}
}

Arguments

ArgumentTypeOptional?Description
messageStringRequiredThe error string for the thrown error.
ifBooleanOptionalA condition to evaluate to determine whether to throw an error. The step will only raise an error when the if condition evaluates to true.

output

You may include one or more output blocks to arbitrary values in the output attribute of the step.

pipeline "get_astros" {
step "http" "whos_in_space" {
url = "http://api.open-notify.org/astros"
method = "get"
}
step "transform" "parse_astros" {
value = step.http.whos_in_space.response_body.people
output "astronauts" {
value = step.http.whos_in_space.response_body.people[*].name
}
output "spacecraft" {
value = distinct(step.http.whos_in_space.response_body.people[*].craft)
}
}
output "people_in_space" {
value = step.transform.parse_astros.output.astronauts
}
output "ships_in_space" {
value = step.transform.parse_astros.output.spacecraft
}
}

Arguments

ArgumentTypeOptional?Description
valueAnyRequiredThe output value.

Common Step Attributes (Read-Only)

AttributeTypeDescription
errorsListList of errors from the step
flowpipeMapA map of Flowpipe metadata about the step instance
outputMapA map of the step outputs defined in output blocks

errors (Read-Only)

Each step has an errors attribute that contains a list of errors that occurred. Unhandled errors will cause the pipeline run to fail and will be returned in the pipeline errors list.

To simplify common error-handling cases, Flowpipe provides some helper functions:

  • is_error: Given a reference to a step, is_error returns a boolean true if there are 1 or more errors, or false if there are no errors. is_error(step.http.my_request) is equivalent to length(step.http.my_request.errors) > 0
  • error_message: Given a reference to a step, error_message will return a string containing the first error message, if any. If there are no errors, then it will return an empty string. This is useful for simple step primitives.

output (Read-Only)

You can access custom step outputs using the output attribute of a step. The output attribute contains a map of outputs for the step, with an entry for each output block.

flowpipe (Read-Only)

The flowpipe attribute includes metadata about the step instance:

AttributeDescription
finished_atTimestamp when the step instance finished executing
started_atTimestamp when the step instance started executing