element14 Community
element14 Community
    Register Log In
  • Site
  • Search
  • Log In Register
  • Community Hub
    Community Hub
    • What's New on element14
    • Feedback and Support
    • Benefits of Membership
    • Personal Blogs
    • Members Area
    • Achievement Levels
  • Learn
    Learn
    • Ask an Expert
    • eBooks
    • element14 presents
    • Learning Center
    • Tech Spotlight
    • STEM Academy
    • Webinars, Training and Events
    • Learning Groups
  • Technologies
    Technologies
    • 3D Printing
    • FPGA
    • Industrial Automation
    • Internet of Things
    • Power & Energy
    • Sensors
    • Technology Groups
  • Challenges & Projects
    Challenges & Projects
    • Design Challenges
    • element14 presents Projects
    • Project14
    • Arduino Projects
    • Raspberry Pi Projects
    • Project Groups
  • Products
    Products
    • Arduino
    • Avnet Boards Community
    • Dev Tools
    • Manufacturers
    • Multicomp Pro
    • Product Groups
    • Raspberry Pi
    • RoadTests & Reviews
  • Store
    Store
    • Visit Your Store
    • Choose another store...
      • Europe
      •  Austria (German)
      •  Belgium (Dutch, French)
      •  Bulgaria (Bulgarian)
      •  Czech Republic (Czech)
      •  Denmark (Danish)
      •  Estonia (Estonian)
      •  Finland (Finnish)
      •  France (French)
      •  Germany (German)
      •  Hungary (Hungarian)
      •  Ireland
      •  Israel
      •  Italy (Italian)
      •  Latvia (Latvian)
      •  
      •  Lithuania (Lithuanian)
      •  Netherlands (Dutch)
      •  Norway (Norwegian)
      •  Poland (Polish)
      •  Portugal (Portuguese)
      •  Romania (Romanian)
      •  Russia (Russian)
      •  Slovakia (Slovak)
      •  Slovenia (Slovenian)
      •  Spain (Spanish)
      •  Sweden (Swedish)
      •  Switzerland(German, French)
      •  Turkey (Turkish)
      •  United Kingdom
      • Asia Pacific
      •  Australia
      •  China
      •  Hong Kong
      •  India
      •  Korea (Korean)
      •  Malaysia
      •  New Zealand
      •  Philippines
      •  Singapore
      •  Taiwan
      •  Thailand (Thai)
      • Americas
      •  Brazil (Portuguese)
      •  Canada
      •  Mexico (Spanish)
      •  United States
      Can't find the country/region you're looking for? Visit our export site or find a local distributor.
  • Translate
  • Profile
  • Settings
Eye On Intelligence Challenge
  • Challenges & Projects
  • Design Challenges
  • Eye On Intelligence Challenge
  • More
  • Cancel
Eye On Intelligence Challenge
Blog ADAS and Vehicle Monitoring System - 4G, Video Streaming, Canbus Integration
  • Blog
  • Forum
  • Documents
  • Polls
  • Files
  • Members
  • Mentions
  • Sub-Groups
  • Tags
  • More
  • Cancel
  • New
Join Eye On Intelligence Challenge to participate - click to join for free!
  • Share
  • More
  • Cancel
Group Actions
  • Group RSS
  • More
  • Cancel
Engagement
  • Author Author: vmate
  • Date Created: 31 Oct 2024 6:32 PM Date Created
  • Views 892 views
  • Likes 6 likes
  • Comments 2 comments
  • video processing
  • python
  • 4g
  • Eye on Intelligence Challenge
  • embedded
  • raspberry pi
  • gps
  • canbus
  • pi pico
  • can
Related
Recommended

ADAS and Vehicle Monitoring System - 4G, Video Streaming, Canbus Integration

vmate
vmate
31 Oct 2024

In this blog, I'll go through setting up the communications hardware, and software setup for video streaming and data logging.

Internet Connectivity

I decided on using an EM7455 WWAN card, typically used in laptops, to provide the system with internet connectivity. This module also has a built-in GPS receiver, so I won’t have to add a separate one later.

image

Newer WWAN cards use an M.2 connector, which is typically associated with PCI-e or SATA connectivity, but these modules only utilize USB.
Initially I thought I’ll have to make a custom PCB for this too, but I found several ones on Amazon that already do exactly what I need. Their purpose is to provide the module with 3.3V, have a SIM card slot that directly connects to the M.2 slot, and a male USB port also directly going to the M.2 connector.
The RF connectors on the module at first look like u.FL, but they are MHF4. Thankfully their PCB footprints are similar enough that I was able to solder on some u.FL connectors instead, and use cheap SMA pigtail cables.

image

I also added a DC injector for the GPS port, to power an active patch antenna. (there are 3 SMA connectors total, the bottom one is obscured in this image - two for 4G with diversity, one for GPS).

Software setup

Setting up the module to work with Linux was really confusing at first. I couldn’t find any guides that worked, some of them are simply wrong(or don't apply to my module, even though it's the same model), some of them are really outdated. The modules can also have different OEM configurations that lock them down and/or need different magic values.

Here’s a quick overview:

These modules can expose multiple different kinds of network interfaces and serial ports. First step is to enable everything we need. In the datasheet for my module, this is referred to as “USB composition”.  I ended up with 3 serial ports(two for 4G control related communication, one for GPS data) and one network interface.

Afterwards, use ModemManager to actually set up the modem. I got stuck here for a while, turns out I needed to do an “FCC unlock”, which is done by using this command:

sudo ln -sft /etc/ModemManager/fcc-unlock.d /usr/share/ModemManager/fcc-unlock.available.d/*

Then I was able to set everything up and have internet access. This setup wasn’t persistent though, so instead of manually configuring the modem each boot, I used NetworkManager to handle the WWAN connection. nmtui provides a graphical setup to configure all required parameters, so this part was simple enough. This approach still uses ModemManager in the background, just managed by NetworkManager, with the added benefits of automatically setting up the module on boot, and also handling reconfiguring everything if the module gets unplugged and plugged back in.

Sending AT commands to the modem for configuration purposes is a bit tricky though, as ModemManager "takes" the control serial port for itself. The solution is to use ModemManager in debug mode, by creating a systemd service override.

sudo systemctl edit ModemManager

Then clearing the previous ExecStart value by setting it to nothing, and afterwards adding our new value:

[Service]
ExecStart=
ExecStart=/usr/sbin/ModemManager --debug

If the blank "ExecStart=" line is omitted, the value configured underneath will be appended to the original value instead of overwritten.

Now it is possible to send AT commands through ModemManager, and get the output back too. For example, enabling the DC output on the GPS port looks like this:

sudo mmcli -m 0 --command='AT+WANT=1'

If someone finds this blog post looking for help with setting a modem up, here are some resources that turned out to be super useful:

https://wiki.archlinux.org/title/Mobile_broadband_modem

https://neilzone.co.uk/2024/01/getting-the-sierra-wireless-em7455-lte-modem-working-in-a-thinkpad-with-debian-12-linux-with-gps/

https://github.com/danielewood/sierra-wireless-modems

https://florian.sesser.at/work/fcc-unlocking-sierra-wireless-em7455-and-others-on-linux/

GPS

Setting up GPS was similarly painful. Even after enabling everything through AT commands, nothing appeared in the serial port that should be outputting NMEA data.

The issue turned out to be a magic command needs to be sent to the serial port. It looks like this might also depend on firmware, because a lot of posts mention a different command than what works for me. I had to send “$GPS_START” to /dev/ttyUSB1 at 9600 baud for the NMEA output to start.

Here are all the AT commands I issued to configure GPS:

Enable "hidden" commands:

sudo mmcli -m 0 --command='AT!ENTERCND="A710"'

Enable GPS:

sudo mmcli -m 0 --command='AT!CUSTOM="GPSENABLE",1'

Use dedicated GPS antenna port instead of sharing with the 4G port:

sudo mmcli -m 0 --command='AT!CUSTOM="GPSSEL",0'

Enable GPS autostart, use standalone fix, 255 second max wait time for fix, no requested min accuracy, 1 seconds between fixes

sudo mmcli -m 0 --command='AT!GPSAUTOSTART=1,1,60,4294967280,1'

Set A-GPS to use User Plane instead of Control Plane:

sudo mmcli -m 0 --command='AT!GPSMOMETHOD=1'

Enable NMEA output and set output rate to 1hz:

sudo mmcli -m 0 --command='AT!GPSNMEACONFIG=1,1'

Enable all NMEA sentences:

sudo mmcli -m 0 --command='AT!GPSNMEASENTENCE=7B9B'

Enable all positioning modes:

sudo mmcli -m 0 --command='AT!GPSPOSMODE=7F'

Enable DC power output for the GPS antenna port:

sudo mmcli -m 0 --command='AT+WANT=1'

And then send the magic string $GPS_START to the NMEA serial port.

After I got GPS configured on the modem side, I installed GPSD, and changed a line in its configuration file at /etc/default/gpsd to point it to the correct serial port.

I only discovered the option for enabling 3.3V output on the GPS port while trying to get the module to work under Linux, meaning my bulky DC injector was redundant. I didn’t really like the enclosure I made anyways, and the 4G module interfered with the GPS receiver, so I made a new, significantly smaller enclosure, featuring lots of shielding:

image

image

One annoyance about using aluminium tape is that the glue prevents layers of tape from making electrical contact with each other. This means the shielding either has to be one continuous piece of tape, or somehow they need to be connected together.

A good solution I found is to use a very thin wire (I just pulled out a strand from a cable), and zig-zag it around the shielding, taping it down every once in a while to hold it down and force it to make contact with the shielding. This also provides an easy way to connect all the shielding to a grounding point, by just soldering the “leftover” wire. Here are some pictures from shielding a Pi HQ Camera the same way, that show it much better:

image

image

VPN

To securely communicate with the server used for video storage, and to remotely access the Pi over SSH, I opted to use OpenVPN.

Basically, there’s a server, and any clients that connect to it will be part of a network, where all the devices can communicate with each other as if they were on a local network. It works just as if there was a new ethernet interface added to each device, and all of those interfaces plugged into a common Ethernet switch.

This is great, because all the encryption and cryptography are handled by OpenVPN, and unsecured connections can be used between all the clients, as if they were on a local network, without having to worry about security. I also won’t need to open any ports for any of the devices, with the exception of a single port on the server, and static IP (or dynamic DNS) is also only required on the server.

To SSH into the Pi, all I’ll have to do is connect to the VPN, and then SSH to the "vpn internal" IP of the Pi.

There are many great guides about setting up an OpenVPN server, they go into much more detail than I have time for in this blog, here is a good one for example:

https://www.digitalocean.com/community/tutorials/how-to-set-up-and-configure-an-openvpn-server-on-ubuntu-20-04

Along with some extra options and troubleshooting:

https://wiki.archlinux.org/title/OpenVPN

I configured a static IP for each of the devices that will be connecting, by adding this line to each client’s configuration file on the server:

ifconfig-push *ip* *netmask*

(see the Arch Linux wiki for setting up these per-client configuration files)

Now all connected devices can directly reach each other, as if they were on the same local network.

Data Logging

For storing and displaying all sorts of numeric data, I decided on using InfluxDB and Grafana. InfluxDB is a database for storing time-series data, basically just numbers that are logged over time. Grafana is a web UI that can visualize data from many kinds of databases.

The idea is to install InfluxDB both on the Pi and server, and tell the Pi to replicate to the server. InfluxDB should automatically handle synchronization while making sure everything goes smoothly.

I wrote the following code to act as a communications hub for the Pi Pico (see the previous blog post, the short version is that it handles charging mostly):

import zmq
import json
import serial
import threading
import os

# Read serial port and forward to all subscribers
def serial_to_pub():
    ser = serial.Serial('/dev/ttyACM0', 115200)
    context = zmq.Context()
    pub_socket = context.socket(zmq.PUB)
    pub_socket.bind("tcp://*:5050")
    
    while True:
        line = ser.readline().decode('utf-8').strip()
        try:
            data = json.loads(line)
            pub_socket.send_json(data)
        except json.JSONDecodeError:
            print("Received invalid JSON:", line)

# Take data from clients and forward to serial port
def req_to_serial():
    context = zmq.Context()
    rep_socket = context.socket(zmq.REP)
    rep_socket.bind("tcp://*:5051")
    
    ser = serial.Serial('/dev/ttyACM0', 115200)
    
    while True:
        message = rep_socket.recv_string().strip()
        ser.write((message + '\n').encode('utf-8'))
        print("Forwarded to serial:", message)
        rep_socket.send_string("OK")

# Start threads
threading.Thread(target=serial_to_pub).start()
threading.Thread(target=req_to_serial).start()

I am using ZeroMQ for communication between local scripts, here’s a quick overview of the two modes I utilize:

- REQ/REP: There’s a server and a client. A client can send a request to the server, which then has to reply. Exactly one request, then exactly one reply. One server can have many clients connected to it. The server cannot send a message to the client unsolicited.

- PUB/SUB: A server broadcasts data to its clients, no need for requests to reply to.

 The PUB/SUB mode is used to broadcast data coming in from the Pi Pico over serial, which is mostly just status reporting from the chargers. Any other script will be able to connect to this ZeroMQ socket, and receive data from the charger.

REQ/REP mode is used for sending data to the Pi Pico. If another script wants to talk to the Pico, it sends a request to the communication hub containing the data to send to the Pico. The communication hub then forwards that to the serial port, and replies back with an “OK”.

For the actual data logging, the following code handles writing data to InfluxDB from various sources. The first one is the ZeroMQ socket where the charger data is available from, the second one is GPSD.

import influxdb_client
from influxdb_client.client.write_api import ASYNCHRONOUS, SYNCHRONOUS
import json
import zmq
import time
import gps
import queue
import threading

GPS_STANDBY_WRITE_INTERVAL_SECS = 5
GPS_ACTIVE_WRITE_INTERVAL_SECS = 1
CHG_STANDBY_WRITE_INTERVAL_SECS = 10
CHG_ACTIVE_WRITE_INTERVAL_SECS = 5

MAX_WRITE_BATCH = 20

API_TOKEN="token"
client = influxdb_client.InfluxDBClient(url="127.0.0.1:8086", token=API_TOKEN, org="org")
write_api = client.write_api(write_options=SYNCHRONOUS)
point_queue = queue.Queue()
gps_write_interval_secs = 5
chg_write_interval_secs = 10

gps_last_write_time = 0
def gps_thread():
    global gps_last_write_time

    session = gps.gps(mode=gps.WATCH_ENABLE)
    while 0 == session.read():
        try:
            if not (gps.MODE_SET & session.valid):
                continue

            if time.time() - gps_last_write_time < gps_write_interval_secs:
                continue
            gps_last_write_time = time.time()

            point = influxdb_client.Point("gps")

            if gps.isfinite(session.fix.latitude) and gps.isfinite(session.fix.longitude):
                point = point.field("latitude", session.fix.latitude).field("longitude", session.fix.longitude)

            if gps.isfinite(session.fix.speed):
                point = point.field("speed", session.fix.speed)

            if gps.isfinite(session.fix.altMSL):
                point = point.field("altitude", session.fix.altMSL)

            if gps.isfinite(session.fix.mode):
                point = point.field("fixType", session.fix.mode)

            if gps.isfinite(session.fix.track):
                point = point.field("track", session.fix.track)

            if gps.isfinite(session.fix.epd):
                point = point.field("track_error", session.fix.epd)

            if gps.isfinite(session.fix.eps):
                point = point.field("speed_error", session.fix.eps)

            if gps.isfinite(session.fix.epx):
                point = point.field("latitude_error", session.fix.epx)

            if gps.isfinite(session.fix.epy):
                point = point.field("longitude_error", session.fix.epy)

            if gps.isfinite(session.fix.epv):
                point = point.field("altitude_error", session.fix.epv)

            point_queue.put(point)

        except Exception as e:
            print(f"Failed to get GPS data: {e}")
            continue
    

zmq_last_write_time = 0
def zmq_thread():
    global zmq_last_write_time
    global chg_write_interval_secs
    global gps_write_interval_secs
    # Set up ZMQ
    context = zmq.Context()
    sub_socket = context.socket(zmq.SUB)
    sub_socket.connect("tcp://localhost:5050")
    sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")  # Subscribe to all messages

    while True:
        try:
            # Check for messages from the MCU
            line = sub_socket.recv_string().strip()

            # Skip writing to try and keep up write_interval_secs rate
            if time.time() - zmq_last_write_time < chg_write_interval_secs:
                continue
            zmq_last_write_time = time.time()

            data = json.loads(line)
            chargers = data.get("chargers", [])
            charger_points = []
            for charger in chargers:
                id = f"bq{charger['id']}"
                inputVoltage = float(charger.get("inputVoltage"))
                inputCurrent = float(charger.get("inputCurrent"))
                batteryVoltage = float(charger.get("batteryVoltage"))
                chargeCurrent = float(charger.get("chargeCurrent"))
                temperature = float(charger.get("temperature"))

                if batteryVoltage > 10:
                    chg_write_interval_secs = CHG_ACTIVE_WRITE_INTERVAL_SECS
                    gps_write_interval_secs = GPS_ACTIVE_WRITE_INTERVAL_SECS
                else:
                    chg_write_interval_secs = CHG_STANDBY_WRITE_INTERVAL_SECS
                    gps_write_interval_secs = GPS_STANDBY_WRITE_INTERVAL_SECS

                charger_points.append(
                    influxdb_client.Point(id)
                    .field("inputVoltage", inputVoltage)
                    .field("inputCurrent", inputCurrent)
                    .field("batteryVoltage", batteryVoltage)
                    .field("chargeCurrent", chargeCurrent)
                    .field("temperature", temperature)
                )

            battery = data.get("battery", {})
            batteryVoltage = float(battery.get("voltage"))
            batteryCurrent = float(battery.get("current"))
            batteryWh = float(battery.get("remainingWh"))

            battery_points = [
                influxdb_client.Point("battery")
                .field("voltage", batteryVoltage)
                .field("current", batteryCurrent)
                .field("remainingWh", batteryWh)
            ]

            generatorVoltage = float(data.get("generatorVoltage"))
            fanSpeed = int(data.get("fanSpeed"))
            forceBackup = bool(data.get("forceBackupCharging"))

            misc_points = [
                influxdb_client.Point("misc")
                .field("generatorVoltage", generatorVoltage)
                .field("fanSpeed", fanSpeed)
                .field("forceBackupCharging", forceBackup)
            ]

            all_points = charger_points + battery_points + misc_points
            for point in all_points:
                point_queue.put(point)
        except Exception as e:
            print(f"Failed to fetch ZMQ data: {e}")
            continue

if __name__ == "__main__":
    gps_thread = threading.Thread(target=gps_thread)
    gps_thread.start()

    zmq_thread = threading.Thread(target=zmq_thread)
    zmq_thread.start()
    
    while True:
        try:
            write_api.write(bucket="mvpp", org="org", record=point_queue.get())
        except Exception as e:
            print(f"Failed to get/write point: {e}")
            continue

The logging interval depends on the car generator voltage measured by the Pi Pico, as there is no need for frequent data logging when the car is parked.

Grafana

Grafana is a web UI used for visualizing information from databases. The basic idea is to pick a visualization type, like a line chart or bar graph, and write a query, and the returned data will be displayed (with some extra configuration usually required, to make sure everything looks nice). These visualizations then can be added to a “Dashboard”.

After spending some time configuring charts and writing queries, I ended up with this dashboard:

image

The dashboards can also be set up to auto update as often as once every 5 seconds, and many other types of visualizations are also available, so Grafana is suitable for live monitoring too, not just browsing historical data.

Here’s some data from the GPS:

image

Setting up replication to the remote server was relatively easy, just had to create an API key on the server with write permissions, and configure InfluxDB on the Pi to replicate to the database on the server, while giving it the API key generated earlier.

InfluxDB has some issues unfortunately, one of them being that replication takes forever if datapoints are written one-by-one on the Pi, instead of in groups.

It seems like replication is just "replaying" the data insertion requests as they came in originally, instead of grouping a bunch of them into a single request. This is the reason for collecting all the datapoints into one big list, and writing them at once in the logging python script. This puts many values in a single request (which would be irrelevant between the python script and local InfluxDB database as bandwidth between local processes is essentially unlimited) and therefore speeds up replication to the remote server significantly.

Video Streaming

After doing some research, I couldn’t find any suitable way of streaming video from the Pi to the server with minimal latency, while also being able to handle the random dropouts and slowdowns of the 4G network connection, so I started making my own.

I used ZeroMQ for this as well, in the REQ/REP mode. The Pi sends a chunk to the server, the server replies if everything arrived correctly. If not, the chunk is sent again.

For now, I’m using a Pi HQ camera, compressing the video stream with H264 utilizing the hardware encoder of the Pi4, putting the encoded video chunks into a deque (double ended queue, a doubly linked list basically) at one end, and taking the chunks out at the other end to send to the server. If successfully transferred, the chunk is deleted from the deque.

A deque is great for this, because it doesn’t need continuous memory and therefore doesn't require reallocation. Each element can be allocated anywhere in memory, with no regard to order either, and each element stores a pointer to the previous and next elements. Whenever adding an element, just allocate it anywhere, update the previous element’s “next” pointer to point to this new element, and set the new element’s “prev” pointer to point to the previous element. This is better than a regular linked list, because it can be traversed in both directions instead of just forward, while having basically no overhead (well, 8 bytes per element).

If network conditions are good, the deque will almost always be empty, since every chunk put in at the beginning will be immediately taken out and transferred to the server. If network conditions deteriorate, syncing might slow down, and the deque begins to grow. As sync speeds improve, the built-up data is synced to the server oldest-first.

I set a limit of 512MB for the deque size, which can fit roughly 12 minutes of video in case there’s no internet connection at all. In the future, I’m planning to implement a feature to start saving this video to an external SSD, and sync it to the server later, but for now, the end of the deque gets discarded if it grows past 512MB.

The H264 encode bitrate is adjusted based on generator voltage(which depends on whether the engine is running or not), like with the charger logging interval. I’ve settled on 6Mbps when driving, 1Mbps when parked.

Deque size and average sync speed are also logged to InfluxDB.

To ensure chunks are successfully transmitted, each chunk of data is SHA256 hashed, and the last 4 bytes of this hash is appended to the data. The receiver will use this to verify that everything arrived correctly.

This is the sender part:

import io, time, zmq, collections, hashlib, threading, struct
from typing import Deque

MAX_BUF_SIZE = 512_000_000 # 512MB


class SyncbufSource(io.BufferedIOBase):
    bytes_buffer: Deque[bytes] = collections.deque()
    server_address = None
    zmq_context = None
    zmq_socket = None
    sync_thread = None
    sync_thread_run = True
    write_callback = None
    sync_callback = None
    transmit_id: int = 0

    def __init__(self, server_address, write_callback = None, sync_callback = None):
        # Socket initialization
        self.zmq_context = zmq.Context()
        self.server_address = server_address

        # Sync thread initialization
        self.sync_thread_run = True
        self.sync_thread = threading.Thread(target=self.sync_loop)
        self.sync_thread.setDaemon(True)
        self.sync_thread.start()
        
        self.write_callback = write_callback
        self.sync_callback = sync_callback
    
    def buffer_size(self):
        byte_count = 0
        buf_len = len(self.bytes_buffer)
        try:
            for i in range(buf_len):
                byte_count += len(self.bytes_buffer[i])
        except IndexError:
            pass
        return byte_count
    
    def stop_sync(self):
        self.sync_thread_run = False
        self.sync_thread.join()
    
    def start_sync(self):
        self.sync_thread_run = True
        self.sync_thread = threading.Thread(target=self.sync_loop)
        self.sync_thread.start()

    def write(self, buf: bytes):
        self.bytes_buffer.append(buf)
        if self.write_callback:
            self.write_callback(len(buf))
        #print(f"Wrote {len(buf)} bytes to buffer, buf len: {len(self.bytes_buffer)}, total size: {self.buffer_size()} bytes")

    def trim_buffer(self):
        while self.buffer_size() > MAX_BUF_SIZE:
            trimmed_size = len(self.bytes_buffer.popleft())
            print(f"Trimmed {trimmed_size} bytes from buffer")
    
    def init_zmq(self):
        self.zmq_socket = self.zmq_context.socket(zmq.REQ)
        self.zmq_socket.connect(self.server_address)
        self.zmq_socket.setsockopt(zmq.SNDTIMEO, 0)
        self.zmq_socket.setsockopt(zmq.LINGER, 0)

    def get_buffer_stats(self):
        return {
            "buffer_size": self.buffer_size(),
            "buffer_len": len(self.bytes_buffer)
        }
    
    def zmq_socket_reset(self):
        self.zmq_socket.setsockopt(zmq.LINGER, 0)
        self.zmq_socket.close()
        self.init_zmq()

    def zmq_reliable_communicate(self, message):
        # Initialize socket if not already initialized
        if not self.zmq_socket:
            self.init_zmq()

        # Send message and save start time
        self.zmq_socket.send(message)
        start_time = time.time()

        while True:
            # Check if response is available, blocks for 1 second max
            if self.zmq_socket.poll(1000) & zmq.POLLIN != 0:
                return self.zmq_socket.recv()

            # If still no response after 10 seconds, reset and reconnect socket
            if start_time + 10 < time.time():
                self.trim_buffer() # Since the most likely scenario for buffer growing too big is a disconnected socket, we should do this here too
                print("No ZMQ response received after 10 seconds, resetting socket")
                self.zmq_socket_reset()

                # Try sending message again
                self.zmq_socket.send(message)
                start_time = time.time()


    def sync_loop(self, target_size: int = 128_000, min_size: int = 16_000):
        while self.sync_thread_run:
            # Nothing to sync
            if not self.bytes_buffer:
                time.sleep(0.05)
                continue

            # Only sync if there's at least min_size bytes in buffer
            if self.buffer_size() < min_size:
                time.sleep(0.05)
                continue

            self.trim_buffer()

            # Gather chunks to send to server, until target_size is reached at maximum
            transmit_id_bytes = struct.pack('<I', self.transmit_id)
            chunks = bytearray(transmit_id_bytes)
            while self.bytes_buffer:
                new_chunk = self.bytes_buffer.popleft()
                chunks.extend(new_chunk)
                if(len(chunks) >= target_size):
                    break
            
            # Append sha256 hash of chunks to the end of chunks
            sha = hashlib.sha256()
            sha.update(chunks)
            digest = sha.digest()
            chunks.extend(digest[:4]) # First 32bits of sha256 hash is more than enough

            # Continuously try sending chunks to server until sent successfully
            while True:
                resp = self.zmq_reliable_communicate(chunks)

                # Check for length
                if len(resp) != 4:
                    print(f"Invalid response length: {len(resp)}")
                    continue 

                # Magic bytes for mismatching hash(should never happen)
                if resp == b'\xFF\xFF\xFF\xFF':
                    print("Server received mismatching hash")
                    continue

                # Check if sequence number is correct
                receive_id = struct.unpack('<I', resp[:4])[0]
                if receive_id != self.transmit_id:
                    print(f"Transmit ID and Receive ID mismatch. Transmit ID: {self.transmit_id}, Receive ID: {receive_id}")
                    continue

                # Everything looks in order
                else:
                    break
            
            self.transmit_id += 1
            
            if self.sync_callback:
                self.sync_callback(len(chunks) - 4)

    def detect_nal_units(self, buf: bytes):
        start_code_1 = b'\x00\x00\x01'
        start_code_2 = b'\x00\x00\x00\x01'
        start_codes = [start_code_1, start_code_2]

        start = 0
        while start < len(buf):
            for start_code in start_codes:
                start_pos = buf.find(start_code, start)
                if start_pos != -1:
                    # Find end of NAL unit
                    next_start_pos = len(buf)
                    for next_start_code in start_codes:
                        next_start_pos = buf.find(next_start_code, start_pos + len(start_code))
                        if next_start_pos != -1:
                            break
                    
                    # Calculate NAL unit size
                    if next_start_pos == -1:
                        nal_unit_size = len(buf) - start_pos - len(start_code)
                    else:
                        nal_unit_size = next_start_pos - start_pos - len(start_code)

                    nal_unit_type = buf[start_pos + len(start_code)] & 0x1F
                    print(f"Found NAL unit type: {nal_unit_type}, Size: {nal_unit_size} bytes")

                    start = start_pos + len(start_code)
                    break
            else:
                break

It can be used with anything that tries to write to a file-like object, for example picamera2's capture functions. Here's the code handling video capture, streaming to the server, and logging related parameters to InfluxDB:

from picamera2 import Picamera2
from picamera2.encoders import H264Encoder, Quality
from picamera2.outputs import FileOutput
from libcamera import Transform, controls
import time
from syncbuf_source import SyncbufSource
import signal
import sys
import zmq
import json
import influxdb_client
import collections
from influxdb_client.client.write_api import ASYNCHRONOUS, SYNCHRONOUS
from typing import Deque

DB_LOG_INTERVAL = 5
STATUS_PRINT_INTERVAL = 30
SYNC_SPEED_AVG_WINDOW_SECS = 10
API_TOKEN="token"

def signal_handler(sig, frame):
    print("Stopping camera")
    picam2.stop()
    picam2.stop_encoder()
    print("Exiting")
    sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)

# Keep track of the last time picamera gave us data
last_write = time.time()
def write_callback(write_size):
    global last_write
    if write_size > 0:
        last_write = time.time()

# Measure sync speed
sync_stats = collections.deque()
def sync_callback(synced_bytes):
    sync_stats.append((time.time(), synced_bytes))

tuning = Picamera2.load_tuning_file("imx477.json")
algo = Picamera2.find_tuning_algo(tuning, "rpi.agc")

for x in range(len(algo["channels"])):
    algo["channels"][x]["metering_modes"]["custom"] = {"weights": [1,1,1,0,1,1,1,1,1,0,1,0,0,1,1]}

picam2 = Picamera2(tuning=tuning)

config = picam2.create_video_configuration(
    raw = None,
    lores = None,
    main = {
        "size": (1440, 1080)
    },
    buffer_count = 8,
    transform=Transform(hflip=True, vflip=True)
)

picam2.align_configuration(config)
picam2.configure(config)
picam2.set_controls({"FrameDurationLimits": (62_500, 62_500)}) # 16 FPS default
picam2.set_controls({"AeMeteringMode": controls.AeMeteringModeEnum.Custom})

print(picam2.controls)

syncbuf = SyncbufSource("tcp://10.255.3.2:5555", write_callback=write_callback, sync_callback=sync_callback)
output = FileOutput(syncbuf)
encoder = H264Encoder(repeat=True, iperiod=8, enable_sps_framerate=True, framerate=16, bitrate=6_000_000)
picam2.start_encoder(encoder, output)
picam2.start()

context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://localhost:5050")
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")  # Subscribe to all messages

client = influxdb_client.InfluxDBClient(url="127.0.0.1:8086", token=API_TOKEN, org="org")
write_api = client.write_api(write_options=ASYNCHRONOUS)

hq_mode = True # Whether we are running in high-FPS high-bitrate mode, or low-FPS low-bitrate mode

time.sleep(1) # Don't immediately switch from high to low bitrate mode if the generator voltage is low on startup

db_log_timer = time.time()
status_print_timer = time.time()

while True:
    try: # This is ugly, but this loop really shouldn't crash. The main priority is to keep the camera running in the thread managed by picamera2, and crashing the main loop will stop the picamera2 thread.
        time.sleep(0.05)
        # Check for messages from the MCU
        if sub_socket.poll(0) & zmq.POLLIN != 0:
            line = sub_socket.recv_string().strip()
            try:
                data = json.loads(line)
                generatorVoltage = float(data.get("generatorVoltage"))
                
                if generatorVoltage < 10 and hq_mode:
                    print("Switching to low bitrate mode")
                    hq_mode = False
                    picam2.stop()
                    picam2.stop_encoder()
                    encoder = H264Encoder(repeat=True, iperiod=8, enable_sps_framerate=True, framerate=4, bitrate=1_000_000)
                    picam2.set_controls({"FrameDurationLimits": (250_000, 250_000)}) # 4 FPS
                    picam2.start_encoder(encoder, output)
                    picam2.start()
                
                if generatorVoltage >= 10 and not hq_mode:
                    print("Switching to high bitrate mode")
                    hq_mode = True
                    picam2.stop()
                    picam2.stop_encoder()
                    encoder = H264Encoder(repeat=True, iperiod=8, enable_sps_framerate=True, framerate=16, bitrate=6_000_000)
                    picam2.set_controls({"FrameDurationLimits": (62_500, 62_500)}) # 16 FPS
                    picam2.start_encoder(encoder, output)
                    picam2.start()
                
            except json.JSONDecodeError:
                print("Failed to decode JSON")
                continue
        
        if last_write + 10 < time.time():
            print("Camera has stopped writing data, attempting restart")
            picam2.stop()
            picam2.stop_encoder()
            time.sleep(1)
            picam2.start_encoder(encoder, output)
            picam2.start()
            last_write = time.time()
        
        if db_log_timer + DB_LOG_INTERVAL < time.time():
            db_log_timer = time.time()

            # Calculate average sync speed in the last 5 seconds
            # Remove stats older than 5 seconds
            threshold = time.time() - SYNC_SPEED_AVG_WINDOW_SECS
            while sync_stats and sync_stats[0][0] < threshold:
                sync_stats.popleft()
            
            # Sum up transferred bytes
            transferred_bytes = 0
            stat_count = len(sync_stats)
            for x in range(stat_count):
                transferred_bytes += sync_stats[x][1]
            
            # Calculate average sync speed
            transferred_bytes /= SYNC_SPEED_AVG_WINDOW_SECS

            # Log buffer stats
            buf_stats = syncbuf.get_buffer_stats()

            points = [
                influxdb_client.Point("misc")
                .field("bufferSize", buf_stats['buffer_size'])
                .field("bufferCount", buf_stats['buffer_len'])
                .field("syncSpeed", transferred_bytes)
            ]
            write_api.write(bucket="mvpp", org="org", record=points)
        
        if status_print_timer + STATUS_PRINT_INTERVAL < time.time():
            status_print_timer = time.time()
            
            buf_stats = syncbuf.get_buffer_stats()
            print(f"Buffer length: {buf_stats['buffer_len']}, total size: {buf_stats['buffer_size']} bytes")
    except Exception as e:
        print(f"Main loop exception: {e}")

With all of this data logged, I’ve put together another Grafana dashboard, made for real-time monitoring:

image

The dashboard can be set to auto-update, with the default configuration allowing 5 seconds per refresh at the fastest.

By setting min_refresh_interval in the Grafana configuration file, this can be overridden to be more frequent. After restarting Grafana, in the dashboard settings, add new custom values for "Auto Refresh", and afterwards they will be visible in the drop-down box in the top right:

image

Receiving Video

Receiving the video stream on the server is quite simple, just receive from the ZeroMQ socket, extract the checksum in the last 4 bytes of the data, hash the rest, compare with the extracted checksum, and reply back to the sender, either confirming a successful transfer, or asking for a resend. A sequence number is also used so out-of-order transfers can be detected (although in the current implementation, they are impossible, since a new chunk is only sent once the previous one arrived successfully).

import zmq
import hashlib
import struct
import datetime

FILE_SIZE_SPLIT = 100_000_000 # 100MB

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file = open(f"output_{current_time}.h264", "wb")
file_size = 0
wait_count = 0

while True:
    if socket.poll(1000, zmq.POLLIN) != 0:
        wait_count = 0
        message = socket.recv()

        if message == b'ping':
            socket.send(b'pong')
            print("Received ping, sent pong")
            continue

        checksum = message[-4:]
        sequence_raw = message[:4]
        sequence = struct.unpack('<I', sequence_raw)[0]

        payload = message[4:-4]

        print(f"Received seq {sequence} with checksum {checksum.hex()}, {len(message)} bytes")

        sha = hashlib.sha256()
        sha.update(message[:-4])
        digest = sha.digest()

        if digest[:4] != checksum:
            print(f"Checksum mismatch for frame {sequence}, expected {checksum.hex()} but got {digest.hex()}")
            socket.send(b'\xFF\xFF\xFF\xFF')
        else:
            socket.send(sequence_raw)
            file.write(payload)
            file_size += len(payload)
            if(file_size >= FILE_SIZE_SPLIT):
                print(f"Splitting file, size {file_size} bytes")
                file.close()
                current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                file = open(f"output_{current_time}.h264", "wb")
                file_size = 0
    else:
        print("Waiting for message...")
        wait_count += 1
        if wait_count > 10 and file_size > 0:
            print(f"Splitting file, size {file_size} bytes")
            file.close()
            current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            file = open(f"output_{current_time}.h264", "wb")
            file_size = 0
            wait_count = 0

The stream is written into .h264 files, split into 100MB chunks to be more easily manageable. A tool like ffmpeg can be used to convert these .h264 files into more common formats later.

Canbus

I decided to hook up the two canbus networks in my car to the Pi, to allow me to access information like vehicle speed and engine parameters.

The board I used is a Waveshare 2-CH CAN HAT, and it uses two MCP2515 SPI canbus controllers, along with two SN65HVD230 transceivers. The transceiver side is fully isolated, which is nice, although not useful in my case.

image

I did not want to use the board as a HAT, as I'd have to redesign a lot of already existing hardware, so I decided to make it an external box, much like how the 4G/GPS module is done.

image

A DB9 connector is used for power + SPI + two chip select signals + two interrupts, and a GX12 6-pin connector for the 2 pairs of canbus signals along with ground. A custom cable connects to the GX12 connector, that has an OBD2 plug on the other end.

I added the corresponding DB9 connector on the Raspberry Pi side, making an even larger mess with the wiring now. The fan has also been upgraded, to a quieter model that supports a PWM input for speed control, and I also replaced one of the barrel jack charger inputs with a GX20 two pin connector for higher power charging since the last blog.

image

A custom PCB would’ve made this look much more professional while occupying significantly less space, but at this point I’m constantly changing things around, and this approach gives me the flexibility to swap parts around quickly without having to redo everything.

Both the canbus and 4G modules hooked up:

image

Setting up the canbus interface on the Pi was really simple, all I had to do was add two lines to config.txt to load and configure the mcp2515 device tree overlay.

dtoverlay=mcp2515-can1,oscillator=16000000,interrupt=24
dtoverlay=mcp2515-can0,oscillator=16000000,interrupt=23

After a reboot, a can0 and can1 interface appeared. To initialize the interfaces at the correct bitrate, I used these commands:

sudo ip link set can0 up type can bitrate 50000 listen-only on
sudo ip link set can1 up type can bitrate 500000 listen-only on

Using cansniffer, it is now possible to see all the canbus messages the car is using to communicate, all that’s left is to figure out which message contains what data.

This is what the output looks like while driving around:

You don't have permission to edit metadata of this video.
Edit media
x
image
Upload Preview
image

I haven’t had time to decipher all of the messages yet, but when I’m done with them, I’ll write a python script that parses and exposes the values on a ZeroMQ pub/sub socket similar to the communications hub script shown before, and update the InfluxDB logging script to also write these values to the database.

Conclusion

With most of the sensors and data synchronization now functional, the largest remaining task is setting up the Arty-Z7 to do object recognition, connected to the Raspberry Pi over ethernet, and set up a display with a user interface.

  • Sign in to reply
  • vmate
    vmate 8 months ago in reply to DAB

    Thanks!

    • Cancel
    • Vote Up 0 Vote Down
    • Sign in to reply
    • More
    • Cancel
  • DAB
    DAB 8 months ago

    Very nice update.

    • Cancel
    • Vote Up 0 Vote Down
    • Sign in to reply
    • More
    • Cancel
element14 Community

element14 is the first online community specifically for engineers. Connect with your peers and get expert answers to your questions.

  • Members
  • Learn
  • Technologies
  • Challenges & Projects
  • Products
  • Store
  • About Us
  • Feedback & Support
  • FAQs
  • Terms of Use
  • Privacy Policy
  • Legal and Copyright Notices
  • Sitemap
  • Cookies

An Avnet Company © 2025 Premier Farnell Limited. All Rights Reserved.

Premier Farnell Ltd, registered in England and Wales (no 00876412), registered office: Farnell House, Forge Lane, Leeds LS12 2NE.

ICP 备案号 10220084.

Follow element14

  • X
  • Facebook
  • linkedin
  • YouTube