query
The query trigger will execute an SQL query on a schedule and can pass row changes as input to the defined pipeline. The query trigger supports the same database engines as the query step.
trigger "query" "expired_access_keys" { database = "postgres://steampipe@localhost:9193/steampipe" primary_key = "access_key_id" schedule = "daily"
sql = <<EOQ select access_key_id, user_name, create_date, ctx ->> 'connection_name' as connection from aws_iam_access_key where create_date < now() - interval '90 days'; EOQ
capture "insert" { pipeline = pipeline.delete_expired_access_keys
args = { access_keys = self.inserted_rows } }}
You may define a capture block for each type of change that you wish to handle. The block label must match the CRUD operation name that you wish to handle (insert
, update
, or delete
). When the query runs, if there are any new, updated, or missing rows it will trigger a pipeline run for the relevant capture
operations. The inserted_rows
, updated_rows
, and deleted_rows
attributes will contain the details about which rows were added, updated, or deleted since the last query run, and you will usually want to pass them as arguments to the pipeline. You must define at least one capture
block in a query trigger.
Flowpipe saves the data from the query trigger in a SQLite database so that it can determine which items are new, changed, or deleted.
The first time a query trigger runs, all items are considered new. The pipeline defined in the insert
capture will be run (if it is defined), and Flowpipe will store each row's primary_key
and a hash of the full row data for comparison on subsequent trigger executions.
On subsequent query trigger runs:
- Any rows with a new
primary_key
are considered inserts. The pipeline defined in theinsert
capture will be run (if it is defined). Flowpipe will store each row'sprimary_key
and a hash of the full row data for comparison on subsequent trigger executions. - Any rows with an existing
primary_key
whose hashed row data does not match what is stored are considered updates. The pipeline defined in theupdate
capture will be run (if it is defined). Flowpipe will save the new row data and hash, overwriting the previous row data and hash for that key. - If a
primary_key
that was previously stored is no longer in the result set, then the row is determined to be deleted. The pipeline defined in thedelete
capture will be run (if it is defined). Flowpipe will delete the item with thatprimary_key
; if an item with that key is returned in a future trigger run, it will be considered an insert.
Arguments
Argument | Type | Optional? | Description |
---|---|---|---|
sql | String | Required | A SQL query string. |
description | String | Optional | A description of the trigger. |
database | String | Optional | The database to query. This may be a connection reference (connection.steampipe.default ), a connection string (postgres://steampipe@127.0.0.1:9193/steampipe ), or a Pipes workspace (acme/anvils ). If not set, the default set in the mod database will be used. |
enabled | Boolean | Optional | Enable or disable the trigger. A disabled trigger will not fire, but it will retain its history and configuration. Default is true . |
param | Block | Optional | A param block that defines the parameters that can be passed into the trigger. |
primary_key | String | Optional | Primary key to use for update vs insert detection. If no primary key is defined, a hash of the row will be used as the key. |
schedule | String | Optional | Schedule to run the query. This may be a named interval (hourly , daily , weekly , 5m , 10m , 15m , 30m , 60m , 1h , 2h , 4h , 6h , 12h , 24h ) or a custom schedule in cron syntax. The default is 15m (every 15 minutes). |
title | String | Optional | Display title for the trigger. |
Attributes (Read-Only)
Attribute | Type | Description |
---|---|---|
deleted_rows | List | A list of rows that were deleted since the last time the trigger ran. deleted_rows does not return all the data for the deleted row, only its primary key. |
inserted_rows | List | A list of rows that were inserted since the last time the trigger ran. |
updated_rows | List | A list of rows that were updated since the last time the trigger ran. updated_rows contains the new row data (after it was updated). |
Parameters
One or more param
blocks may optionally be used in a trigger to define parameters that the trigger accepts.
param "url" { type = string default = "http://api.open-notify.org/astros"}
Arguments
Name | Type | Description |
---|---|---|
default | Any | A value to use if no argument is passed for this parameter when the query is run. |
description | String | A description of the parameter. |
type | String | The data type of the parameter: string , number , bool , list , map , any (default any ). |
capture
You must define at least one capture
block in a query trigger.
You may define a capture
block for each type of change that you wish to handle. The block label must match the CRUD operation name that you wish to handle (insert
, update
, or delete
). When the query runs, if there are any new, updated, or missing rows it will trigger a pipeline run for the relevant capture
operations.
Arguments
Argument | Type | Optional? | Description |
---|---|---|---|
pipeline | Pipeline Reference | Required | A reference to a pipeline resource to start when this trigger runs. |
args | Map | Optional | A map of arguments to pass to the pipeline. |
More Examples
Composite Key
The query trigger does not explicitly support composite keys, however you can concatenate multiple columns to create one.
trigger "query" "dns_changes" { database = "postgres://steampipe@localhost:9193/steampipe" primary_key = "composite_key"
sql = <<EOQ select concat(domain, '_', type, '_', ip, '_', target) as composite_key, domain, type, ip, target, value from net_dns_record where domain = 'flowpipe.io'; EOQ
capture "insert" { pipeline = pipeline.notify_new_dns_record args = { records = self.inserted_rows } }}
All capture types
You may run a different pipeline for each capture block:
trigger "query" "my_query_trigger" { database = "postgres://steampipe@localhost:9193/steampipe" primary_key = "arn"
sql = <<EOQ select arn, instance_id, instance_state, instance_type, region, account_id from aws_ec2_instance; EOQ
capture "insert" { pipeline = pipeline.instance_added
args = { rows = self.inserted_rows } }
capture "update" { pipeline = pipeline.instance_changed
args = { rows = self.updated_rows } }
capture "delete" { pipeline = pipeline.instance_terminated
args = { rows = self.deleted_rows } }}
Alternatively, you may call the same pipeline from multiple capture blocks:
trigger "query" "my_query_trigger" { database = "postgres://steampipe@localhost:9193/steampipe" primary_key = "arn"
sql = <<EOQ select arn, instance_id, instance_state, instance_type, region, account_id from aws_ec2_instance; EOQ
capture "insert" { pipeline = pipeline.instance_add_change_delete
args = { inserted_rows = self.inserted_rows } }
capture "update" { pipeline = pipeline.instance_add_change_delete
args = { updated_rows = self.updated_rows } }
capture "delete" { pipeline = pipeline.instance_add_change_delete
args = { deleted_rows = self.deleted_rows } }}