4. Hardware-Software Communication
In this tutorial, you will learn how to send commands to and receive sensor data from a microcontroller using the MQTT protocol.
Workflow diagram of using a MQTT broker with the “publish/subscribe” model to pass temperature data from one client to another.
Onboard LED using MQTT
In an earlier module (blink and read), you learned how to blink the onboard LED every two seconds. In this section, you will control the onboard LED using MQTT, a standard protocol for internet-of-things communication. By definition, hardware/software communication implies that you are interfacing multiple devices (in this case, your microcontroller and a Jupyter notebook running on e.g., Google Colab). Let’s start with a hands-on example.
✅ Create a new file on your Pico W microcontroller called mqtt_led.py
and copy the script below (based on JayPalm’s gist and mqtt_as
README). Ensure that your microcontroller has been set up according to the instructions in the “Running the demo” module earlier in the course (only netman.py
[permalink] and your filled-out secrets.py
are required). You will also need to upload the mqtt_as.py
module to either the main directory or the lib
folder. Make sure to replace <your_id_here>
with your course ID (without the brackets). If you’ve forgotten your GitHub Classroom course ID, you can refer back to your quiz responses from the GitHub starter tutorial assignment. Once you’ve updated the file, save the file and click play.
from mqtt_as import MQTTClient, config
from machine import Pin, ADC
import asyncio
from netman import connectWiFi
import ussl
import ntptime
from time import time
from secrets import (
HIVEMQ_HOST,
HIVEMQ_PASSWORD,
HIVEMQ_USERNAME,
PASSWORD,
SSID,
)
connectWiFi(SSID, PASSWORD, country="US")
# usually would be a device-specific ID, but using course ID for now
COURSE_ID = "<your_id_here>" # UPDATE THIS TO YOUR ID
# To validate certificates, a valid time is required
ntptime.timeout = 30 # type: ignore
ntptime.host = "pool.ntp.org"
ntptime.settime()
print("Obtaining CA Certificate")
# generated via https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/7.2.1-hivemq-openssl-certificate.ipynb # noqa: E501
with open("hivemq-com-chain.der", "rb") as f:
cacert = f.read()
f.close()
# Local configuration
config.update(
{
"ssid": SSID,
"wifi_pw": PASSWORD,
"server": HIVEMQ_HOST,
"user": HIVEMQ_USERNAME,
"password": HIVEMQ_PASSWORD,
"ssl": True,
"ssl_params": {
"server_side": False,
"key": None,
"cert": None,
"cert_reqs": ussl.CERT_REQUIRED,
"cadata": cacert,
"server_hostname": HIVEMQ_HOST,
},
"keepalive": 30,
}
)
onboard_led = Pin("LED", Pin.OUT) # Pico W is slightly different than Pico
command_topic = f"{COURSE_ID}/onboard_led"
sensor_data_topic = f"{COURSE_ID}/onboard_temp"
adcpin = 4
sensor = ADC(adcpin)
def ReadTemperature():
adc_value = sensor.read_u16()
volt = (3.3 / 65535) * adc_value
temperature = 27 - (volt - 0.706) / 0.001721
# internal temp sensor has low precision, so round to 1 decimal place
return round(temperature, 1)
async def messages(client): # Respond to incoming messages
async for topic, msg, retained in client.queue:
try:
topic = topic.decode()
msg = msg.decode()
retained = str(retained)
print((topic, msg, retained))
if topic == command_topic:
if msg == "on":
onboard_led.on()
elif msg == "off":
onboard_led.off()
elif msg == "toggle":
onboard_led.toggle()
temperature = ReadTemperature()
print(f"Publish {temperature} to {sensor_data_topic}")
# If WiFi is down the following will pause for the duration.
await client.publish(sensor_data_topic, f"{temperature}", qos=1)
except Exception as e:
print(e)
async def up(client): # Respond to connectivity being (re)established
while True:
await client.up.wait() # Wait on an Event
client.up.clear()
await client.subscribe(command_topic, 1) # renew subscriptions
async def main(client):
await client.connect()
for coroutine in (up, messages):
asyncio.create_task(coroutine(client))
start_time = time()
# must have the while True loop to keep the program running
while True:
await asyncio.sleep(5)
elapsed_time = round(time() - start_time)
print(f"Elapsed: {elapsed_time}s")
config["queue_len"] = 2 # Use event interface with specified queue length
MQTTClient.DEBUG = True # Optional: print diagnostic messages
client = MQTTClient(config)
del cacert # to free memory
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
After running the above script, you should see a message something like the following printed to the Thonny command line:
MPY: soft reboot MAC address: <...> connected ip = <...> Obtaining CA Certificate Checking WiFi integrity. Got reliable connection Connecting to broker. Connected to broker. Elapsed: 10s Elapsed: 20s RAM free 119776 alloc 57504 Elapsed: 30s ...
Then, navigate to the companion Jupyter notebook and open it in a Jupyter IDE of your choice. For your convenience, an “Open in Colab” link is provided. If you are running it elsewhere, you will need to manually install the paho-mqtt
and matplotlib
packages (i.e., pip install paho-mqtt matplotlib
). Update it with your course ID and run the notebook. You should see a plot of the temperature data being sent from your microcontroller to the Jupyter notebook. You should also see the onboard LED on your microcontroller blinking every few seconds in response to the commands sent by the Jupyter notebook.
While there are simpler implementations of MQTT on a Pico W microcontroller, the current setup which uses micropython-mqtt
is secure, robust, resilient, and asynchronous. What this means is that:
All messages you send are encrypted and private (assuming you created your own HiveMQ instance)
Message delivery can be guaranteed
Receiving multiple messages in a short time frame is handled smoothly
The device automatically reconnects if WiFi gets spotty
The microcontroller’s CPU resources are used efficiently via multi-tasking
For a hobbyist project, these might not be large concerns; however, when implementing an autonomous experimentation setup for research purposes, these are very important. Also keep in mind that much of this is “boilerplate code” that you can copy and paste into your own projects.
How MQTT Works
✅ Read the following three pages from this MQTT basics course:
✅ Read How to Use The Paho MQTT Python Client for Beginners from the MQTT and Python For Beginners course
✅ Review the micropython-mqtt repository README and its Asynchronous MQTT docs
✅ Read Receiving Messages with the Paho MQTT Python Client, paying special attention to the “Use the Queue object” section. Then read the Basic first-in first-out (FIFO) Queue section from this page
Additional Resources
How to Send and Receive Data Using Raspberry Pi Pico W and MQTT
To better understand the
async
andawait
expressions in the code above, refer to Make Your Microcontroller Multi-task With Asynchronous ProgrammingA simpler, less robust alternative to
mqtt_as
:micropython-umqtt.simple2
repositoryHiveMQ’s websocket client for MQTT testing (note that one is also available on your HiveMQ instance which can be used for your instance without as much setup)
HiveMQ has a free public broker and browser client than can be useful for debugging and troubleshooting
Another popular MQTT framework is Mosquitto, which also has a public (non-secure) broker
Alternative Frameworks
MQTT is only one of many good protocols that can be used for hardware/software communication. Other popular protocols, especially in the context of laboratory information, include:
Workflow orchestration packages for laboratory automation are generally based on a standard communication protocol like MQTT and the ones mentioned above.
See also https://github.com/AccelerationConsortium/awesome-self-driving-labs#software.
Once you’ve successfully finished the example from above and completed the reading material, you are done with this tutorial 🎉. Return to the course website to do a knowledge check.