Bubble 2016.08.16

bubble.functions

Contents

Source code for bubble.functions

# -*- coding: utf-8 -*-
# Part of bubble. See LICENSE file for full copyright and licensing details.

'''rule functions, these can be attached to rules and run by the rule'''
import os
import sys
from functools import wraps
import six
import arrow

from . import Bubble

##########################################################################
FUN_TYPE = 'system'
# FUN_TYPE='USER'


[docs]class RuleFunction(Bubble): name = None fun = None fun_type = None stats = None def __init__(self, name=None, fun=None, fun_type=None, verbose=1): Bubble.__init__(self, name='Rule Function', verbose=verbose) self.name = name self.fun = fun self.fun_type = fun_type # self.say('RuleFunction present:%s %s'%(str(name),str(fun))) # self.set_verbose(0)
[docs] def run(self, *a): self.say('running function:' + str(self.name) + '(' + str(a) + ')', verbosity=100) try: res = self.fun(*a) # res = self.fun(**k) except Exception as e: self.cry(e) res = 'cannot run:' + self.name + ' with:' + str(a) return res
def __repr__(self): return '<RuleFunction 0x%d %s %s %s>' % (id(self), self.name, self.fun, self.fun_type)
[docs]class NoRuleFunction(Bubble): name = None fun = None fun_type = None stats = None def __init__(self, name=None): Bubble.__init__(self, name='NoRule Function') self.name = name self.fun = self.no_fun # self.say('NoRuleFunction present:%s %s'%(str(name),str(fun))) # self.set_verbose(0)
[docs] def run(self, *a): res = self.fun(*a) # k return res
[docs] def no_fun(self, *a): if len(a) == 1: res = str(a[0]) else: # TODO move joiner to config, rule?? joiner = ' ' # join args with space string_values = [str(item) for item in a] res = joiner.join(string_values) self.say('no function, return joined items as:' + res, verbosity=100) return res
[docs]class RuleFunctions(Bubble): _rule_functions = {} def __init__(self): Bubble.__init__(self, name='Rule Functions Manager')
[docs] def get_function(self, fun=None): """get function as RuleFunction or return a NoRuleFunction function""" sfun = str(fun) self.say('get_function:' + sfun, verbosity=100) if not fun: return NoRuleFunction() # dummy to execute via no_fun if sfun in self._rule_functions: return self._rule_functions[sfun] else: self.add_function(name=sfun, fun=self.rule_function_not_found(fun)) self.cry('fun(%s) not found, returning dummy' % (sfun), verbosity=10) if sfun in self._rule_functions: return self._rule_functions[sfun] else: self.rule_function_not_found(fun)
[docs] def add_custom_function(self, fun=None, name=None, fun_type="custom"): return self.add_function(fun=fun, name=name, fun_type=fun_type)
[docs] def add_function(self, fun=None, name=None, fun_type=FUN_TYPE): """actually replace function""" if not name: if six.PY2: name = fun.func_name else: name = fun.__name__ self.say('adding fun(%s)' % name, verbosity=50) self.say('adding fun_type:%s' % fun_type, verbosity=50) if self.function_exists(name): self.cry('overwriting :fun(%s)' % name, verbosity=10) self.say('added :' + name, verbosity=10) self._rule_functions[name] = RuleFunction(name, fun, fun_type) return True
[docs] def function_exists(self, fun): """ get function's existense """ res = fun in self._rule_functions self.say('function exists:' + str(fun) + ':' + str(res), verbosity=10) return res
[docs] def functions_count(self): """ get number of functions""" return len(self._rule_functions)
[docs] def rule_function_not_found(self, fun=None): """ any function that does not exist will be added as a dummy function that will gather inputs for easing into the possible future implementation """ sfun = str(fun) self.cry('rule_function_not_found:' + sfun) def not_found(*a, **k): return(sfun + ':rule_function_not_found', k.keys()) return not_found
[docs] def get_rule_functions(self): return self._rule_functions
def __len__(self): return len(self._rule_functions) def __iter__(self): self.say('acting as a list rule functions', verbosity=101) for item in self._rule_functions: self.say('fun item:', stuff=item, verbosity=101) yield item
########################################################################## # register ########################################################################## # rule functions list manager rule_functions = RuleFunctions()
[docs]def get_registered_rule_functions(): """ get the function registry """ return rule_functions
register_system_function = rule_functions.add_function register = rule_functions.add_custom_function ##########################################################################
[docs]def trace(fun, *a, **k): """ define a tracer for a rule function for log and statistic purposes """ @wraps(fun) def tracer(*a, **k): ret = fun(*a, **k) print('trace:fun: %s\n ret=%s\n a=%s\nk%s\n' % (str(fun), str(ret), str(a), str(k))) return ret return tracer
[docs]def timer(fun, *a, **k): """ define a timer for a rule function for log and statistic purposes """ @wraps(fun) def timer(*a, **k): start = arrow.now() ret = fun(*a, **k) end = arrow.now() print('timer:fun: %s\n start:%s,end:%s, took [%s]' % ( str(fun), str(start), str(end), str(end - start))) return ret return timer
# TODO make all importing absolute or relative "./" inside a bubble # and move loading to something like util.loader # "python import" -> "bubble loading"
[docs]def load_custom_functions(ctx, custom_rule_functions_py='./custom_rule_functions.py'): ctx.say('functions:load_custom_rule_functions:'+custom_rule_functions_py, verbosity=100) # this "feels" wrong, functions system should be loaded from . import functions_system ctx.say('imported functions_system:'+functions_system.__name__, verbosity=100) abs_path = ctx.home custom_functions_py = ctx.home + os.sep + custom_rule_functions_py ctx.gbc.say("trying to load:" + custom_functions_py, verbosity=100) if os.path.exists(custom_functions_py) and \ os.path.isfile(custom_functions_py): sys.path.insert(0, abs_path) import custom_rule_functions ctx.say('imported custom rule functions:'+custom_rule_functions.__name__, verbosity=100)

Contents