Demo entry 2933721

MonWin

   

Submitted by Bill Waggoner on Nov 03, 2015 at 15:56
Language: Python 3. Code size: 13.3 kB.

# New Scanmon window based on urwid

import logging
import urwid
import queue
from urwid import Columns, Text, Padding, Frame, WidgetPlaceholder, Pile, Filler, Divider, ListBox, SimpleListWalker, BoxAdapter, SimpleFocusListWalker, Edit, AttrMap
import time
import threading
import sys
import serial
from uuid import uuid4

class MonWin(urwid.MainLoop):

    class CallbackQueue(object):
        '''Creates a queue of callback objects.
        Each object is a tuple of (response code, handle, callback, repeat)
        These are stored within lists in a dict under ther response code.
        The response code is always upper case as a key but may be either in the
        list entry.
        '''
        def __init__():
            self.queue = dict()

        def put(code, callback, repeat = False):
            handle = uuid4()
            rcode = code.upper()
            try:
                clist = self.queue[rcode]
            except KeyError:
                clist = []
            clist.append((handle, callback, repeat))
            self.queue[rcode] = clist
            return handle

        def drop(code, handle):
            rcode = code.upper()
            found = False

            try:
                clist = self.queue[rcode]
                for n in range(len(clist)):
                    z = clist[n]
                    if handle == z[0]:
                        del clist[n]
                        self.queue[rcode] = clist
                        found = True
                        break
            except KeyError:
                pass


            return found

        def get(code):
            try:
                retval = self.queue[code.upper()]
            except KeyError:
                retval = []

            return retval

        def doCalls(code):
            callList = self.get(code)[:]
            dList = []
            for n in range(len(callList)):
                callList[n][1]()
                if not callList[n][2]:
                    self.drop(code, callList[n][0])

    class CmdLine(urwid.WidgetWrap):

        def __init__(self, cmdQueue):
            self.logger = logging.getLogger(__name__)
            self.cmdQueue = cmdQueue
            super().__init__(Edit(caption=('green', "Cmd> "), wrap = 'clip'))

        def keypress(self, size, key):
            #self.logger.debug('kepress: {}'.format(key))
            try:
                keyret = self._w.keypress(size, key)
                if keyret == 'enter':
                    cmd = self._w.edit_text
                    self.logger.debug('keypress: Command: "{}"'.format(cmd))
                    self._w.set_edit_text('')
                    self.cmdQueue.put(cmd)
                    keyret = None
            except Exception as e:
                self.logger.exception('keypress error')

            return keyret

    class ScrollWin(urwid.WidgetWrap):

        def __init__(self, title = None):
            self.logger = logging.getLogger(__name__)
            self.scroller = ListBox(SimpleFocusListWalker([]))
            if title:
                self.frameTitle = Columns([('pack', Text('--')),('pack', Text(('wintitle', title))), Divider('-')])
            else:
                self.frameTitle = None
            return super().__init__(Frame(self.scroller, header = self.frameTitle))

        def append(self, wid):
            try:
                focusNow = self.scroller.focus_position
            except:
                focusNow = -1
            bottom = focusNow == len(self.scroller.body) - 1  # Bottom widget in focus?
            self.scroller.body.append(wid)
            if bottom:
                self.logger.debug('scrolling')
                self.scroller.set_focus(focusNow + 1, coming_from = 'above')
            else:
                self.logger.debug('skipping scrolling')

        def mouse_event(self, size, event, button, col, row, focus):

            focus_dir = None

            if button == 4.0:   # scroll wheel down
                focus_dir = -1
                from_dir = 'below'
                self.logger.debug('Scroll wheel down')
            elif button == 5.0: # scroll wheel up
                focus_dir = 1
                from_dir = 'above'
                self.logger.debug('Scroll wheel up')

            if focus_dir is not None:
                handled = True
                try:
                    newpos = self.scroller.focus_position + focus_dir
                    if newpos >= 0 and newpos < len(self.scroller.body):
                        self.scroller.change_focus(size, newpos, coming_from = from_dir)
                        self.logger.debug('newpos=%d', newpos)
                except IndexError:
                    self.logger.exception('OUCH! size={}, event={}, button={}, col={}, row={}, focus={}, dir={}'.format(size, event, button, col, row, focus, focus_dir))
                    self.append(Text('OUCH! size={}, event={}, button={}, col={}, row={}, focus={}, dir={}'.format(size, event, button, col, row, focus, focus_dir)))
            else:
                handled = self._w.mouse_event(size, event, button, col, row, focus)

            return handled

    def show_or_exit(self, key):
        if key in ('q', 'Q'):
            raise urwid.ExitMainLoop()
        self.resp.append(Text(repr(key)))
        if self.widget.focus_position != 'footer':  # readjust focus if needed
            self.widget.focus_position = 'footer'

    def sendGLG(self):
        while True:
            if self.runGLG:
                self.logger.debug('Sending')
                self.scanner.write(b'GLG\r')
            time.sleep(5)

    def readScanner(self):
        self.logger.debug('Reading')
        while self.scanner.inWaiting() > 0:
            readIt = self.scanner.read(self.scanner.inWaiting())
            self.readBuf += readIt
            self.logger.debug('Scanner sent: ' + repr(self.readBuf))

        while b'\r' in self.readBuf:
            (rLine, sep, self.readBuf) = self.readBuf.partition(b'\r')
            rLine = rLine.decode(errors='ignore')
            self.putLine('glg', rLine)
            self.logger.debug('Read scanner: ' + repr(rLine))

    def __init__(self, doCmdQueue):
        self.logger = logging.getLogger(__name__)
        self.mdl = Text('BCD996XT')
        self.ver = Text('Ver [checking...]')
        self.doCmdQueue = doCmdQueue
        self.threadID = threading.current_thread().ident
        header = Columns([
            ('weight', 60, Divider('-')),
            ('weight', 30, Padding(AttrMap(self.mdl, 'wintitle'), min_width = 10, align = 'left')),
            ('weight', 20, Divider('=')),
            ('weight', 30, Padding(AttrMap(self.ver, 'wintitle'), min_width = 10, align = 'right', width = 'pack')),
            ('weight', 60, Divider('-'))
            ])

        footer = MonWin.CmdLine(self.doCmdQueue)

        self.msg = MonWin.ScrollWin('Messages')
        self.glg = MonWin.ScrollWin('Channel Monitor')
        self.resp = MonWin.ScrollWin('Command Response')
        self.windows = {'msg': self.msg, 'glg': self.glg, 'resp': self.resp}

        body = Pile([
            ('weight', 5, self.msg),
            ('weight', 33, self.glg),
            ('weight', 11, self.resp)
            ])
        frame = Frame(body, header = header, footer = footer, focus_part = 'footer')
        palette = [
            ('wintitle', 'yellow', 'default', 'bold'),
            ('green', 'dark green', 'default', 'bold'),
            ('NORM', 'white', 'default', 'default'),
            ('ALERT', 'light red', 'default', 'standout'),
            ('WARN', 'light magenta', 'default', 'underline')
            ]
        super().__init__(frame, unhandled_input=self.show_or_exit, palette=palette)
        (screenCols, screenRows) = self.screen.get_cols_rows()
        glgRows = screenRows - (2 + 5 + 11)

        self.msg.append(Text('Screen has {:d} rows and {:d} columns'.format(screenRows, screenCols)))
        self.msg.append(Text('glg has {:d} rows'.format(glgRows)))

        self.scanner = serial.Serial(port='/dev/ttyUSB0', baudrate=115200)
        self.scanner.nonblocking()
        self.watch_file(self.scanner.fileno(), self.readScanner)
        self.logger.info("Watching scaner at file #%d", self.scanner.fileno())
        self.readBuf = b''
        self.runGLG = False
        glgThread = threading.Thread(target=self.sendGLG, name = 'sendGLG')
        glgThread.daemon = True
        glgThread.start()


    def _aPutLine(self, mainLoop, userData):
        """Called via set_alarm_in when putLine is called outside the main thread"""
        (window, message, *color) = userData
        color = color[0] if len(color)>0 else 'NORM'
        self.putLine(window, message, color)

    def putLine(self, window, message, color = 'NORM'):
        """Scroll the window and write the message on the bottom line.

        Positional parameters:
        window -- The name of the window("msg", "glg", "resp")
        message -- The message (a string, a tuple[attributed string(s)] or a Widget)

        Keyword parameters:
        color -- The color of the message("NORM", "ALERT", "WARN", "GREEN")
        """
        if self.threadID == threading.current_thread().ident:
            win = self.windows[window]
            if win is None:
                win = 'msg'

            try:
                self.logger.debug('window={}, message={}, color={}'.format(window, repr(message), color))
                if isinstance(message, urwid.Widget): # a Widget?
                    wid = message
                elif isinstance(message, str):        # a bare string gets color
                    wid = Text((color, message))
                else:
                    wid = Text(message)               # probably a tuple
                self.windows[window].append(wid)
            except Exception as e:
                self.logger.exception('exception:')
        else:
            self.logger.debug('redirect: "{}", "{}", "{}"'.format(window, message, color))
            self.set_alarm_in(0.0, self._aPutLine, user_data = (window, message, color))

    def _aSetText(self, mainLoop, userData):
            (wid, txt) = userData
            self.setText(wid, txt)

    def setText(self, wid, txt):
        if self.threadID == threading.current_thread().ident:
            self.logger.debug(txt)
            wid.set_text(txt)
        else:
            self.set_alarm_in(0.0, self._aSetText, (wid, txt))

    def doIt(self, f, data = None, time = 0.0):
        '''Called by other threads, schedules a method call for execution by mainLoop'''
        self.set_alarm_in(time, f, data)

    def doQuit(self, mainLoop = None, user_data = None):
        self.logger.warning('QUIT requested')
        if self.threadID == threading.current_thread().ident:
            raise urwid.ExitMainLoop
        else:
            self.set_alarm_in(0.1, self.doQuit)

    def doNothing(self, mainLoop, user_data):
        '''Does nothing useful except to waken the main loop to cause refreshes.
        I believe this works around a bug in urwid'''
        mainLoop.set_alarm_in(0.5, mainLoop.doNothing)

def loopTest():
    dologger = logging.getLogger(__name__)
    dologger.info('starting')
    global loopTestTime
    loopTestTime = "I'm not done yet!"
    dologger.debug(loopTestTime)
    monwin.putLine('msg', loopTestTime, 'WARN')
    now1 = time.time()
    time.sleep(5)
    now2 = time.time()
    loopTestTime = "loopTest took {:5.3f} seconds".format(now2 - now1)
    monwin.putLine('msg', loopTestTime, 'ALERT')
    monwin.setText(monwin.ver, 'Ver 2.3.4')

    dologger.info(loopTestTime)

def doCommands(monwin, queue):
    dologger = logging.getLogger(__name__)
    dologger.info('starting')
    while True:
        try:
            cmd = queue.get()
            dologger.info(repr(cmd))
            monwin.putLine('resp', Text('CMD: {}'.format(cmd)))

            if cmd is None:
                dologger.info('quitting')
                break
            elif cmd == 'quit':
                monwin.doQuit()
            elif cmd == 'test':
                monwin.putLine('resp', 'Testing ... 1 ... 2 ... 3 ...', 'WARN')
            elif cmd == 'stop':
                monwin.runGLG = False
            elif cmd == 'go':
                monwin.runGLG = True
            else:
                monwin.putLine('resp', "I don't know how to " + repr(cmd), 'ALERT')
                dologger.error('Unknown command: ' + repr(cmd))
        except:
            dologger.exception('Exception')

# Run simulation for testing
if __name__ == '__main__':
    doCmdQueue = queue.Queue()
    fmt = '%(asctime)s -%(levelname)s- *%(threadName)s* %%%(funcName)s%% %(message)s'
    logging.basicConfig(filename='MonWin.log', filemode = 'w', level=logging.DEBUG, format = fmt)
    logger = logging.getLogger(__name__)
    now1 = time.time()

    monwin = MonWin(doCmdQueue)

    lt = threading.Thread(target=loopTest, name = 'LoopTest')
    dc = threading.Thread(target=doCommands, name = 'DoCommands', args = (monwin, doCmdQueue))
    monwin.doNothing(monwin, None)
    lt.start()
    dc.start()
    monwin.run()
    doCmdQueue.put(None)

    now2 = time.time()
    logger.info("That took {:5.3f} seconds".format(now2 - now1))
    dc.join()
    logging.shutdown()

This snippet took 0.03 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).