Rick.Brown

MQTT test steps in DevTest

Blog Post created by Rick.Brown Employee on Aug 4, 2018

Over the past couple of years, we've solicited opinion in the DevTest community about MQTT, but there's been little customer traction as yet. I use MQTT at home (see My Current State of Home Automation  for an introduction to my setup), and I'm sure I would make use of DevTest with MQTT, so I've been wondering whether to enable support for a few months.

 

I'm not a member of the Product organisation at CA, so nothing I write is ever going to form any official CA position on any subject - I'm just a hacker who happens to work in Pre-sales, and has lots of experience with DevTest (since LISA v4.x).

 

The final impetus to creating this blog entry was a question on the DevTest community Does DevTest support pub/sub for MQTT?  and so I wondered whether my interest would be more than personal.

The first thing to investigate is whether there's an open source MQTT client presented as a Java library. An instant response from Google pointed me at the Apache Paho project, as explained and described in the link, The link also includes a simp publisher, pasted from a Paho example.

 

Subscribing is a little more tricky. as message reading uses a callback. Fortunately, talented developers around the world share their source code, and I could nab this to re-use in DevTest. I based my subscriber on solace-samples-mqtt/TopicSubscriber.java at master · SolaceSamples/solace-samples-mqtt · GitHub 

 

So I have a pair of simple MQTT test steps

My Publish step is what I pasted in the previous link, but here is it for completeness:

 

 

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

 

String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://192.168.1.21:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();

 

MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
_logger.info("Connecting to broker: {}", broker);
sampleClient.connect(connOpts);
_logger.info("Connected");
_logger.info("Publishing message: {}", content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
_logger.info("Message published");
sampleClient.disconnect();
_logger.info("Disconnected");
return "Message sent";

My Subscribe step is based on that Solace code:

 


import java.sql.Timestamp;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttCallback;

 


String topic = "MQTT Examples";
String content = "";
int qos = 2;
String broker = "tcp://192.168.1.21:1883";
String clientId = "DevTest_subscribe2"; // this needs to be unique. If previously failed, you 'll need a new one
long timeout = 30000; // timeout in milliseconds


MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
_logger.info("Connecting to broker: {}", broker);
sampleClient.connect(connOpts);
_logger.info("Connected");
//_logger.info("Publishing message: "+content);
//MqttMessage message = new MqttMessage(content.getBytes());
// Latch used for synchronizing b/w threads
final CountDownLatch latch = new CountDownLatch(1);

 

// Topic filter the client will subscribe to
final String subTopic = topic;

 

// Callback - Anonymous inner-class for receiving messages
sampleClient.setCallback(new MqttCallback() {

 

  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // Called when a message arrives from the server that
    // matches any subscription made by the client
    String time = new Timestamp(System.currentTimeMillis()).toString();
    content = new String(message.getPayload());
    _logger.info("\nReceived a Message!\n\tTime: {}\n\tTopic: {}\n\tMessage: {}\n\tQoS: {}", time, topic, content, message.getQos() + "\n");
    latch.countDown(); // unblock main thread
  }

 

  public void connectionLost(Throwable cause) {
    _logger.info("Connection to broker lost! {}", cause.getMessage());
    latch.countDown();
  }

 

  public void deliveryComplete(IMqttDeliveryToken token) {
  }

 

});

 

// Connection Details
var connection_properties = connOpts.getDebug();
_logger.info("MQTT connection properties: {}", connection_properties);

 

// Subscribe client to the topic filter and a QoS level of 0
_logger.info("Subscribing client to topic: {}", topic);
sampleClient.subscribe(topic, qos);
_logger.info("Subscribed");

 

// Wait for the message to be received
try {
  latch.await(timeout, TimeUnit.MILLISECONDS); // block here until message received, and latch will flip
                                                                               // If there are no arguments, this will listen forever
} catch (InterruptedException e) {
  _logger.info("I was awoken while waiting");
}

 

// Disconnect the client
sampleClient.disconnect();
_logger.info("Exiting");

 

return content;

And so now I can send and receive simple messages over MQTT. I could expand the scripts with MqttConnectOptions to add things like SSL support, but I don't currently have a requirement to do that.

 

I scanned through the MQTT specifications for some of the more advanced messaging concepts, but I didn't see information about things like correlation IDs. I also didn't notice anything regarding message structure, and the only thing I found about topic naming was to avoid Unicode U+0000 (null). Enforced metadata is also lacking.

 

So, this will do for now, as a simple driver for MQTT. Any implementation I do with this will include the generation and encoding of data in different steps, along with abstracting the MQTT configuration options. These steps (with minimal changes, such as the subscriber not timing out) can also be the foundation for a half-bridge virtual service, much like the Tibrv one I blogged about.

Outcomes