Publishing an Experiment via MQTT#

This notebook is part of a proof concept study regarding use of the Internet of Things (IoT) as infrastructure for engineering teaching laboratories.

This notebook uses the SimPy library a create real-time simulation of a hypothetical experiment. The experiment responds to inputs and publishes data to a remote user via an an MQTT broker. In actual use, the content of this notebook would be implemented in an attached device such as an Arduino, Particle, or Raspberry Pi, with attached sensors.

The companion notebook ??? demonstrates how a remote client could interact with the experiment via MQTT.

Installations#

The following installations are required for use on Google Colab.

!pip install paho-mqtt
!pip install simpy
Requirement already satisfied: paho-mqtt in /Users/jeff/opt/anaconda3/lib/python3.7/site-packages (1.5.1)
Requirement already satisfied: simpy in /Users/jeff/opt/anaconda3/lib/python3.7/site-packages (4.0.1)

Publishing a real-time simulation via MQTT#

Topics:

topic

messages

cbe-virtual-lab/command

start and stop experiments

cbe-virtual-lab/expt-name/data

topic

Proof of Concept#

Here we experiment with encapsulating the experiment as a standalone class. This is set up so that upon receiving an appropriate message from the remote user, a new instance of the experiment is created and run.

%matplotlib inline
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import time
import matplotlib.pyplot as plt
import numpy as np
from IPython import display

# select experiment duration

# set up client to interact with cbe-virtual-laboratory

class CBEClient(mqtt.Client):
    
    def __init__(self, recv="", send=""):
        super().__init__()
        self.host = "mqtt.eclipse.org"
        self.recv = recv
        self.send = send

    def on_connect(self, client, userdata, flags, rc):
        print(f"Connected: {self.host} with return code {rc}")
        if self.recv:
            self.subscribe(self.recv, qos=2)
            print(f"Subscribed: {self.recv}")
        
    def on_message(self, client, userdata, msg):
        payload = json.loads(msg.payload.decode("utf-8"))
        print(f"Receieved: {payload} from {self.recv}")
    
    def connect(self):
        super().connect(host=self.host, port=1883, keepalive=60)
        
    def publish(self, payload):
        if self.send:
            super().publish(self.send, payload=payload)
            print(f"Sent: {payload} to {self.send}")
        else:
            print("No send topic has been specified.")
        
    def __enter__(self):
        self.connect()
        self.loop_start()
        time.sleep(0.5)
        print(f"Loop Started: {self}")
        return self
        
    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.loop_stop()
        print(f"Loop Stopped: {self}")
        
def on_message(client, userdata, msg):
    payload = json.loads(msg.payload.decode("utf-8"))
    send = "/".join(["cbe-virtual-laboratory", payload["client_id"]])
    print(send)
    print(f"Message: {payload}")
    with CBEClient(send=send) as expt:
        for k in range(payload["duration"]):
            x = 0
            y = 0
            expt.publish(json.dumps({"time": k, "x" : x, "y" : y}))
            time.sleep(1)

# listen for command on the command channel
with CBEClient(recv = "cbe-virtual-laboratory/command/#") as cbe:
    cbe.on_message = on_message
    time.sleep(30)
Connected: mqtt.eclipse.org with return code 0
Subscribed: cbe-virtual-laboratory/command/#
Loop Started: <__main__.CBEClient object at 0x7fe572670e10>
cbe-virtual-laboratory/12
Message: {'client_id': '12', 'duration': 10}
Connected: mqtt.eclipse.org with return code 0
Loop Started: <__main__.CBEClient object at 0x7fe5733a2910>
Sent: {"time": 0, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 1, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 2, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 3, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 4, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 5, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 6, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 7, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 8, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Sent: {"time": 9, "x": 0, "y": 0} to cbe-virtual-laboratory/12
Loop Stopped: <__main__.CBEClient object at 0x7fe5733a2910>
cbe-virtual-laboratory/2485378613250
Message: {'client_id': '2485378613250', 'duration': 10}
Connected: mqtt.eclipse.org with return code 0
Loop Started: <__main__.CBEClient object at 0x7fe5733a2bd0>
Sent: {"time": 0, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 1, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 2, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 3, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 4, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 5, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 6, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 7, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 8, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Sent: {"time": 9, "x": 0, "y": 0} to cbe-virtual-laboratory/2485378613250
Loop Stopped: <__main__.CBEClient object at 0x7fe5733a2bd0>
Loop Stopped: <__main__.CBEClient object at 0x7fe572670e10>

Version 1#

T

import time
import simpy
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
import json

class Experiment():

    def __init__(self, topic, duration):
        self.topic = topic
        self.duration = duration
        self.env = simpy.rt.RealtimeEnvironment(factor=1)
        self.proc = self.env.process(self.process())
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_publish = self.on_publish

    def on_connect(self, client, userdata, flags, rc):
        print(f"Connected with result code {rc}")
        self.client.subscribe(self.topic)

    def on_publish(self, client, userdata, result):
        print(f"{client} published with result code {result}")

    def process(self):
        t_start = time.perf_counter()
        t = 0
        y = 2.0
        while True:
            msg = f"{round(t,2)},{y:5.2f}"
            self.client.publish(self.topic, msg)
            yield self.env.timeout(1 - (t - round(t, 0)))
            t = time.perf_counter() - t_start
            y -= 0.1*y

    def run(self, client):
        print(f"Experiment started by {client}")
        self.client.connect("mqtt.eclipse.org", 1883, 60)
        self.env.run(until=self.duration)
        self.client.disconnect()
        print("End experiment.")

# set up client to wait for messages on 
#     cbe-virtual-laboratory/command/#
# expect a message with a specified experiment duration

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("cbe-virtual-laboratory/command/#")

def on_message(client, userdata, msg):
    print(f"Received {msg.payload} from {msg.topic}")
    data = json.loads(msg.payload.decode("utf-8"))
    duration = data['duration']
    expt = Experiment("cbe-virtual-laboratory/expt", duration)
    expt.run(client)

# setup client
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# connect client to broker
client.connect("mqtt.eclipse.org", 1883, 60)

# start a non-blocking thread to wait for messages
client.loop_start()

# prove the loop is non-blocking
for k in range(20):
    print(k)
    time.sleep(1)

# don't leave a zombie thread behind
client.loop_stop()
0
Connected with result code 0
1
2
3
Received b'{"duration": 10}' from cbe-virtual-laboratory/command
Experiment started by <paho.mqtt.client.Client object at 0x7fe57268dd10>
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 1
4
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 2
5
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 3
6
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 4
7
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 5
8
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 6
9
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 7
10
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 8
11
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 9
12
<paho.mqtt.client.Client object at 0x7fe5726a36d0> published with result code 10
13
End experiment.
14
15
16
17
18
19