xMsg supports publishing a message and receiving a response, with the following method:

xMsgMessage syncPublish(xMsgConnection connection, xMsgMessage msg, int timeout)
        throws xMsgException, TimeoutException

This publishes the message just like the publish method, but this time the metadata is modified with a unique replyTo field. Then the method will block until a response message is received or the timeout occurs, whichever happens first.

In order to receive a response, the subscription callback must support sync-publication and publish response messages to the expected topic. xMsg does not publish a response automatically.

As with normal publication, the xMsg actor can sync-publish messages on multiples threads, but each thread must obtain its own connection.

executor.submit(() -> {
    try (xMsgConnection con = actor.getConnection()) {
        xMsgMessage msg = createMessage();
        xMsgMessage res = actor.syncPublish(con, msg, 10000);
        process(res);
    } catch (xMsgException | TimeoutException e) {
        e.printStacktrace();
    }
});

Receiving responses

When a message is sync-published, its metadata will be modified to contain a unique replyTo field. This value is generated by the actor for each sync-published message, and correspond to the topic that can be used by the subscription to publish a response message.

The format of the replyTo topic is: ret:<ID>:LDDDDDD. The <ID> is the unique identifier of the actor, generated on the constructor. L is the language (1 for Java, 2 for C++, 3 for Python), and DDDDDD is a 6-digit serial number between 0 and 999999, different for each message. When 999999 is reached, it starts from 0 again. This unique replyTo value per message ensures that the response can be matched with the sync-publication call that published the request.

In order to receive the response message, the actor must have a subscription to the proxy where the response will be published. To avoid creating a new subscription every time a sync message is sent, only a single subscription per proxy will be created, with topic: ret:<ID>. This subscription will be running on background because it will be reused to receive the responses of all sync-publication requests to that proxy:

if no response socket to address:
    create socket to address
    subscribe socket to "ret:<ID>"
set reply topic to "ret:<ID>:<SEQ>"
publish msg
wait response

Since response messages are received in a different thread, a concurrent map is used to pass messages to the waiting threads that sync-published those requests, with the unique replyTo topic as the key:

ConcurrentMap<Topic, Message> responses

Waiting a response is just checking the map periodically for a message with topic equals to replyTo, until the map contains the expected message or timeout occurs.

The actor may have multiple response subscriptions, to many proxies. Unlike user-defined subscriptions (each one on its own thread), only a single background poller thread checks response messages in all subscribed sockets:

while true:
    poll all sockets
    for each socket:
        if socket contains message
            put message on responses map

This poller thread is started on the xMsg constructor, but every socket is created and subscribed the first time a message is sync-published to a proxy.

Publishing responses

To reply sync-publication messages, the user-defined callback must explicitly support publication of responses. xMsg will not reply synchronous requests automatically. If the callback does not send a response, the actor doing the syncPublish call will timeout.

A received message is a synchronous request if the replyTo metadata is set. To reply this message, the response must be published to the topic defined by the value of replyTo. The xMsgMessage class provides methods to quickly access this metatada field:

boolean     hasReplyTopic()
xMsgTopic   getReplyTopic()

Finally, the response message shall be published to the same proxy used to start the subscription. The xMsg convention is to subscribe to the default proxy. If the wrong topic or proxy are used, the response will not be received.

xMsgConnection connection = actor.getConnection(); // to default proxy
xMsgTopic topic = xMsgTopic.build("data", "power");
xMsgSubscription sub = actor.subscribe(connection, topic, msg -> {
    try {
        byte[] data = processMessage(msg);
        // check if message is a sync request
        if (msg.hasReplyTopic()) {
            xMsgTopic resTopic = msg.getReplyTopic();
            xMsgMessage resMsg = new xMsgMessage(resTopic, "binary/data", data);
            // publish response to default proxy (the same of subscription)
            try (xMsgConnection resCon = actor.getConnection()) {
                actor.publish(resCon, resMsg);
            }
        }
    } catch (Exception e) {
        e.printStacktrace();
    }
});

To quickly create response messages, for example, returning the same input data or data of primitive type, the following static methods are also provided:

xMsgMessage createResponse(xMsgMessage msg)
xMsgMessage createResponse(xMsgMessage msg, Object data)

The response topic and mime-type will be set to the proper values.