In my previous entry, I demonstrated how the Debug probe revealed the Request and Response data flowing to and from the RSL10-Sense BTLE device and a python script that I wrote to visualize the services and characteristics available. In this entry, I'm going to show you how to request sensors data with Python and how I effectively read the Beacon response.
Before we start, it is important to understand how the Requests (cyan) and Responses (purple) are structured - example below:
Custom Service Request
Using the feedback from the Debug Probe and and with some digging at the RSL10 example firmware, we can see that the command requested (cyan color) is segmented in 3 parts; separated by the '/' (slash ASCII character) .
Token
What seems to be a random ASCII character between '0' and '~' (0x30 to 0x7E)
Node
Name of the sensor that will provide the data. e.g.: AL (Ambient Light) PB (Push Button), EV (Environmental - Temp, Humidity, Pressure, Air Quality)
Handler
Defines the property or in other words, what kind of measurement is requested to the sensor.
e.g. For the "EV" (Environmental Sensor) these are some of the options
- T: Temperature in Celcius
- TF: Temperature in Fahrenheit
- H: Humidity
Custom Service Response
Once more, we will take a look at the debug probe feedback (purple color) to see how the response packet "generally" looks:
Token
The RSL10 device will respond with the same token that was sent in the request; an ASCII character between '0' and '~' (0x30 to 0x7E)
Data Type
When returning numbers, the RSL10 will respond with the following:
- f: when the value of the sensor is a float (or contains decimals)
- i: for integer values
Sensor Value
And finally the value of the sensor
It is important to mention that not all the sensors will return this kind of packet but you get a general idea of how it may work with most sensors.
Collecting data from the Beacon with Python
To collect data from the RSL10 Beacon we will need to do the following (how to retrieve these details was explained in my previous blog):
- Connect to the BTLE device (the RSL10 Beacon)
- Write a request to the Characteristic with WRITE property (described as 'RX_VALUE - Command for BDK') which is Identified with the Handle 0x0010
- If successful, read the Beacon response from the Characteristic identified with the Handle 0x000b (described as 'TX_VALUE - Response from BDK')
- Disconnect or repeat steps 2 and 3
Source code
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Luis Ortiz - luislab.com # March 8, 2020 from bluepy import btle import binascii import subprocess import re import sys from textcolor import textColor import time from datetime import datetime class console_log: def __init__(self, debug = False, ansi = False): self.debug = debug self.ansi = ansi def Sys_Info(self, text:str): if self.debug: if not self.ansi: text = re.sub('(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]', '', text); print (text) def elapsed_time(elapsed): s, ms = divmod(elapsed, 1000) m, s = divmod(s, 60) h, m = divmod(m, 60) time_str = "" if (h > 0): time_str = "%dh " % h if (m > 0): time_str += "%dm " % m if (s > 0): time_str += "%ds " % s if (ms > 0): time_str += "%dms " % ms return time_str.strip() class beacon: def __init__(self, macAddr: str): self.mac = macAddr self.minOrd = ord('0') self.maxOrd = ord('~') self.token_limit = (self.maxOrd - self.minOrd) + 1 # CS.c if (request.token[0] < '0' || request.token[0] > '~') self.rToken = 0 # Request token self.color = textColor() self.cl = console_log(True, True) color = self.color cl = self.cl cl.Sys_Info ("\nConnecting...") self.p = btle.Peripheral(macAddr, btle.ADDR_TYPE_PUBLIC) cl.Sys_Info ("\nConnected [" + color.green(self.p.addr) + "]") for gs in self.p.getServices(): for gc in gs.getCharacteristics(): if gc.supportsRead() and gc.uuid.getCommonName() == str(gc.uuid) and 'NOTIFY' in gc.propertiesToString(): self.rxC = gc cl.Sys_Info('__init__ rxC %s' % self.rxC) elif gc.uuid.getCommonName() == str(gc.uuid) and 'WRITE' in gc.propertiesToString(): self.txC = gc cl.Sys_Info('__init__ txC %s' % self.txC) elif gc.uuid == btle.AssignedNumbers.batteryLevel and 'NOTIFY' in gc.propertiesToString(): self.battC = gc cl.Sys_Info('__init__ battC %s' % self.battC) def disconnect(self): cl = self.cl self.p.disconnect() cl.Sys_Info("\nDisconnected") def getBatteryLevel(self): color = self.color cl = self.cl try: bValue = self.battC.read() sValue = binascii.b2a_hex(bValue).decode('utf-8') cl.Sys_Info(("\nBattery Level: " + color.cyan("0x%s") + \ " (" + color.yellow("%d%%") + ")") \ % (sValue, int(sValue, 16))) return sValue except btle.BTLEException as exc: cl.Sys_Info(exc) return None def readNode(self, node:str, prop:str): color = self.color cl = self.cl try: self.rToken = (self.rToken + 1) % self.token_limit cl.Sys_Info("\nrToken %d" % self.rToken) sReq = chr(self.rToken + self.minOrd) + '/' + node + '/' + prop bReq = sReq.encode('utf-8') cl.Sys_Info("Requested %s" % color.cyan('\'' + sReq + '\'')) startTime = time.time() self.p.writeCharacteristic(0x0010, bReq, False) success = self.p.waitForNotifications(2) endTime = time.time() if not success: cl.Sys_Info("Request timeout.") else: cl.Sys_Info("Notification received in %.2f seconds" % (endTime - startTime)) bReq = self.rxC.read() sReq = bReq.decode('utf-8') cl.Sys_Info ("Read sReq %s" % color.pink(sReq)) result = re.match('(.{1}\/)(.?)(\/)([0-9\.]+$)', sReq) if result: vType = result.group(2) value = result.group(4) return vType, value else: cl.Sys_Info ("No \"re\" match") return None, None except ValueError: return None, None except btle.BTLEException as exc: cl.Sys_Info (exc) return None, None def getNodeValue(self, node:str, prop:str=''): # node (sensor), property color = self.color cl = self.cl if node == 'BL': # Battery Level batteryLevel = int(self.getBatteryLevel(), 16) return batteryLevel elif node == 'EV': # Environmental sensor (Temp, Humidity, Raw Pressure, Indoor Air Quality) if prop in ('T', 'TF', 'H', 'PP', 'P', 'A'): # Temp C valueType, nodeStrValue = self.readNode(node, prop) if nodeStrValue.replace('.', '1').isdigit(): nodeValue = float(nodeStrValue) if prop in ('T'): tempC = nodeValue cl.Sys_Info (("valueType: %s, temp: " + color.yellow("%2.1f°C")) \ % (color.purple("\'" + valueType + "\'"), tempC)) return tempC elif prop in ('TF'): tempF = nodeValue cl.Sys_Info (("valueType: %s, temp: " + color.yellow("%2.1f°F")) \ % (color.purple("\'" + valueType + "\'"), tempF)) return tempF elif prop in ('H'): humidity = nodeValue cl.Sys_Info (("valueType: %s, humidity: " + color.yellow("%2.f%%")) \ % (color.purple("\'" + valueType + "\'"), humidity)) return humidity else: cl.Sys_Info (("valueType: %s, value: " + color.yellow("%2.2f") + ", property: %s") \ % (color.purple("\'" + valueType + "\'"), nodeValue, color.cyan(prop))) return nodeValue else: cl.Sys_Info ("valueType: %s, nodeValue %s" \ % (color.purple("\'" + valueType + "\'"), color.purple("\'" + nodeStrValue + "\'"))) return nodeStrValue elif node == 'AL': # Light Sensor if prop in ('L'): # Lux valueType, nodeStrValue = self.readNode(node, prop) if nodeStrValue.replace('.', '1').isdigit(): lux = float(nodeStrValue) cl.Sys_Info (("valueType: %s, illuminance: " + color.yellow("%2.2f lux")) % (color.purple("\'" + valueType + "\'"), lux)) return lux else: cl.Sys_Info ("valueType: %s, nodeValue %s" % (color.purple("\'" + valueType + "\'"), color.purple("\'" + nodeStrValue + "\'"))) return nodeStrValue class beaconCSV: def __init__(self, fileName): self.fileName = fileName try: with open(self.fileName, mode='xt', encoding='utf-8') as f: f.write("\"Date\",\"TempC\",\"TempF\",\"Humidity\"\n") except FileExistsError: pass def write(self, tempC, tempF, humidity): dt_string = datetime.now().strftime('%Y%m%d %H:%M:%S') fileStr = "\"" + dt_string + "\",%2.1f" % tempC + ",%2.1f" % tempF + ",%2.f" % humidity cl.Sys_Info(fileStr) with open(self.fileName, mode='at', encoding='utf-8') as f: f.write(fileStr + "\n") beacon1 = beacon("60:C0:BF:29:EF:50") beacon1CSV = beaconCSV('beacon1.csv') cl = console_log(True, True) try: while True: tempC = beacon1.getNodeValue('EV', 'T') tempF = tempC * 1.8 + 32 humidity = beacon1.getNodeValue('EV', 'H') batteryLevel = beacon1.getNodeValue('BL') beacon1CSV.write(tempC, tempF, humidity) time.sleep(180) beacon1.disconnect() except KeyboardInterrupt: beacon1.disconnect() print("Bye.")
Below is what the output looks like
The data collected is also stored in a .CSV (example attached)
Blogs in this series
- ON Sweet Home - Introduction
- ON Sweet Home - Getting to know your RSL10
- ON Sweet Home - Collecting the Beacon data
- ON Sweet Home - 3D printed parts
- ON Sweet Home - GUI is alive and the project is complete!