import json, time, os, getpass, socket, base64, traceback
import splunklib.client as client
import Queue as Q
from sw.const import *
[docs]class Report:
"""Report handles all the reporting events sent from the Pool, currently it exclusively sends these upstream to a
Splunk server which then does further handling. The events are sent from the pool via calls to this instance's functions
that in turn call :func:`send`.
Reporting can be disabled putting None (or a blank field) in the initial settings page. This has the
same effect as not including the #report="url" option within the options header of a script before conversion.
The reporting class also takes in the custom ``id=""``, ``run=""``, and ``project=""`` from kwargs / initial settings. It
uses these settings when communicating unstream with the reporting server.
:param pool: Reference to our owning pool. This is primarily to access pool.options and not used much elsewhere.
:return: Report (self)
.. seealso::
:ref:`reporting-terms`
"""
def __init__( self, pool ):
self.pool = pool
# This machine's identifier
self.ID = pool.options.get( 'id', None )
# The name for the current run, some sort of unique identifier
self.run = pool.options.get( 'run', None )
self.project = pool.options.get( 'project', None )
self.script = pool.options.get( 'script', None )
self.site = pool.options.get( 'report', None )
self.port = pool.options.get( 'report_port', 8089 )
self.user = pool.options.get( 'report_user', None )
self.password = pool.options.get( 'report_pass', None )
self.index = pool.options.get( 'report_index', None )
self.enabled = self.site is not None
if not self.enabled:
return
# Set these, these are initialized when we get a response from the server
self.rid = None
self.cid = None
# Set on an error transmitting
self.nextSend = 0
self.tries = 5
# Our queue of things to submit to our server
self.queue = Q.Queue( )
self.func = self.pool.func.__name__
pool.logMsg( ''.join( [ "Reporting to URL: ", self.site ] ) )
[docs] def send( self, payload, type ):
"""Send enables a payload to be parsed and added into our local queue for transmission
reporting server, where applicable. It is called as a sort of wrapper function from all
of the individual reporting functions (such as :func:`endchild`) to facilitate
standard transmission of the data upstream.
Currently (to be fixed) it encodes repeat information in every single payload including
all identifying characteristics about the run, pool, and our id. There's major issue for this
except for further stressing network bandwidth and the JSON parser.
:param payload: A hash of information to send upstream to our reporting server.
:param type: The R_* constant identifier for the type of payload included.
"""
if not self.enabled:
return
# Encode identifying information and the time
payload['id'] = self.id( )
payload['Project'] = self.project
payload['Script'] = self.script
payload['Run'] = self.run
payload['func'] = self.func
payload['time'] = time.time( )
payload['type'] = type
# Log payload
self.pool.logMsg( "Sending payload to queue: " + str( payload ), DEBUG )
self.queue.put( payload )
[docs] def think( self ):
"""The think function called by our pool periodically. It handles the transmission of
our payload (many individual reports) to the Splunk server.
If our report.queue has content in it, it prepares to send everything upstream. The payload is transmitted
in JSON with the following general format::
{
"cid" => #, // Our client ID assigned by the server, nil if we don't have one yet.
"rid" => #, // Our run ID assined by the server, ^
"type" => R_START, // For example, this is the type of payload here.
"time" => UNIX_EPOCH, // Since sometimes the Queue has a delay in transmission, each
// payload contains it own timestamp.
// Some miscellaneous identifying information goes in here that's deprecated
// and unused.
// If the type involves a child:
"ChildID" => #, // The index of our child in pool.children / pool.data
// If the type involves a job:
"timetaken" => UNIX_EPOCH //The time the job took to complete.
}
This module attempts to send the payload upstream 5 times, waiting 5 seconds between reattempts. On the fifth failure
it never tries again. It is noted in the log when it gives up.
:returns: None
"""
t = time.time( )
if not self.enabled:
return
if self.nextSend != 0 and t < self.nextSend:
return
if self.queue.qsize( ) == 0:
return
# We try to smash all of our data into a single array, which is then in
# a hash. This way we can trasmit everything in one fell swoop.
data = [ ]
while self.queue.qsize( ) > 0:
m = None
try:
m = self.queue.get( False )
except Q.Empty as e:
break
data.append( m )
if len( data ) > 0:
self.pool.logMsg( ' '.join( [ 'Sending', str( len( data ) ), 'payload(s) to server.' ] ), NOTICE )
# Prepare a single event to fire off
for i in range(len(data)):
r = None
d = data.pop( )
try:
r = self.sendSplunk( json.dumps( d ) )
except Exception as e:
self.pool.logMsg( "Fatal error with reporting, probably failed to connect: ", CRITICAL )
self.pool.logMsg( traceback.format_exc( ), CRITICAL )
if self.tries > 0:
self.tries -= 1
self.pool.logMsg( ''.join( [ "Disabling reporting after ", str( self.tries ), " more tries." ] ), CRITICAL )
self.nextSend = t + 5
else:
self.pool.logMsg( "Disabling reporting.", CRITICAL )
self.enabled = False
return
# Put our data back in the send queue
self.queue.put( d )
for m in data:
self.queue.put( m )
return
[docs] def sendSplunk( self, data ):
"""Sends data to a Splunk sever encoded in JSON.
:param data: JSON of data to send to the splunk server.
:returns: The parsed splunk event on success.
"""
splunk = client.connect( host=self.site,
port=self.port,
username=self.user,
password=self.password )
index = splunk.indexes[self.index]
return index.submit( data, sourcetype='py-event' )
[docs] def start( self ):
"""Sends a start notification payload.
:returns: None
"""
self.send( { }, R_START )
[docs] def stop( self ):
"""Sends a stop notification payload.
:returns: None
"""
self.send( { }, R_STOP )
[docs] def jobStart( self, child ):
"""Sends a job start notification payload.
:param child: The index of the child reporting in pool.children/pool.data.
:returns: None
"""
self.send( { "childID": child }, R_JOB_START )
[docs] def jobFinish( self, timetaken, child ):
"""Sends a job finish notification payload.
:param timetaken: The time it took for the job to finish, recorded
internally.
:param child: The index of the child reporting in pool.children/pool.data.
:returns: None
"""
data = { 'timetaken': timetaken, 'childID': child }
self.send( data, R_JOB_COMPLETE )
[docs] def jobFail( self, error, child, screenshot=None ):
"""Sends a job failed notification payload.
:param error: The error text that was included with the error.
:param child: The index of the child reporting in pool.children/pool.data.
:param None screenshot: A image from selenium's screenshot to be encoded in base64
for sending (optional).
:returns: None
"""
data = { 'error': error, 'childID': child }
if screenshot is not None and False:
self.pool.logMsg( "Screenshot: " + screenshot )
with open( screenshot, "rb") as img:
data['screenshot'] = base64.b64encode( img.read( ) )
self.send( data, R_JOB_FAIL )
[docs] def newChild( self, child ):
"""Sends a new child notification payload. This is called even when a child
is restarted.
:param child: The index of the child reporting in self.children/pool.data.
:returns: None
"""
self.send( { 'childID': child }, R_NEW_CHILD )
[docs] def endChild( self, child ):
"""Sends a child process killed notification payload.
:param child: The index of the child reporting in self.children/pool.data.
:returns: None
"""
self.send( { 'childID': child }, R_END_CHILD )
# Generators
[docs] def id( self ):
"""If self.id was not already set by the user, it generates a deterministic ID
based on the user running the script and the machine's name.
:returns: None
"""
# If we have a prespecified id, use that
if not self.ID:
# Generate one, user@machinename
user = getpass.getuser( )
machine = socket.gethostname( )
self.ID = ''.join( [ user, '@', machine ] )
return self.ID