Trigger functions from Postgres¶
Trigger functions from PostgreSQL tables efficiently without the overhead of database triggers.
Note: This feature is included for OpenFaaS Standard & For Enterprises customers.
From the blog: Trigger OpenFaaS functions from PostgreSQL with AWS Aurora
Installation¶
There are two options for triggering functions from Postgres:
- Using logical replication and the Write Ahead Log (WAL)
- Using LISTEN/NOTIFY and a series of table-level triggers
Configure your Postgresql database¶
You can configure a cloud/managed Postgresql database or install Postgres locally, the following settings are required in postgresql.conf
:
wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
You can set up a self-hosted postgres database to test out this feature:
docker run --rm --name pg -e POSTGRES_PASSWORD=passwd -p 5432:5432 -ti postgres:latest
docker cp pg:/var/lib/postgresql/data/postgresql.conf postgresql.conf
echo "wal_level = logical" | tee -a postgresql.conf
echo "max_wal_senders = 10" | tee -a postgresql.conf
echo "max_replication_slots = 5" | tee -a postgresql.conf
docker cp postgresql.conf pg:/var/lib/postgresql/data/postgresql.conf
docker restart pg
Connect and test it:
PGPASSWORD=passwd psql -U postgres -h 127.0.0.1
On start-up, the connector will create a database schema named openfaas
and a function named notify_event
which emits events using pg_notify()
.
If you are following least-privilege principles, you'll need to define a Postgresql user with the above permissions.
To receive events on table changes a trigger needs to be created for one or more specific tables to. The usage section explains how to create these.
Install the connector with Helm¶
-
Set up the connector
You can install the Postgres connector using its helm chart.
The
values.yaml
file can be customised to suit your needs.Provide the connection string in a Kubernetes secret as directed.
-
Tuning the connector for your needs.
filters
- set this to filter which tables and events to be notified about i.e.customers:insert,customers:delete,subscriptions:update
Usage¶
You can deploy our tester function which prints out the HTTP headers and body of any invocation.
A function can subscribe to a subset of the values you set in the filters
field in values.yaml when installing with helm.
Separate each one with a comma, as follows, or give just one if that's what you need.
faas-cli store deploy printer \
--annotation topic=customer:insert,customer:update
Create a table and insert data.¶
- Create a table i.e.
customer
:CREATE TABLE customer ( id integer primary key generated always as identity, name text, created_at timestamp);
- Insert some data into the table:
insert into customer (name, created_at) values ('Alex Ellis', now());
- Create a table i.e.
customer
:CREATE TABLE customer ( id integer primary key generated always as identity, name text, created_at timestamp);
- Create a trigger for the
customer
table:CREATE TRIGGER customer_notify_event AFTER INSERT OR UPDATE OR DELETE ON customer FOR EACH ROW EXECUTE PROCEDURE openfaas.notify_event();
- Insert some data into the table:
insert into customer (name, created_at) values ('Alex Ellis', now());
Take a look at the logs of the printer function to inspect the invocations made by the connector. You should see the following output:
faas-cli logs printer
2023-01-23T17:13:50Z X-Event-Id=[e6b7da06-775a-4220-becb-f483508e1eb9]
2023-01-23T17:13:50Z X-Message-Id=[0]
2023-01-23T17:13:50Z X-Connector=[connector-sdk openfaasltd/postgres-connector]
2023-01-23T17:13:50Z X-Forwarded-Uri=[/function/printer.openfaas-fn]
2023-01-23T17:13:50Z X-Start-Time=[1674494030262922184]
2023-01-23T17:13:50Z Accept-Encoding=[gzip]
2023-01-23T17:13:50Z X-Event-Action=[insert]
2023-01-23T17:13:50Z X-Event-Table=[customer]
2023-01-23T17:13:50Z Content-Type=[application/json]
2023-01-23T17:13:50Z X-Call-Id=[9a7ad051-1038-4a04-bd16-e1a9d478855f]
2023-01-23T17:13:50Z X-Topic=[customer:insert]
2023-01-23T17:13:50Z 2023/01/23 17:13:50 POST / - 202 Accepted - ContentLength: 0B (0.0007s)
2023-01-23T17:13:50Z User-Agent=[Go-http-client/1.1]
2023-01-23T17:13:50Z
2023-01-23T17:13:50Z {"id":"e6b7da06-775a-4220-becb-f483508e1eb9","schema":"public","table":"customer","action":"insert","data":{"created_at":"2023-01-23T17:13:50.24367Z","id":2,"name":"Alex Ellis"},"dataOld":{},"commitTime":"2023-01-23T17:13:50.243871Z"}
faas-cli logs printer
2023-01-23T17:32:31Z Accept-Encoding=[gzip]
2023-01-23T17:32:31Z X-Event-Action=[insert]
2023-01-23T17:32:31Z X-Call-Id=[93050c37-faa6-4e69-bc00-c0e15f17bea0]
2023-01-23T17:32:31Z X-Forwarded-Uri=[/function/printer.openfaas-fn]
2023-01-23T17:32:31Z X-Start-Time=[1674495151266155144]
2023-01-23T17:32:31Z User-Agent=[Go-http-client/1.1]
2023-01-23T17:32:31Z X-Event-Table=[customer]
2023-01-23T17:32:31Z X-Topic=[customer:insert]
2023-01-23T17:32:31Z X-Message-Id=[1]
2023-01-23T17:32:31Z Content-Type=[application/json]
2023-01-23T17:32:31Z X-Connector=[connector-sdk openfaasltd/postgres-connector]
2023-01-23T17:32:31Z 2023/01/23 17:32:31 POST / - 202 Accepted - ContentLength: 0B (0.0006s)
2023-01-23T17:32:31Z
2023-01-23T17:32:31Z {"schema":"public","table":"customer","action":"insert","data":{"created_at":"2023-01-23T17:32:31.248531","id":2,"name":"Alex Ellis"}}
Reference¶
The content-type is set to application/json
and the body is a JSON object.
The body will contain JSON as follows for an insert:
{
"id": "e6b7da06-775a-4220-becb-f483508e1eb9",
"schema": "public",
"table": "customer",
"action": "insert",
"data": {
"created_at": "2023-01-23T17:13:50.24367Z",
"id": 2,
"name": "Alex Ellis"
},
"dataOld": {},
"commitTime": "2023-01-23T17:13:50.243871Z"
}
{
"schema": "public",
"table": "customer",
"action": "insert",
"data":{
"created_at": "2023-01-23T17:32:31.248531",
"id": 2,
"name": "Alex Ellis"
}
}
For a delete:
{
"id": "da83108d-9421-443e-a4ca-d8516598f82d",
"schema": "public",
"table": "customer",
"action": "delete",
"data": {},
"dataOld": {
"created_at": null,
"id": 2,
"name": null
},
"commitTime": "2023-01-23T17:28:20.552425Z"}
{
"schema": "public",
"table": "customer",
"action": "delete",
"data": {
"created_at": "2023-01-23T17:32:31.248531",
"id": 2,
"name": "Alex Ellis"
}
}
Additional headers¶
Additional headers are also made available, which mean you can efficiently filter out events that you do not need to process, without parsing JSON.
X-Event-Action
- the action that was performed on the table, e.g.insert
,update
,delete
X-Event-Table
- the table that was affectedX-Topic
- the topic that was used to subscribe to the event i.e.customer:insert
X-Event-Id
- a UUID for the delivery of this eventX-Message-Id
- the message ID of the event - increments from 0 to N based upon the amount of events received by the connector
X-Event-Action
- the action that was performed on the table, e.g.insert
,update
,delete
X-Event-Table
- the table that was affectedX-Topic
- the topic that was used to subscribe to the event i.e.customer:insert
Would you like a demo?¶
Feel free to reach out to us for a demo or to ask any questions you may have.