In order to publish or subscribe messages, a connection to an xMsg proxy must be obtained. Connections are managed by the xMsg actor, that keeps an internal connection pool to cache and reuse connections.
The global ZContext
wrapper is not used to keep a list of created sockets.
Thus, in order to destroy the context, all connections must be already
closed and the actor destroyed too,
otherwise the context will hang because some sockets are not closed yet.
Connection fields
Each connection contains the address of the proxy and three ZMQ sockets:
pubSocket
(PUB): socket used for publication of messagessubSocket
(SUB): socket used to received subscribed messagesctrlSocket
(DEALER): socket used internally to send and receive control messages
The ctrlSocket
is used to verify that the connection to the proxy is
established.
A 9-digit unique ID is generated for each new connection,
with the following format: LPPPRRRRR
.
The first digit is the language (1 for Java, 2 for C++, 3 for Python),
PPP
is a 3-digit prefix unique to the node,
and RRRRR
is a 5-digit random number between 0 and 99999.
This ID is required to register the ctrlSocket
with the proxy
(ZMQ uses an identity per socket for REQ/REP communications).
Connection pool
To get a connection from the connection pool, a proxy address is required. The connection pool keeps a set of cached connections. If a connection to the proxy already exists, it will be returned. Otherwise, a new one will be created. Multiple threads can access the pool at the same time. Each thread will receive its own connection.
Connections should be closed in order to return them to the connection pool,
so it can be reused by other publishing threads.
The try-with-resources
block is the preferred way to obtain and use a
connection:
try (xMsgConnection connection = actor.getConnection(proxyAddress)) {
// use the connection
} catch (xMsgException e) {
e.printStacktrace();
}
The actor must be destroyed in order to close all connections cached in the connection pool.
Creating connections
When the connection pool does not have a cached connection to the given proxy, a new connection will be created.
The three sockets will be connected to the proxy address using the TCP protocol.
To check the connection, the pubSocket
will publish a control message to the proxy,
with the following format:
If the request was successfully published,
the proxy will send to the ctrlSocket
a message with this format
(note that the first frame will be stripped):
If no response is received after 100 ms, the request will be published again. After 10 unsuccessful requests, an exception will be thrown because the proxy could not be connected.
New connections can be customized by providing a connection setup:
public class CustomSetup implements xMsgConnectionSetup {
@override
public void preConnection(Socket socket) {
// set options before the ZMQ socket is connected
}
@override
public void postConnection() {
System.out.println("Successfully connected");
}
}
actor.setConnectionSetup(new CustomSetup());
The setup will be used each time a new connection is created.