Demo entry 6634505

ECG

   

Submitted by anonymous on Aug 11, 2017 at 00:29
Language: Python. Code size: 5.4 kB.

#!/usr/bin/python
import subprocess   # for running linux commands
import bluetooth    # bluetooth functions
from shutil import make_archive # to ZIP ECG files
import os   # for getting correct file paths to ZIP
import spidev   # for communicating with MCP3008
import time     # delay funcionality
from datetime import datetime # Time stamping ECG files
import re   # regex library to match MAC address

def Acquire_Target(filepath):
    with open(filepath, "r") as macFile:
        MAC = macFile.read()
    if re.match("[0-9a-f]{2}([:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", MAC.lower()):   # regex to check MAC address syntax
        # [0-9a-f] means a hexadecimal digit, {2} mean I want two of them, [:] indicates a colon,
        # [0-9a-f]{2} is another pair of hexadecimal digits, \\1 to match same expression I matched before as a separator,
        # [0-9a-f]{2} yet another pair of hexadecimal digits, {4} means the block in parenthesis must be repeated 4 times as in
        # <pair> <sep> <pair> ( <same-sep> <pair> ) * 4
        # $ means that the string must end here
        return(str(MAC)) # return mac address
    else:
        os.remove(filepath) # delete file that is not a MAC address

def Services(target):
    # scans target PC's active bluetooth services
    # if target PC is in range and ready to receive files
    # ZIP ECG logs & send to computer,
    # then delete old ECG files
    blu0 = subprocess.Popen("/home/pi/BT.sh", stdout=subprocess.PIPE) # script to pair device to user computer
    blupipe0 = subprocess.Popen("bluetoothctl", stdin=blu0.stdout, stdout=subprocess.PIPE) # pass script to bluetoothctl
    services = bluetooth.find_service(address = target) # get list of BT services
    # print services
    if len(services) > 0: # if PC in range
        for svc in services: # iterate through services
            print svc["name"]
            if svc["name"] == "OBEX Object Push": # service name that indicates PC ready to receive files
                channel = str(svc["port"]) # get channel of service
                dateNow = time.strftime("%Y%m%d") # current date and time
                archive_name = os.path.expanduser(os.path.join('/home/pi', 'ECG_archive')) # path to ZIP file that is about to be created
                root_dir = os.path.expanduser(os.path.join('/home/pi', 'ECGlogs')) # path to directory with ECG csv files
                make_archive(archive_name, 'zip', root_dir) # ZIP ECG logfile directory
                subprocess.call("echo 'connect "+target+" \nquit' | bluetoothctl", shell=True)   # connect to target PC
                subprocess.call("ussp-push "+target+"@"+channel+" /home/pi/lECG_archive.zip ECG_archive.zip", shell=True)  # transfer ECG ZIP file to target PC
                os.rename('/home/pi/ECG_archive.zip', '/home/pi/ECGBackUps/ECG_'+dateNow+'.zip')  # delete old ECG ZIP file
                subprocess.call('sudo rm /home/pi/ECGlogs/*', shell=True)   # delete old ECG csv files

def readadc(adcnum):
     # read SPI data from the MCP3008, 8 channels in total
     if adcnum > 7 or adcnum < 0:   # check parameter value to make sure it's between 0 & 7 (the number of channels on the ADC)
         return -1  # if paramter not between 0 & 7 return -1
     # to read CH0 on MCP3008, we need to send 3-bytes (3 8-bit words)
     # when we send the 3 1-byte words to the ADC over SPI, we get 24 bits back
     r = spi.xfer2([1, 8 + adcnum << 4, 0]) # writes 24 bits (0000 0001 1000 0000 0000 0000) over SPI and gets 24 bits back from CH0
     # we only care about the last 10 bits of this
     # ???? ???? ???? ??XX XXXX XXXX
     # xfer2 returns this as 3 ints in a list [int1,int2,int3]
     adcout = ((r[1] & 3) << 8) + r[2] # we ignore the first int and then mask out everything except the last 2 bits on the second int.
     # we then shift this left 8 bits and add it to the third int to get our decimal integer output as as level from 0 to 1023.
     return adcout # return adc level value

if __name__=='__main__':
    target = Acquire_Target('/home/pi/MAC.txt') # read target address
    Services(target[0:17])  # passing whole string breaks Services, only read characters 0:17

    spi = spidev.SpiDev()   # create SPI object
    spi.open(0, 0)  # open SPI channel 0
    delay = 0.001   # 1 millisecond delay
    ldr_channel = 0 # set channel to read ADC from
    recTime = 60*60*18 # how long main loop should run (18 hours)
    samples = 10    # number of samples to average
    row = 0 # X axis values

    timestr = time.strftime("%Y%m%d-%H%M%S") # filename timestamp
    filename = "/home/pi/ECGlogs/"+timestr+".csv"   # create full filename
    file = open(filename, "a")  # create file
    file.write("Time,ECG\n")    # write column headers
    timeout = time.time() + recTime
    while time.time() < timeout:    # stop after 18 hours
        avg = 0
        sumv = 0
        for i in range(samples): # get 10 readings and average them
            ECGval = readadc(ldr_channel)
            sumv = sumv + ECGval
            time.sleep(delay) # 1 millisecond delay between readings
        avg = sumv/samples
        ECGval = avg * (3.3/1023)  # 3.3/1023 converts from levels to volts
        file.write(str(row)+","+str(ECGval)+"\n") # write ECG value to Y axis
        file.flush()    # make sure data gets from program buffer to the actual file
        row = row + 1 # increment row (X axis)

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).