element14 Community
element14 Community
    Register Log In
  • Site
  • Search
  • Log In Register
  • Community Hub
    Community Hub
    • What's New on element14
    • Feedback and Support
    • Benefits of Membership
    • Personal Blogs
    • Members Area
    • Achievement Levels
  • Learn
    Learn
    • Ask an Expert
    • eBooks
    • element14 presents
    • Learning Center
    • Tech Spotlight
    • STEM Academy
    • Webinars, Training and Events
    • Learning Groups
  • Technologies
    Technologies
    • 3D Printing
    • FPGA
    • Industrial Automation
    • Internet of Things
    • Power & Energy
    • Sensors
    • Technology Groups
  • Challenges & Projects
    Challenges & Projects
    • Design Challenges
    • element14 presents Projects
    • Project14
    • Arduino Projects
    • Raspberry Pi Projects
    • Project Groups
  • Products
    Products
    • Arduino
    • Avnet & Tria Boards Community
    • Dev Tools
    • Manufacturers
    • Multicomp Pro
    • Product Groups
    • Raspberry Pi
    • RoadTests & Reviews
  • About Us
  • Store
    Store
    • Visit Your Store
    • Choose another store...
      • Europe
      •  Austria (German)
      •  Belgium (Dutch, French)
      •  Bulgaria (Bulgarian)
      •  Czech Republic (Czech)
      •  Denmark (Danish)
      •  Estonia (Estonian)
      •  Finland (Finnish)
      •  France (French)
      •  Germany (German)
      •  Hungary (Hungarian)
      •  Ireland
      •  Israel
      •  Italy (Italian)
      •  Latvia (Latvian)
      •  
      •  Lithuania (Lithuanian)
      •  Netherlands (Dutch)
      •  Norway (Norwegian)
      •  Poland (Polish)
      •  Portugal (Portuguese)
      •  Romania (Romanian)
      •  Russia (Russian)
      •  Slovakia (Slovak)
      •  Slovenia (Slovenian)
      •  Spain (Spanish)
      •  Sweden (Swedish)
      •  Switzerland(German, French)
      •  Turkey (Turkish)
      •  United Kingdom
      • Asia Pacific
      •  Australia
      •  China
      •  Hong Kong
      •  India
      • Japan
      •  Korea (Korean)
      •  Malaysia
      •  New Zealand
      •  Philippines
      •  Singapore
      •  Taiwan
      •  Thailand (Thai)
      • Vietnam
      • Americas
      •  Brazil (Portuguese)
      •  Canada
      •  Mexico (Spanish)
      •  United States
      Can't find the country/region you're looking for? Visit our export site or find a local distributor.
  • Translate
  • Profile
  • Settings
Connected Cloud Challenge
  • Challenges & Projects
  • Design Challenges
  • Connected Cloud Challenge
  • More
  • Cancel
Connected Cloud Challenge
Blog Adventure of stream the sound clip to AWS #8
  • Blog
  • Forum
  • Documents
  • Polls
  • Files
  • Events
  • Mentions
  • Sub-Groups
  • Tags
  • More
  • Cancel
  • New
  • Share
  • More
  • Cancel
Group Actions
  • Group RSS
  • More
  • Cancel
Engagement
  • Author Author: saicheong
  • Date Created: 2 May 2020 6:33 PM Date Created
  • Views 714 views
  • Likes 2 likes
  • Comments 0 comments
  • mqtt
  • mailbox
  • aws
  • sound
  • iot
  • cypress
Related
Recommended

Adventure of stream the sound clip to AWS #8

saicheong
saicheong
2 May 2020

Introduction

Last blog we are success of record sound clip to FRAM in the Cypress Kits, here we could like upload to AWS to future process, such as used for sound recognition or feed to AI engine.

In order to stream the sound data to the AWS cloud, the trandition way is use the UDP or Http streaming. By without use of additional sockets, we use MQTT publish to AWS and save in S3 storage, esspecally many iOT device only transfer way. The MQTT channel is good and design for small continues data such as machine status or sensor, it also able to transfer other object as using some technique.

 

Setup of AWS IoT Core

In order re-route received data to AWS Storage (or other application), we added a "Rules"

 

Rules

Under Act > Rules, Click Create

Name: Voice

Rule query is:

SELECT * FROM '+/sound/send

which the rules received topic [anyname]/sound/send, as we designed use topic "ThingsName"/sound/send to received the data.

 

Select "Add action", select "Send a message to an Amazon Kinesis Stream"

(not Kinesis Firehose, we first choose Kinesis Firehose stream which is much easy for storage but not good, we will mention later.  )

image

Stream Name: voice

Partition key: ${Topic()}

 

Create IAM Role and use new one.

image

 

 

Inside of Things Policy document

Added

"arn:aws:iot:us-west-2:161xxxxxxx:topic/${iot:Connection.Thing.ThingName}/sound/*",
"arn:aws:iot:us-west-2:161xxxxxxx:topic/$aws/rules/Voice/*"

under Action iot:Publish

 

The second line for we prepare use of Basic Ingest, which reduce our Messaging Costs, ref:

https://docs.aws.amazon.com/iot/latest/developerguide/iot-basic-ingest.html

 

S3

Other platform may more efficiency,  however, for simplify, we choose S3 instead of other advanced database or storage platform.

Open Amazon S3, Simple Added a new Buckets "voicecognise"

image

Not public is fine, we save/retrieve though private channel.

Amazon Kinesis

Amazon Kinesis provide Data Streams and Data Firehose, depending on usage, either is suitable for continue receiving machine data, which is the "Streams", however Amazon Kinesis Streams  

provide more flexible solution as Firehose is simple solution for storage under several popular storage such as S3 and Redshift.

Following Amazon instruction you can easy to build up the Kinesis Firehouse:

https://docs.aws.amazon.com/firehose/latest/dev/writing-with-iot.html

If you choose stream to S3 default, the data will separate under Date, the default is good for storage/backup iOT machine status such as temperature,  as no programming require, the default setting is well for most case.

However in our case, the result is not good, this is because:

 

1. The minimum buffer time is 60s, so the data will not observed until the cache writing, which is Firehose system control.

2. It simple glue up the data by received time, which every message arrived time is not some, which our sound data is sequence is destroy.

(It not problem for most iOT case which transfer by json with timestamp)

3. We have two position can be process data by Lamba under firehose, first is data incoming, sorry about that the firehose accept BASE64 which not suitable for use reconstruction PCM data,

another is after S3 file builded by firehose, as this stage all data glue is not good for repacking.

 

So we use the Kinesis Data Streams instead of Data Firehose as main reason it have faster response so our Lamba function to process the incoming data within seconds after received.

image

Data stream name: voice

Number of open shards: 1

 

 

The IoT Hardware Mbed Code

 

Our awsiot cycle will continue monitoring  and stream (publish batch of MQTT message) to  AWS IoT

 

/*Loop for send MQTT message, sample dismiss last if size not alignment */
  while(sound_stream(sound_buffer)>=SOUND_STREAM_BUFFER_SIZE){


  /* time stamp */
  time_t current_time = time(NULL);

  if(current_time==last_time)
  ++count;
  else{
  last_time = current_time;
  count=0;
  }

  current_time = current_time*1000+count;

  time_t now = current_time;


  /* write time_t to last 8 char */
  for(int i=buffer_size*2-1;i>buffer_size*2-9;i--){
  ((char*)sound_buffer)[i]= current_time & 0xff;
  current_time >>= 8;

  }

  /* len of after process BASE64 */
  size_t len = 0;

  int result = mbedtls_base64_encode((unsigned char*)base64buffer,base64buffer_size,&len,(const unsigned char*)sound_buffer,buffer_size*2);

  if (result == MBEDTLS_ERR_BASE64_BUFFER_TOO_SMALL){
  APP_INFO(( "BASE64 Error %d\r\n",len ));
  }

  /* publish the stream*/
  awsiot_publish_stream(base64buffer,rules,len);

  APP_INFO(("%" PRId64 "\n",now ));

  wait_us(50*1000);


  }

 

The brief jobs of above code is:

 

1. enquire num of sound data require send to AWS

2. read data from FRAM to buffer

3. added timestamp at the end of data

4. encode data to BASE64

5. publish the data to AWS

6. read next lot and wait 50ms for another publish

 

Every MQTT publish content 1.5k sound data, for 12s data we require 256 MQTT message, which mostly can finish the upload under 5 seconds.

 

AWS Lambda function

We build a custom function which is python to receive the stream and save to S3 storage

 

Under AWS Lambda, Create new Functions, name voice_consumption

 

image

Select Python 3.8  and Create a new role with basic Lamba permissions

 

Open the created functio, select Permissions > Execution role, click the Role name and open IAM

image

Attach policies AmazonS3FullAccess

 

Under Lambda, Configuration, Open Designer

Click     + Add trigger

 

Select "Kinesis"

Kinesis stream > voice

 

Other leave default.

 

image

Here is the Lamba working for task.

import base64
import boto3
from datetime import datetime
import time
import pickle


# Temp int64 array used for indexing
# Timestamp is device MQTT sending time ms, not sound record time
#   first - start time stamp
#   ..... - seq of timestamp
#   last - last time stamp
#   

TEMP_S3_INDEX = "income/sound_temp_index.tmp"
TEMP_S3_LAST_SOUND_KEY = "income/sound_temp_last_key.tmp"
NEXT_FILE_SECOND = 2
BATCH_SIZE = 6
RECORD_SIZE = BATCH_SIZE * 256
SAMPLE_SIZE = RECORD_SIZE + 8;


s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

def lambda_handler(event, context):
    soundclip = bytearray()
    key = ''
    
    soundclip_index = []
    
    soundclip_obj:s3.Object
    
    s3soundlast_sound_key_obj:s3.Object
    s3soundclip_index_obj:s3.Object
    
    
    try:
        s3soundlast_sound_key_obj = s3.Object('voicerecognise',TEMP_S3_LAST_SOUND_KEY)
        s3soundlast_sound_key = s3soundlast_sound_key_obj.get()['Body'].read()
        
        key = pickle.loads(s3soundlast_sound_key)
        
        soundclip_obj = s3.Object('voicerecognise',key)
        soundclip = bytearray(soundclip_obj.get()['Body'].read())
        
        print ("num of record",len(soundclip)/RECORD_SIZE)
        
    except s3_client.exceptions.NoSuchKey as e:
        print ("no previous temp soundclip found")
    
     
    try:    
        s3soundclip_index_obj = s3.Object('voicerecognise',TEMP_S3_INDEX)
        s3soundclip_index = s3soundclip_index_obj.get()['Body'].read()
        soundclip_index = pickle.loads (s3soundclip_index)
        
    except s3_client.exceptions.NoSuchKey as e:
        print ("no previous index found")
        
        
        
   
    for record in event['Records']:
        #Kinesis data is base64 encoded so decode here
        payload=base64.b64decode(record["kinesis"]["data"])
        #Another decord for MQTT
        payload=base64.b64decode(payload)
        recordsize = len(payload);
       #Check sample size match
       
        if recordsize != SAMPLE_SIZE:
            print( "recordsize mismatch %d %d",recordsize,SAMPLE_SIZE)
            break
        
         
        
       # timestamp ms decorded from back of payload      
        stamp = int.from_bytes(payload[SAMPLE_SIZE-8:], byteorder='big',signed='true')
        
        print( "Decoded time",stamp,"size:",len(payload)," payload: " + str(payload))
        
       # if the timestamp more that 2.5s the last index, simple build new file  
        if len(soundclip_index)>0:
            last = soundclip_index[len(soundclip_index)-1]
            if stamp - last > 2500:
                key = ''
                soundclip_index.clear()
                soundclip = bytearray() 
        
        timestamp = datetime.fromtimestamp(stamp/1000)
        
           
        # if index empty, append data 
       
        if len(soundclip_index) == 0:
            soundclip_index.append(stamp)
            soundclip+=payload
            continue
        
        # from back seek the pos and insert to soundclip and update index     
        
        for i in range(len(soundclip_index)-1, -1, -1):
            if soundclip_index[i] > stamp:
                continue
            # insert sound data on soundclip 
            soundclip[(i+1)*RECORD_SIZE:(i+1)*RECORD_SIZE] = payload[0:RECORD_SIZE]
            # insert timestamp in index
            soundclip_index.insert(i+1,stamp)
            break

       
    print("Decoded soundclip: " + str(soundclip))
    
    
    if len(soundclip_index) == 0 :
        return
    
    if key=='' :
        key = datetime.fromtimestamp(soundclip_index[0]/1000).strftime("income/sound %m%d%Y-%H%M%S")
        s3soundlast_sound_key_obj.put(Body=pickle.dumps(key))
        soundclip_obj = s3.Object('voicerecognise',key)
        
    s3soundclip_index_obj.put(Body=pickle.dumps(soundclip_index))
    soundclip_obj.put(Body=soundclip)

 

The Kinesis Stream will trigger this function after stream arrived

The brief of the function is:

 

1. A temp list storage sound clip timestamp

2. A temp string storage writing file

3  Decord the data from BASE64 to raw (working for twice because AWS kinesis will also encode another BASE64)

4. Retrive the timestamp from last 8 bytes payload.

5. seek the position and insert the sound data to correct sequence

6. Create another file if no sound data more that 2.5 second

7. Save data to S3 Storage

 

image

 

Conclusion

 

MQTT is lightweight protocol based on publish/subscribe is good for IoT, small alive overhead and low power require.

The MQTT not just can send text such as sensor or json data, by careful implement it can transfer other type of data such as firmware, pictures or as this case the sound clips, we can update the device Logo such as latest Company Logo or just update the help file without implement other protocols.

 

The AWS Alexa Voice Server also use of MQTT for sound transfer.

 

As Greengrass let the device process locally, in our challenge, target build Intelligent Mailbox for consumer, which good for simple serverless application with direct contact to AWS Cloud and process data under Cloud.

 

The AWS IoT advantage is fully integrated with many AWS tools to help IoT more possible, such as we can very easy to implement sensor logging and backup to S3 or DynamoDB or compute custom code though Lambda seamless.

  • Sign in to reply
element14 Community

element14 is the first online community specifically for engineers. Connect with your peers and get expert answers to your questions.

  • Members
  • Learn
  • Technologies
  • Challenges & Projects
  • Products
  • Store
  • About Us
  • Feedback & Support
  • FAQs
  • Terms of Use
  • Privacy Policy
  • Legal and Copyright Notices
  • Sitemap
  • Cookies

An Avnet Company © 2025 Premier Farnell Limited. All Rights Reserved.

Premier Farnell Ltd, registered in England and Wales (no 00876412), registered office: Farnell House, Forge Lane, Leeds LS12 2NE.

ICP 备案号 10220084.

Follow element14

  • X
  • Facebook
  • linkedin
  • YouTube