nats
Module nats

ballerinax/nats
Overview
This module provides the capability to send and receive messages by connecting to the NATS server.
NATS messaging enables the communication of data that is segmented into messages among computer applications and services. Data is encoded and framed as a message and sent by a publisher. The message is received, decoded, and processed by one or more subscribers. NATS makes it easy for programs to communicate across different environments, languages, cloud providers, and on-premise systems. Clients connect to the NATS system usually via a single URL and then subscribe or publish messages to a subject.
Basic usage
Set up the connection
First, you need to set up the connection with the NATS Basic server. The following ways can be used to connect to a NATS Basic server.
- Connect to a server using the default URL:
- Connect to a server using the URL:
- Connect to one or more servers with custom configurations:
Publish messages
Publish messages to the NATS basic server
Once connected, publishing is accomplished via one of the three methods below.
- Publish with the subject and the message content:
- Publish as a request that expects a reply:
- Publish messages with a
replyTo
subject:
Listen to incoming messages
Listen to messages from a NATS server
- Listen to incoming messages with the
onMessage
remote method:
- Listen to incoming messages and reply directly with the
onRequest
remote method:
Advanced usage
Set up TLS
The Ballerina NATS module allows the use of TLS in communication. This setting expects a secure socket to be set in the connection configuration as shown below.
Configure TLS in the nats:Listener
Configure TLS in the nats:Client
Listeners
nats: Listener
Represents the NATS listener to which a subscription service should be bound in order to receive messages.
Constructor
Initializes the NATS listener.
nats:Listener natsListener = check new(nats:DEFAULT_URL);
init (string|string[] url, *ConnectionConfiguration config)
- config *ConnectionConfiguration - The connection configurations
attach
Binds a service to the nats:Listener
.
Parameters
- s Service - The type descriptor of the service
Return Type
- error? -
()
or else anats:Error
upon failure to attach
detach
Stops consuming messages and detaches the service from the nats:Listener
.
Parameters
- s Service - The type descriptor of the service
Return Type
- error? -
()
or else anats:Error
upon failure to detach
'start
function 'start() returns error?
Starts the nats:Listener
.
Return Type
- error? -
()
or else anats:Error
upon failure to start the listener
gracefulStop
function gracefulStop() returns error?
Stops the nats:Listener
gracefully.
Return Type
- error? -
()
or else anats:Error
upon failure to stop the listener
immediateStop
function immediateStop() returns error?
Stops the nats:Listener
forcefully.
Return Type
- error? -
()
or else anats:Error
upon failure to stop the listener
Clients
nats: Client
The client provides the capability to publish messages to the NATS server.
Constructor
Initializes the NATS client.
nats:Client natsClient = check new(nats:DEFAULT_URL);
init (string|string[] url, *ConnectionConfiguration config)
- config *ConnectionConfiguration - The connection configurations
publishMessage
function publishMessage(AnydataMessage message) returns Error?
Publishes data to a given subject.
Parameters
- message AnydataMessage - The message to be published
Return Type
- Error? -
()
or else anats:Error
if an error occurred
requestMessage
function requestMessage(AnydataMessage message, decimal? duration, typedesc<AnydataMessage> T) returns T|Error
Publishes data to a given subject and waits for a response.
Parameters
- message AnydataMessage - The message to be published
- duration decimal? (default ()) - The time (in seconds) to wait for the response
- T typedesc<AnydataMessage> (default <>) - Type of AnydataMessage to be returned
Return Type
- T|Error - The response or else a
nats:Error
if an error occurred
close
function close() returns Error?
Closes the NATS client connection.
Return Type
- Error? -
()
or else anats:Error
if an error is occurred
nats: JetStreamCaller
The client that provides functionality related to acknowledging a JetStream message.
ack
function ack()
Acknowledges a JetStream messages received from a Consumer, indicating the message should not be received again later.
nak
function nak()
Acknowledges a JetStream message has been received but indicates that the message is not completely processed and should be sent again later.
inProgress
function inProgress()
Indicates that this message is being worked on and reset redelivery timer in the server.
nats: JetStreamClient
The client provides the capability to publish messages to the NATS JetStream server and manage streams.
Constructor
Initializes the NATS JetStream client.
init (Client natsClient)
- natsClient Client - NATS client object to create the streaming client
publishMessage
function publishMessage(JetStreamMessage message) returns Error?
Publishes data to a given subject.
Parameters
- message JetStreamMessage - The JetStream message to send to
Return Type
- Error? -
()
or else anats:Error
if an error is occurred
consumeMessage
function consumeMessage(string subject, decimal timeout) returns JetStreamMessage|Error
Retrieves a message synchronously from the given subject.
Return Type
- JetStreamMessage|Error -
nats:JetStreamMessage
or else anats:Error
if an error is occurred
ack
function ack(JetStreamMessage message)
Acknowledges a JetStream messages received from a Consumer, indicating the message should not be received again later.
Parameters
- message JetStreamMessage - The message to be acknowledged
nak
function nak(JetStreamMessage message)
Acknowledges a JetStream message has been received but indicates that the message is not completely processed and should be sent again later.
Parameters
- message JetStreamMessage - The message to be acknowledged
inProgress
function inProgress(JetStreamMessage message)
Indicates that this message is being worked on and reset redelivery timer in the server.
Parameters
- message JetStreamMessage - The message to be acknowledged
addStream
function addStream(StreamConfiguration streamConfig) returns Error?
Loads or creates a stream.
Parameters
- streamConfig StreamConfiguration - Configurations required to load or create a stream
Return Type
- Error? - () or else a
nats:Error
if an error occurred
updateStream
function updateStream(StreamConfiguration streamConfig) returns Error?
Updates an existing stream.
Parameters
- streamConfig StreamConfiguration - Configurations required to update a stream
Return Type
- Error? - () or else a
nats:Error
if an error occurred
deleteStream
Deletes an existing stream.
Parameters
- streamName string - Name of the stream to be deleted
Return Type
- Error? -
()
or else anats:Error
if an error is occurred
purgeStream
Purge stream messages.
Parameters
- streamName string - Name of the stream to be purged
Return Type
- Error? -
()
or else anats:Error
if an error is occurred
Classes
nats: JetStreamListener
Represents the NATS JetStream listener to which a subscription service should be bound in order to receive messages. Receives messages from a NATS JetStream server.
Constructor
Initializes the NATS JetStream listener.
init (Client natsClient)
- natsClient Client - NATS client object to create the streaming client.
attach
function attach(JetStreamService s, string[]|string? name) returns error?
Binds a service to the nats:JetStreamListener
.
Parameters
- s JetStreamService - The type descriptor of the service
Return Type
- error? -
()
or else anats:Error
upon failure to attach
detach
function detach(JetStreamService s) returns error?
Stops consuming messages and detaches the service from the nats:JetStreamListener
.
Parameters
- s JetStreamService - The type descriptor of the service
Return Type
- error? -
()
or else anats:Error
upon failure to detach
'start
function 'start() returns error?
Starts the nats:JetStreamListener
.
Return Type
- error? -
()
or else anats:Error
upon failure to start the listener
gracefulStop
function gracefulStop() returns error?
Stops the nats:JetStreamListener
gracefully.
Return Type
- error? -
()
or else anats:Error
upon failure to stop the listener
immediateStop
function immediateStop() returns error?
Stops the nats:JetStreamListener
forcefully.
Return Type
- error? -
()
or else anats:Error
upon failure to stop the listener
nats: TypeChecker
Represents the TypeChecker which is used by the runtime to check whether a type is a subtype of nats:AnydataMessage
Object types
nats: JetStreamService
The NATS JetStream service type.
nats: Service
The NATS service type.
Records
nats: AnydataMessage
Represents the anydata message, which a NATS server sends to its subscribed services.
Fields
- content anydata - The message content, which can of type anydata
- subject string - The subject to which the message was sent to
- replyTo string? - The
replyTo
subject of the message
nats: BytesMessage
Represents the subtype of AnydataMessage
record where the message content is a byte array.
Fields
- Fields Included from * AnydataMessage
- content byte[] - Message content in bytes
nats: CertKey
Represents combination of certificate, private key and private key password if encrypted.
Fields
- certFile string - A file containing the certificate
- keyFile string - A file containing the private key in PKCS8 format
- keyPassword string? - Password of the private key if it is encrypted
nats: ConnectionConfiguration
Configurations related to initializing the NATS client and listener.
Fields
- connectionName string(default "ballerina-nats") - The name of the connection
- retryConfig RetryConfig? - The configurations related to connection reconnect attempts
- ping Ping? - The configurations related to pinging the server
- auth Credentials|Tokens? - The configurations related to authentication
- inboxPrefix string(default "_INBOX.") - The connection's inbox prefix, which all inboxes will start with
- noEcho boolean(default false) - Turns off echoing. This prevents the server from echoing messages back to the connection if it has subscriptions on the subject being published to
- secureSocket SecureSocket? - The configurations related to SSL/TLS
- validation boolean(default true) - Configuration related to constraint validation check
nats: Credentials
Configurations related to basic authentication.
Fields
- username string - The username for basic authentication
- password string - The password for basic authentication
nats: JetStreamMessage
A message consumed from a stream.
Fields
- subject string - Subject of the message
- content byte[] - Payload of the message
nats: JetStreamServiceConfigData
The configurations for the NATS JetStream subscription.
Fields
- subject string - Name of the subject
- queueName string? - Name of the queue group
- autoAck boolean(default true) -
nats: Message
Represents the message, which a NATS server sends to its subscribed services.
Fields
- content byte[] - The message content
- subject string - The subject to which the message was sent to
- replyTo string? - The
replyTo
subject of the message
nats: NatsPayload
Defines the Payload remote function parameter.
nats: PendingLimits
The configurations to set limits on the maximum number of messages or maximum size of messages this consumer will hold before it starts to drop new messages waiting for the resource functions to drain the queue. Setting a value less than or equal to 0 will disable this check.
Fields
- maxMessages int - Maximum number of pending messages retrieved and held by the consumer service. The default value is 65536
- maxBytes int - Total size of pending messages in bytes retrieved and held by the consumer service. The default value is 67108864
nats: Ping
Configurations related to pinging the server.
Fields
- pingInterval decimal(default 120) - The interval (in seconds) between the attempts of pinging the server
- maxPingsOut int(default 2) - The maximum number of pings the client can have in flight
nats: RetryConfig
Configurations related to connection reconnect attempts.
Fields
- maxReconnect int(default 60) - Maximum number of reconnect attempts. The reconnect state is triggered when an already established
connection is lost. During the initial connection attempt, the client will cycle
over its server list one time regardless of the
maxReconnects
value that is set. Use 0 to turn off auto reconnecting. Use -1 to turn on infinite reconnects.
- reconnectWait decimal(default 2) - The time(in seconds) to wait between the reconnect attempts to reconnect to the same server
- connectionTimeout decimal(default 2) - The timeout (in seconds) for the connection attempts
nats: SecureSocket
Configurations related to facilitating a secure communication.
Fields
- cert TrustStore|string - Configurations associated with
crypto:TrustStore
or single certificate file that the client trusts
- protocol record {| name Protocol |}? - SSL/TLS protocol related options
nats: ServiceConfigData
The configurations for the NATS basic subscription.
Fields
- subject string - Name of the subject
- queueName string? - Name of the queue group
- pendingLimits PendingLimits? - Parameters to set limits on the maximum number of pending messages or maximum size of pending messages
nats: StreamConfiguration
Determines the properties for a stream.
Fields
- name string? - A name for the stream
- description string? - A short description of the purpose of this stream
- retentionPolicy RetentionPolicy? - How message retention is considered, Limits (default), Interest or WorkQueue
- maxConsumers float? - How many consumers can be defined for the stream, -1 for unlimited
- maxMsgs float? - How large the stream may become in total messages before the configured discard policy takes action
- maxMsgsPerSubject float? - Maximum amount of messages to retain per subject
- maxBytes float? - How large the stream may become in total bytes before the configured discard policy takes action
- maxAge decimal? - Maximum age of any message in the stream, expressed in seconds
- maxMsgSize float? - The largest message that will be accepted by the Stream
- storageType StorageType? - The type of storage backend, File and Memory
- replicas int? - The number of replicas to store this message on
- noAck boolean? -
- discardPolicy DiscardPolicy? - When a stream has reached its configured maxBytes or maxMsgs, this policy takes action
nats: Tokens
Configurations related to token based authentication.
Fields
- token string - The token for token-based authentication
Enums
nats: DiscardPolicy
When a Stream has reached its configured maxMsgs or maxBytes, this policy kicks in. New refuses new messages or Old (default) deletes old messages to make space
Members
nats: Protocol
Represents protocol options.
Members
nats: RetentionPolicy
How message retention is considered, Limits (default), Interest or WorkQueue
Members
nats: StorageType
The type of storage backend, File and Memory
Members
Constants
nats: DEFAULT_URL
Default URL for NATS connections.
Annotations
nats: Payload
The annotation which is used to define the payload parameter in the onMessage
service method.
nats: ServiceConfig
The annotation, which is used to configure the basic subscription.
nats: StreamServiceConfig
The annotation, which is used to configure the basic subscription.
Errors
nats: Error
Represents the NATS module related errors.
nats: PayloadBindingError
Represents an error, which occurred due to payload binding.
nats: PayloadValidationError
Represents an error, which occurred due to payload constraint validation.