Post by Jason on Aug 8, 2022 8:13:51 GMT -8
Good afternoon all,
After having the desire to do so for a LONG time, I finally sat my lazy self down and wrote an MQTT class wrapper in Python3 for paho.mqtt. Included below is my current source code and a test script.
Class file mqtt.py
Test script testScript.py
This is an alpha version at best that appears to work with subscribing to topics and capturing the messages. I've not yet written the publish functionality.
Thanks,
Jason
After having the desire to do so for a LONG time, I finally sat my lazy self down and wrote an MQTT class wrapper in Python3 for paho.mqtt. Included below is my current source code and a test script.
Class file mqtt.py
from paho.mqtt.client import Client as mqttClient
from queue import Queue
class mqtt:
def __init__(self, client_id, host, port, auth):
self._client_id = client_id
self._host = host
self._port = port
self.connected = False
self._reconnect = True
# setup queue to capture messages from topic subscription
self.q = Queue()
# what to do with auth?!?
# instantiate client from paho.mqtt library
print('instantiating mqtt client...')
self.client = mqttClient(client_id=self._client_id)
# register callbacks
print('registering callbacks...')
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_subscribe = self._on_subscribe
self.client.on_publish = self._on_publish
self.client.on_unsubscribe = self._on_unsubscribe
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc):
print('on connect')
print('rc = {0}'.format(rc))
if rc == 0:
print('connection established...')
else:
print('connection failed...')
self.connected = True
def _on_reconnect(self):
print('on reconnect')
self.connected = True
def _on_disconnect(self, client, userdata, rc):
print('on disconnect')
self.connected = False
self.client.reconnect()
def _on_subscribe(self, client, userdata, mid, granted_qos):
print('on subscribe')
def _on_publish(self, client, userdata, mid):
print('on publish')
def _on_unsubscribe(self, client, userdata, mid):
print('on unsubscribe')
def _on_message(self, client, userdata, message):
print('message received...')
self.q.put(message.payload.decode('utf-8'))
def begin(self):
# connect the client to the mqtt broker
print('establishing connection...')
self.client.connect(
host=self._host,
port=self._port
)
self.client.loop_start()
def end(self):
print('end')
self._reconnect = False
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, topic):
print('subscribing to topic {0}...'.format(topic))
self.client.subscribe(topic)
def get_message(self):
print('getting message...')
return self.q.get()
def queue_empty(self):
return self.q.empty()
Test script testScript.py
from mqtt import mqtt
from time import sleep
mqttClient = mqtt('testClient', BROKER_IP_ADDRESS, BROKER_PORT, '')
mqttClient.begin()
mqttClient.subscribe(TOPIC)
while True:
while not mqttClient.queue_empty():
msg = mqttClient.get_message()
print(msg)
sleep(5)
# except KeyboardInterruption as ki:
# mqttClient.end()
This is an alpha version at best that appears to work with subscribing to topics and capturing the messages. I've not yet written the publish functionality.
Thanks,
Jason