The topics you need to connect to functions are set on a connector.
A single connector can have either a single topic, or a comma-separated list of topics.
It is recommended to deploy one connector per topic, so that it can be scaled to match the number of consumers set on the partition. I.e. a topic named payment.created which has a partition size of 3, should have three replicas of the configured connector deployed.
But it's also possible to use a single connector and pass in multiple topics i.e. payment.created,customer.onboarded,invoice.generate.
Your function(s) can then subscribe to one or more topics by setting the topic annotation with the name of the topic, or a comma separated list of topics.
The body of the message in binary format will be received as the body of the function or incoming HTTP request.
For headers and metadata:
X-Topic - the Kafka topic
X-Kafka-* - each header on the Kafka message is outputted with the form: X-Kafka-Key: Value
X-Kafka-Partition - the partition in Kafka that the message was received on
X-Kafka-Offset - the offset in Kafka that the message was received on
X-Kafka-Key - if set on the message, the key of the message in Kafka
If a binary message key is submitted to a Kafka topic, then the value will be base64-encoded and passed to the function as the X-Kafka-Key header, along with an additional header X-Kafka-Key-Enc with a value of b64 to indicate that the value is base64-encoded.
The connector will invoke functions using the default content type of text/plain, however this can be overridden in the Helm chart to i.e. `application/json or whatever is required.
Most templates make HTTP headers sent by the connector available through their request or context object, for example:
By default, the Kafka connector will invoke functions using the Gateway's synchronous invocation endpoint. If a response is returned by a function, then the message will be considered as processed and will be acknowledged on the topic.
Automatic retries via the queue-worker
If you would like to retry messages then you can switch the connector to use asynchronous invocations. Asynchronous invocations are executed using the queue-worker.
The queue-worker has a set of default retry values set via the Helm chart. They can also be overridden for each function using the documentation on the Retries page.