Skip to content

Trigger functions from Kafka

Trigger function invocations from messages received on Kafka topics.

Note: This feature is included for OpenFaaS Standard & For Enterprises customers.

Feature blog posts

Walkthrough video

Installation

You can install the Kafka connector using its helm chart, or by using arkade.

Installation with Helm

See the kafka-connector helm chart

Usage

The topics you need to connect to functions are set on a connector.

A single connector can have either a single topic, or a comma-separated list of topics.

It is recommended to deploy one connector per topic, so that it can be scaled to match the number of consumers set on the partition. I.e. a topic named payment.created which has a partition size of 3, should have three replicas of the configured connector deployed.

But it's also possible to use a single connector and pass in multiple topics i.e. payment.created,customer.onboarded,invoice.generate.

Your function(s) can then subscribe to one or more topics by setting the topic annotation with the name of the topic, or a comma separated list of topics.

Create a new function:

export OPENFAAS_PREFIX=docker.io/alexellis2
faas-cli new --lang golang-middleware provision-customer

Now add an annotation for the payment.created topic, so that the provision-customer function is invoked for any message received:

version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080

functions:
  provision-customer:
    annotations:
      topic: payment.created
    lang: golang-middleware
    handler: ./provision-customer
    image: docker.io/alexellis2/provision-customer:latest

Now deploy your function, and publish an event to your Kafka broker on the payment.created topic.

Message body, headers and metadata

The body of the message in binary format will be received as the body of the function or incoming HTTP request.

For headers and metadata:

  • X-Topic - the Kafka topic
  • X-Kafka-* - each header on the Kafka message is outputted with the form: X-Kafka-Key: Value
  • X-Kafka-Partition - the partition in Kafka that the message was received on
  • X-Kafka-Offset - the offset in Kafka that the message was received on
  • X-Kafka-Key - if set on the message, the key of the message in Kafka

If a binary message key is submitted to a Kafka topic, then the value will be base64-encoded and passed to the function as the X-Kafka-Key header, along with an additional header X-Kafka-Key-Enc with a value of b64 to indicate that the value is base64-encoded.

The connector will invoke functions using the default content type of text/plain, however this can be overridden in the Helm chart to i.e. `application/json or whatever is required.

Most templates make HTTP headers sent by the connector available through their request or context object, for example:

For detailed examples with Node.js, see: Serverless For Everyone Else

For detailed examples with Go, see: Everyday Golang (Premium Edition only)

Message lifecycle and retries

Default synchronous invocation

By default, the Kafka connector will invoke functions using the Gateway's synchronous invocation endpoint. If a response is returned by a function, then the message will be considered as processed and will be acknowledged on the topic.

Automatic retries via the queue-worker

If you would like to retry messages then you can switch the connector to use asynchronous invocations. Asynchronous invocations are executed using the queue-worker.

The queue-worker has a set of default retry values set via the Helm chart. They can also be overridden for each function using the documentation on the Retries page.

See also

Would you like a demo?

Feel free to reach out to us for a demo or to ask any questions you may have.