4. Hardware-Software Communication

In this tutorial, you will learn how to send commands to and receive sensor data from a microcontroller using the MQTT protocol.

../../_images/mqtt-workflow.png

Workflow diagram of using a MQTT broker with the “publish/subscribe” model to pass temperature data from one client to another.

Onboard LED using MQTT

In an earlier module (blink and read), you learned how to blink the onboard LED every two seconds. In this section, you will control the onboard LED using MQTT, a standard protocol for internet-of-things communication. By definition, hardware/software communication implies that you are interfacing multiple devices (in this case, your microcontroller and a Jupyter notebook running on e.g., Google Colab). Let’s start with a hands-on example.

✅ Create a new file on your Pico W microcontroller called mqtt_led.py and copy the script below (based on JayPalm’s gist and mqtt_as README). Ensure that your microcontroller has been set up according to the instructions in the “Running the demo” module earlier in the course (only netman.py [permalink] and your filled-out secrets.py are required). You will also need to upload the mqtt_as.py module to either the main directory or the lib folder. Make sure to replace <your_id_here> with your course ID (without the brackets). If you’ve forgotten your GitHub Classroom course ID, you can refer back to your quiz responses from the GitHub starter tutorial assignment. Once you’ve updated the file, save the file and click play.

from mqtt_as import MQTTClient, config
from machine import Pin, ADC
import asyncio
from netman import connectWiFi
import ussl
import ntptime
from time import time

from secrets import (
    HIVEMQ_HOST,
    HIVEMQ_PASSWORD,
    HIVEMQ_USERNAME,
    PASSWORD,
    SSID,
)

connectWiFi(SSID, PASSWORD, country="US")

# usually would be a device-specific ID, but using course ID for now
COURSE_ID = "<your_id_here>"  # UPDATE THIS TO YOUR ID

# To validate certificates, a valid time is required
ntptime.timeout = 30  # type: ignore
ntptime.host = "pool.ntp.org"
ntptime.settime()

print("Obtaining CA Certificate")
# generated via https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/7.2.1-hivemq-openssl-certificate.ipynb # noqa: E501
with open("hivemq-com-chain.der", "rb") as f:
    cacert = f.read()
f.close()

# Local configuration
config.update(
    {
        "ssid": SSID,
        "wifi_pw": PASSWORD,
        "server": HIVEMQ_HOST,
        "user": HIVEMQ_USERNAME,
        "password": HIVEMQ_PASSWORD,
        "ssl": True,
        "ssl_params": {
            "server_side": False,
            "key": None,
            "cert": None,
            "cert_reqs": ussl.CERT_REQUIRED,
            "cadata": cacert,
            "server_hostname": HIVEMQ_HOST,
        },
        "keepalive": 30,
    }
)

onboard_led = Pin("LED", Pin.OUT)  # Pico W is slightly different than Pico

command_topic = f"{COURSE_ID}/onboard_led"
sensor_data_topic = f"{COURSE_ID}/onboard_temp"

adcpin = 4
sensor = ADC(adcpin)


def ReadTemperature():
    adc_value = sensor.read_u16()
    volt = (3.3 / 65535) * adc_value
    temperature = 27 - (volt - 0.706) / 0.001721
    # internal temp sensor has low precision, so round to 1 decimal place
    return round(temperature, 1)


async def messages(client):  # Respond to incoming messages
    async for topic, msg, retained in client.queue:
        try:
            topic = topic.decode()
            msg = msg.decode()
            retained = str(retained)
            print((topic, msg, retained))

            if topic == command_topic:
                if msg == "on":
                    onboard_led.on()
                elif msg == "off":
                    onboard_led.off()
                elif msg == "toggle":
                    onboard_led.toggle()
                temperature = ReadTemperature()
                print(f"Publish {temperature} to {sensor_data_topic}")
                # If WiFi is down the following will pause for the duration.
                await client.publish(sensor_data_topic, f"{temperature}", qos=1)
        except Exception as e:
            print(e)


async def up(client):  # Respond to connectivity being (re)established
    while True:
        await client.up.wait()  # Wait on an Event
        client.up.clear()
        await client.subscribe(command_topic, 1)  # renew subscriptions


async def main(client):
    await client.connect()
    for coroutine in (up, messages):
        asyncio.create_task(coroutine(client))

    start_time = time()
    # must have the while True loop to keep the program running
    while True:
        await asyncio.sleep(5)
        elapsed_time = round(time() - start_time)
        print(f"Elapsed: {elapsed_time}s")


config["queue_len"] = 2  # Use event interface with specified queue length
MQTTClient.DEBUG = True  # Optional: print diagnostic messages
client = MQTTClient(config)
del cacert  # to free memory
try:
    asyncio.run(main(client))
finally:
    client.close()  # Prevent LmacRxBlk:1 errors

After running the above script, you should see a message something like the following printed to the Thonny command line:

MPY: soft reboot
MAC address: <...>
connected
ip = <...>
Obtaining CA Certificate
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
Elapsed: 10s
Elapsed: 20s
RAM free 119776 alloc 57504
Elapsed: 30s
...

Then, navigate to the companion Jupyter notebook and open it in a Jupyter IDE of your choice. For your convenience, an “Open in Colab” link is provided. If you are running it elsewhere, you will need to manually install the paho-mqtt and matplotlib packages (i.e., pip install paho-mqtt matplotlib). Update it with your course ID and run the notebook. You should see a plot of the temperature data being sent from your microcontroller to the Jupyter notebook. You should also see the onboard LED on your microcontroller blinking every few seconds in response to the commands sent by the Jupyter notebook.

While there are simpler implementations of MQTT on a Pico W microcontroller, the current setup which uses micropython-mqtt is secure, robust, resilient, and asynchronous. What this means is that:

  1. All messages you send are encrypted and private (assuming you created your own HiveMQ instance)

  2. Message delivery can be guaranteed

  3. Receiving multiple messages in a short time frame is handled smoothly

  4. The device automatically reconnects if WiFi gets spotty

  5. The microcontroller’s CPU resources are used efficiently via multi-tasking

For a hobbyist project, these might not be large concerns; however, when implementing an autonomous experimentation setup for research purposes, these are very important. Also keep in mind that much of this is “boilerplate code” that you can copy and paste into your own projects.

How MQTT Works

✅ Read the following three pages from this MQTT basics course:

✅ Read How to Use The Paho MQTT Python Client for Beginners from the MQTT and Python For Beginners course

✅ Review the micropython-mqtt repository README and its Asynchronous MQTT docs

✅ Read Receiving Messages with the Paho MQTT Python Client, paying special attention to the “Use the Queue object” section. Then read the Basic first-in first-out (FIFO) Queue section from this page

Additional Resources

Alternative Frameworks

MQTT is only one of many good protocols that can be used for hardware/software communication. Other popular protocols, especially in the context of laboratory information, include:

Workflow orchestration packages for laboratory automation are generally based on a standard communication protocol like MQTT and the ones mentioned above.

See also https://github.com/AccelerationConsortium/awesome-self-driving-labs#software.

Once you’ve successfully finished the example from above and completed the reading material, you are done with this tutorial 🎉. Return to the course website to do a knowledge check.