Source code for sw.ui

import time, curses, datetime
from math import *
from sw.formatting import *

[docs]class Ui: """The UI class houses the curses instance which in turn enables all user-requested actions while displaying relevant information. Only one instance of UI should ever be initialized. Initialization handles all the color constants generation along with defining all subwindows statically. :param screen: Used with the wrapping function provided by curses. This is a curses screen that has just been created. :param pool: Reference to our owning pool. This pool will be monitored and displayed on the main status window. :return: UI (self) """ def __init__( self, screen, pool ): # The curses screen self.scr = screen self.scr.clear( ) # The pool we're a UI for. self.pool = pool # Options for buttons to press self.options = [ "c+- - Children", "j+- - Jobs", "p - Pause", "q - Quit" ] # Toggled options self.altopts = [ "c+- - Children", "j+- - Jobs", "p - Unpause", "s - Start" ] # Options bits, true if alternate self.optbits = [ False for i in range(len(self.options)) ] self.bits = [ "c", "j", "p", "q" ] # Key buffer self.keys = [ ] self.last = [ None for i in range(4) ] # Next time we update the screen self.nextUpdate = time.time( ) self.screenUpdateTime = 0.1 # Dimensions and subwindow for statistics section self.STATS_HEIGHT = 7 self.STATS_WIDTH = self.x( )-2 self.stats = self.scr.subwin( self.STATS_HEIGHT-1, self.STATS_WIDTH, self.y( )-self.STATS_HEIGHT, 1 ) # Dimensions and subwindow for options section self.OPTIONS_HEIGHT = self.y( ) - self.STATS_HEIGHT self.OPTIONS_WIDTH = 20 self.opts = self.scr.subwin( self.OPTIONS_HEIGHT, self.OPTIONS_WIDTH, 1, self.x( ) - self.OPTIONS_WIDTH ) # Main area self.MAIN_HEIGHT = self.y( ) - 2 - self.STATS_HEIGHT self.MAIN_WIDTH = self.x( ) - 2 - self.OPTIONS_WIDTH self.main = self.scr.subwin( self.MAIN_HEIGHT, self.MAIN_WIDTH, 1, 1 ) # Colors curses.init_pair( DISP_LOAD, curses.COLOR_BLACK, curses.COLOR_YELLOW ) curses.init_pair( DISP_START, curses.COLOR_BLACK, curses.COLOR_YELLOW ) curses.init_pair( DISP_GOOD, curses.COLOR_WHITE, curses.COLOR_BLACK ) curses.init_pair( DISP_ERROR, curses.COLOR_BLACK, curses.COLOR_RED ) curses.init_pair( DISP_WAIT, curses.COLOR_WHITE, curses.COLOR_BLUE ) curses.init_pair( DISP_DONE, curses.COLOR_BLACK, curses.COLOR_WHITE ) curses.init_pair( DISP_FINISH, curses.COLOR_BLACK, curses.COLOR_GREEN ) self.scr.nodelay( True ) # Don't wait on key presses curses.curs_set( 0 ) # Invisible Cursor self.scr.border( ) # Draws a pretty border around the window self.scr.refresh( )
[docs] def drawMainScreen( self, first=False ): """Renders the initial screen and sets up options for curses. Draws the border around the screen and the separators for the various sections with the title. :param False first: Whether this is the call by the UI initialization. :returns: None """ if first: self.scr.addstr( 0, 3, "Selenium Wrapper Console" ) # Puts a message up top self.scr.vline( 1, self.x( )-self.OPTIONS_WIDTH, 0, self.y( )-self.STATS_HEIGHT ) # Line for stats window self.scr.hline( self.y( )-self.STATS_HEIGHT-1, 1, 0, self.STATS_WIDTH ) # Draw spaces for l in range(len(self.options)*2): self.opts.addstr( l, 2, ( self.OPTIONS_WIDTH-3 )*" " ) i = 0 for l in self.options: if not self.optbits[i/2]: self.opts.addstr( i, 2, l ) else: self.opts.addstr( i, 2, self.altopts[i/2] ) i += 2 self.scr.refresh( )
[docs] def think( self ): """Calls several other functions which need to be checked constantly. Includes :func:`updateStats`, :func:`updateKeys`, and :func:`updateMain`. The last one is called every time think is, while the other two are called every self.nextUpdate seconds. :returns: None """ self.updateMain( ) if time.time( ) >= self.nextUpdate: self.nextUpdate = time.time( ) + self.screenUpdateTime self.updateStats( ) self.updateKeys( )
[docs] def sleep( self, amount ): """Handles sleeping while listening for button presses. The hardcoded subsleep (0.01s) amount is an appropriate resolution that allows for seemingly instant button press responses without consuming an entire core (when constantly listening). :param amount: Float for amount of seconds to wait while listening to button presses. This is accurately followed to a resolution of 0.01s. :returns: None """ end = amount + time.time( ) while time.time( ) < end: time.sleep( 0.01 ) key = self.scr.getch( ) # Catch keys which do nothing if key == -1 and len( self.keys ) > 0: clear = [ "p", "q", "s", "+", "-" ] for c in clear: if c in self.keys: del self.keys[:] break continue elif key == curses.KEY_ENTER: del self.keys[:] continue elif key == curses.KEY_BACKSPACE: del self.keys[-1] continue # Flip between all our accepted keys if key == ord( "q" ) or key == ord( "s" ): self.keys = [ chr( key ) ] curses.flash( ) self.toggleKey( "q" ) if self.pool.status >= STOPPED: # Start self.pool.start( ) else: # Quit self.pool.stop( ) elif key == ord( "p" ) and not self.pool.status >= STOPPED: self.keys = [ chr( key ) ] curses.flash( ) self.toggleKey( chr( key ) ) if self.pool.status == PAUSED: # Unpause self.pool.start( ) else: # Pause self.pool.stop( PAUSED ) else: # This is either an unaccepted key, or a command key self.handleCommandKeys( key )
[docs] def handleCommandKeys( self, key ): """Part of our :func:`sleep` function which checks if any of our accepted keys are pressed. It handles the logic for interpreting presses while recording and discarding them. The render of keys in the bottom right of the screen is done by :func:`updateKeys`. :param key: The key to process / store. If it's +/-, the all recorded keys that are relevant are executed. :returns: None """ # If we have more than 8 keys and this key isn't +/-, clear everything and return if len( self.keys ) >= 8 and key != ord( "+" ) and key != ord( "-" ): self.keys = [ ] return if key == ord( "j" ) or key == ord( "c" ): # Check we have no other commands queued, else clear. if "j" in self.keys or "c" in self.keys: self.keys = [ ] self.keys.append( chr( key ) ) elif key == ord( "+" ) or key == ord( "-" ): self.keys.append( chr( key ) ) # This key acts as an executor if not "j" in self.keys and not "c" in self.keys: self.keys = [ ] else: # Look for numbers for c in self.keys: if c.isdigit( ): break else: self.keys.append( "1" ) #assume 1 # Execute command ##Get the number we're increasing num = [ ] for c in self.keys: if c.isdigit( ): num.append( c ) num = int( ''.join( num ) ) ##Now do operations specific to each command if "j" in self.keys: if "+" in self.keys: for i in range(num): self.pool.workQueue.put( self.pool.func ) if "-" in self.keys: for i in range(num): if not self.pool.workQueue.empty( ): self.pool.workQueue.get( False, False ) elif "c" in self.keys: if "+" in self.keys: for i in range(num): self.pool.newChild( ) if "-" in self.keys: for i in range(num): self.pool.endChild( ) # Clear keys for next command self.keys = [ ] elif key >= ord( "0" ) and key <= ord( "9" ): self.keys.append( chr( key ) )
[docs] def updateMain( self ): """Prints out a number for each of our children with an appropriate color for each corresponding with the last status message they reported. Refreshes the screen when done. :return: None """ # Draw each child with an appropriate background color / anim y = 1 # Our cursor's y position x = 1 # ^ but x for c in self.pool.children: if c is None: # Children are None for a while, these are ignored continue s = ''.join( [ "#", str( c.num + 1 ) ] ) if type( self.pool.data[c.num][DISPLAY] ) == int: self.main.addstr( y, x, s, curses.color_pair( self.pool.data[c.num][DISPLAY] ) ) else: self.main.addstr( y, x ,s ) y += 2 # Scoot down two lines for each number if y > self.y( ) - self.STATS_HEIGHT - 4: x += 3 # Over three, an extra character to space y = 0 self.main.refresh( )
[docs] def updateStats( self ): """Updates the statistics field within our window. Completely clears all of it initally then slowly goes through and reads from each individual pool value to rebuild it. This is called from :func:`think` and refreshes several times per second. :return: None """ statstrs = [ ] jpstr = None t = time.time( ) this = [ len( self.pool.children ), self.pool.successful( ) + self.pool.failed( ), self.pool.workQueue.qsize( ) ] if self.last[0] != this[0] or self.last[1] != this[1] or self.last[2] != this[2]: # Store this for next time self.last = this # Clear our window self.stats.clear( ) # Number of Children statstrs.append( ''.join( [ "Children: ", str( len( self.pool.children ) ) ] ) ) # Number of Active Children numactive = 0 for c in self.pool.children: if c is not None and c.status( ) == RUNNING: numactive += 1 statstrs.append( ''.join( [ "Act: ", str( numactive ) ] ) ) # Number of Jobs Left statstrs.append( ''.join( [ "Jobs Left: ", str( self.pool.workQueue.qsize( ) ) ] ) ) # Number of Jobs Successful statstrs.append( ''.join( [ "Successful: ", str( self.pool.successful( ) ) ] ) ) # Number of Failed Jobs statstrs.append( ''.join( [ "Failed: ", str( self.pool.failed( ) ) ] ) ) # Average Job Time times = self.pool.timeTaken( ) avgtime = avg( times ) statstrs.append( ''.join( [ "Avg Job: ", format( avgtime ), "s" ] ) ) totaltime = sum( times ) # Jobs per minute if len( times ) > 0: jps_ideal = ( 1 / avgtime ) * numactive if jps_ideal < 0.25: jpstr = ''.join( [ "Ideal JPM: ", format( jps_ideal * 60 ) ] ) else: jpstr = ''.join( [ "Ideal JPS: ", format( jps_ideal ) ] ) if len( times ) > 5 and totaltime > 0: jps_true = self.pool.successful( ) / ( time.time( ) - self.pool.started ) if jps_true < 0.25: statstrs.append( ''.join( [ "True JPM: ", format( jps_true * 60 ) ] ) ) else: statstrs.append( ''.join( [ "True JPS: ", format( jps_true ) ] ) ) if jpstr is not None: statstrs.append( jpstr ) adj = 2 # Amount we are shifting right in characters k = 0 # Amount we are shifting vertically for st in statstrs: stl = len( st ) # String length if ( adj + stl ) >= self.STATS_WIDTH: k += 1 adj = 2 self.stats.addstr( k, adj, st ) adj += stl + 3 self.stats.refresh( )
[docs] def updateKeys( self ): """Handles rendering any relevant keys recorded from user input. It reads from self.keys to do this. :return: None """ k = 1 y = self.y( ) - self.STATS_HEIGHT-3 self.opts.addstr( y, k, " "*(self.OPTIONS_WIDTH-2) ) if len( self.keys ) > 0: for c in self.keys: self.opts.addstr( y, k, c ) k += 1 self.opts.refresh( )
[docs] def toggleKey( self, key ): """Toggles a key display on the right hand side. :param key: A key that's listed on the right hand pane to toggle to an alternate option. :return: None """ i = self.bits.index(key) self.optbits[i] = not self.optbits[i] self.drawMainScreen( )
[docs] def x( self ): """Accessor for returning the x size of the curses screen. :return: Integer of the number of characters along the x axis of the screen. """ return self.scr.getmaxyx( )[1]
[docs] def y( self ): """Accessor for returning the y size of the curses screen. :return: Integer of the number of characters along the y axis of the screen. """ return self.scr.getmaxyx( )[0]