Introduction
After the ESP8266 web server of the door opener responds correctly to the APIs it is the ideal candidate to implement the fundamentals of the Control Center architecture. Until now we saw how the Control Center UI has been created but it should be activated.
The short screen recording below shows how the Door Opener APIs work in a browser during a test.
The full sources of the control center are available in the SuperSmartHome GitHub repository. The application architecture described in this post will be upgraded as the project continues, always following the same approach.
Text to Speech
One of the roles of the Control Center is managing notifications and alarms when a critical situation happens. To do this only the graphical UI maybe not sufficient so I have implemented the text to speech feature with the well-tested script Trans. One of the minor issues of using this command as a signaling message from the Python application is that the program execution is blocked until the command has not completed the audio streaming of the sentence. For this reason, I have created a specific class based on a set of pre-built sentences played in a separate thread. In this way, the main program can execute the Trans command in a subprocess and immediately continue the execution while the sentence is played.
The Sentences File
The sentences are saved in a simple Json file that is updated as the development of the Control Center features goes ahead. The code below shows the first series of messages and alarms I have added in the file.
{ "phrases": 11, "list": [ "Super Smart Home started", "Alert!", "node is not responding", "System is healthy", "Log history restarted", "Checking nodes status", "System is idle", "node is healthy", "Door opened", "Wrong password. Door not opened", "Super Smart Home version 0.1 Alpha" ] }
The Speak Python Class
Many of the tasks executed by the Control Center are concurrent and some of these are time-critical. To optimize the process I have used intensively a multithreading approach. To do this, a good way is to organize the features of the same group of function in classes; it is just the case of the Speak class.
''' Plays sentences accoringly with the sentences.json file Uses the trans bash script to spek audio sentences and optionally also provide the textual format of the desired sentence. This call access to the sentences json file for the strings content. ''' import time import subprocess import json # List with the messages for voice comments text_messages = [''] # Number of text messages num_messages = 0 class Speak(): ''' Class to speak voice comments ''' def __init__(self): ''' Initialization function. Load the sentences list from json file ''' global num_messages global text_messages # Better put the sentences file full path sentences_file = "/media/pi/PiDesktop/SuperSmartHome/control_center/classes/sentences.json" # Loads the message tracks with open(sentences_file) as file: dictionary = json.load(file) num_messages = dictionary['phrases'] text_messages = dictionary['list'] def announce(self, message_id): ''' Speak a message from the standard sentences list ''' # Announce the initial message txt = text_messages[message_id] self.speak(txt) def speak(self, message): ''' Speak a generic message ''' # Text-to-speech command and parameters # Parameters: -sp = speak, -n = narrator voice (not used) TTS = [ '/usr/bin/trans', '-sp' ] self.runCmd([TTS[0], TTS[1], message]) def runCmd(self, cmd): ''' Execute a subprocess command ignoring the return value. :param cmd: The bash command with the parameters ''' proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc.communicate() def getString(self, message_id): ''' Return the corresponding sting in text format ''' return text_messages[message_id]
The principle I followed in every class is to create a series of self-referenced objects that can act from any point of the application, including when launched in a separate thread. The __init__(self) initialization function creates the global variable (inside the class instance) for the number of messages and a list with the messages. In this version of the Python classes, the only limitation is that the external files (included in the same "classes" Python folder have an absolute path. A better solution will be a configuration Json file with all the paths used by the system, available traversal to all the classes.
The class includes the method speak(self, message) to be used from the main application; it prepares the text message to be executed by the Bash Shell subprocess Trans in the runCmd(self, cmd) low-level call, never used by the external. The method speak accepts any string message to be spoken and it can be used also to talk messages not included in the sentencese.json file. The second method announce(self, message_id) instead, only needs the sentence id (the positional number in the sentences.json file) to talk.
The low-level function runCmd(self, cmd) executes the call to the Bash Shell subprocess to call.
In many cases, the same spoken sentence should also be shown in text format on the UI, so I have added the method getString(self, message_id) to retrieve the text of a corresponding sentence instead of sending it to Talk.
Creating Secure, Randomized, OTP
Some of the critical data exchange between nodes is mediated by the Control Center Raspberry Pi Desktop using a two-way 4-digit PIN method (the same used by many sites including Google to secure email login and other sensitive information). Using the simple Python random library the pin generation is weak and it is almost frequent a repetition of the same sequence. As a matter of fact, what I want in this case is cryptographic strong randomization, with very low repeatability. We should distinguish between two kinds of randomization:
- PRNG Pseudo-Random Number Generation
- CSPRING Cryptographically Strong Pseudo-Random Number Generator
While the PRNG approach after a finite series of numbers it is possible to predict what is the next, the CSPRING approach avoids the prediction. Based on some articles on the cryptographic random generation I found a python article that using the system randomization instead of the Python standard random library. Based on the code example and the explained theory (see the Python PEP 506) I wrote the set of methods to create a token of the desired length. I tested the efficiency generating a four-digits toke every three minutes for a day without repetition.
""" Generate cryptographically strong pseudo-random numbers suitable for managing secrets such as account authentication, tokens, and similar. See PEP 506 for more information. https://www.python.org/dev/peps/pep-0506/ """ __all__ = ['choice', 'randbelow', 'randbits', 'SystemRandom', 'token_bytes', 'token_hex', 'token_urlsafe', 'compare_digest', ] import base64 import binascii import os from hmac import compare_digest from random import SystemRandom _sysrand = SystemRandom() randbits = _sysrand.getrandbits choice = _sysrand.choice def randbelow(exclusive_upper_bound): """ Return a random int in the range [0, n). """ if exclusive_upper_bound <= 0: raise ValueError("Upper bound must be positive.") return _sysrand._randbelow(exclusive_upper_bound) DEFAULT_ENTROPY = 32 # number of bytes to return by default def token_bytes(nbytes=None): """ Return a random byte string containing *nbytes* bytes. If *nbytes* is ``None`` or not supplied, a reasonable default is used. >>> token_bytes(16) #doctest:+SKIP b'\\xebr\\x17D*t\\xae\\xd4\\xe3S\\xb6\\xe2\\xebP1\\x8b' """ if nbytes is None: nbytes = DEFAULT_ENTROPY return os.urandom(nbytes) def token_hex(nbytes=None): """ Return a random text string, in hexadecimal. The string has *nbytes* random bytes, each byte converted to two hex digits. If *nbytes* is ``None`` or not supplied, a reasonable default is used. >>> token_hex(16) #doctest:+SKIP 'f9bf78b9a18ce6d46a0cd2b0b86df9da' """ return binascii.hexlify(token_bytes(nbytes)).decode('ascii') def token_urlsafe(nbytes=None): """ Return a random URL-safe text string, in Base64 encoding. The string has *nbytes* random bytes. If *nbytes* is ``None`` or not supplied, a reasonable default is used. >>> token_urlsafe(16) #doctest:+SKIP 'Drmhze6EPcv0fN_81Bj-nA' """ tok = token_bytes(nbytes) return base64.urlsafe_b64encode(tok).rstrip(b'=').decode('ascii')
As shown in the code above, from the random library it is imported the SystemRandom class that uses the system cryptography and randomization based on hardware inputs instead of only software. I have left all the three methods to create different tokens but only the token_bytes() method is used.
The NodesAPI Class
Not only the first Door Open node will use the web-server HTTPS_GET method to exchange information with the Control Center, as well as receive commands; while the communication approach remains the same, only the API and parameters will change. For this reason, I have created the nodes.json configuration file.
Nodes.json
{ "nodes": 7, "nodes_info": [ { "name": "Doorbell", "URL": "https://192.168.1.151", "active": 0 }, { "name": "Door Opener", "URL": "https://192.168.1.150", "active": 1 }, { "name": "Environment", "URL": "https://192.168.1.152", "active": 0 }, { "name": "Lighting", "URL": "https://192.168.1.153", "active": 0 }, { "name": "Apppliances", "URL": "https://192.168.1.154", "active": 0 }, { "name": "Alarms", "URL": "https://192.168.1.154", "active": 0 }, { "name": "Cloud", "URL": "https://192.168.1.155", "active": 0 } ] }
The file is a list of Json objects, one every node with the node name as it will appear on the UI, the static IP address assigned, according to the network topography, and the node status. Until a node is not implemented the active status is set to 0 (False) to avoid unwanted alarms.
The Class
''' Manages the network URL of the nodes and calls the API specific for every node. The URL parameters for the connection are defined in the nodes json file. The URL access to the nodes is secured through the HTTPS access with OpenSSL certificate and a double-check data exchange through the https GET and an OTP password generated by the system. ''' import subprocess import json num_nodes = 0 nodes_dic = [''] class NodesAPI(): ''' Manages the API calls to the nodes end the connection status ''' def __init__(self): ''' Initialization function. Load the configuration from json file ''' global num_nodes global nodes_dic # Better put the sentences file full path nodes_file = "/media/pi/PiDesktop/SuperSmartHome/control_center/classes/nodes.json" # Loads the nodes info with open(nodes_file) as file: dictionary = json.load(file) num_nodes = dictionary['nodes'] nodes_dic = dictionary['nodes_info'] def get_url(self, node_id): ''' Get the URL of the requested node ''' global nodes_dic node_content = nodes_dic[node_id] return node_content['URL'] def get_status(self, node_id): ''' Get the current status of the requested node. If the status is 0 return False as it is not enabled, else return true ''' global nodes_dic node_content = nodes_dic[node_id] if(node_content['active'] == 1): return True else: return False def get_nodename(self, node_id): ''' Get the name of the requested node. ''' global nodes_dic node_content = nodes_dic[node_id] return node_content['name'] def get_nnodes(self): ''' Get the number of nodes in the json file ''' return num_nodes def url_get(self, url): ''' Executes a get to the node with error checking. Note that the URL should be the complete requrest including the API name and parameters, if any Return the URL response (stdout) of False if the remote server is unreachable. ''' # Fixed command name and parameters CURL = [ 'curl', '--insecure'] # Compose the command cmd = [CURL[0], CURL[1], url] # Executes the call to the subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc.communicate() # Check if the request as been completed if(proc.returncode is 0): return stdout else: return 'No' def url_check(self, url): ''' Check for the requested server url. if there is not response (timeout) returns False else returns True. ''' # Fixed command name and parameters CURL = [ 'curl', '--insecure'] # Compose the command cmd = [CURL[0], CURL[1], url] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc.communicate() if(proc.returncode is 0): return True else: return False
Also for this case, the class is self-inclusive and generalized to be able to work with all the nodes (but some other methods will be added further). Following the same approach of the other classes, the method __init__(self) access the nodes.json file and load a dictionary with the node objects, using a fixed path for the configuration file identification.
The three methods get_url(self, node_id), get_status(self, node_id), and get_nodename(self, node_id) return the corresponding value of the selected not object (the if is the numerical position of the node in the dictionary). The methods url_get(self, URL) and url_check(self, URL) instead, execute respectively an API call to the node and check that the node is up and running.
To make the class as much general as possible and avoiding to set complex HTTPS calls mechanisms inside Python the URL access by the API is done using a call to the Linux command curl. We should note that the call includes the parameter "--insecure". The reason is that the x509 certificate set to the ESP8266 is limited to 512 bytes while at least a 2048 bytes certificate is considered secure.
The Control Center Main Application
The control_center.py file – the main application – is divided into four sections:
- Global Parameters
- User Interface
- Timeloop Jobs
- Local functions
Global Parameters
This section defines the application control parameters accessed locally by the user interface class and the other functions, including also
# Temporized recursive jobs based on Timeloop TIME_TICK = 1 # Clock step (sec) OTP_TICK = 60 # OTP Generation (sec) NODES_RETRY = 1 # Number of session retries before raising an error NODES_TIMEOUT = 3 # Number of seconds waiting for a node response
These parameters define the frequency the timed events will occur while the program runs, executed as jobs in separate threads. The whole application act as an event-driven state-machine.
User Interface
We saw that the UI_MainWindow class is automatically generated from the ui Qt file by PySide. The UI_MainWindow class only includes the Python translation of what has been done with Qt Designer (the generated XML file) and should not be changed, because its content is rewritten every time it is recreated by the PySide pyside-ucf command. The MainWindow class we find in the main application instead interfaces this class and manages the user-generated events, as well as updating the content depending on the external state changes of the nodes.
class MainWindow(QMainWindow, Ui_MainWindow): ''' Main window class. Manages the Qt environment and the Pyside converted UI design. ''' def __init__(self): ''' Initialization function. Setup the user interface and show it. ''' global time_start global narrator global nodes_api super(MainWindow, self).__init__() self.setupUi(self) self.labelUpTime.setText(time_start) self.labelAlertDoorOpener.setText(strings.S_NODE_OK) self.assign_widgets() self.show() # Notify the program has started self.update_message_from_id(0) self.t_play_sentence(0) # And initialize the log self.log_clean() def assign_widgets(self): ''' Define the interactive elements and the associated methods ''' self.toolButtonOpenDoor.clicked.connect(self.button_open_door) self.toolButtonNodesHealth.clicked.connect(self.nodes_health) self.toolButtonResetLog.clicked.connect(self.log_clean) self.toolButtonAbout.clicked.connect(self.about) def t_play_sentence(self, sentence_id): ''' Speak a sentence via a separate thread calling the narrator class method announce. ''' global narrator # Create a thread from a function with arguments th = threading.Thread(target=self.play_sentence, args=(str(sentence_id))) # Start the thread th.start() def play_sentence(self, sentence_id): ''' Function associated by the threaded call t_play_sentence() ''' global narrator narrator.announce(int(sentence_id)) def time_otp_set(self): ''' Update the clock timer and last OTP password ''' global otp timeLabel = datetime.now().strftime("%H:%M:%S") timeLabel += strings.S_OTP + otp self.labelTime.setText(timeLabel) def log_update(self, log_string): ''' Update the log list with a new sentence with timestamp ''' timestamp = "[" + datetime.now().strftime("%a, %d %b %Y - %H:%M:%S") + "] " timestamp += log_string # Write the log to the screen self.textBrowserEventsLog.append(timestamp) def log_clean(self): ''' Clean the log list with a new sentence with timestamp ''' global narrator timestamp = "[" + datetime.now().strftime("%a, %d %b %Y - %H:%M:%S") + "] " timestamp += narrator.getString(4) # Write the log to the screen self.textBrowserEventsLog.setText(timestamp) def update_message(self, message, ts = True): ''' Update the realtime message with the specified string prepended with the timestamp. If ts is False the message is shown without timestamp. ''' if ts: timestamp = datetime.now().strftime("%H:%M:%S - ") else: timestamp = "" timestamp += message self.labelRealTimeMessage.setText(timestamp) def update_message_from_id(self, string_id, ts = True): ''' Call the method update_message after retrieving one of the prebuilt strings from the sentences json file ''' global narrator self.update_message(narrator.getString(string_id), ts) def button_open_door(self): ''' Execute the procedure to open the door on button call ''' global nodesAPI global narrator global otp # Temporary disable the buttons until the action is not completed self.toolButtonOpenDoor.setEnabled(False) self.toolButtonNodesHealth.setEnabled(False) # Saves locally the current otp password to be reusable during # the two calls to avoid an unwanted error if the global otp # automatically changes between the two calls.0 door_otp = otp # prepare the first HTTP GET api call door_get = nodes_api.get_url(1) + "/otp?pass=" + str(door_otp) ###### For testing only!!! Comment to enable the opener # door_otp=0 # Executes the call api_return = nodes_api.url_get(door_get) # Check the return code if(api_return is 'No'): # The node is offline or not connected self.open_door_alert() else: # Prepare the second HTTP_GET api call door_get = nodes_api.get_url(1) + "/opendoor?pass=" + str(door_otp) # Executes the call api_return = nodes_api.url_get(door_get) # Check the return code print(api_return) if(api_return is 'No'): # The node is offline or not connected self.open_door_alert() else: if('esp8266' in str(api_return)): self.open_door_opened() else: self.open_door_not_open(str(api_return)) # Enable the buttons self.toolButtonOpenDoor.setEnabled(True) self.toolButtonNodesHealth.setEnabled(True) def open_door_opened(self): ''' Signals to the control center window that the door has been opened ''' global nodes_api global narrator # Create the log message msg = narrator.getString(8) self.log_update(msg) # Remove the alert status if it was set before self.labelAlertDoorOpener.setText(strings.S_NODE_OK) self.t_play_sentence(8) self.update_message(msg) def open_door_alert(self): ''' Create the alerts on the control center window when there is a problem opening the door ''' global nodes_api global narrator # Create the log message msg = nodes_api.get_nodename(1) + " " + narrator.getString(2) self.log_update(msg) # Set the alert message beside the node list self.labelAlertDoorOpener.setText(narrator.getString(1)) self.t_play_sentence(1) self.update_message(msg) def open_door_not_open(self, api_return): ''' The server is up but the door is not open ''' global nodes_api global narrator # Create the log message msg = narrator.getString(9) self.log_update(msg) self.log_update(api_return) # Set the alert message beside the node list self.t_play_sentence(9) self.update_message(msg) def nodes_health(self): ''' Check the LAN nodes health status and manage alarms if some node is unreachable ''' global nodes_api global narrator # Temporary disable the buttons until the action is not completed self.toolButtonOpenDoor.setEnabled(False) self.toolButtonNodesHealth.setEnabled(False) # Update the log self.log_update(narrator.getString(5)) # Loop on all the nodes n = 0 while n < nodes_api.get_nnodes(): # Check if the node is active, else ignore the node if(nodes_api.get_status(n)): url = nodes_api.get_url(n) if(nodes_api.url_check(url) is False): # The node is offline or not connected # Create the log message msg = nodes_api.get_nodename(n) + " " + narrator.getString(2) self.log_update(msg) # Set the alert message beside the node list self.labelAlertDoorOpener.setText(narrator.getString(1)) self.t_play_sentence(1) else: # Create the log message msg = nodes_api.get_nodename(n) + " " + narrator.getString(7) self.log_update(msg) # Set the alert message beside the node list self.labelAlertDoorOpener.setText("") n += 1 # Enable the buttons self.toolButtonOpenDoor.setEnabled(True) self.toolButtonNodesHealth.setEnabled(True) def about(self): ''' Shows the current application version info ''' self.update_message_from_id(10)
As shown in the above code, every user interaction event like pressing a button corresponds to an event managed by a method in this class.
Timeloop Jobs
The most efficient way I have found to precise run timed tasks like the periodical renewal of the OTP password is using the Timeloop library. With this library, every function associated with a periodic event is scheduled as a job that runs in a separate thread. As shown the piece of code below the implementation is easy and the result is very efficient.
@tl.job(interval = timedelta(seconds = OTP_TICK)) def OTP_update(): ''' Update the otp pin ''' global otp otp_create()
Local Functions
This section includes local functions, mostly related to the temporized jobs, that are called during the initialization of the main function, as well as the Timeloop jobs.
Full Content
Already Posted (until now)
Super Smart Home #1 The project
Super Smart Home #2 The Door Opener
Super Smart Home #3 Designing the Control Center
Super Smart Home #4 Activating the Door Opener
Super Smart Home #5 Connecting the First Node to the Control Center
Super Smart Home #6 PSoC6 On the Cloud
Super Smart Home #7 From AWS IoT Core to AWS SiteWise
Super Smart Home #8 The Kitchen Node: Parts, Design and Components
Super Smart Home #9 The Kitchen Node: Circuit and Software
Super Smart Home #10 Building IoT nodes with PSoC6-WiFi-Bt and Mbed OS
Super Smart Home #11 Project Summary, Highlights, and More...
Super Smart Home #12 PSoC6 Local Node: Application Skeleton
Super Smart Home #13 PSoC6 Protection Case
Sources, Circuits, and Documentation
All the software sources, scripts, circuits schematics, and more are available as Open Source material on the SuperSmartHome GitHub repository.
The video episodes of this challenge are repurposed on the blog posts of the site we-are-borg.com
Thanks to
Element14, AWS, and Cypress, main sponsors
Elegoo for 3D printers ad printing material
Digitspace for sensors, actuators, and boards
The friends and community members Jan Cumps and shabaz always available with advice and suggestions.
Top Comments