SDK
courier
import "github.com/gojek/courier-go"
Package courier contains the client that can be used to interact with the courier infrastructure to publish/subscribe to messages from other clients
Index
- Variables
- func ExponentialStartStrategy(ctx context.Context, c interface{ Start() error }, opts ...StartOption)
- func Version() string
- func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Duration) bool
- type Client
- func NewClient(opts ...ClientOption) (*Client, error)
- func (c *Client) InfoHandler() http.Handler
- func (c *Client) IsConnected() bool
- func (c *Client) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error
- func (c *Client) Run(ctx context.Context) error
- func (c *Client) Start() error
- func (c *Client) Stop()
- func (c *Client) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
- func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
- func (c *Client) Unsubscribe(ctx context.Context, topics ...string) error
- func (c *Client) UsePublisherMiddleware(mwf ...PublisherMiddlewareFunc)
- func (c *Client) UseSubscriberMiddleware(mwf ...SubscriberMiddlewareFunc)
- func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)
- type ClientInfoEmitter
- type ClientInfoEmitterConfig
- type ClientMeta
- type ClientOption
- func WithAddress(host string, port uint16) ClientOption
- func WithAutoReconnect(autoReconnect bool) ClientOption
- func WithCleanSession(cleanSession bool) ClientOption
- func WithClientID(clientID string) ClientOption
- func WithConnectTimeout(duration time.Duration) ClientOption
- func WithCredentialFetcher(fetcher CredentialFetcher) ClientOption
- func WithCustomDecoder(decoderFunc DecoderFunc) ClientOption
- func WithCustomEncoder(encoderFunc EncoderFunc) ClientOption
- func WithExponentialStartOptions(options ...StartOption) ClientOption
- func WithGracefulShutdownPeriod(duration time.Duration) ClientOption
- func WithKeepAlive(duration time.Duration) ClientOption
- func WithLogger(l Logger) ClientOption
- func WithMaintainOrder(maintainOrder bool) ClientOption
- func WithMaxReconnectInterval(duration time.Duration) ClientOption
- func WithOnConnect(handler OnConnectHandler) ClientOption
- func WithOnConnectionLost(handler OnConnectionLostHandler) ClientOption
- func WithOnReconnect(handler OnReconnectHandler) ClientOption
- func WithPassword(password string) ClientOption
- func WithPersistence(store Store) ClientOption
- func WithResolver(resolver Resolver) ClientOption
- func WithTCPAddress(host string, port uint16) ClientOption
- func WithTLS(tlsConfig *tls.Config) ClientOption
- func WithUseBase64Decoder() ClientOption
- func WithUsername(username string) ClientOption
- func WithWriteTimeout(duration time.Duration) ClientOption
- type ConnectRetryInterval
- type ConnectionInformer
- type Credential
- type CredentialFetcher
- type Decoder
- type DecoderFunc
- type Encoder
- type EncoderFunc
- type KeepAlive
- type Logger
- type MQTTClientInfo
- type Message
- type MessageHandler
- type OnConnectHandler
- type OnConnectionLostHandler
- type OnReconnectHandler
- type Option
- type PubSub
- type Publisher
- type PublisherFunc
- type PublisherMiddlewareFunc
- type QOSLevel
- type Resolver
- type Retained
- type SharedSubscriptionPredicate
- type StartOption
- type Store
- type Subscriber
- type SubscriberFuncs
- func NewSubscriberFuncs(subscribeFunc func(context.Context, string, MessageHandler, ...Option) error, subscribeMultipleFunc func(context.Context, map[string]QOSLevel, MessageHandler) error) SubscriberFuncs
- func (s SubscriberFuncs) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
- func (s SubscriberFuncs) SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
- type SubscriberMiddlewareFunc
- type TCPAddress
- type Unsubscriber
- type UnsubscriberFunc
- type UnsubscriberMiddlewareFunc
Variables
var (
// ErrConnectTimeout indicates connection timeout while connecting to broker.
ErrConnectTimeout = errors.New("client timed out while trying to connect to the broker")
// ErrPublishTimeout indicates publish timeout.
ErrPublishTimeout = errors.New("publish timeout")
// ErrSubscribeTimeout indicates subscribe timeout.
ErrSubscribeTimeout = errors.New("subscribe timeout")
// ErrUnsubscribeTimeout indicates unsubscribe timeout.
ErrUnsubscribeTimeout = errors.New("unsubscribe timeout")
// ErrSubscribeMultipleTimeout indicates multiple subscribe timeout.
ErrSubscribeMultipleTimeout = errors.New("subscribe multiple timeout")
)
var ErrClientNotInitialized = errors.New("courier: client not initialized")
var ResumeSubscriptions = resumeSubscriptions{}
This is useful when working with shared subscriptions and multiple connections can be created to subscribe on the same application.
var UseMultiConnectionMode = multiConnMode{}
func ExponentialStartStrategy(ctx context.Context, c interface{ Start() error }, opts ...StartOption)
ExponentialStartStrategy will keep attempting to call Client.Start in the background and retry on error, it will never exit unless the context used to invoke is cancelled. This will NOT stop the client, that is the responsibility of caller.
## func [Version](https://github.com/gojek/courier-go/blob/main/version.go#L4)func Version() string
Version can be used to get the current courier library version
## func [WaitForConnection](https://github.com/gojek/courier-go/blob/main/utils.go#L10)func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Duration) bool
WaitForConnection checks if the Client is connected, it calls ConnectionInformer.IsConnected after every tick and waitFor is the maximum duration it can block. Returns true only when ConnectionInformer.IsConnected returns true
## type [Client](https://github.com/gojek/courier-go/blob/main/client.go#L22-L43)Client allows to communicate with an MQTT broker
type Client struct {
// contains filtered or unexported fields
}
func NewClient(opts ...ClientOption) (*Client, error)
NewClient creates the Client struct with the clientOptions provided, it can return error when prometheus.DefaultRegisterer has already been used to register the collected metrics
Example
c, err := courier.NewClient(
courier.WithUsername("username"),
courier.WithPassword("password"),
courier.WithAddress("localhost", 1883),
)
if err != nil {
panic(err)
}
if err := c.Start(); err != nil {
panic(err)
}
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...)
go func() {
tick := time.NewTicker(time.Second)
for {
select {
case t := <-tick.C:
msg := map[string]interface{}{
"time": t.UnixNano(),
}
if err := c.Publish(context.Background(), "topic", msg, courier.QOSOne); err != nil {
fmt.Printf("Publish() error = %s\n", err)
} else {
fmt.Println("Publish() success")
}
case <-stopCh:
tick.Stop()
return
}
}
}()
<-stopCh
c.Stop()
func (c *Client) InfoHandler() http.Handler
InfoHandler returns a http.Handler that exposes the connected clients information
### func \(\*Client\) [IsConnected](https://github.com/gojek/courier-go/blob/main/client.go#L84)func (c *Client) IsConnected() bool
IsConnected checks whether the client is connected to the broker
### func \(\*Client\) [Publish](https://github.com/gojek/courier-go/blob/main/client_publish.go#L11)func (c *Client) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error
Publish allows to publish messages to an MQTT broker
### func \(\*Client\) [Run](https://github.com/gojek/courier-go/blob/main/client.go#L112)func (c *Client) Run(ctx context.Context) error
Run will start running the Client. This makes Client compatible with github.com/gojekfarm/xrun package. https://pkg.go.dev/github.com/gojekfarm/xrun
### func \(\*Client\) [Start](https://github.com/gojek/courier-go/blob/main/client.go#L97)func (c *Client) Start() error
Start will attempt to connect to the broker.
### func \(\*Client\) [Stop](https://github.com/gojek/courier-go/blob/main/client.go#L108)func (c *Client) Stop()
Stop will disconnect from the broker and finish up any pending work on internal communication workers. This can only block until the period configured with the ClientOption WithGracefulShutdownPeriod.
### func \(\*Client\) [Subscribe](https://github.com/gojek/courier-go/blob/main/client_subscribe.go#L13)func (c *Client) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
Subscribe allows to subscribe to messages from an MQTT broker
### func \(\*Client\) [SubscribeMultiple](https://github.com/gojek/courier-go/blob/main/client_subscribe.go#L30-L34)func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
### func \(\*Client\) [Unsubscribe](https://github.com/gojek/courier-go/blob/main/client_unsubscribe.go#L10)func (c *Client) Unsubscribe(ctx context.Context, topics ...string) error
Unsubscribe removes any subscription to messages from an MQTT broker
### func \(\*Client\) [UsePublisherMiddleware](https://github.com/gojek/courier-go/blob/main/client_publish.go#L18)func (c *Client) UsePublisherMiddleware(mwf ...PublisherMiddlewareFunc)
UsePublisherMiddleware appends a PublisherMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Client.
### func \(\*Client\) [UseSubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/client_subscribe.go#L57)func (c *Client) UseSubscriberMiddleware(mwf ...SubscriberMiddlewareFunc)
UseSubscriberMiddleware appends a SubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.
### func \(\*Client\) [UseUnsubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/client_unsubscribe.go#L27)func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)
UseUnsubscriberMiddleware appends a UnsubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.
## type [ClientInfoEmitter](https://github.com/gojek/courier-go/blob/main/metrics.go#L17-L19)ClientInfoEmitter emits broker info. This can be called concurrently, implementations should be concurrency safe.
type ClientInfoEmitter interface {
Emit(ctx context.Context, meta ClientMeta)
}
ClientInfoEmitterConfig is used to configure the broker info emitter.
type ClientInfoEmitterConfig struct {
// Interval is the interval at which the broker info emitter emits broker info.
Interval time.Duration
Emitter ClientInfoEmitter
}
ClientMeta contains information about the internal MQTT client(s)
type ClientMeta struct {
MultiConnMode bool
Clients []MQTTClientInfo
Subscriptions map[string]QOSLevel
}
ClientOption allows to configure the behaviour of a Client.
type ClientOption interface {
// contains filtered or unexported methods
}
func WithAddress(host string, port uint16) ClientOption
WithAddress sets the broker address to be used. To establish a TLS connection, use WithTLS Option along with this. Default values for hostname is "127.0.0.1" and for port is 1883.
### func [WithAutoReconnect](https://github.com/gojek/courier-go/blob/main/client_options.go#L50)func WithAutoReconnect(autoReconnect bool) ClientOption
WithAutoReconnect sets whether the automatic reconnection logic should be used when the connection is lost, even if disabled the WithOnConnectionLost is still called.
### func [WithCleanSession](https://github.com/gojek/courier-go/blob/main/client_options.go#L62)func WithCleanSession(cleanSession bool) ClientOption
WithCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before disconnecting but didn't, will not be sent upon connecting to the broker.
### func [WithClientID](https://github.com/gojek/courier-go/blob/main/client_options.go#L21)func WithClientID(clientID string) ClientOption
WithClientID sets the clientID to be used while connecting to an MQTT broker. According to the MQTT v3.1 specification, a client id must be no longer than 23 characters.
### func [WithConnectTimeout](https://github.com/gojek/courier-go/blob/main/client_options.go#L151)func WithConnectTimeout(duration time.Duration) ClientOption
WithConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timing out. A duration of 0 never times out. Default 15 seconds.
### func [WithCredentialFetcher](https://github.com/gojek/courier-go/blob/main/client_credentials.go#L17)func WithCredentialFetcher(fetcher CredentialFetcher) ClientOption
WithCredentialFetcher sets the specified CredentialFetcher.
### func [WithCustomDecoder](https://github.com/gojek/courier-go/blob/main/client_options.go#L193)func WithCustomDecoder(decoderFunc DecoderFunc) ClientOption
WithCustomDecoder allows to decode message bytes into the desired object.
### func [WithCustomEncoder](https://github.com/gojek/courier-go/blob/main/client_options.go#L190)func WithCustomEncoder(encoderFunc EncoderFunc) ClientOption
WithCustomEncoder allows to transform objects into the desired message bytes.
### func [WithExponentialStartOptions](https://github.com/gojek/courier-go/blob/main/client_options.go#L205)func WithExponentialStartOptions(options ...StartOption) ClientOption
WithExponentialStartOptions configures the client to use ExponentialStartStrategy along with the passed StartOption(s) when using the Client.Run method.
### func [WithGracefulShutdownPeriod](https://github.com/gojek/courier-go/blob/main/client_options.go#L175)func WithGracefulShutdownPeriod(duration time.Duration) ClientOption
WithGracefulShutdownPeriod sets the limit that is allowed for existing work to be completed.
### func [WithKeepAlive](https://github.com/gojek/courier-go/blob/main/client_options.go#L142)func WithKeepAlive(duration time.Duration) ClientOption
WithKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Deprecated: Use KeepAlive instead.
### func [WithLogger](https://github.com/gojek/courier-go/blob/main/log.go#L6)func WithLogger(l Logger) ClientOption
WithLogger sets the Logger to use for the client.
### func [WithMaintainOrder](https://github.com/gojek/courier-go/blob/main/client_options.go#L76)func WithMaintainOrder(maintainOrder bool) ClientOption
WithMaintainOrder will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false (recommended), this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order. Specifically, the message handler is called in its own go routine. Note that setting this to true does not guarantee in-order delivery (this is subject to broker settings like "max_inflight_messages=1") and if true then MessageHandler callback must not block.
### func [WithMaxReconnectInterval](https://github.com/gojek/courier-go/blob/main/client_options.go#L168)func WithMaxReconnectInterval(duration time.Duration) ClientOption
WithMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts. when connection is lost
### func [WithOnConnect](https://github.com/gojek/courier-go/blob/main/client_options.go#L84)func WithOnConnect(handler OnConnectHandler) ClientOption
WithOnConnect will set the OnConnectHandler callback to be called when the client is connected. Both at initial connection time and upon automatic reconnect.
### func [WithOnConnectionLost](https://github.com/gojek/courier-go/blob/main/client_options.go#L92)func WithOnConnectionLost(handler OnConnectionLostHandler) ClientOption
WithOnConnectionLost will set the OnConnectionLostHandler callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.
### func [WithOnReconnect](https://github.com/gojek/courier-go/blob/main/client_options.go#L100)func WithOnReconnect(handler OnReconnectHandler) ClientOption
WithOnReconnect sets the OnReconnectHandler callback to be executed prior to the client attempting a reconnect to the MQTT broker.
### func [WithPassword](https://github.com/gojek/courier-go/blob/main/client_options.go#L35)func WithPassword(password string) ClientOption
WithPassword sets the password to be used while connecting to an MQTT broker.
### func [WithPersistence](https://github.com/gojek/courier-go/blob/main/client_options.go#L183)func WithPersistence(store Store) ClientOption
WithPersistence allows to configure the store to be used by broker Default persistence is in-memory persistence with mqtt.MemoryStore
### func [WithResolver](https://github.com/gojek/courier-go/blob/main/client_resolver.go#L37)func WithResolver(resolver Resolver) ClientOption
WithResolver sets the specified Resolver.
### func [WithTCPAddress](https://github.com/gojek/courier-go/blob/main/client_options.go#L111)func WithTCPAddress(host string, port uint16) ClientOption
WithTCPAddress sets the broker address to be used. Default values for hostname is "127.0.0.1" and for port is 1883.
Deprecated: This Option used to work with plain TCP connections, it's now possible to use TLS with WithAddress and WithTLS combination.
### func [WithTLS](https://github.com/gojek/courier-go/blob/main/client_options.go#L42)func WithTLS(tlsConfig *tls.Config) ClientOption
WithTLS sets the TLs configuration to be used while connecting to an MQTT broker.
### func [WithUseBase64Decoder](https://github.com/gojek/courier-go/blob/main/client_options.go#L197)func WithUseBase64Decoder() ClientOption
WithUseBase64Decoder configures a json decoder with a base64.StdEncoding wrapped decoder which decodes base64 encoded message bytes into the passed object.
### func [WithUsername](https://github.com/gojek/courier-go/blob/main/client_options.go#L28)func WithUsername(username string) ClientOption
WithUsername sets the username to be used while connecting to an MQTT broker.
### func [WithWriteTimeout](https://github.com/gojek/courier-go/blob/main/client_options.go#L160)func WithWriteTimeout(duration time.Duration) ClientOption
WithWriteTimeout limits how long the client will wait when trying to publish, subscribe or unsubscribe on topic when a context deadline is not set while calling Publisher.Publish, Subscriber.Subscribe, Subscriber.SubscribeMultiple or Unsubscriber.Unsubscribe.
## type [ConnectRetryInterval](https://github.com/gojek/courier-go/blob/main/client_options.go#L216)ConnectRetryInterval allows to configure the interval between connection retries. Default value is 10 seconds.
type ConnectRetryInterval time.Duration
ConnectionInformer can be used to get information about the connection
type ConnectionInformer interface {
// IsConnected checks whether the client is connected to the broker
IsConnected() bool
}
Credential is a \<username,password> pair.
type Credential struct {
Username string
Password string
}
CredentialFetcher is an interface that allows to fetch credentials for a client.
type CredentialFetcher interface {
Credentials(context.Context) (*Credential, error)
}
Decoder helps to decode message bytes into the desired object
type Decoder interface {
// Decode decodes message bytes into the passed object
Decode(v interface{}) error
}
func DefaultDecoderFunc(_ context.Context, r io.Reader) Decoder
DefaultDecoderFunc is a DecoderFunc that uses a json.Decoder as the Decoder.
## type [DecoderFunc](https://github.com/gojek/courier-go/blob/main/decoder.go#L13)DecoderFunc is used to create a Decoder from io.Reader stream of message bytes before calling MessageHandler; the context.Context value may be used to select appropriate Decoder.
type DecoderFunc func(context.Context, io.Reader) Decoder
Encoder helps in transforming objects to message bytes
type Encoder interface {
// Encode takes any object and encodes it into bytes
Encode(v interface{}) error
}
func DefaultEncoderFunc(_ context.Context, w io.Writer) Encoder
DefaultEncoderFunc is a EncoderFunc that uses a json.Encoder as the Encoder.
## type [EncoderFunc](https://github.com/gojek/courier-go/blob/main/encoder.go#L11)EncoderFunc is used to create an Encoder from io.Writer; the context.Context value may be used to select appropriate Encoder.
type EncoderFunc func(context.Context, io.Writer) Encoder
KeepAlive will set the amount of time that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Default value is 60 seconds. Note: Practically, when KeepAlive >= 10s, the client will check every 5s, if it needs to send a PING. In other cases, the client will check every KeepAlive/2.
type KeepAlive time.Duration
Logger is the interface that wraps the Info and Error methods.
type Logger interface {
Info(ctx context.Context, msg string, attrs map[string]any)
Error(ctx context.Context, err error, attrs map[string]any)
}
MQTTClientInfo contains information about the internal MQTT client
type MQTTClientInfo struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
// Subscriptions contains the topics the client is subscribed to
// Note: Currently, this field only holds shared subscriptions.
Subscriptions []string `json:"subscriptions,omitempty"`
}
Message represents the entity that is being relayed via the courier MQTT brokers from Publisher(s) to Subscriber(s).
type Message struct {
ID int
Topic string
Duplicate bool
Retained bool
QoS QOSLevel
// contains filtered or unexported fields
}
func NewMessageWithDecoder(payloadDecoder Decoder) *Message
NewMessageWithDecoder is a helper to create Message, ideally payloadDecoder should not be mutated once created.
### func \(\*Message\) [DecodePayload](https://github.com/gojek/courier-go/blob/main/message.go#L24)func (m *Message) DecodePayload(v interface{}) error
DecodePayload can decode the message payload bytes into the desired object.
## type [MessageHandler](https://github.com/gojek/courier-go/blob/main/types.go#L24)MessageHandler is the type that all callbacks being passed to Subscriber must satisfy.
type MessageHandler func(context.Context, PubSub, *Message)
OnConnectHandler is a callback that is called when the client state changes from disconnected to connected. Both at initial connection and on reconnection
type OnConnectHandler func(PubSub)
OnConnectionLostHandler is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an WithOnConnectionLost callback to execute.
type OnConnectionLostHandler func(error)
OnReconnectHandler is invoked prior to reconnecting after the initial connection is lost
type OnReconnectHandler func(PubSub)
Option changes behaviour of Publisher.Publish, Subscriber.Subscribe calls.
type Option interface {
// contains filtered or unexported methods
}
PubSub exposes all the operational functionalities of Client with Publisher, Subscriber, Unsubscriber and ConnectionInformer
type PubSub interface {
Publisher
Subscriber
Unsubscriber
ConnectionInformer
}
Publisher defines behaviour of an MQTT publisher that can send messages.
type Publisher interface {
// Publish allows to publish messages to an MQTT broker
Publish(ctx context.Context, topic string, message interface{}, options ...Option) error
}
PublisherFunc defines signature of a Publish function.
type PublisherFunc func(context.Context, string, interface{}, ...Option) error
func (f PublisherFunc) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error
Publish implements Publisher interface on PublisherFunc.
## type [PublisherMiddlewareFunc](https://github.com/gojek/courier-go/blob/main/publisher.go#L32)PublisherMiddlewareFunc functions are closures that intercept Publisher.Publish calls.
type PublisherMiddlewareFunc func(Publisher) Publisher
func (pmw PublisherMiddlewareFunc) Middleware(publisher Publisher) Publisher
Middleware allows PublisherMiddlewareFunc to implement the publishMiddleware interface.
## type [QOSLevel](https://github.com/gojek/courier-go/blob/main/options.go#L10)QOSLevel is an agreement between the sender of a message and the receiver of a message that defines the guarantee of delivery for a specific message
type QOSLevel uint8
const (
// QOSZero denotes at most once message delivery
QOSZero QOSLevel = 0
// QOSOne denotes at least once message delivery
QOSOne QOSLevel = 1
// QOSTwo denotes exactly once message delivery
QOSTwo QOSLevel = 2
)
Resolver sends TCPAddress updates on channel returned by UpdateChan() channel.
type Resolver interface {
// UpdateChan returns a channel where TCPAddress updates can be received.
UpdateChan() <-chan []TCPAddress
// Done returns a channel which is closed when the Resolver is no longer running.
Done() <-chan struct{}
}
Retained is an option used with Publisher.Publish call
type Retained bool
SharedSubscriptionPredicate allows to configure the predicate function that determines whether a topic is a shared subscription topic.
type SharedSubscriptionPredicate func(topic string) bool
StartOption can be used to customise behaviour of ExponentialStartStrategy
type StartOption func(*startOptions)
func WithMaxInterval(interval time.Duration) StartOption
WithMaxInterval sets the maximum interval the retry logic will wait before attempting another Client.Start, Default is 30 seconds
### func [WithOnRetry](https://github.com/gojek/courier-go/blob/main/exp_starter.go#L23)func WithOnRetry(retryFunc func(error)) StartOption
WithOnRetry sets the func which is called when there is an error in the previous Client.Start attempt
## type [Store](https://github.com/gojek/courier-go/blob/main/alias.go#L16)Store is an interface which can be used to provide implementations for message persistence.
[IMPORTANT] When implementing a store with a shared storage (ex: redis) across multiple application instances, it should be ensured that the keys are namespaced for each application instance otherwise there will be collisions. The messages are identified based on message id from the MQTT packets, and they have values in range (0, 2^16), this coincides with max number of in-flight messages
type Store = mqtt.Store
func NewMemoryStore() Store
NewMemoryStore returns a pointer to a new instance of mqtt.MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.
## type [Subscriber](https://github.com/gojek/courier-go/blob/main/subscriber.go#L8-L14)Subscriber defines behaviour of an MQTT subscriber that can create subscriptions.
type Subscriber interface {
// Subscribe allows to subscribe to messages from an MQTT broker
Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
// SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
}
SubscriberFuncs defines signature of a Subscribe function.
type SubscriberFuncs struct {
// contains filtered or unexported fields
}
func NewSubscriberFuncs(subscribeFunc func(context.Context, string, MessageHandler, ...Option) error, subscribeMultipleFunc func(context.Context, map[string]QOSLevel, MessageHandler) error) SubscriberFuncs
NewSubscriberFuncs is a helper function to create SubscriberFuncs
### func \(SubscriberFuncs\) [Subscribe](https://github.com/gojek/courier-go/blob/main/subscriber.go#L44)func (s SubscriberFuncs) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
Subscribe implements Subscriber interface on SubscriberFuncs.
### func \(SubscriberFuncs\) [SubscribeMultiple](https://github.com/gojek/courier-go/blob/main/subscriber.go#L49-L53)func (s SubscriberFuncs) SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
SubscribeMultiple implements Subscriber interface on SubscriberFuncs.
## type [SubscriberMiddlewareFunc](https://github.com/gojek/courier-go/blob/main/subscriber.go#L22)SubscriberMiddlewareFunc functions are closures that intercept Subscriber.Subscribe calls.
type SubscriberMiddlewareFunc func(Subscriber) Subscriber
func (smw SubscriberMiddlewareFunc) Middleware(subscriber Subscriber) Subscriber
Middleware allows SubscriberMiddlewareFunc to implement the subscribeMiddleware interface.
## type [TCPAddress](https://github.com/gojek/courier-go/blob/main/client_resolver.go#L21-L24)TCPAddress specifies Host and Port for remote broker
type TCPAddress struct {
Host string `json:"host"`
Port uint16 `json:"port"`
}
func (t TCPAddress) String() string
Unsubscriber defines behaviour of an MQTT client that can remove subscriptions.
type Unsubscriber interface {
// Unsubscribe removes any subscription to messages from an MQTT broker
Unsubscribe(ctx context.Context, topics ...string) error
}
UnsubscriberFunc defines signature of a Unsubscribe function.
type UnsubscriberFunc func(context.Context, ...string) error
func (f UnsubscriberFunc) Unsubscribe(ctx context.Context, topics ...string) error
Unsubscribe implements Unsubscriber interface on UnsubscriberFunc.
## type [UnsubscriberMiddlewareFunc](https://github.com/gojek/courier-go/blob/main/unsubscriber.go#L19)UnsubscriberMiddlewareFunc functions are closures that intercept Unsubscriber.Unsubscribe calls.
type UnsubscriberMiddlewareFunc func(Unsubscriber) Unsubscriber
func (usmw UnsubscriberMiddlewareFunc) Middleware(unsubscriber Unsubscriber) Unsubscriber
Middleware allows UnsubscriberMiddlewareFunc to implement the unsubscribeMiddleware interface.
Generated by gomarkdoc