# Data-driven DevOps workflows with query triggers and steps

> Watch a database, and run a pipeline when a query yields changed results.

By Turbot Team
Published: 2024-03-05


It's handy to run Flowpipe pipelines directly in your console, but pipelines also need to respond to events. <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger">Triggers</a> are the way to do that. An <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger/http">http</a> trigger relays an inbound webhook call so that, for example, a Slack integration can run  a Flowpipe pipeline. A <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger">schedule</a> trigger runs a pipeline periodically. And a <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger/query">query</a> trigger watches a database, firing in response to data changes. 

## A query trigger that watches for new AWS IAM access keys

Here's a workflow that's driven by a query trigger that periodically asks AWS if any new IAM access keys have appeared. If there are any, the trigger calls a notification pipeline.

<div>
<img width="40%" alt="access key workflow graph" src="/images/blog/2024-03-query-trigger/query_trigger_iam.png"/>
</div>

Here's the HCL definition of the trigger.

```hcl
trigger "query" "new_aws_iam_access_key" {
  description       = "Fire when a new key appears, call a notifier"
  database          = "postgres://steampipe@localhost:9193/steampipe"
  schedule          = "* * * * *"
  primary_key       = "access_key_id"
  sql               = <<EOQ
    select access_key_id, account_id, user_name, create_date
    from aws_iam_access_key
    where create_date > now() - interval '2 weeks'
  EOQ

  capture "insert" {
    pipeline = pipeline.notify_new_access_key
    args = {
      rows = self.inserted_rows
    }
  }

}
```

In this case we're using Steampipe, along with the AWS plugin, to query the Amazon API — so the `database` argument is a Postgres connection string. But pipelines can watch other databases including SQLite, MySQL, and DuckDB. And you could even use another distribution of the AWS plugin: a [foreign-data-wrapper extension](https://steampipe.io/blog/2023-12-postgres-extensions) in your own Postgres database, or a [virtual-table extension](https://steampipe.io/blog/2023-12-sqlite-extensions) in SQLite.

We're using a cron expression to run every minute, but could alternatively use a <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger/schedule#arguments">named interval</a> like `"5m"`.

The primary key is optional, if omitted Flowpipe will use a hash of the row to detect change. In this case we're using the `access_key_id`; it's also possible to create a <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger/query#composite-key">composite key</a> by concatenating columns.

The query finds one or more new access keys. The <a href="https://flowpipe.io/docs/flowpipe-hcl/trigger/query#all-capture-types">capture</a> block controls what happens when the query returns new rows. In this case it relays captured rows to a pipeline that sends a notification. You can also have capture blocks that watch for updates and deletes, and you can run a different pipeline for each set of affected rows. 

## Notify on new access keys

Here's the pipeline called by the trigger. The [notifier](/docs/reference/config-files/notifier) doesn't say anything about which communication medium to use, it could as easily be an email or Slack [integration](/docs/reference/config-files/integration), that's purely a configuration detail. 

```hcl
pipeline "notify_new_access_key" {
  description = "Loop over new access keys, send each to notifier."

  param "rows" {
    type = list
  }

  step "message" "notify" {
    notifier = notifier["new_access_key"]
    for_each = param.rows
    text = <<EOQ
      New access key for ${each.value.user_name}, ${each.value.account_id}: 
        ${each.value.access_key_id},  ${each.value.create_date}
      EOQ
  }  
  
}
```

## Run the workflow

As with any scheduled Flowpipe activity, we'll start Flowpipe in server mode

```
flowpipe server
```

Within one minute we can see that Flowpipe detected new data and fired to run the notification pipeline. 

<div>
<img alt="access key console" src="/images/blog/2024-03-query-trigger/access_key_console.png"/>
</div>


Note that Flowpipe watches your code for changes. If you alter the code while the pipeline is running, your changes take effect immediately.

## See it in action

<div className="flex justify-center">
<iframe
    class="youtube-video"
    src="https://www.youtube-nocookie.com/embed/TJZ5NCMohYo"
    frameBorder="0"
    allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
    allowFullScreen
    title="Data-driven DevOps workflows with query triggers and steps"
>
</iframe>
</div>


## Tap into the Flowpipe ecosystem

Flowpipe's query trigger enables powerful workflows that connect to databases and take action when data changes. You can use any of Steampipe's [140 plugins](https://hub.steampipe.io) to query cloud platforms like AWS and Azure, SaaS tools like Jira, security scanners, network tools, and more. Or you can connect to your own data sources. Likewise you can use any of Flowpipe's [library mods](https://hub.flowpipe.io/?type=library) to take actions in Slack, Teams, Email, Jira, AWS, Azure, and elsewhere, or create your own library pipelines that compose with one another and with published mods.

To get started with Flowpipe: [download](/downloads) the tool, follow the [tutorial](/docs), peruse the [library mods](https://hub.flowpipe.io/?type=library), and check out the [samples](https://hub.flowpipe.io/?type=sample). Then create a workflow that alerts on security events, enforces policies, gathers and publishes metrics, or does something else your DevOps team requires, and [let us know](https://turbot.com/community/join) how it goes!


