ballerinax/kafka

Overview

This module provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.

Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

This module supports Kafka 1.x.x and 2.0.0 versions.

Consumer and producer

Kafka producer

A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer. For the producer to execute successfully, an active Kafka broker should be available.

The code snippet given below initializes a producer with the basic configuration.

1import ballerinax/kafka;
2
3kafka:ProducerConfiguration producerConfiguration = {
4 clientId: "basic-producer",
5 acks: "all",
6 retryCount: 3
7};
8
9kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);

Kafka consumer

A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer. For the consumer to execute successfully, an active Kafka broker should be available.

The code snippet given below initializes a consumer with the basic configuration.

1kafka:ConsumerConfiguration consumerConfiguration = {
2 groupId: "group-id", // Unique string that identifies the consumer
3 offsetReset: "earliest", // Offset reset strategy if no initial offset
4 topics: ["kafka-topic"]
5};
6
7kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);

Listener

The Kafka consumer can be used as a listener to a set of topics without the need to manually poll the messages.

You can use the Caller to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.

1kafka:ConsumerConfiguration consumerConfigs = {
2 groupId: "group-id",
3 topics: ["kafka-topic-1"],
4 pollingInterval: 1,
5 autoCommit: false
6};
7
8listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
9
10service kafka:Service on kafkaListener {
11 remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
12 // processes the records
13 ...
14 // commits the offsets manually
15 kafka:Error? commitResult = caller->commit();
16
17 if commitResult is kafka:Error {
18 log:printError("Error occurred while committing the offsets for the consumer ", 'error = commitResult);
19 }
20 }
21}

Data serialization

Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serialization in which bytes of arrays are converted into the desired data type.

Currently, this module only supports the byte array data type for both the keys and values. The following code snippets show how to produce and read a message from Kafka.

1string message = "Hello World, Ballerina";
2string key = "my-key";
3// converts the message and key to a byte array
4check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
1kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
2
3foreach var kafkaRecord in records {
4 byte[] messageContent = kafkaRecord.value;
5 // tries to generate the string value from the byte array
6 string result = check string:fromBytes(messageContent);
7 io:println("The result is : ", result);
8}

Concurrency

In Kafka, records are grouped into smaller units called partitions. These can be processed independently without compromising the correctness of the results and lays the foundation for parallel processing. This can be achieved by using multiple consumers within the same group each reading and processing data from a subset of topic partitions and running in a single thread.

Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.

The following code snippet joins a consumer to the consumer-group and assigns it to a topic partition manually.

1kafka:ConsumerConfiguration consumerConfigs = {
2 // `groupId` determines the consumer group
3 groupId: "consumer-group",
4 pollingInterval: 1,
5 autoCommit: false
6};
7
8kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
9// creates a topic partition
10kafka:TopicPartition topicPartition = {
11 topic: "kafka-topic-1",
12 partition: 1
13};
14// passes the topic partitions to the assign function as an array
15check kafkaConsumer->assign([topicPartition]);

Clients

[4]

Caller

Represents a Kafka caller, which can be used to commit the offsets consumed by the service.

Consumer

Represents a Kafka consumer endpoint.

Listener

Represents a Kafka consumer endpoint.

Producer

Represents a Kafka producer endpoint.

Classes

[1]

TypeChecker

Represents the TypeChecker which is used by the runtime to check whether a type is a subtype of kafka:AnydataConsumerRecord

Object types

[1]

Service

The Kafka service type.

Records

[14]

AnydataConsumerRecord

Type related to anydata consumer record.

AnydataProducerRecord

Details related to the anydata producer record.

AuthenticationConfiguration

Configurations related to Kafka authentication mechanisms.

BytesConsumerRecord

Subtype related to kafka:AnydataConsumerRecord record.

BytesProducerRecord

Subtype related to kafka:AnydataProducerRecord record.

CertKey

Represents a combination of certificate, private key, and private key password if encrypted.

ConsumerConfiguration

Configurations related to consumer endpoint.

ConsumerRecord
D

Type related to consumer record.

KafkaPayload

Defines the Payload remote function parameter.

PartitionOffset

Represents the topic partition position in which the consumed record is stored.

ProducerConfiguration

Represents the kafka:Producer configuration.

ProducerRecord
D

Details related to the producer record.

SecureSocket

Configurations for secure communication with the Kafka server.

TopicPartition

Represents a topic partition.

Constants

[31]

ACKS_ALL

Producer acknowledgement type is 'all'.

ACKS_NONE

Producer acknowledgement type '0'.

ACKS_SINGLE

Producer acknowledgement type '1'.

AUTH_SASL_PLAIN

Kafka SASL_PLAIN authentication mechanism

COMPRESSION_GZIP

Kafka GZIP compression type.

COMPRESSION_LZ4

Kafka LZ4 compression type.

COMPRESSION_NONE

No compression.

COMPRESSION_SNAPPY

Kafka Snappy compression type.

COMPRESSION_ZSTD

Kafka ZSTD compression type.

DEFAULT_URL

The default server URL.

DES_AVRO

Apache Avro deserializer.

DES_BYTE_ARRAY

In-built Kafka byte array deserializer.

DES_CUSTOM

User-defined deserializer.

DES_FLOAT

In-built Kafka float deserializer.

DES_INT

In-built Kafka int deserializer.

DES_STRING

In-built Kafka string deserializer.

ISOLATION_COMMITTED

Configures the consumer to read the committed messages only in the transactional mode when poll() is called.

ISOLATION_UNCOMMITTED

Configures the consumer to read all the messages including the aborted ones.

OFFSET_RESET_EARLIEST

Automatically reset the consumer offset to the earliest offset

OFFSET_RESET_LATEST

Automatically reset the consumer offset to the latest offset

OFFSET_RESET_NONE

If the offsetReset is set to OFFSET_RESET_NONE, the consumer will give an error if no previous offset is found for the consumer group

PROTOCOL_PLAINTEXT

Represents Kafka un-authenticated, non-encrypted channel

PROTOCOL_SASL_PLAINTEXT

Represents Kafka authenticated, non-encrypted channel

PROTOCOL_SASL_SSL

Represents Kafka SASL authenticated, SSL channel

PROTOCOL_SSL

Represents Kafka SSL channel

SER_AVRO

Apache Avro serializer.

SER_BYTE_ARRAY

In-built Kafka Byte Array serializer.

SER_CUSTOM

User-defined serializer.

SER_FLOAT

In-built Kafka float serializer.

SER_INT

In-built Kafka int serializer.

SER_STRING

In-built Kafka string serializer.

Enums

[1]

Protocol

Represents protocol options.

Annotations

[1]

Payload

The annotation which is used to define the payload parameter in the onConsumerRecord service method.

Types

[8]

AuthenticationMechanism

Represents the supported Kafka SASL authentication mechanisms.

CompressionType

Kafka compression types to compress the messages.

DeserializerType

Kafka in-built deserializer type.

IsolationLevel

kafka:Consumer isolation level type.

OffsetResetMethod

Represents the different types of offset-reset methods of the Kafka consumer.

ProducerAcks

kafka:Producer acknowledgement types.

SecurityProtocol

Represents the supported security protocols for Kafka clients.

SerializerType

Kafka in-built serializer types.

Errors

[1]

Error