Creating messages

To construct a message, the topic, the metadata and the serialized data are needed:

xMsgMessage(xMsgTopic topic, xMsgMeta.Builder metaData, byte[] data)
xMsgMessage(xMsgTopic topic, String mimeType, byte[] data)

An overload with just the mime-type is provided for simple messages when there is no need to set other metadata fields. Without the mime-type the message will be malformed and it will not be published.

The data in the message is always a byte array. xMsg does not do serialization of user defined data.

// Set the topic
xMsgTopic topic = xMsgTopic.build("domain", "subject", "type");

// Set the metadata
xMsgMeta.Builder meta = xMsgMeta.newBuilder();
meta.setDataType("binary/type");
meta.setByteOrder(xMsgMeta.Endian.Little);
meta.setCommunicationId(666);

// The data of the message
SomeType value = new SomeType();
byte[] data = SomeType.serialize(value);

// Create the message
xMsgMessage msg = new xMsgMessage(topic, meta, data);

For primitives and arrays of primitives, a protobuffer container class is provided to store and serialize the data:

xMsgData.Builder builder = xMsgData.newBuilder();
builder.setFLSINT32(100);
builder.setDOUBLE(4.5);
builder.addAllDOUBLEA(Arrays.asList(4.4, 5.6, 2.1));
byte data = builder.build().toByteArray();

To help creating simple messages, a static method can serialize primitives, arrays of primitives or Java objects:

xMsgMessage createFrom(xMsgTopic topic, Object data)

In this case, the data will be stored and serialized in protobuffer format (see xMsgData), and the mime-type will be set to the proper predefined value (see xMsgMimeType).

xMsgMessage msg1 = xMsgMessage.createFrom(topic, 200});
xMsgMessage msg2 = xMsgMessage.createFrom(topic, new Double[] { 3, 4, 5});
xMsgMessage msg3 = xMsgMessage.createFrom(topic, "string data");

assert msg1.getMimeType().equals(xMsgMimeType.SFIXED32);
assert msg2.getMimeType().equals(xMsgMimeType.ARRAY_DOUBLE);
assert msg3.getMimeType().equals(xMsgMimeType.STRING);

Reading messages

To read the data of a message, the mime-type must be checked first. If the type is known, the data can be deserialized:

Type data = null;
if (msg.getMimeType().equals("binary/type")) {
    byte[] bb = msg.getData();
    data = Type.deserialize(bb);
}

Or if the byte order matters:

Type data = null;
if (msg.getMimeType().equals("binary/type")) {
    byte[] bb = msg.getData();
    ByteOrder order = msg.getDataOrder();
    data = Type.deserialize(bb, order);
}

When the data type is a primitive, arrays of primitives or Java object, a static helper method can parse the data from the message:

<T> T parseData(xMsgMessage msg, Class<T> dataType)

The mime-type will be used to check if the message contains data of the expected type. Primitives and arrays of primitives should have been serialized as protobuffer format. The createFrom method can help with that.

Integer intData = xMsgMessage.parseData(msg, Integer.class);
Double[] arrayData = xMsgMessage.parseData(msg, Double[].class);
String stringData = xMsgMessage.parseData(msg, String.class);
JavaType objectData = (JavaType) xMsgMessage.parseData(msg);

Creating connections

The try-with-resources block is the preferred way to obtain and use a connection:

try (xMsgConnection connection = actor.getConnection(proxyAddress)) {
    // use the connection
} catch (xMsgException e) {
    e.printStacktrace();
}

The actor must be destroyed in order to close all connections cached in the connection pool.

New connections can be customized by providing a connection setup:

public class CustomSetup implements xMsgConnectionSetup {
    @override
    public void preConnection(Socket socket) {
        // set options before the ZMQ socket is connected
    }
    @override
    public void postConnection() {
        System.out.println("Successfully connected");
    }
}

actor.setConnectionSetup(new CustomSetup());

The setup will be used each time a new connection is created.

Publication

The xMsg actor presents a single method to publish messages:

void publish(xMsgConnection connection, xMsgMessage message) throws xMsgException

For short publication tasks, the connection should be returned to the pool, to be reused by others threads:

try (xMsgConnection connection = actor.getConnection(proxyAddress)) {
    xMsgMessage message = createMessage();
    actor.publish(connection, message);
} catch (xMsgException e) {
    e.printStacktrace();
}

If the connections are never returned to the pool, new connections will be created each time getConnection is called, which can affect performance.

The xMsg actor can publish messages on multiples threads, but each thread must obtain its own connection.

try (xMsg actor = new xMsg("multithread-publisher")) {
    xMsgTopic topic = xMsgTopic.build("report");
    ExecutorService es = Executors.newCachedThreadPool();
    for (int i = 0; i < 8; i++) {
        es.submit(() -> {
            try (xMsgConnection connection = actor.getConnection()) {
                String data = longRunningTask();
                xMsgMessage msg = xMsgMessage.createFrom(topic, data);
                actor.publish(connection, msg);
            } catch (xMsgException e) {
                e.printStacktrace();
            }
        });
    }
    es.shutdown();
    es.awaitTermination(2, TimeUnit.MINUTES);
}

If the actor is just doing a few long-running publication tasks, each one to the same proxy, there is no need to return the connections to the pool:

try (xMsgConnection connection = actor.getConnection(proxyAddress)) {
    while (keepRunning) {
        actor.publish(connection, generateMessage());
    }
}

Closing the actor and exiting the JVM will not send all messages still on queue. If those messages should be delivered, the global ZMQ context should be destroyed.

public static void main(String[] argv) {
    try (xMsg publisher = new xMsg("publisher");
         xMsgConnection con = publisher.getConnection()) {
        xMsgTopic topic = xMsgTopic.build("report", "sports");
        for (int i = 0; i < 100000; i++) {
            xMsgMessage msg = createReport(topic);
            publisher.publish(con, msg);
        }
    } catch (xMsgException e) {
        e.printStackTrace();
    }
    // wait until all messages are published by ZMQ
    xMsgContext.destroyContext();
}

Subscriptions

The xMsg actor presents a single call to start a subscription:

xMsgSubscription subscribe(xMsgConnection connection,
                           xMsgTopic topic,
                           xMsgCallBack callback) throws xMsgException

User-defined callbacks

The callback interface presents a single method, that receives a message that matches the topic of the subscription:

public interface xMsgCallBack {
    void callback(xMsgMessage msg);
}

Lambda functions can be used to write simple callbacks:

xMsgConnection connection = actor.getConnection();
xMsgTopic topic = xMsgTopic.build("data", "cars");
xMsgSubscription sub = actor.subscribe(connection, topic, msg -> {
    System.out.println("Received: " + xMsgMessage.parseData(msg, String.class));
});

For each received message on the subscription, the callback will run with the message as the argument. Callbacks do not run as soon as the messages are received; they are submitted to be executed by the worker threads of the internal threadpool, when a thread is available.

Since the actual callback object is created once per subscription, the same callback may be executed simultaneously by many worker threads to process multiple received messages. Therefore, any user-defined callback shall be thread-safe:

class ThreadSafeAccumulator implements xMsgCallBack {

    private AtomicInteger sum = new AtomicInteger();

    @Override
    public void callback(xMsgMessage msg) {
        sum.addAndGet(xMsgMessage.parseData(msg, Integer.class));
    }

    public int getSum() {
        return sum.get();
    }
}

xMsgConnection connection = actor.getConnection();
xMsgTopic topic = xMsgTopic.build("data", "numbers", "integers");
ThreadSafeAccumulator callback = new ThreadSafeAccumulator();
xMsgSubscription sub = actor.subscribe(connection, topic, callback);

The actor can also be used inside the callback to publish new messages. This allows writing complex interactions between actors — like service-oriented architectures, where services send data to other services to process a request. The connections must be obtained inside the callback, and closed after publishing:

xMsgConnection connection = actor.getConnection();
xMsgTopic topic = xMsgTopic.build("data", "power");

xMsgSubscription sub = actor.subscribe(connection, topic, msg -> {
    try {
        Object result = processMessage(msg);

        xMsgTopic pubTopic = xMsgTopic.build("result", "data");
        xMsgTopic logTopic = xMsgTopic.build("result", "log");
        xMsgProxyAddress pubAddr = selectAddress(result);
        xMsgProxyAddress logAddr = getLogAddress();

        try (xMsgConnection pubCon = actor.getConnection(pubAddr);
            xMsgConnection logCon = actor.getConnection(logAddr)) {
            xMsgMessage pubMsg = createMessage(pubTopic, result);
            xMsgMessage logMsg = createLogMessage(logTopic, result);
            actor.publish(pubCon, pubMsg); // publish to proxy 1
            actor.publish(logCon, logMsg); // publish to proxy 2
        }
    } catch (Exception e) {
        e.printStacktrace();
    }
});

Stopping subscriptions

To stop a subscription, the subscription handler is required:

void unsubscribe(xMsgSubscription handler)

The background thread will stop receiving messages, the subSocket will be unsubscribed to the topic, and the connection will be closed.

Stopping the subscription will not remove or interrupt the callbacks of the subscription that are still pending or running in the internal threadpool.

All active subscriptions will also be closed when the actor is destroyed.

Since the subscriptions run in background threads, there must be a main thread that is kept alive while subscriptions are active. Otherwise the actor will be closed, all subscriptions will be stopped and the program will finish.

private static volatile boolean keepRunning = true;

public static void main(String[] argv) {
    try (xMsg subscriber = new xMsg("subscriber")) {
        xMsgTopic topic = xMsgTopic.build("report", "sports");
        xMsgConnection connection = subscriber.getConnection();
        subscriber.subscribe(connection, topic, msg -> processMessage(msg));
        // keep subscription running until another threads cancels it
        while (keepRunning) {
            xMsgUtil.sleep(100);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Synchronous Publication

xMsg supports publishing a message and receiving a response, with the following method:

xMsgMessage syncPublish(xMsgConnection connection, xMsgMessage msg, int timeout)
        throws xMsgException, TimeoutException

This publishes the message just like the publish method, but this time the metadata is modified with a unique replyTo field. Then the method will block until a response message is received or the timeout occurs, whichever happens first.

In order to receive a response, the subscription callback must support sync-publication and publish response messages to the expected topic. xMsg does not publish a response automatically.

As with normal publication, the xMsg actor can sync-publish messages on multiples threads, but each thread must obtain its own connection.

executor.submit(() -> {
    try (xMsgConnection con = actor.getConnection()) {
        xMsgMessage msg = createMessage();
        xMsgMessage res = actor.syncPublish(con, msg, 10000);
        process(res);
    } catch (xMsgException | TimeoutException e) {
        e.printStacktrace();
    }
});

Receiving responses

When a message is sync-published, its metadata will be modified to contain a unique replyTo field. This value is generated by the actor for each sync-published message, and correspond to the topic that can be used by the subscription to publish a response message.

The format of the replyTo topic is: ret:<ID>:LDDDDDD. The <ID> is the unique identifier of the actor, generated on the constructor. L is the language (1 for Java, 2 for C++, 3 for Python), and DDDDDD is a 6-digit serial number between 0 and 999999, different for each message. When 999999 is reached, it starts from 0 again. This unique replyTo value per message ensures that the response can be matched with the sync-publication call that published the request.

In order to receive the response message, the actor must have a subscription to the proxy where the response will be published. To avoid creating a new subscription every time a sync message is sent, only a single subscription per proxy will be created, with topic: ret:<ID>. This subscription will be running on background because it will be reused to receive the responses of all sync-publication requests to that proxy:

if no response socket to address:
    create socket to address
    subscribe socket to "ret:<ID>"
set reply topic to "ret:<ID>:<SEQ>"
publish msg
wait response

Since response messages are received in a different thread, a concurrent map is used to pass messages to the waiting threads that sync-published those requests, with the unique replyTo topic as the key:

ConcurrentMap<Topic, Message> responses

Waiting a response is just checking the map periodically for a message with topic equals to replyTo, until the map contains the expected message or timeout occurs.

The actor may have multiple response subscriptions, to many proxies. Unlike user-defined subscriptions (each one on its own thread), only a single background poller thread checks response messages in all subscribed sockets:

while true:
    poll all sockets
    for each socket:
        if socket contains message
            put message on responses map

This poller thread is started on the xMsg constructor, but every socket is created and subscribed the first time a message is sync-published to a proxy.