Skip navigation
All People > Rick.Brown > Rick Brown's Blog > 2018 > August
2018

Over the past couple of years, we've solicited opinion in the DevTest community about MQTT, but there's been little customer traction as yet. I use MQTT at home (see My Current State of Home Automation  for an introduction to my setup), and I'm sure I would make use of DevTest with MQTT, so I've been wondering whether to enable support for a few months.

 

I'm not a member of the Product organisation at CA, so nothing I write is ever going to form any official CA position on any subject - I'm just a hacker who happens to work in Pre-sales, and has lots of experience with DevTest (since LISA v4.x).

 

The final impetus to creating this blog entry was a question on the DevTest community Does DevTest support pub/sub for MQTT?  and so I wondered whether my interest would be more than personal.

The first thing to investigate is whether there's an open source MQTT client presented as a Java library. An instant response from Google pointed me at the Apache Paho project, as explained and described in the link, The link also includes a simp publisher, pasted from a Paho example.

 

Subscribing is a little more tricky. as message reading uses a callback. Fortunately, talented developers around the world share their source code, and I could nab this to re-use in DevTest. I based my subscriber on solace-samples-mqtt/TopicSubscriber.java at master · SolaceSamples/solace-samples-mqtt · GitHub 

 

So I have a pair of simple MQTT test steps

My Publish step is what I pasted in the previous link, but here is it for completeness:

 

 

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

 

String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://192.168.1.21:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();

 

MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
_logger.info("Connecting to broker: {}", broker);
sampleClient.connect(connOpts);
_logger.info("Connected");
_logger.info("Publishing message: {}", content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
_logger.info("Message published");
sampleClient.disconnect();
_logger.info("Disconnected");
return "Message sent";

My Subscribe step is based on that Solace code:

 


import java.sql.Timestamp;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttCallback;

 


String topic = "MQTT Examples";
String content = "";
int qos = 2;
String broker = "tcp://192.168.1.21:1883";
String clientId = "DevTest_subscribe2"; // this needs to be unique. If previously failed, you 'll need a new one
long timeout = 30000; // timeout in milliseconds


MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
_logger.info("Connecting to broker: {}", broker);
sampleClient.connect(connOpts);
_logger.info("Connected");
//_logger.info("Publishing message: "+content);
//MqttMessage message = new MqttMessage(content.getBytes());
// Latch used for synchronizing b/w threads
final CountDownLatch latch = new CountDownLatch(1);

 

// Topic filter the client will subscribe to
final String subTopic = topic;

 

// Callback - Anonymous inner-class for receiving messages
sampleClient.setCallback(new MqttCallback() {

 

  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // Called when a message arrives from the server that
    // matches any subscription made by the client
    String time = new Timestamp(System.currentTimeMillis()).toString();
    content = new String(message.getPayload());
    _logger.info("\nReceived a Message!\n\tTime: {}\n\tTopic: {}\n\tMessage: {}\n\tQoS: {}", time, topic, content, message.getQos() + "\n");
    latch.countDown(); // unblock main thread
  }

 

  public void connectionLost(Throwable cause) {
    _logger.info("Connection to broker lost! {}", cause.getMessage());
    latch.countDown();
  }

 

  public void deliveryComplete(IMqttDeliveryToken token) {
  }

 

});

 

// Connection Details
var connection_properties = connOpts.getDebug();
_logger.info("MQTT connection properties: {}", connection_properties);

 

// Subscribe client to the topic filter and a QoS level of 0
_logger.info("Subscribing client to topic: {}", topic);
sampleClient.subscribe(topic, qos);
_logger.info("Subscribed");

 

// Wait for the message to be received
try {
  latch.await(timeout, TimeUnit.MILLISECONDS); // block here until message received, and latch will flip
                                                                               // If there are no arguments, this will listen forever
} catch (InterruptedException e) {
  _logger.info("I was awoken while waiting");
}

 

// Disconnect the client
sampleClient.disconnect();
_logger.info("Exiting");

 

return content;

And so now I can send and receive simple messages over MQTT. I could expand the scripts with MqttConnectOptions to add things like SSL support, but I don't currently have a requirement to do that.

 

I scanned through the MQTT specifications for some of the more advanced messaging concepts, but I didn't see information about things like correlation IDs. I also didn't notice anything regarding message structure, and the only thing I found about topic naming was to avoid Unicode U+0000 (null). Enforced metadata is also lacking.

 

So, this will do for now, as a simple driver for MQTT. Any implementation I do with this will include the generation and encoding of data in different steps, along with abstracting the MQTT configuration options. These steps (with minimal changes, such as the subscriber not timing out) can also be the foundation for a half-bridge virtual service, much like the Tibrv one I blogged about.

I was going to write some MQTT work with DevTest here, but I noticed that I haven't made an update about my home automaton setup for a couple of years, since https://communities.ca.com/blogs/rickTesting/2016/07/15/iot-testing-next-small-steps  so I should do that first, as a precursor to why I'm interested in MQTT.

 

Since then, we’ve had some home renovation work done, and I took the opportunity to add more smart devices.

I integrated Hue strip lights and downlighters on my top floor, so I’ve added a Hue gateway (it would be great if a Zigbee controller hub would control all my Zigbee devices, but Hue and Hive don’t interoperate).

I added a Broadlink RM2 Pro Plus (and a RM Mini) for replication of IR and RF remote control functionality.

I bought some simple RF sockets, so I can start to have control of electrical items. I use one for a standard lamp, and others for Christmas lights.

I installed Node-RED to complement my OpenHAB installation. I migrated much of my IoT functionality to Node-RED, as its user experience is more positive to me. I know that OpenHAB is written around IBM’s Eclipse SmartHome, and Node-RED was written by IBM in Hursley (if it had been possible to have all this stuff when I worked for IBM, I might not have left there!), and it’s obvious that they have origins in different decades. As an occasional hacker, the new model-based approach to home automation in Node-RED matches the CA Service Virtualization approach to model-based virtual services and API tests, and the CA Agile Requirements Designer approach to model-based testing, so it all synchronises nicely.

I upgraded my home wifi by installing BT Whole Home Wifi, so I get better wifi on the upper floors in my house. I upgraded from SKY+ to SKY Q. So I now have two wifi mesh networks; one for networking and one dedicated to TV.

I bought my wife a gaming PC, so I added some AV1300 Powerline boxes to give her fast, reliable wired networking.

I implemented the TICK stack for monitoring my home network. This uses Telegraf to capture server stats, InfluxDB to hold time series data, and Chronograf for metrics visualisation.

After implementing all the above, I realised that I needed a broker to store simple numeric data, so I wouldn’t need to hard-code any point-to-point integrations. I know and understand MQ, JMS and AMQP, but none of these is designed for the level of chaos that I would undoubtedly reach with all my disparate systems, so I did a quick test installation of the Mosquitto MQTT broker, and when I realised how easy it would be to read-and-write metric data in topics there, I decided that this would be my preference.

So, all my monitoring devices output to MQTT, my control devices read from MQTT, and I can visualise what’s happening.

I upgraded my broadband with BT, from “Superfast 2” (80Mbps download, 20Mbps upload) to “Ultrafast 2” (314Mbps download, 45Mbps upload). BT wanted to sell me “Ultrafast” (100Mbps download), but I work with BT, so I know the packages they are newly providing, and I requested the fasted package possible. The BT sales person said that this was the first sale of Ultrafast 2, and when the BT Openreach engineer turned up, he said that mine is only the third Ultrafast 2 installation that has happened in the UK, so I feel like a guinea pig for this. I asked about the theoretical maximum performance that BT could support using this technology, and the engineer said that he’s been told that it can support 825Mbps, so there’s somewhere for me to upgrade to, when BT work out how to package this service. This is starting to expose performance issues elsewhere on my LAN, as the broadband link is no longer the bottleneck!

This is what my default monitoring page looks like:

Hmm, looks like (from the first graph above) that my Broadlink isn’t reporting the current temperature in the lounge. After a quick investigation …

The IP address for the RM2 Pro Plus is different (I power-cycled it a week ago so I could add a USB extension cable to it).

I change the IP address from .68 to .157, and the section of the metrics graph (past 6 hours instead of 30 days, so help visualising the change) to:

So now I’m getting lounge temperature reporting again.

The other thing to note from the main metric graph is that I’m seeing about 270Mbps download and 50Mbps upload. This is being measured by Node-RED on my monitoring server, which uses hard-wired networking in my server farm. I check connection to OpenDNS every few seconds. If it’s successful, I do a speed test every 5 minutes. I implemented that functionality by using this Node-RED flow:

I expect to see a lower “real-world” performance than the link speed as measured on the BT SmartHubX, so I’m happy that I’m getting more than 300/50 from the hub to the exchange.

What each of my flows do

I don't intend uploading JSON depictions of my flows, as everyone's flows will be different.

Hive flow

This gets the current temperature (as measured by the Hive controller, which is located in the hallway) and the target temperature using Hive APIs, and writes those numbers to MQTT.

Nest flow

This flow checks the status of all the smoke alarms.

Hue flow

I have a Hue motion sensor, and this device writes the current temperature at its location (top floor) to MQTT.

I can turn the lights on or off on-demand.

Scan LAN flow

This scans the network on deploy, and scans the Broadlink devices on demand.

Public Services flow

As well as getting broadband performance, this flow also gets the weather forecast, storing the current temperature to a topic that Chronograf will read and add to visualisation.

Broadlink flow

This flow enables me to learn RF codes as well as checking status and sending temperatures.

It also determines whether to switch any of the lights on, through the RF sockets.

Lights Schedule flow

This determines when to turn the RF lights on or off. It will also report what it does using text-to-speech. I‘m a fan of cheesy ‘70s sci-fi TV programmes, and there was one called Blake’s Seven, which had three talking computers, all of which would be classed as AI these days. One of the computers was called Slave, and responded obsequiously, and this is how the flow responds to a change of state in the “Standard Lamp” node:

Dashboard Console flow

Node-RED can have a dashboard, so a web page can be built to monitor and control nodes & flows. This flow determines what will be shown there

The web page looks like this:

 

Persistent Storage flow

This flow integrates MQTT with InfluxDB

Alexa flow

I don’t own an Amazon Echo device, but there are third-party ways to connect this device to Node-RED, so this is implemented in case I decide to purchase one.

Tasker MQTT flow

I’m not a fan of Tasker on Android, because my phone always feels sluggish after installing it. It can, however, have a direct connection to MQTT

 

So, this is how my home is configured. You can see that most of my flows use MQTT, and you can see from the first metrics screenshot that I'm writing and reading perhaps 300 messages per second. I only have my implementation, so I don't know if 300 messages per second is small or large; I just know that it's what I'm storing so I can visualise (and debug) what I need.

Now I've introduced you to my setup, I suppose I should get back to DevTest with MQTT, as a different follow-up to IoT Testing - next small steps , so I'll do that in my next blog post.