xMsg supports publishing a message and receiving a response, with the following method:
xMsgMessage syncPublish(xMsgConnection connection, xMsgMessage msg, int timeout)
throws xMsgException, TimeoutException
This publishes the message just like the publish
method,
but this time the metadata is modified with a unique replyTo
field.
Then the method will block until a response message is received
or the timeout occurs, whichever happens first.
In order to receive a response, the subscription callback must support sync-publication and publish response messages to the expected topic. xMsg does not publish a response automatically.
As with normal publication, the xMsg actor can sync-publish messages on multiples threads, but each thread must obtain its own connection.
executor.submit(() -> {
try (xMsgConnection con = actor.getConnection()) {
xMsgMessage msg = createMessage();
xMsgMessage res = actor.syncPublish(con, msg, 10000);
process(res);
} catch (xMsgException | TimeoutException e) {
e.printStacktrace();
}
});
Receiving responses
When a message is sync-published,
its metadata will be modified to contain a unique replyTo
field.
This value is generated by the actor for each sync-published message,
and correspond to the topic that can be used by the subscription
to publish a response message.
The format of the replyTo
topic is: ret:<ID>:LDDDDDD
.
The <ID>
is the unique identifier of the actor,
generated on the constructor.
L
is the language (1 for Java, 2 for C++, 3 for Python),
and DDDDDD
is a 6-digit serial number between 0 and 999999,
different for each message.
When 999999 is reached, it starts from 0 again.
This unique replyTo
value per message ensures that the response can be
matched with the sync-publication call that published the request.
In order to receive the response message,
the actor must have a subscription
to the proxy where the response will be published.
To avoid creating a new subscription every time a sync message is sent,
only a single subscription per proxy will be created,
with topic: ret:<ID>
.
This subscription will be running on background because
it will be reused to receive the responses
of all sync-publication requests to that proxy:
if no response socket to address:
create socket to address
subscribe socket to "ret:<ID>"
set reply topic to "ret:<ID>:<SEQ>"
publish msg
wait response
Since response messages are received in a different thread,
a concurrent map is used to pass messages
to the waiting threads that sync-published those requests,
with the unique replyTo
topic as the key:
ConcurrentMap<Topic, Message> responses
Waiting a response is just checking the map periodically
for a message with topic equals to replyTo
,
until the map contains the expected message or timeout occurs.
The actor may have multiple response subscriptions, to many proxies. Unlike user-defined subscriptions (each one on its own thread), only a single background poller thread checks response messages in all subscribed sockets:
while true:
poll all sockets
for each socket:
if socket contains message
put message on responses map
This poller thread is started on the xMsg constructor, but every socket is created and subscribed the first time a message is sync-published to a proxy.
Publishing responses
To reply sync-publication messages,
the user-defined callback must explicitly support publication of responses.
xMsg will not reply synchronous requests automatically.
If the callback does not send a response,
the actor doing the syncPublish
call will timeout.
A received message is a synchronous request if the replyTo
metadata is set.
To reply this message,
the response must be published to the topic defined by the value of replyTo
.
The xMsgMessage
class provides methods to quickly access this metatada field:
boolean hasReplyTopic()
xMsgTopic getReplyTopic()
Finally, the response message shall be published to the same proxy used to start the subscription. The xMsg convention is to subscribe to the default proxy. If the wrong topic or proxy are used, the response will not be received.
xMsgConnection connection = actor.getConnection(); // to default proxy
xMsgTopic topic = xMsgTopic.build("data", "power");
xMsgSubscription sub = actor.subscribe(connection, topic, msg -> {
try {
byte[] data = processMessage(msg);
// check if message is a sync request
if (msg.hasReplyTopic()) {
xMsgTopic resTopic = msg.getReplyTopic();
xMsgMessage resMsg = new xMsgMessage(resTopic, "binary/data", data);
// publish response to default proxy (the same of subscription)
try (xMsgConnection resCon = actor.getConnection()) {
actor.publish(resCon, resMsg);
}
}
} catch (Exception e) {
e.printStacktrace();
}
});
To quickly create response messages, for example, returning the same input data or data of primitive type, the following static methods are also provided:
xMsgMessage createResponse(xMsgMessage msg)
xMsgMessage createResponse(xMsgMessage msg, Object data)
The response topic and mime-type will be set to the proper values.