diff --git a/bin/netsvc_socket.py b/bin/netsvc_socket.py new file mode 100644 index 00000000000..c4454381d74 --- /dev/null +++ b/bin/netsvc_socket.py @@ -0,0 +1,275 @@ +#!/usr/bin/python + +import time +import threading + +import SimpleXMLRPCServer,signal,sys,xmlrpclib +import SocketServer +import socket +import logging +import os + +try: + from ssl import * + HAS_SSL = True +except ImportError: + HAS_SSL = False + +_service={} +_group={} +_res_id=1 +_res={} + +class ServiceEndPointCall(object): + def __init__(self,id,method): + self._id=id + self._meth=method + def __call__(self,*args): + _res[self._id]=self._meth(*args) + return self._id + +class ServiceEndPoint(object): + def __init__(self, name, id): + self._id = id + self._meth={} + s=_service[name] + for m in s._method: + self._meth[m]=s._method[m] + def __getattr__(self, name): + return ServiceEndPointCall(self._id, self._meth[name]) + +class Service(object): + _serviceEndPointID = 0 + def __init__(self, name, audience=''): + _service[name]=self + self.__name=name + self._method={} + self.exportedMethods=None + self._response_process=None + self._response_process_id=None + self._response=None + + def joinGroup(self,name): + if not name in _group: + _group[name]={} + _group[name][self.__name]=self + + def exportMethod(self, m): + if callable(m): + self._method[m.__name__]=m + + def serviceEndPoint(self,s): + if Service._serviceEndPointID >= 2**16: + Service._serviceEndPointID = 0 + Service._serviceEndPointID += 1 + return ServiceEndPoint(s, self._serviceEndPointID) + + def conversationId(self): + return 1 + + def processResponse(self,s,id): + self._response_process, self._response_process_id = s, id + + def processFailure(self,s,id): + pass + + def resumeResponse(self,s): + pass + + def cancelResponse(self,s): + pass + + def suspendResponse(self,s): + if self._response_process: + self._response_process(self._response_process_id, + _res[self._response_process_id]) + self._response_process=None + self._response=s(self._response_process_id) + + def abortResponse(self,error, description, origin, details): + import tools + if not tools.config['debug_mode']: + raise Exception("%s -- %s\n\n%s"%(origin,description,details)) + else: + raise + + def currentFailure(self,s): + pass + +class LocalService(Service): + def __init__(self, name): + self.__name=name + s=_service[name] + self._service=s + for m in s._method: + setattr(self,m,s._method[m]) + +class ServiceUnavailable(Exception): + pass + +LOG_DEBUG='debug' +LOG_INFO='info' +LOG_WARNING='warn' +LOG_ERROR='error' +LOG_CRITICAL='critical' + +def init_logger(): + from tools import config + import os + + if config['logfile']: + logf = config['logfile'] + # test if the directories exist, else create them + try: + if not os.path.exists(os.path.dirname(logf)): + os.makedirs(os.path.dirname(logf)) + try: + fd = open(logf, 'a') + handler = logging.StreamHandler(fd) + except IOError: + sys.stderr.write("ERROR: couldn't open the logfile\n") + handler = logging.StreamHandler(sys.stdout) + except OSError: + sys.stderr.write("ERROR: couldn't create the logfile directory\n") + handler = logging.StreamHandler(sys.stdout) + else: + handler = logging.StreamHandler(sys.stdout) + + # create a format for log messages and dates + formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s', '%a, %d %b %Y %H:%M:%S') + + # tell the handler to use this format + handler.setFormatter(formatter) + + # add the handler to the root logger + logging.getLogger().addHandler(handler) + logging.getLogger().setLevel(logging.INFO) + + +class Logger(object): + def notifyChannel(self,name,level,msg): + log = logging.getLogger(name) + getattr(log,level)(msg) + +class Agent(object): + _timers = [] + _logger = Logger() + + def setAlarm(self, fn, dt, args=[], kwargs={}): + wait = dt - time.time() + if wait > 0: + self._logger.notifyChannel( + 'timers', LOG_DEBUG, + "Job scheduled in %s seconds for %s.%s" % (wait, + fn.im_class.__name__, + fn.func_name)) + timer = threading.Timer(wait, fn, args, kwargs) + timer.start() + self._timers.append(timer) + for timer in self._timers[:]: + if not timer.isAlive(): + self._timers.remove(timer) + + def quit(cls): + for timer in cls._timers: + timer.cancel() + quit=classmethod(quit) + +class RpcGateway(object): + def __init__(self, name): + self.name=name + +class Dispatcher(object): + def __init__(self): + pass + def monitor(self,signal): + pass + def run(self): + pass + +class xmlrpc(object): + class RpcGateway(object): + def __init__(self, name): + self.name=name + +class GenericXMLRPCRequestHandler: + def _dispatch(self, method, params): + import traceback + try: + n=self.path.split("/")[-1] +# print "TERP-CALLING:",n,method,params + s=LocalService(n) + m=getattr(s,method) + s._service._response=None + r=m(*params) + res=s._service._response + if res!=None: +# print "RESPONSE FOUND" + r=res +# print "TERP-RETURN :",r + return r + except Exception,e: + print "Exception in call:" + print '-'*60 + traceback.print_exc(file=sys.stdout) + print '-'*60 + s=str(e) + import tools + if tools.config['debug_mode']: + import pdb + tb = sys.exc_info()[2] + pdb.post_mortem(tb) + raise xmlrpclib.Fault(1,s) + +import tiny_socket + +class HttpDaemon(object): + def __init__(self, interface,port, secure=False): + self.__port=port + self.__interface=interface + self.server = socket.socket( socket.AF_INET, socket.SOCK_STREAM) + self.server.bind((socket.gethostname(), 8085)) + self.server.listen(5) + + def attach(self,path,gw): + pass + + def handler(self,signum, frame): + from tools import config + self.server.socket.close() + self.server.socket.close() + Agent.quit() + if config['pidfile']: + os.unlink(config['pidfile']) + del self.server + sys.exit(0) + + def start(self): + signal.signal(signal.SIGINT, self.handler) + signal.signal(signal.SIGTERM, self.handler) + class client_thread(threading.Thread): + def run(self, sock): + ts = tiny_socket.mysocket(sock) + msg = ts.myreceive() + + s=LocalService(msg[0]) + m=getattr(s,msg[1]) + s._service._response=None + r=m(*msg[2:]) + res=s._service._response + if res!=None: + r=res + result = r + + ts.mysend(result) + + return msg + + while 1: + #accept connections from outside + (clientsocket, address) = self.server.accept() + #now do something with the clientsocket + #in this case, we'll pretend this is a threaded server + ct = client_thread() + ct.run(clientsocket) + diff --git a/bin/netsvc_xmlrpc.py b/bin/netsvc_xmlrpc.py new file mode 100644 index 00000000000..643918b3d3a --- /dev/null +++ b/bin/netsvc_xmlrpc.py @@ -0,0 +1,292 @@ +#!/usr/bin/python + +import time +import threading + +import SimpleXMLRPCServer,signal,sys,xmlrpclib +import SocketServer +import socket +import logging +import os + +try: + from ssl import * + HAS_SSL = True +except ImportError: + HAS_SSL = False + +_service={} +_group={} +_res_id=1 +_res={} + +class ServiceEndPointCall(object): + def __init__(self,id,method): + self._id=id + self._meth=method + def __call__(self,*args): + _res[self._id]=self._meth(*args) + return self._id + +class ServiceEndPoint(object): + def __init__(self, name, id): + self._id = id + self._meth={} + print _service + s=_service[name] + for m in s._method: + self._meth[m]=s._method[m] + def __getattr__(self, name): + return ServiceEndPointCall(self._id, self._meth[name]) + +class Service(object): + _serviceEndPointID = 0 + def __init__(self, name, audience=''): + _service[name]=self + self.__name=name + self._method={} + self.exportedMethods=None + self._response_process=None + self._response_process_id=None + self._response=None + + def joinGroup(self,name): + if not name in _group: + _group[name]={} + _group[name][self.__name]=self + + def exportMethod(self, m): + if callable(m): + self._method[m.__name__]=m + + def serviceEndPoint(self,s): + if Service._serviceEndPointID >= 2**16: + Service._serviceEndPointID = 0 + Service._serviceEndPointID += 1 + return ServiceEndPoint(s, self._serviceEndPointID) + + def conversationId(self): + return 1 + + def processResponse(self,s,id): + self._response_process, self._response_process_id = s, id + + def processFailure(self,s,id): + pass + + def resumeResponse(self,s): + pass + + def cancelResponse(self,s): + pass + + def suspendResponse(self,s): + if self._response_process: + self._response_process(self._response_process_id, + _res[self._response_process_id]) + self._response_process=None + self._response=s(self._response_process_id) + + def abortResponse(self,error, description, origin, details): + import tools + if not tools.config['debug_mode']: + raise Exception("%s -- %s\n\n%s"%(origin,description,details)) + else: + raise + + def currentFailure(self,s): + pass + +class LocalService(Service): + def __init__(self, name): + self.__name=name + s=_service[name] + self._service=s + for m in s._method: + setattr(self,m,s._method[m]) + +class ServiceUnavailable(Exception): + pass + +LOG_DEBUG='debug' +LOG_INFO='info' +LOG_WARNING='warn' +LOG_ERROR='error' +LOG_CRITICAL='critical' + +def init_logger(): + from tools import config + import os + + if config['logfile']: + logf = config['logfile'] + # test if the directories exist, else create them + try: + if not os.path.exists(os.path.dirname(logf)): + os.makedirs(os.path.dirname(logf)) + try: + fd = open(logf, 'a') + handler = logging.StreamHandler(fd) + except IOError: + sys.stderr.write("ERROR: couldn't open the logfile\n") + handler = logging.StreamHandler(sys.stdout) + except OSError: + sys.stderr.write("ERROR: couldn't create the logfile directory\n") + handler = logging.StreamHandler(sys.stdout) + else: + handler = logging.StreamHandler(sys.stdout) + + # create a format for log messages and dates + formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s', '%a, %d %b %Y %H:%M:%S') + + # tell the handler to use this format + handler.setFormatter(formatter) + + # add the handler to the root logger + logging.getLogger().addHandler(handler) + logging.getLogger().setLevel(logging.INFO) + + +class Logger(object): + def notifyChannel(self,name,level,msg): + log = logging.getLogger(name) + getattr(log,level)(msg) + +class Agent(object): + _timers = [] + _logger = Logger() + + def setAlarm(self, fn, dt, args=[], kwargs={}): + wait = dt - time.time() + if wait > 0: + self._logger.notifyChannel( + 'timers', LOG_DEBUG, + "Job scheduled in %s seconds for %s.%s" % (wait, + fn.im_class.__name__, + fn.func_name)) + timer = threading.Timer(wait, fn, args, kwargs) + timer.start() + self._timers.append(timer) + for timer in self._timers[:]: + if not timer.isAlive(): + self._timers.remove(timer) + + def quit(cls): + for timer in cls._timers: + timer.cancel() + quit=classmethod(quit) + +class RpcGateway(object): + def __init__(self, name): + self.name=name + +class Dispatcher(object): + def __init__(self): + pass + def monitor(self,signal): + pass + def run(self): + pass + +class xmlrpc(object): + class RpcGateway(object): + def __init__(self, name): + self.name=name + +class GenericXMLRPCRequestHandler: + def _dispatch(self, method, params): + import traceback + try: + n=self.path.split("/")[-1] +# print "TERP-CALLING:",n,method,params + s=LocalService(n) + m=getattr(s,method) + s._service._response=None + r=m(*params) + res=s._service._response + if res!=None: +# print "RESPONSE FOUND" + r=res +# print "TERP-RETURN :",r + return r + except Exception,e: + print "Exception in call:" + print '-'*60 + traceback.print_exc(file=sys.stdout) + print '-'*60 + s=str(e) + import tools + if tools.config['debug_mode']: + import pdb + tb = sys.exc_info()[2] + pdb.post_mortem(tb) + raise xmlrpclib.Fault(1,s) + +class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): + pass + +if HAS_SSL: + class SecureXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SecureXMLRPCServer.SecureXMLRPCRequestHandler): + pass +else: + pass + +class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer): + def server_bind(self): + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self) + +if HAS_SSL: + class SecureThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SecureXMLRPCServer.SecureXMLRPCServer): + def server_bind(self): + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + SecureXMLRPCServer.SecureXMLRPCServer.server_bind(self) +else: + pass + +class HttpDaemon(object): + def __init__(self, interface,port, secure=False): + self.__port=port + self.__interface=interface + if secure and HAS_SSL: + self.server = SecureThreadedXMLRPCServer((interface, port), SecureXMLRPCRequestHandler,0) + else: + self.server = SimpleThreadedXMLRPCServer((interface, port), SimpleXMLRPCRequestHandler,0) + + def attach(self,path,gw): + pass + + def handler(self,signum, frame): + from tools import config + self.server.socket.close() + self.server.socket.close() + Agent.quit() + if config['pidfile']: + os.unlink(config['pidfile']) + del self.server + sys.exit(0) + + def start(self): + from tools import config + if config['pidfile']: + fd=open(config['pidfile'], 'w') + pidtext="%d" % (os.getpid()) + fd.write(pidtext) + fd.close() + signal.signal(signal.SIGINT, self.handler) + signal.signal(signal.SIGTERM, self.handler) + self.server.register_introspection_functions() + + self.server.serve_forever() + + # If the server need to be run recursively + # + #signal.signal(signal.SIGALRM, self.my_handler) + #signal.alarm(6) + #while True: + # self.server.handle_request() + #signal.alarm(0) # Disable the alarm + +# vim:noexpandtab: + + diff --git a/bin/tiny_socket.py b/bin/tiny_socket.py new file mode 100644 index 00000000000..3585ccae295 --- /dev/null +++ b/bin/tiny_socket.py @@ -0,0 +1,32 @@ +import socket +import cPickle +import marshal + +class mysocket: + def __init__(self, sock=None): + if sock is None: + self.sock = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) + else: + self.sock = sock + def connect(self, host, port): + self.sock.connect((host, port)) + def mysend(self, msg): + msg = cPickle.dumps(msg) + size = len(msg) + self.sock.send('%8d' % size) + totalsent = 0 + while totalsent < size: + sent = self.sock.send(msg[totalsent:]) + if sent == 0: + raise RuntimeError, "socket connection broken" + totalsent = totalsent + sent + def myreceive(self): + size = int(self.sock.recv(8)) + msg = '' + while len(msg) < size: + chunk = self.sock.recv(size-len(msg)) + if chunk == '': + raise RuntimeError, "socket connection broken" + msg = msg + chunk + return cPickle.loads(msg) diff --git a/bin/tiny_socket.pyc b/bin/tiny_socket.pyc new file mode 100644 index 00000000000..967b65016a4 Binary files /dev/null and b/bin/tiny_socket.pyc differ