Skip to content

JetStream for OpenFaaS

The OpenFaaS async system used to be powered by NATS Streaming. The new generation of the OpenFaaS async system is backed by NATS JetStream.

Note: This feature is included for OpenFaaS Pro customers.

Async use cases

Async can be used for any OpenFaaS function invocation, where the response is not required immediately, but is either discarded or made available at a later time. Some use-cases include:

  • Batch processing and machine learning
  • Resilient data pipelines
  • Receiving webhooks
  • Long running jobs

On our blog we demo and explore some architectural patterns for these uses cases:

Terminology

In JetStream ("js" for short), there are new terms that will help us all in running and debugging the product.

  1. A JetStream Server is the original NATS Core project, running in "jetstream mode"
  2. A Stream is a set of messages, and works in a similar way to Kafka
  3. A Consumer is a group that various Subscribers can join to read messages and keep track of where they left off
  4. A Subscriber is what the queue-worker creates, if the max_inflight is set to 25, the queue-worker will create 25 subscribers

You can learn more about JetStream here: Docs for JetStream

Installation

For staging and development environments OpenFaaS can be deployed with an embedded version of the NATS server which uses an in-memory store.

To enable JetSteam for OpenFaaS set jetstream as the queue mode in the values.yaml file of the OpenFaaS Helm chart

queueMode: jetstream
nats:
    streamReplication: 1

If the NATS pod restarts, you will lose all messages that it contains. In your development or staging environment, this shouldn't happen very often.

For production environments you will need to install NATS separately using its Helm chart with at least 3 server replicas, so that if a pod crashes, the data can be recovered automatically.

queueMode: jetstream
nats:
  streamReplication: 3
  external:
    enabled: true
    host: "nats.nats"
    port: "4222"

Features

Metrics and monitoring

Get insight into the behaviour of your queues with built in metrics.

Prometheus metrics are available for monitoring things like the number of messages that have been submitted to the queue over a period of time, how many messages are waiting to be completed and the total number of messages that were processed.

An overview of all the available metrics can be found in the metrics reference

Grafana dashboard for the queue-worker

Grafana dashboard for the queue-worker

Multiple queues

OpenFaaS ships with a “mixed queue”, where all invocations run in the same queue. If you have special requirements, you can set up your own separate queue and queue-worker using the queue-worker helm chart.

See: multiple queues

If the capacity of your queue does not fit within the default limits described here you will need to follow these steps to create a Stream and Consumer manually for each queue.

Retries

Users can specify a list of HTTP codes that should be retried a number of times using an exponential back-off algorithm to mitigate the impact associated with retrying messages.

See: retries

Structured JSON logging

Logs from the queue-worker can be formatted for readability, during development, or in JSON for a log aggregator like ELK or Grafana Loki.

You can change the logging format by editing the values.yaml file for the OpenFaaS chart

jetstreamQueueWorker:
  logs:
    format: json

Structured logs formatted for the console

Structured logs formatted for the console

Configure JetStream

Every OpenFaaS async queue requires a Stream and Consumer to be created on the JetStream server. The queue-worker creates these on first startup if they do not exist. The default Stream and Consumer have some limits:

  • The maximum concurrency of each queue is limited to 512.
  • There is a limit on the amount of messages that can be queued for retries. NATS will suspend the delivery of messages when this limit is reached. The work will resume once requests or completed or dropped.

To extend these limits Streams and Consumers can be created manually.

Configure Streams and Consumers manually

Streams and Consumers can be defined manually, typically using the NATS CLI tool.

A Kubernetes controller is available for managing Streams and Consumers declaratively. This can be used if you are using a CD system like ArgoCD or Flux.

You can use arkade to install the NATS CLI.

arkade get nats

Port forward the nats server to your localhost so you can use the cli to interact with it.

kubectl port-forward -n openfaas svc/nats 4222:4222

Create a Stream

The Stream will need to be created first. In this example we will create the Stream for the shared queue, faas-request.

nats stream create faas-request \
  --subjects=faas-request \
  --replicas=1 \
  --retention=limits \
  --discard=old

Messages intended for a queue are published to a NATS subject, faas-request by default and the queue name for a dedicated queue. A Stream should only consume the subject for the queue that it will be associated with. This can be configured with the --subjects flag.

The --replicas flag is used to configure the stream replication factor. This should be at least 3 for production environments.

The command above includes the required settings for using the Stream with a queue-worker. We want to retain messages based on limits and discard old messages if these limits are reached. You will get prompted interactively for the remaining stream information. Use the defaults or configure your own limits for the stream.

? Storage file
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes

Create a Consumer

Once the Stream has been created the Consumer can be added. We will create a consumer faas-workers for the faas-request stream.

nats consumer \
  create faas-request faas-workers \
  --pull \
  --deliver=all \
  --filter="" \
  --ack=explicit \
  --replay=instant \
  --no-headers-only \
  --backoff=none
  --max-deliver=-1 \
  --wait=3m \
  --max-waiting=900 \
  --max-pending=4000 \

This command creates a pull consumer that makes available all messages for every subject on the stream. We require that each message is acknowledged explicitly.

Important configuration flags: - The queue-worker will control how many times a message can be redelivered,--max-deliver has to be set to -1 to allow unlimited deliveries.

  • The queue-worker automatically extends the ack window for functions that require more time to complete. In order to prevent us from having to extend the ack window to often we recommend configuring a default acknowledgement waiting time of 3 minutes. This can be configured with the --wait flag.

  • The --max-waiting flag limits the number of subscribers a queue-worker can create. The value should be at least max_inflight * queue-worker-replicas.

    If you are running 3 replicas of the queue-worker with a max_inflight setting of 300 this value should be 900.

  • The --max-pending flag limits the number of messages that can have a pending status. Messages that are queued for retries are also considered pending. The value of this flag should be at least max_inflight * queue-worker-replicas + buffer. The size of the buffer depends on the number of retries your queue needs to be able to handle. Set this to -1 to allow any number of pending messages.

    This value can always be update later:

    nats consumer edit --max-pending 6000
    

Configure the queue-worker

As a final step the queue-worker needs to be configured to use the externally created Stream and Consumer.

For the shared OpenFaaS queue edit the values.yaml file of the OpennFaaS chart. The name of the Consumer used by the queue-worker is set with jetstreamQueueWorker.durableName. The name of the Stream needs to be set with nats.channel.

jetstreamQueueWorker:
  durableName: faas-workers

nats:
  streamReplication: 1
  channel: faas-request

A dedicated queue using the queue-worker Helm chart can be configured by setting the nats.stream.name and nats.consumer.durableName parameters.

See also