"""
Basic framework module
This is the main module implementing the framework.
@author: skwok
"""
import datetime
import threading
import signal
import traceback
import time
import importlib
import sys
import os
from keckdrpframework.core import queues
# Server Task import
from keckdrpframework.core.server_task import DRPFServerHandler
from keckdrpframework.models.processing_context import ProcessingContext
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.action import Action
from keckdrpframework.models.event import Event
from keckdrpframework.models.data_set import DataSet
from keckdrpframework.utils.drpf_logger import getLogger
from keckdrpframework.config.framework_config import ConfigClass
[docs]class Framework(object):
"""
This class implements the core of the framework.
The processing is event driven.
An event can be defined as a change in set of items of interest, for example files or directories, or something in memory.
Events are appended to a queue. Events are associated with arguments, such time, name of files, or new values of some variables.
In a loop, an event is taken out from the queue and translated into an action.
An action is a call to a regular function, a method in the pipeline class or in a regular class.
If desired, a Data_set() can created to keep a list of files in memory.
Attributes
----------
config : ConfigClass
Instance of ConfigClass that uses the configuration file to create a set of configuration parameters
logger : log
pipeline : pipeline
pipeline can be a string, a module, a class or an object of subclass base_pipeline
The pipeline that will be used in the framework
context :
Instance of Processing_context, which passed along to all processing steps.
"""
def __init__(self, pipeline_name, configFile):
"""
pipeline_name: name of the pipeline class containing recipes
Creates the event_queue and the action queue
"""
if configFile is None:
self.config = ConfigClass()
elif isinstance(configFile, str):
self.config = ConfigClass(configFile)
else:
self.config = configFile
self.logger = getLogger(self.config.logger_config_file, name="DRPF")
self.logger.info("")
self.logger.info("Initialization Framework cwd={}".format(os.getcwd()))
self.wait_for_event = False
# The high priority event queue is local to the process
self.event_queue_hi = queues.SimpleEventQueue()
# The regular event queue can be local or shared via queue manager
self.queue_manager = None
self.event_queue = self._get_event_queue()
# The done_queue
self.done_queue = None
self.context = ProcessingContext(self.event_queue, self.event_queue_hi, self.logger, self.config)
pipeline = find_pipeline(pipeline_name, self.config.pipeline_path, self.context, self.logger)
if pipeline is None:
raise Exception(f"Failed to initialize pipeline {pipeline_name}")
self.pipeline = pipeline
self.keep_going = True
self.init_signal()
self.store_arguments = Arguments()
def _get_queue_manager(self, cfg):
"""
Tries to get an event queue.
If successful then returns the manager and the queue,
otherwise starts the new queue manager.
"""
hostname = cfg.queue_manager_hostname
portnr = cfg.queue_manager_portnr
auth_code = cfg.queue_manager_auth_code
self.logger.debug(f"Getting shared event queue from {hostname}:{portnr}")
queue = queues.get_event_queue(hostname, portnr, auth_code)
if queue is None:
self.logger.debug("Starting Queue Manager")
self.queue_manager = queues.start_queue_manager(hostname, portnr, auth_code, self.logger)
queue = queues.get_event_queue(hostname, portnr, auth_code)
if queue is not None:
self.logger.debug("Got event queue from Queue Manager")
return queue
else:
return queue
def _get_event_queue(self):
"""
If multiprocessing is desired then returns the shared queue,
otherwise returns the Simple_event_queue, which will only work within a single process.
"""
cfg = self.config
want_multi = cfg.getValue("want_multiprocessing", False)
if want_multi:
return self._get_queue_manager(cfg)
return queues.SimpleEventQueue()
[docs] def get_event(self):
"""
Retrieves and returns an event from the queues.
First it checks the high priority queue, if fails then checks the regular event queue.
If there are no more events, then it returns the no_event_event, which is defined in the configuration.
"""
try:
try:
return self.event_queue_hi.get(block=False)
except:
ev = self.event_queue.get(block=True, timeout=self.config.event_timeout)
self.wait_for_event = False
return ev
except Exception as e:
if self.wait_for_event:
ev = Event("no_event", None)
else:
ev = self.config.no_event_event
if ev is None:
return None
ev.args = Arguments(name=ev.name, time=datetime.datetime.ctime(datetime.datetime.now()))
return ev
def _push_event(self, event_name, args):
"""
Pushes high priority events
Normal events go to the lower priority queue
This method is only used in execute.
"""
self.logger.debug(f"Push event {event_name}, {args.name}")
self.event_queue_hi.put(Event(event_name, args))
[docs] def append_event(self, event_name, args, recurrent=False):
"""
Appends low priority event to the end of the queue
"""
if args is None:
args = self.store_arguments
self.event_queue.put(Event(event_name, args, recurrent))
[docs] def event_to_action(self, event, context):
"""
Returns an Action()
Passes event.args to action.args.
Note that event.args comes from previous action.output.
This method is called in the action loop.
The actual event_to_action method is defined in the pipeline and it depends on the incoming event and context.state.
"""
event_info = self.pipeline.event_to_action(event, context)
self.logger.debug(f"Event to action {event_info}")
return Action(event, event_info, args=event.args)
[docs] def execute(self, action, context):
"""
Executes one action
The input for the action is in action.args.
The action returns action_output and it is passed to the next event if action is successful.
"""
pipeline = self.pipeline
action_name = action.name
try:
# Pre condition
if pipeline.get_pre_action(action_name)(action, context):
if self.config.print_trace:
self.logger.debug("Executing action " + action.name)
# Run action
action_output = pipeline.get_action(action_name)(action, context)
action.output = action_output
if action_output is not None:
self.store_arguments = action_output
else:
self.store_arguments = action.args
# Post condition
if pipeline.get_post_action(action_name)(action, context):
if not action.new_event is None:
# Post new event
new_args = self.store_arguments if action_output is None else action_output
self._push_event(action.new_event, new_args)
if not action.next_state is None:
# New state
context.state = action.next_state
if self.config.print_trace:
self.logger.debug("Action " + action.name + " done")
return
else:
# Post-condition failed
context.state = "stop"
else:
# Failed pre-condition
if self.config.pre_condition_failed_stop:
context.state = "stop"
else:
self.store_arguments = action.args
except:
self.logger.error("Exception while invoking {}".format(action_name))
context.state = "stop"
if self.config.print_trace:
traceback.print_exc()
def _action_completed(self, successful, action):
event = action.event
id = event.id
try:
argname = event.args.name
except:
argname = "Undef"
if successful:
self.logger.info(
f"Event completed: name {event.name}, action {action.name}, arg name {argname}, recurr {event._recurrent}"
)
else:
self.logger.info(
f"Event failed: name {event.name}, action {action.name}, arg name {argname}, recurr {event._recurrent}"
)
try:
if self.event_queue.get_in_progress().get(id):
self.event_queue.discard(id)
if event._recurrent:
self.append_event(event.name, event.args, event._recurrent)
elif self.event_queue_hi.get_in_progress().get(id):
self.event_queue_hi.discard(id)
except Exception as e:
self.logger.error(f"Exception occured while in _action_completed, {e}")
[docs] def main_loop(self):
"""
This is the main action loop.
This method can be called directly to run in the main thread.
To run in a thread, use start_action_loop().
"""
success = False
self.keep_going = True
while self.keep_going:
try:
action = ""
success = False
event = self.get_event()
if event is None:
self.logger.debug("No new events - do nothing")
if self.event_queue.qsize() == 0 and self.event_queue_hi.qsize() == 0:
self.logger.debug(f"No pending events or actions, terminating")
self.keep_going = False
if self.queue_manager is not None:
try:
self.event_queue.terminate()
except:
pass
continue
action = self.event_to_action(event, self.context)
self.execute(action, self.context)
success = action.output is not None
self.context.state = self.on_state(self.context.state)
if self.context.state == "stop":
break
except BrokenPipeError as bpe:
self.logger.error(f"Failed to get retrieve events. Queue may be closed.")
break
except Exception as e:
self.logger.error(f"Exception while processing action {action}, {e}")
if self.config.print_trace:
traceback.print_exc()
self._action_completed(success, action)
self.keep_going = False
self.logger.info("Exiting main loop")
def _main_loop_helper(self):
self.main_loop()
self.on_exit(0)
[docs] def start_action_loop(self):
"""
This is a thread running the action loop.
"""
thr = threading.Thread(name="action_loop", target=self._main_loop_helper)
thr.setDaemon(True)
thr.start()
[docs] def init_signal(self):
"""
Captures keyboard interrupt
"""
def handler(signum, *args):
self.logger.error(f"Signal {signum} received")
self.keep_going = False
try:
signal.signal(signal.CTRL_BREAK_EVENT, handler)
except:
pass
signal.signal(signal.SIGINT, handler)
def _start(self):
"""
Starts the event loop and the action loop
"""
self.start_action_loop()
[docs] def end(self):
"""
Releases the event_queue.
Needed when a client ingest_data and then quits.
"""
try:
self.event_queue.close()
except:
pass
[docs] def wait_for_ever(self):
"""
Because the action loops runs in a thread, this methods waits until keep_going is false.
"""
while self.keep_going:
time.sleep(1)
def get_pending_events(self):
return self.event_queue.get_pending(), self.event_queue_hi.get_pending()
#
# Methods to handle data set
#
[docs] def ingest_data(self, path=None, files=None, monitor=False):
"""
Adds files to the data_set.
The data_set resides in the framework context.
"""
ds = self.context.data_set
if ds is None:
# Data_set will scan and import the content of the directory
ds = DataSet(path, self.logger, self.config, self.context.event_queue)
if files is not None:
for f in files:
ds.append_item(f)
# for ditem in ds.data_table.index:
# self.logger.info("File ingestion: pushing next file event to the queue")
# self.event_queue.put(Event("next_file", Arguments(name=ditem)))
self.context.data_set = ds
if monitor:
self.context.data_set.start_monitor()
def start(self, qm_only=False, ingest_data_only=False, wait_for_event=False, continuous=False):
if qm_only:
self.logger.info("Queue manager only mode, no processing")
self.wait_for_ever()
else:
if ingest_data_only:
# Release the queue
self.end()
else:
if continuous:
self.config.no_event_event = Event("no_event", None)
self.wait_for_event = wait_for_event
self._start()
self.logger.info("Framework main loop started")
self.wait_for_ever()
[docs] def on_exit(self, status=0):
"""
Hook fo exit
Subclasses can override to continue in the main_loop or call exit(status)
"""
os._exit(status)
[docs] def on_state(self, state):
"""
Hook to change context state.
Default is to ignore state = 'stop'.
To terminate, override this method to return 'stop'.
"""
if state == "stop":
state = "ready"
return state
def find_pipeline(pipeline_name, pipeline_path, context, logger):
def to_camel_case(instr):
out = []
flag = True
for c in instr:
if flag:
out.append(c.upper())
flag = False
continue
if c == "_":
flag = True
continue
out.append(c)
return "".join(out)
"""
Finds the class called pipeline_name and instantiates an object of that class.
"""
if not isinstance(pipeline_name, str): # not string
if isinstance(pipeline_name, type): # is a class, ie. a.b
return pipeline_name(context)
elif isinstance(pipeline_name, type(sys)): # is a module, ie. a.a_b_c
last_name = pipeline_name.__name__.split(".")[-1]
klass = getattr(pipeline_name, to_camel_case(last_name)) # try a.a_b_c.ABC
if klass is not None:
return klass(context)
elif isinstance(pipeline_name, object): # object, already an instance of a class
return pipeline_name
logger.error(f"{pipeline_name} must be a module, a class, an object or a string")
return None
#
# pileline_name is a tring
#
parts = pipeline_name.split(".")
module_name = ".".join(parts[:-1])
last_name = parts[-1]
klass = None
try:
if len(parts) > 1:
# Converts a.a_b_c.a_b_c to a.a_b_c.ABC
# and instantiates an object of the class ABC
module = importlib.import_module(module_name)
klass = getattr(module, to_camel_case(last_name))
if klass is not None:
return klass(context)
except Exception as me:
logger.debug(f"Failed loading as string {pipeline_name}")
logger.debug(f"Trying {pipeline_path} next")
if pipeline_path is None:
pipeline_path = ("",)
for p in pipeline_path:
try:
full_name = pipeline_name
if p:
full_name = f"{p}.{pipeline_name}"
module = importlib.import_module(full_name)
class_name = to_camel_case(last_name)
if hasattr(module, class_name):
klass = getattr(module, class_name)
if klass is not None:
logger.debug(f"Found {class_name} in {full_name}")
break
logger.debug(f"Class {class_name} not in {full_name}")
except ModuleNotFoundError as me:
logger.debug(f"Failed loading pipeline {full_name}, {me}")
continue
except Exception as e:
logger.error(f"Exception {e}")
break
if klass is not None:
return klass(context)
logger.error(f"Could not find pipeline {pipeline_name} in {pipeline_path}")
return None
[docs]def create_context(event_queue=None, event_queue_hi=None, logger=None, config=None):
"""
Convenient function to create a context for working withtout the framework.
Useful in Jupyter notebooks for example.
"""
if config is None:
config = ConfigClass()
if logger is None:
logger = getLogger(config.logger_config_file, name="DRPF")
if event_queue_hi is None:
event_queue_hi = queues.SimpleEventQueue()
if event_queue is None:
event_queue = queues.SimpleEventQueue()
return ProcessingContext(event_queue, event_queue_hi, logger, config)