Sunday, May 24, 2020

Thermistors_Triple Python Program on a Raspberry Pi

Here's a video of one of my home projects:

Note: to view it in high resolution, after starting it, click on the YouTube logo to watch it on a YouTube page.




In this video, we measured how the surface temperature of a ceramic coffee mug changes after hot coffee is poured in. Three temperature sensors are taped to the mug in three different places. Sensor 0 is on the side of the mug near the bottom, Sensor 1 is near the rim, and Sensor 2 is on the handle. The sensor is a thermistor (https://en.wikipedia.org/wiki/Thermistor) which is a tiny resistor that changes resistance strongly with temperature. Measurement accuracy and resolution is plus or minus 0.01 degree Celsius.

The hardware in the video is a Raspberry Pi 4B credit-card-sized computer running Raspbian Linux, a Pi-16ADC voltage-measuring HAT board screwed on top, and a few resistors and capacitors that I soldered onto the HAT that form the sensor circuit. I bought the Raspberry Pi and HAT at PiShop.us.

I wrote a Python program that commands and reads from the ADC chip on the HAT via the I2C communication bus. I used a very cool and powerful Python user interface (UI) library and live-updating web-server called REMI (https://github.com/dddomodossola/remi). The smartphone in the video, using its web browser app, displays the live measurements on a numeric display and on a strip chart drawn using the Matplotlib library (https://matplotlib.org). Black is Sensor 0, Red is Sensor 1, and Blue is Sensor 2. The display update rate is once per second.

Here's the Python code:


# THERMISTORS_TRIPLE PROGRAM FOR MEASURING AND DISPLAYING TEMPERATURE
# Version 0.5, 30-May-2020
# Copyright © 2020 Joe Czapski
# Contact: xejgyczlionautomeasuretigercom replace lion with
# an at sign and tiger with a dot.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This program was written to run on a Raspberry Pi 4B computer running
# Raspbian Linux, with a Pi-16ADC voltage-measuring HAT board screwed on top,
# and with resistors and capacitors soldered onto the HAT to form
# the sensor circuit. The program commands and reads from the LTC2497
# ADC chip on the HAT via the I2C communication bus.
#
# This program uses a UI library and live-updating web-server engine
# called REMI (https://github.com/dddomodossola/remi).
# The program's structure is based on usage examples from the
# REMI codebase. The strip chart is drawn using the Matplotlib library
# (https://matplotlib.org).

# Comments that begin with [DEV] describe potential improvements that can be made.

import math
import threading
from smbus import SMBus
import io
import time
from time import sleep
import remi
from remi.gui import *
from matplotlib.figure import Figure
from matplotlib.backends.backend_agg import FigureCanvasAgg

#SET CONSTANTS
Nsensors = 3   #number of sensors to poll continuously
i2caddr = 0x76   #ADC board jumper positions
adc_ch = [0xB0, 0xB8, 0xB1]   #ADC voltage input channel addresses
#reference resistors are metal-film resistors with TC<0.5ohm/degC:
Rref = [4976.4, 5003.9, 4994.8]   #values in ohms
vplus = 5.0   #use nominal value of supply +5V to the ADC board, not actual value
max_counts = 16777215.0   #2^24 - 1
readlenbytes = 6   #I2C bus read data block length
buswait = 0.17   #seconds to wait between I2C actions
update_interval_sec = 1.0
xspan = 30.0   #strip chart X axis span in seconds
buffer_length = 250
yAuto = True   #autorange the Y axis?
ymin = 20.0; ymax = 70.0   #initial Y axis range, and fixed if yAuto is False
plotColors = ['black', 'red', 'blue']
#Steinhart–Hart equation coefficients for TDK NTC Thermistor B57863S0103F040:
coeffA = 1.12532e-3; coeffB = 2.34873e-4; coeffC = 8.59509e-8

#FUNCTION CtoF() converts Celcius to Fahrenheit
def CtoF(degrees):
    return 1.8 * degrees + 32.0

#CLASS CircBuff
#This class implements a continuous wrap-around buffer to hold and serve out
#the temperature and time data for the strip chart.
#Two instances of CircBuff are spawned for each ADC channel used.
#If for example Nsensors = 3, then 6 CircBuff instances are spawned.
#[DEV] Within this class, need to implement locking of all methods per instance, to prevent
#issues arising from multiple threads simultaneously acting on a buffer.
class CircBuff:
    def __init__(self, length):
        self.buflen = length
        self.buffer = [0.0] * self.buflen
        self.ptr = 0
        self.full = False
        
    def add(self, value):
        self.buffer[self.ptr] = value
        self.ptr += 1
        if self.ptr >= self.buflen:
            self.ptr = 0
            self.full = True
            
    def size(self):
        if self.full:
            return self.buflen
        else:
            return self.ptr
        
    def read(self, count=-1):
        s = self.size()
        p = self.ptr
        b = self.buffer.copy()  #grab snapshot of buffer to ignore colliding changes
        if self.full:
            if p == 0:
                rlist = b
            else:
                rlist = b[p:] + b[0: p]
        else:
            rlist = b[0: p]
        if count >= 0 and count < self.buflen:
            return rlist[max(s - count, 0):]
        else:
            return rlist
        
    def readtimespan(self, tspan):
        tlist = self.read()
        tstart = tlist[-1] - tspan
        i = -1
        for t in tlist:
            i += 1
            if t > tstart:
                break
        return tlist[i:]
        
    def readlast(self):
        if self.ptr == 0:
            return self.buffer[self.buflen - 1]
        else:
            return self.buffer[self.ptr - 1]
        
    def clear(self):
        self.ptr = 0
        self.full = False

#CLASS MatplotImage
#This class is taken intact from REMI example programs for drawing graphs.
#It contains the interface functions for displaying a Matplotlib graph image
#onto the web page. This latest version is flicker-free.
class MatplotImage(Image):
    ax = None
    app_instance = None #the application instance used to send updates
    
    def __init__(self, **kwargs):
        super(MatplotImage, self).__init__("/%s/get_image_data?index=0" % str(id(self)), **kwargs)
        self._fig = Figure(figsize=(8, 4))
        self.ax = self._fig.add_subplot(111)

    def search_app_instance(self, node):
        if issubclass(node.__class__, remi.server.App):
            return node
        if not hasattr(node, "get_parent"):
            return None
        return self.search_app_instance(node.get_parent()) 

    def update(self, *args):
        if self.app_instance==None:
            self.app_instance = self.search_app_instance(self)
            if self.app_instance==None:
                return
        self.app_instance.execute_javascript("""
            url = '/%(id)s/get_image_data?index=%(frame_index)s';
            xhr = null;
            xhr = new XMLHttpRequest();
            xhr.open('GET', url, true);
            xhr.responseType = 'blob'
            xhr.onload = function(e){
                urlCreator = window.URL || window.webkitURL;
                urlCreator.revokeObjectURL(document.getElementById('%(id)s').src);
                imageUrl = urlCreator.createObjectURL(this.response);
                document.getElementById('%(id)s').src = imageUrl;
            }
            xhr.send();
            """ % {'id': self.identifier, 'frame_index': str(time.time())})

    def get_image_data(self, index=0):
        Image.set_image(self, '/%(id)s/get_image_data?index=%(frame_index)s'% {'id': self.identifier, 'frame_index': str(time.time())})
        self._set_updated()
        try:
            data = None
            canv = FigureCanvasAgg(self._fig)
            buf = io.BytesIO()
            canv.print_figure(buf, format='png')
            buf.seek(0)
            data = buf.read()
            headers = {'Content-type': 'image/png', 'Cache-Control': 'no-cache'}
            return [data, headers]
        except Exception:
            pass
            #print(traceback.format_exc())
        return None, None


#CLASS Thermistors
#This class is the main program structured as a REMI App object.
class Thermistors(remi.App):

    #FUNCTION read_temperature() reads a channel's ADC counts via the I2C bus
    #and then calculates voltage then temperature.
    def read_temperature(self, ch_index):
        self.i2cbus.write_byte(i2caddr, adc_ch[ch_index])
        sleep(buswait)
        readarray = self.i2cbus.read_i2c_block_data(i2caddr, adc_ch[ch_index], readlenbytes)
        counts = readarray[0]*65536.0 + readarray[1]*256.0 + readarray[2]
        volts = vplus * counts / max_counts
        Rt = volts * Rref[ch_index] / (vplus - volts)  #calculate thermistor resistance
        lnR = math.log(Rt)
        invT = coeffA + coeffB*lnR + coeffC*lnR*lnR*lnR  #Steinhart–Hart equation
        T = 1.0/invT - 273.0   #convert inverse Kelvin to Celcius
        return T
    
    #FUNCTION daq_process() is the data acquisition process running as a separate thread.
    #Each ADC channel takes 300 to 400 milliseconds to access and read.
    #Temperature readings are written to the circular buffers that were allocated at start-up.
    #[DEV] Provide an option for this function to also write readings to a file.
    def daq_process(self):
        while self.daq_running:
            for i in range(0, Nsensors):
                sleep(buswait)
                while self.daq_paused:
                    sleep(buswait)
                if not self.daq_running:
                    break
                t = time.time() - self.reftime   #capture timestamp before the T reading
                self.ydata[i].add(self.read_temperature(i))   #read T
                self.xdata[i].add(t)   #write timestamp after T is done reading

    def __init__(self, *args, **kwargs):
        if not 'editing_mode' in kwargs.keys():
            super(Thermistors, self).__init__(*args, static_file_path={'my_res': './res/'})

    #FUNCTION idle() is called by the REMI engine once per display update interval.
    #This function's job is to update the display of the temperature values.
    def idle(self):
        allChannelsIn = True
        for i in range(0, Nsensors):
            n = self.ydata[i].size()
            if n > 0:
                T = self.ydata[i].readlast()   #get the latest T reading from the DAQ thread
                if not self.unitIsCelcius:
                    T = CtoF(T)   #convert Celcius to Fahrenheit
                Tstr = f'{T:.2f}'   #display value to 2 decimal places
            else:
                allChannelsIn = False
                Tstr = '-.--'
            self.mainContainer.children[self.textbox[i]].set_text(Tstr)
        
        #Generate the strip chart if all sensors have at least one measured sample in.
        if allChannelsIn:
            x2a = []; y1a = []; y2a = []
            self.mpl.ax.clear()  #Clear the previous plots. Unfortunately, this clears the axis labels, too.
            for i in range(0, Nsensors):
                xlist = self.xdata[i].readtimespan(xspan + 1.0)  #read the X data (time) from the buffer
                ylist = self.ydata[i].read(len(xlist))  #read the Y data (temperature) from the buffer
                if not self.unitIsCelcius:
                    ylist = [CtoF(y) for y in ylist]
                self.mpl.ax.plot(xlist, ylist, color=plotColors[i])  #plot the data using this sensor's color
                x2a.append(xlist[-1])
                y1a.append(min(ylist))
                y2a.append(max(ylist))
            
            #Calculate the X and Y axes limits, (x1, x2) and (y1, y2)
            x2 = max(x2a)
            if x2 < xspan:
                x2 = xspan
                x1 = 0.0
            else:
                x1 = x2 - xspan
            if yAuto:
                y1 = math.floor(min(y1a)); y2 = math.ceil(max(y2a))
            elif self.unitIsCelcius:
                y1 = ymin; y2 = ymax
            else:
                y1 = math.floor(CtoF(ymin)); y2 = math.ceil(CtoF(ymax))
                
            #Generate the new graph and update the display
            self.mpl.ax.set(xlabel='Time (s)',
                ylabel='Temperature (deg ' + ('C' if self.unitIsCelcius else 'F') + ')',
                autoscale_on=False, xlim=(x1, x2), ylim=(y1, y2),
                title='Three Temperatures vs. Time')
            self.mpl.update()

    #FUNCTION main() is called by the REMI engine once at program start.
    def main(self):
        self.xdata = []; self.ydata = []
        for i in range(0, Nsensors):   #allocate all the data buffers
            self.xdata.append(CircBuff(buffer_length))
            self.ydata.append(CircBuff(buffer_length))
        self.textbox = ['textTvalue0', 'textTvalue1', 'textTvalue2']
        self.daq_running = True
        self.daq_paused = False
        self.unitIsCelcius = True
        self.reftime = time.time()
        self.i2cbus = SMBus(1)   #initialize connection to the I2C bus
        t = threading.Thread(target=self.daq_process)
        t.start()   #start the DAQ Process thread
        return Thermistors.construct_ui(self)

    #FUNCTION construct_ui() consists mostly of Python code generated by the REMI Editor,
    #which you can use to layout the web page and its indicators, text, images, and controls.
    @staticmethod
    def construct_ui(self):
        mainContainer = Container()
        mainContainer.attr_editor_newclass = False
        mainContainer.css_background_color = "rgb(236,246,204)"
        mainContainer.css_border_color = "rgb(25,31,37)"
        mainContainer.css_border_style = "solid"
        mainContainer.css_border_width = "5%"
        mainContainer.css_height = "240.0px"
        mainContainer.css_left = "10.0px"
        mainContainer.css_margin = "0px"
        mainContainer.css_position = "absolute"
        mainContainer.css_top = "15px"
        mainContainer.css_width = "315.0px"
        mainContainer.variable_name = "mainContainer"
        textTvalue0 = Label()
        textTvalue0.attr_editor_newclass = False
        textTvalue0.css_align_content = "stretch"
        textTvalue0.css_align_items = "center"
        textTvalue0.css_align_self = "center"
        textTvalue0.css_color = "rgb(0,0,0)"
        textTvalue0.css_font_size = "50px"
        textTvalue0.css_font_weight = "bold"
        textTvalue0.css_height = "75px"
        textTvalue0.css_justify_content = "center"
        textTvalue0.css_left = "15.0px"
        textTvalue0.css_margin = "0px"
        textTvalue0.css_position = "absolute"
        textTvalue0.css_text_align = "center"
        textTvalue0.css_top = "15px"
        textTvalue0.css_width = "180.0px"
        textTvalue0.text = "-.--"
        textTvalue0.variable_name = "textTvalue0"
        mainContainer.append(textTvalue0,'textTvalue0')
        btnDegUnit = Button()
        btnDegUnit.attr_editor_newclass = False
        btnDegUnit.css_font_size = "40px"
        btnDegUnit.css_height = "75.0px"
        btnDegUnit.css_left = "225px"
        btnDegUnit.css_margin = "0px"
        btnDegUnit.css_position = "absolute"
        btnDegUnit.css_top = "15px"
        btnDegUnit.css_width = "75.0px"
        btnDegUnit.text = "C"
        btnDegUnit.variable_name = "btnDegUnit"
        mainContainer.append(btnDegUnit,'btnDegUnit')
        textTvalue1 = Label()
        textTvalue1.attr_editor_newclass = False
        textTvalue1.css_align_items = "center"
        textTvalue1.css_align_self = "center"
        textTvalue1.css_color = "rgb(255,0,0)"
        textTvalue1.css_font_size = "50px"
        textTvalue1.css_font_weight = "bold"
        textTvalue1.css_height = "75.0px"
        textTvalue1.css_justify_content = "center"
        textTvalue1.css_left = "15.0px"
        textTvalue1.css_margin = "0px"
        textTvalue1.css_position = "absolute"
        textTvalue1.css_text_align = "center"
        textTvalue1.css_top = "90px"
        textTvalue1.css_width = "180.0px"
        textTvalue1.text = "-.--"
        textTvalue1.variable_name = "textTvalue1"
        mainContainer.append(textTvalue1,'textTvalue1')
        graphContainer = Container()
        graphContainer.attr_editor_newclass = False
        graphContainer.css_border_color = "rgb(25,31,37)"
        graphContainer.css_border_style = "solid"
        graphContainer.css_border_width = "1%"
        graphContainer.css_height = "215.0px"
        graphContainer.css_left = "0.0px"
        graphContainer.css_margin = "0px"
        graphContainer.css_position = "absolute"
        graphContainer.css_top = "255px"
        graphContainer.css_width = "340.0px"
        graphContainer.variable_name = "graphContainer"
        mainContainer.append(graphContainer,'graphContainer')
        textTvalue2 = Label()
        textTvalue2.attr_editor_newclass = False
        textTvalue2.css_align_items = "center"
        textTvalue2.css_align_self = "center"
        textTvalue2.css_color = "rgb(0,0,255)"
        textTvalue2.css_font_size = "50px"
        textTvalue2.css_font_weight = "bold"
        textTvalue2.css_height = "75px"
        textTvalue2.css_justify_content = "center"
        textTvalue2.css_left = "15px"
        textTvalue2.css_margin = "0px"
        textTvalue2.css_position = "absolute"
        textTvalue2.css_text_align = "center"
        textTvalue2.css_top = "165.0px"
        textTvalue2.css_width = "180px"
        textTvalue2.text = "-.--"
        textTvalue2.variable_name = "textTvalue2"
        mainContainer.append(textTvalue2,'textTvalue2')
        btnEndProcess = Button()
        btnEndProcess.attr_editor_newclass = False
        btnEndProcess.css_height = "45.0px"
        btnEndProcess.css_left = "210.0px"
        btnEndProcess.css_margin = "0px"
        btnEndProcess.css_position = "absolute"
        btnEndProcess.css_top = "495.0px"
        btnEndProcess.css_width = "90.0px"
        btnEndProcess.text = "End Server Process"
        btnEndProcess.variable_name = "btnEndProcess"
        mainContainer.append(btnEndProcess,'btnEndProcess')
        btnClearChart = Button()
        btnClearChart.attr_editor_newclass = False
        btnClearChart.css_height = "45px"
        btnClearChart.css_left = "15.0px"
        btnClearChart.css_margin = "0px"
        btnClearChart.css_position = "absolute"
        btnClearChart.css_top = "495.0px"
        btnClearChart.css_width = "90px"
        btnClearChart.text = "Clear Chart"
        btnClearChart.variable_name = "btnClearChart"
        mainContainer.append(btnClearChart,'btnClearChart')
        
        self.mpl = MatplotImage(width=340, height=200)
        self.mpl.style['margin'] = '0px'
        self.mpl.ax.set(xlabel='Time (s)', ylabel='Temperature (deg C)',
            autoscale_on=False, xlim=(0.0, xspan), ylim=(ymin, ymax),
            title='Three Temperatures vs. Time')
        self.mpl.update()
        graphContainer.append(self.mpl)
        mainContainer.append(graphContainer,'graphContainer')
        
        mainContainer.children['btnDegUnit'].onclick.do(self.onclick_btnDegUnit)
        mainContainer.children['btnEndProcess'].onclick.do(self.onclick_btnEndProcess)
        mainContainer.children['btnClearChart'].onclick.do(self.onclick_btnClearChart)
        
        self.mainContainer = mainContainer
        return self.mainContainer
    
    def onclick_btnDegUnit(self, emitter):
        self.unitIsCelcius = not self.unitIsCelcius
        if self.unitIsCelcius:
            unitSymbol = 'C'
        else:
            unitSymbol = 'F'
        self.mainContainer.children['btnDegUnit'].set_text(unitSymbol)
        
    def onclick_btnClearChart(self, emitter):
        self.daq_paused = True   #Pause DAQ thread and wait for it to actually pause
        sleep(buswait)
        self.reftime = time.time()
        for i in range(0, Nsensors):
            self.xdata[i].clear()
            self.ydata[i].clear()
        self.daq_paused = False   #Unpause DAQ thread now that buffers, pointers, and flags are fully cleared
    
    #If the 'End Server Process' button is clicked, first stop the DAQ thread, wait,
    #then end the program. It might be a good idea depending on future usage
    #to require an admin password in order to end the program.
    def onclick_btnEndProcess(self, emitter):
        self.daq_running = False
        self.daq_paused = False
        sleep(2.0*buswait)
        self.close()

#configuration hash set below is just for the REMI UI Editor
configuration = {'config_project_name': 'Thermistors', 'config_resourcepath': './res/'}

#The REMI start() function is called below to start serving the web page.
localmode = True
if localmode:
    ipaddr = '127.0.0.1'; show_browser = True
else:
    ipaddr = '192.168.1.193'; show_browser = False

if __name__ == "__main__":
    remi.start(Thermistors, address=ipaddr, port=8081, update_interval=update_interval_sec,
          multiple_instance=False, enable_file_cache=False, start_browser=show_browser)