Skip to content

Commit

Permalink
Support MQTT subscribe as data source
Browse files Browse the repository at this point in the history
  • Loading branch information
aerialist committed May 27, 2016
1 parent 67fe33c commit feb371d
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 30 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Not working/
log/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -87,4 +88,4 @@ ENV/
.spyderproject

# Rope project settings
.ropeproject
.ropeproject
109 changes: 95 additions & 14 deletions serialLogger.pyw
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import (
bytes, dict, int, list, object, range, str,
bytes, dict, int, list, object, range, #str, #paho uses isinstance of str and doesn't like future's str
ascii, chr, hex, input, next, oct, open,
pow, round, super,
filter, map, zip)
Expand All @@ -43,6 +43,7 @@ import pyqtgraph as pg

import numpy as np
import serial
import paho.mqtt.client as mqtt

from ui_serialLogger import Ui_MainWindow

Expand Down Expand Up @@ -105,6 +106,61 @@ class SerialWorker(QtCore.QObject):
if self.port:
self.port.close()

class MqttWorker(QtCore.QObject):
# http://stackoverflow.com/questions/6783194/background-thread-with-qthread-in-pyqt
finished = QtCore.pyqtSignal()
dataReady = QtCore.pyqtSignal(str)

def __init__(self):
super(MqttWorker, self).__init__()
self.host = "192.168.10.110"
self.port = 1883
self.running = False
self.fname = "magi_log.txt"

@QtCore.pyqtSlot()
def processA(self):
print("MqttWorker.processA")
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
print("Connecting to MQTT broker at {}".format(self.settings['host']))
self.client.connect(**self.settings)
self.client.loop_start()
while self.running:
time.sleep(1)
print("MqttWorker finished processA")
self.client.loop_stop()
self.finished.emit()

def on_connect(self, client, userdata, flags, rc):
print("Connected with result code "+str(rc))

# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
#client.subscribe("$SYS/#")
#self.client.subscribe("feeds/currentsensors/#")
# subscribe requires string not unicode ...
print("Topic: {}".format(self.topic))
self.client.subscribe(str(self.topic))

# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
self.dataReady.emit(msg.payload)

def startRunning(self, topic, host, port):
self.settings = {}
self.settings['host'] = host
self.settings['port'] = port
self.topic = topic
self.running = True

def stopRunning(self):
self.running = False

def __del__(self):
self.running = False

class dataObject(object):
def __init__(self, name, plotwdg, pen=pg.mkPen('r', width=1.0, style=QtCore.Qt.SolidLine)):
self.pen = pen
Expand Down Expand Up @@ -163,6 +219,12 @@ class MainWindow(QtGui.QMainWindow, Ui_MainWindow):
self.serialreader.dataReady.connect(self.processPayload)
self.thread.started.connect(self.serialreader.processA)

self.thread_mqtt = QtCore.QThread() # no parent!
self.mqttreader = MqttWorker() # no parent!
self.mqttreader.moveToThread(self.thread_mqtt)
self.mqttreader.dataReady.connect(self.processPayload)
self.thread_mqtt.started.connect(self.mqttreader.processA)

# if you want the thread to stop after the worker is done
# you can always call thread.start() again later
# obj.finished.connect(thread.quit)
Expand All @@ -178,10 +240,10 @@ class MainWindow(QtGui.QMainWindow, Ui_MainWindow):
#self._serialReader = serialreader()
#self._serialReader.updated.connect(self.processPayload)
self.pushButton.clicked.connect(self.start)
self.pushButton_mqtt_connect.clicked.connect(self.start_mqtt)
self.pushButton_update.clicked.connect(self.populatePort)
self.pushButtonUpdateFileName.clicked.connect(self.populateFileName)
self.pushButton_AutoRange.clicked.connect(self.onAutoRange)
self.checkBox_autoscroll.toggled.connect(self.onAutoScroll)

self.populatePort()
self.populateFileName()
Expand Down Expand Up @@ -215,13 +277,6 @@ class MainWindow(QtGui.QMainWindow, Ui_MainWindow):
def onAutoRange(self):
self.plotwdg.enableAutoRange()

def onAutoScroll(self):
if self.raw2box:
if self.checkBox_autoscroll.isChecked():
self.raw2box.end = QtGui.QTextCursor.End
else:
self.raw2box.end = None

def populatePort(self):
self.comboBox.clear()
serials = [('FILE', '', ''),]
Expand All @@ -244,30 +299,54 @@ class MainWindow(QtGui.QMainWindow, Ui_MainWindow):
def populateCheckBox(self, data):
pass


def start(self):
if not self.running:
print("Start running!")
#self._serialReader.start()
if str(self.comboBox.currentText()) == 'FILE':
self.serialreader.setFilename(str(QtGui.QFileDialog.getOpenFileName(self, 'Open log file', 'log/', '*.txt')))
self.serialreader.startRunning(str(self.comboBox.currentText()))
self.thread.start()
self.running = True
self.pushButton.setText("STOP")
self.pushButton_mqtt_connect.setEnabled(False)
if self.checkBox_csv.isChecked():
#TODO: except IOError. happens when log directory does not exist.
self.logfileh = open(str(self.lineEdit.text()), 'w')

else:
print("Stop running.")
# self._serialReader.quit()
# self._serialReader.exit()
# self._serialReader.wait()
self.serialreader.stopRunning()
self.thread.quit()
self.running = False
self.pushButton.setText("START")
self.pushButton_mqtt_connect.setEnabled(True)
if self.logfileh:
self.logfileh.close()
self.logfileh = None

def start_mqtt(self):
if not self.running:
print("Start running!")
#str(self.comboBox.currentText())
topic = str(self.lineEdit_mqtt_topic.text())
host = str(self.lineEdit_mqtt_host.text())
port = int(self.lineEdit_mqtt_port.text())
self.mqttreader.startRunning(topic, host, port)
self.thread_mqtt.start()
self.running = True
self.pushButton_mqtt_connect.setText("STOP")
self.pushButton.setEnabled(False)
if self.checkBox_csv.isChecked():
#TODO: except IOError. happens when log directory does not exist.
self.logfileh = open(str(self.lineEdit.text()), 'w')

else:
print("Stop running.")
self.mqttreader.stopRunning()
self.thread_mqtt.quit()
self.running = False
self.pushButton_mqtt_connect.setText("START")
self.pushButton.setEnabled(True)
if self.logfileh:
self.logfileh.close()
self.logfileh = None
Expand All @@ -280,6 +359,8 @@ class MainWindow(QtGui.QMainWindow, Ui_MainWindow):
#payload = unicode(payloadQs).encode('latin-1')
payload = payloadQs
self.textBrowser_log.append(payload.strip())
if self.checkBox_autoscroll.isChecked():
self.textBrowser_log.moveCursor(QtGui.QTextCursor.End)
if self.logfileh:
self.logfileh.write(payload)

Expand Down
103 changes: 99 additions & 4 deletions serialLogger.ui
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
<rect>
<x>0</x>
<y>0</y>
<width>708</width>
<width>681</width>
<height>584</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QHBoxLayout" name="horizontalLayout">
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<widget class="QSplitter" name="splitter_2">
<property name="orientation">
Expand All @@ -25,7 +25,7 @@
<enum>Qt::Vertical</enum>
</property>
<widget class="QWidget" name="">
<layout class="QVBoxLayout" name="verticalLayout_upperleft">
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<layout class="QHBoxLayout" name="horizontalLayout_serial">
<item>
Expand All @@ -39,6 +39,12 @@
</item>
<item row="0" column="1">
<widget class="QComboBox" name="comboBox">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>150</width>
Expand All @@ -51,6 +57,12 @@
</item>
<item>
<widget class="QPushButton" name="pushButton_update">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>Update</string>
</property>
Expand Down Expand Up @@ -111,13 +123,96 @@
</item>
<item>
<widget class="QPushButton" name="pushButton">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>Open</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QLabel" name="label_2">
<property name="text">
<string>MQTT host</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_mqtt_host">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>192.168.10.110</string>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label_3">
<property name="text">
<string>port</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_mqtt_port">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="maximumSize">
<size>
<width>70</width>
<height>16777215</height>
</size>
</property>
<property name="text">
<string>1883</string>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label_4">
<property name="text">
<string>Topic</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_mqtt_topic">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>currentsensors/washingmachine</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="pushButton_mqtt_connect">
<property name="text">
<string>Connect</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_save_file">
<item>
Expand Down Expand Up @@ -559,7 +654,7 @@
<rect>
<x>0</x>
<y>0</y>
<width>708</width>
<width>681</width>
<height>22</height>
</rect>
</property>
Expand Down
Loading

0 comments on commit feb371d

Please sign in to comment.