Announcement

Data-driven DevOps workflows with query triggers and steps

Watch a database, and run a pipeline when a query yields changed results.

Turbot Team
5 min. read - Mar 05, 2024

It's handy to run Flowpipe pipelines directly in your console, but pipelines also need to respond to events. Triggers are the way to do that. An http trigger relays an inbound webhook call so that, for example, a Slack integration can run a Flowpipe pipeline. A schedule trigger runs a pipeline periodically. And a query trigger watches a database, firing in response to data changes.

A query trigger that watches for new AWS IAM access keys

Here's a workflow that's driven by a query trigger that periodically asks AWS if any new IAM access keys have appeared. If there are any, the trigger calls a notification pipeline.

Here's the HCL definition of the trigger.

trigger "query" "new_aws_iam_access_key" {
description = "Fire when a new key appears, call a notifier"
database = "postgres://steampipe@localhost:9193/steampipe"
schedule = "* * * * *"
primary_key = "access_key_id"
sql = <<EOQ
select access_key_id, account_id, user_name, create_date
from aws_iam_access_key
where create_date > now() - interval '2 weeks'
EOQ
capture "insert" {
pipeline = pipeline.notify_new_access_key
args = {
rows = self.inserted_rows
}
}
}

In this case we're using Steampipe, along with the AWS plugin, to query the Amazon API — so the database argument is a Postgres connection string. But pipelines can watch other databases including SQLite, MySQL, and DuckDB. And you could even use another distribution of the AWS plugin: a foreign-data-wrapper extension in your own Postgres database, or a virtual-table extension in SQLite.

We're using a cron expression to run every minute, but could alternatively use a named interval like "5m".

The primary key is optional, if omitted Flowpipe will use a hash of the row to detect change. In this case we're using the access_key_id; it's also possible to create a composite key by concatenating columns.

The query finds one or more new access keys. The capture block controls what happens when the query returns new rows. In this case it relays captured rows to a pipeline that sends a notification. You can also have capture blocks that watch for updates and deletes, and you can run a different pipeline for each set of affected rows.

Notify on new access keys

Here's the pipeline called by the trigger. The notifier doesn't say anything about which communication medium to use, it could as easily be an email or Slack integration, that's purely a configuration detail.

pipeline "notify_new_access_key" {
description = "Loop over new access keys, send each to notifier."
param "rows" {
type = list
}
step "message" "notify" {
notifier = notifier["new_access_key"]
for_each = param.rows
text = <<EOQ
New access key for ${each.value.user_name}, ${each.value.account_id}:
${each.value.access_key_id}, ${each.value.create_date}
EOQ
}
}

Run the workflow

As with any scheduled Flowpipe activity, we'll start Flowpipe in server mode

flowpipe server

Within one minute we can see that Flowpipe detected new data and fired to run the notification pipeline.

Note that Flowpipe watches your code for changes. If you alter the code while the pipeline is running, your changes take effect immediately.

See it in action

Tap into the Flowpipe ecosystem

Flowpipe's query trigger enables powerful workflows that connect to databases and take action when data changes. You can use any of Steampipe's 140 plugins to query cloud platforms like AWS and Azure, SaaS tools like Jira, security scanners, network tools, and more. Or you can connect to your own data sources. Likewise you can use any of Flowpipe's library mods to take actions in Slack, Teams, Email, Jira, AWS, Azure, and elsewhere, or create your own library pipelines that compose with one another and with published mods.

To get started with Flowpipe: download the tool, follow the tutorial, peruse the library mods, and check out the samples. Then create a workflow that alerts on security events, enforces policies, gathers and publishes metrics, or does something else your DevOps team requires, and let us know how it goes!