kafka
Module kafka
API
Declarations
ballerinax/kafka Ballerina library
Package Overview
This package is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This package supports Kafka 1.x.x and 2.0.0 versions.
For information on the operations, which you can perform with this package, see the below Functions. For examples on the usage of the operations, see the following.
- Producer Example
- Consumer Service Example
- Consumer Client Example
- Transactional Producer Example
- Consumer with SASL Authentication Example
- Producer with SASL Authentication Example
Basic Usages
Publishing Messages
- Initialize the Kafka message producer.
kafka:ProducerConfiguration producerConfiguration = { clientId: "basic-producer", acks: "all", retryCount: 3 }; kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);
- Use the
kafka:Producer
to publish messages.
string message = "Hello World, Ballerina"; check kafkaProducer->send({ topic: "test-kafka-topic", value: message.toBytes() });
Consuming Messages
- Initializing the Kafka message consumer.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", offsetReset: "earliest", topics: ["kafka-topic"] }; kafka:Consumer consumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
- Use the
kafka:Consumer
as a simple record consumer.
kafka:ConsumerRecord[]|kafka:Error result = consumer->poll(1);
- Use the
kafka:Listener
as a listener.
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { } }
Clients
kafka: Caller
Represents a Kafka caller, which can be used to commit the offsets consumed by the service.
'commit
function 'commit() returns Error?
Commits the current consumed offsets for the service.
kafka:Error? result = caller->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits given offsets and partitions for the given topics, for service.
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? -
kafka:Error
if an error is encountered or else nil
kafka: Consumer
Represents a Kafka consumer endpoint.
Constructor
Creates a new Kafka Consumer
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to consumer endpoint
assign
function assign(TopicPartition[] partitions) returns Error?
Assigns consumer to a set of topic partitions.
Parameters
- partitions TopicPartition[] - Topic partitions to be assigned
Return Type
- Error? -
kafka:Error
if an error is encountered or else nil
close
Closes the consumer connection of the external Kafka broker.
kafka:Error? result = consumer->close();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the close operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
'commit
function 'commit() returns Error?
Commits the current consumed offsets for the consumer.
kafka:Error? result = consumer->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits given offsets and partitions for the given topics, for consumer.
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? -
kafka:Error
if an error is encountered or else nil
getAssignment
function getAssignment() returns TopicPartition[]|Error
Retrieves the currently-assigned partitions for the consumer.
kafka:TopicPartition[]|kafka:Error result = consumer->getAssignment();
Return Type
- TopicPartition[]|Error - Array of assigned partitions for the consumer if executes successfully or else a
kafka:Error
getAvailableTopics
Retrieves the available list of topics for a particular consumer.
string[]|kafka:Error result = consumer->getAvailableTopics();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the execution of the
get available topics
operation
Return Type
getBeginningOffsets
function getBeginningOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the start offsets for given set of partitions.
Parameters
- partitions TopicPartition[] - Array of topic partitions to get the starting offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the get beginning offsets execution
Return Type
- PartitionOffset[]|Error - Starting offsets for the given partitions if executes successfully or else
kafka:Error
getCommittedOffset
function getCommittedOffset(TopicPartition partition, decimal duration) returns PartitionOffset|Error?
Retrieves the last committed offsets for the given topic partitions.
Parameters
- partition TopicPartition - The
TopicPartition
in which the committed offset is returned for consumer
- duration decimal (default -1) - Timeout duration (in seconds) for the get committed offset operation to execute
Return Type
- PartitionOffset|Error? - The last committed offset for the consumer for the given partition if there is a committed offset
present,
()
if there are no committed offsets or else akafka:Error
getEndOffsets
function getEndOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the last offsets for given set of partitions.
Parameters
- partitions TopicPartition[] - Set of partitions to get the last offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the get end offsets operation to execute
Return Type
- PartitionOffset[]|Error - End offsets for the given partitions if executes successfully or else
kafka:Error
getPausedPartitions
function getPausedPartitions() returns TopicPartition[]|Error
Retrieves the partitions, which are currently paused.
kafka:TopicPartition[]|kafka:Error result = consumer->getPausedPartitions();
Return Type
- TopicPartition[]|Error - Set of partitions paused from message retrieval if executes successfully or else
a
kafka:Error
getPositionOffset
function getPositionOffset(TopicPartition partition, decimal duration) returns int|Error
Retrieves the offset of the next record that will be fetched, if a records exists in that position.
Parameters
- partition TopicPartition - The
TopicPartition
in which the position is required
- duration decimal (default -1) - Timeout duration (in seconds) for the get position offset operation to execute
Return Type
getSubscription
Retrieves the set of topics, which are currently subscribed by the consumer.
string[]|kafka:Error result = consumer->getSubscription();
Return Type
getTopicPartitions
function getTopicPartitions(string topic, decimal duration) returns TopicPartition[]|Error
Retrieves the set of partitions to which the topic belongs.
kafka:TopicPartition[]|kafka:Error result = consumer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The topic for which the partition information is needed
- duration decimal (default -1) - Timeout duration (in seconds) for the
get topic partitions
operation to execute
Return Type
- TopicPartition[]|Error - Array of partitions for the given topic if executes successfully or else a
kafka:Error
pause
function pause(TopicPartition[] partitions) returns Error?
Pauses retrieving messages from a set of partitions.
Parameters
- partitions TopicPartition[] - Partitions to pause the retrieval of messages
Return Type
- Error? -
kafka:Error
if an error is encountered or else nil
poll
function poll(decimal timeout) returns ConsumerRecord[]|Error
Polls the consumer for the records of an external broker.
kafka:ConsumerRecord[]|kafka:Error result = consumer->poll(1000);
Parameters
- timeout decimal - Polling time in seconds
Return Type
- ConsumerRecord[]|Error - Array of consumer records if executed successfully or else a
kafka:Error
resume
function resume(TopicPartition[] partitions) returns Error?
Resumes consumer retrieving messages from set of partitions which were paused earlier.
Parameters
- partitions TopicPartition[] - Partitions to resume the retrieval of messages
Return Type
- Error? -
kafka:Error
if an error is encountered or else ()
seek
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? -
kafka:Error
if an error is encountered or else ()
seekToBeginning
function seekToBeginning(TopicPartition[] partitions) returns Error?
Seeks the beginning of the offsets for the given set of topic partitions.
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? -
kafka:Error
if an error is encountered or else ()
seekToEnd
function seekToEnd(TopicPartition[] partitions) returns Error?
Seeks end of the offsets for the given set of topic partitions.
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? -
kafka:Error
if an error is encountered or else ()
subscribe
Subscribes the consumer to the provided set of topics.
kafka:Error? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
Parameters
- topics string[] - Array of topics to be subscribed to
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
subscribeWithPattern
Subscribes the consumer to the topics, which match the provided pattern.
kafka:Error? result = consumer->subscribeWithPattern("kafka.*");
Parameters
- regex string - Pattern, which should be matched with the topics to be subscribed to
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
unsubscribe
function unsubscribe() returns Error?
Unsubscribes from all the topic subscriptions.
kafka:Error? result = consumer->unsubscribe();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
kafka: Listener
Represents a Kafka consumer endpoint.
Constructor
Creates a new Kafka Listener
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to consumer endpoint
'start
function 'start() returns error?
Starts the registered services.
Return Type
- error? - An
kafka:Error
if an error is encountered while starting the server or else nil
gracefulStop
function gracefulStop() returns error?
Stops the kafka listener.
Return Type
- error? - An
kafka:Error
if an error is encountered during the listener stopping process or else nil
immediateStop
function immediateStop() returns error?
Stops the kafka listener.
Return Type
- error? - An
kafka:Error
if an error is encountered during the listener stopping process or else nil
attach
Gets called every time a service attaches itself to the listener.
Parameters
- s Service - The service to be attached
Return Type
- error? - An
kafka:Error
if an error is encountered while attaching the service or else nil
detach
Detaches a consumer service from the listener.
Parameters
- s Service - The service to be detached
Return Type
- error? - An
kafka:Error
if an error is encountered while detaching a service or else nil
kafka: Producer
Represents a Kafka producer endpoint.
Constructor
Creates a new Kafka Producer
.
init (string|string[] bootstrapServers, *ProducerConfiguration config)
- config *ProducerConfiguration - Configurations related to initializing a Kafka
Producer
close
function close() returns Error?
Closes the producer connection to the external Kafka broker.
kafka:Error? result = producer->close();
Return Type
- Error? - A
kafka:Error
if closing the producer failed or else '()'
'flush
function 'flush() returns Error?
Flushes the batch of records already sent to the broker by the producer.
kafka:Error? result = producer->'flush();
Return Type
- Error? - A
kafka:Error
if records couldn't be flushed or else '()'
getTopicPartitions
function getTopicPartitions(string topic) returns TopicPartition[]|Error
Retrieves the topic partition information for the provided topic.
kafka:TopicPartition[]|kafka:Error result = producer->getTopicPartitions("kafka-topic");
Parameters
- topic string - Topic of which the partition information is given
Return Type
- TopicPartition[]|Error - A
kafka:TopicPartition
array for the given topic or else akafka:Error
if the operation fails
send
function send(ProducerRecord producerRecord) returns Error?
Produces records to the Kafka server.
kafka:Error? result = producer->send("Hello World, Ballerina", "kafka-topic");
Parameters
- producerRecord ProducerRecord - Record to be produced
Return Type
- Error? - A
kafka:Error
if send action fails to send data or else '()'
Constants
kafka: ACKS_ALL
Producer acknowledgement type is 'all'. This will guarantee that the record will not be lost as long as at least one in-sync replica is alive.
kafka: ACKS_NONE
Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.
kafka: ACKS_SINGLE
Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its A local log will respond without waiting FOR full acknowledgement from all the followers.
kafka: AUTH_SASL_PLAIN
Kafka SASL_PLAIN authentication mechanism
kafka: COMPRESSION_GZIP
Kafka GZIP compression type.
kafka: COMPRESSION_LZ4
Kafka LZ4 compression type.
kafka: COMPRESSION_NONE
No compression.
kafka: COMPRESSION_SNAPPY
Kafka Snappy compression type.
kafka: COMPRESSION_ZSTD
Kafka ZSTD compression type.
kafka: DEFAULT_URL
kafka: DES_AVRO
Apache Avro deserializer.
kafka: DES_BYTE_ARRAY
In-built Kafka byte array deserializer.
kafka: DES_CUSTOM
User-defined deserializer.
kafka: DES_FLOAT
In-built Kafka float deserializer.
kafka: DES_INT
In-built Kafka int deserializer.
kafka: DES_STRING
In-built Kafka string deserializer.
kafka: ISOLATION_COMMITTED
Configures the consumer to read the committed messages only in the transactional mode when poll() is called.
kafka: ISOLATION_UNCOMMITTED
Configures the consumer to read all the messages including the aborted ones.
kafka: OFFSET_RESET_EARLIEST
Automatically reset the consumer offset to the earliest offset
kafka: OFFSET_RESET_LATEST
Automatically reset the consumer offset to the latest offset
kafka: OFFSET_RESET_NONE
If the offsetReset
is set to OFFSET_RESET_NONE
, the consumer will give an error if no previous offset is found
for the consumer group
kafka: PROTOCOL_PLAINTEXT
Represents Kafka un-authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_PLAINTEXT
Represents Kafka authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_SSL
Represents Kafka SASL authenticated, SSL channel
kafka: PROTOCOL_SSL
Represents Kafka SSL channel
kafka: SER_AVRO
Apache Avro serializer.
kafka: SER_BYTE_ARRAY
In-built Kafka Byte Array serializer.
kafka: SER_CUSTOM
User-defined serializer.
kafka: SER_FLOAT
In-built Kafka float serializer.
kafka: SER_INT
In-built Kafka int serializer.
kafka: SER_STRING
In-built Kafka string serializer.
Enums
kafka: Protocol
Represents protocol options.
Members
Records
kafka: AuthenticationConfiguration
Configurations related to Kafka authentication mechanisms.
Fields
- mechanism AuthenticationMechanism(default AUTH_SASL_PLAIN) - Type of the authentication mechanism. Currently, SASL_PLAIN and SCRAM are supported. See
kafka:AuthenticationMechanism
for more information
- username string - The username to use to authenticate the Kafka producer/consumer
- password string - The password to use to authenticate the Kafka producer/consumer
kafka: AvroGenericRecord
Represents a generic Avro record. This is the type of the value returned from an Avro deserializer consumer.
kafka: AvroRecord
Defines a record to send data using Avro serialization.
Fields
- schemaString string - The string, which defines the Avro schema
- dataRecord anydata - Records, which should be serialized using Avro
kafka: ConsumerConfiguration
Configurations related to consumer endpoint.
Fields
- groupId string? - Unique string that identifies the consumer
- topics string[]? - Topics to be subscribed by the consumer
- offsetReset OffsetResetMethod? - Offset reset strategy if no initial offset
- partitionAssignmentStrategy string? - Strategy class for handling the partition assignment among consumers
- metricsRecordingLevel string? - Metrics recording level
- metricsReporterClasses string? - Metrics reporter classes
- clientId string? - Identifier to be used for server side logging
- interceptorClasses string? - Interceptor classes to be used before sending records
- isolationLevel IsolationLevel? - Transactional message reading method
- schemaRegistryUrl string? - Avro schema registry url. Use this field to specify schema registry url, if Avro serializer is used
- sessionTimeout decimal? - Timeout used to detect consumer failures when heartbeat threshold is reached in seconds
- heartBeatInterval decimal? - Expected time between heartbeats in seconds
- metadataMaxAge decimal? - Maximum time to force a refresh of metadata in seconds
- autoCommitInterval decimal? - Auto committing interval (in seconds) for commit offset, when auto-commit is enabled
- maxPartitionFetchBytes int? - The maximum amount of data per-partition the server returns
- sendBuffer int? - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer int? - Size of the TCP receive buffer (SO_RCVBUF)
- fetchMinBytes int? - Minimum amount of data the server should return for a fetch request
- fetchMaxBytes int? - Maximum amount of data the server should return for a fetch request
- fetchMaxWaitTime decimal? - Maximum amount of time (in seconds) the server will block before answering the fetch request
- reconnectBackoffTimeMax decimal? - Maximum amount of time in seconds to wait when reconnecting
- retryBackoff decimal? - Time (in seconds) to wait before attempting to retry a failed request
- metricsSampleWindow decimal? - Window of time (in seconds) a metrics sample is computed over
- metricsNumSamples int? - Number of samples maintained to compute metrics
- requestTimeout decimal? - Wait time (in seconds) for response of a request
- connectionMaxIdleTime decimal? - Close idle connections after the number of seconds
- maxPollRecords int? - Maximum number of records returned in a single call to poll
- maxPollInterval int? - Maximum delay between invocations of poll
- reconnectBackoffTime decimal? - Time (in seconds) to wait before attempting to reconnect
- pollingTimeout decimal? - Timeout interval for polling in seconds
- pollingInterval decimal? - Polling interval for the consumer in seconds
- concurrentConsumers int? - Number of concurrent consumers
- defaultApiTimeout decimal? - Default API timeout value (in seconds) for APIs with duration
- autoCommit boolean(default true) - Enables auto committing offsets
- checkCRCS boolean(default true) - Check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to
the messages occurred. This may add some overhead, and might needed set to
false
if extreme performance is required
- excludeInternalTopics boolean(default true) - Whether records from internal topics should be exposed to the consumer
- decoupleProcessing boolean(default false) - Decouples processing
- secureSocket SecureSocket? - Configurations related to SSL/TLS encryption
- auth AuthenticationConfiguration? - Authentication-related configurations for the Kafka consumer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: ConsumerRecord
Type related to consumer record.
Fields
- key byte[]? - Key that is included in the record
- value byte[] - Record content
- timestamp int - Timestamp of the record, in milliseconds since epoch
- offset PartitionOffset - Topic partition position in which the consumed record is stored
kafka: PartitionOffset
Represents the topic partition position in which the consumed record is stored.
Fields
- partition TopicPartition - The
kafka:TopicPartition
to which the record is related
- offset int - Offset in which the record is stored in the partition
kafka: ProducerConfiguration
Represents the Kafka Producer configuration.
Fields
- acks ProducerAcks(default ACKS_SINGLE) - Number of acknowledgments
- compressionType CompressionType(default COMPRESSION_NONE) - Compression type to be used for messages
- clientId string? - Identifier to be used for server side logging
- metricsRecordingLevel string? - Metrics recording level
- metricReporterClasses string? - Metrics reporter classes
- partitionerClass string? - Partitioner class to be used to select the partition to which the message is sent
- interceptorClasses string? - Interceptor classes to be used before sending records
- transactionalId string? - Transactional ID to be used in transactional delivery
- schemaRegistryUrl string? - Avro schema registry URL. Use this field to specify the schema registry URL if the Avro serializer is used
- bufferMemory int? - Total bytes of memory the producer can use to buffer records
- retryCount int? - Number of retries to resend a record
- batchSize int? - Maximum number of bytes to be batched together when sending records. Records exceeding this limit will not be batched. Setting this to 0 will disable batching.
- linger decimal? - Delay (in seconds) to allow other records to be batched before sending them to the Kafka server
- sendBuffer int? - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer int? - Size of the TCP receive buffer (SO_RCVBUF)
- maxRequestSize int? - The maximum size of a request in bytes
- reconnectBackoffTime decimal? - Time (in seconds) to wait before attempting to reconnect
- reconnectBackoffMaxTime decimal? - Maximum amount of time in seconds to wait when reconnecting
- retryBackoffTime decimal? - Time (in seconds) to wait before attempting to retry a failed request
- maxBlock decimal? - Maximum block time (in seconds) during which the sending is blocked when the buffer is full
- requestTimeout decimal? - Wait time (in seconds) for the response of a request
- metadataMaxAge decimal? - Maximum time (in seconds) to force a refresh of metadata
- metricsSampleWindow decimal? - Time (in seconds) window for a metrics sample to compute over
- metricsNumSamples int? - Number of samples maintained to compute the metrics
- maxInFlightRequestsPerConnection int? - Maximum number of unacknowledged requests on a single connection
- connectionsMaxIdleTime decimal? - Close the idle connections after this number of seconds
- transactionTimeout decimal? - Timeout (in seconds) for transaction status update from the producer
- enableIdempotence boolean(default false) - Exactly one copy of each message is written to the stream when enabled
- secureSocket SecureSocket? - Configurations related to SSL/TLS encryption
- auth AuthenticationConfiguration? - Authentication-related configurations for the Kafka producer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: ProducerRecord
Details related to the producer record.
Fields
- topic string - Topic to which the record will be appended
- key byte[]? - Key that is included in the record
- value byte[] - Record content
- timestamp int? - Timestamp of the record, in milliseconds since epoch
- partition int? - Partition to which the record should be sent
kafka: SecureSocket
Configurations for secure communication with the Kafka server.
Fields
- cert TrustStore - Configurations associated with
crypto:TrustStore
- key record {|
crypto:KeyStore keyStore;
string keyPassword?;
|} ? - Configurations associated with
crypto:KeyStore
- protocol record {| Protocol name; string[] versions?; |} ? - SSL/TLS protocol related options
- ciphers string[]? - List of ciphers to be used. By default, all the available cipher suites are supported
- provider string? - Name of the security provider used for SSL connections. Default value is the default security provider of the JVM
kafka: TopicPartition
Represents a topic partition.
Fields
- topic string - Topic to which the partition is related
- partition int - Index for the partition
Errors
kafka: Error
Object types
kafka: Service
The Kafka service type
Union types
kafka: CompressionType
CompressionType
Kafka compression types to compress the messages.
kafka: IsolationLevel
IsolationLevel
Kafka consumer isolation level type.
kafka: OffsetResetMethod
OffsetResetMethod
Represents the differnet types of offset-reset mothods for Kafka consumer
kafka: ProducerAcks
ProducerAcks
Kafka producer acknowledgement types.
kafka: SecurityProtocol
SecurityProtocol
Represents the supported security protocols for Kafka clients.