Net-RPC: armor the thread, provide stats and inf. timeout.
This fixes near-persistent connections where the client will take long to respond. It is better to keep the connection open, than to launch more threads and use ports. bzr revid: p_christ@hol.gr-20091028004002-8y5iprfizzsktlkv
This commit is contained in:
parent
51a2cf93d0
commit
1956e79494
|
@ -35,27 +35,37 @@ class TinySocketClientThread(threading.Thread, netsvc.OpenERPDispatcher):
|
|||
def __init__(self, sock, threads):
|
||||
threading.Thread.__init__(self)
|
||||
self.sock = sock
|
||||
# Only at the server side, use a big timeout: close the
|
||||
# clients connection when they're idle for 20min.
|
||||
self.sock.settimeout(1200)
|
||||
self.threads = threads
|
||||
|
||||
def __del__(self):
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
try:
|
||||
if hasattr(socket, 'SHUT_RDWR'):
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
else:
|
||||
self.socket.shutdown(2)
|
||||
except: pass
|
||||
# That should garbage-collect and close it, too
|
||||
self.sock = None
|
||||
|
||||
def run(self):
|
||||
import select
|
||||
# import select
|
||||
self.running = True
|
||||
try:
|
||||
ts = tiny_socket.mysocket(self.sock)
|
||||
except:
|
||||
self.threads.remove(self)
|
||||
self.running = False
|
||||
return False
|
||||
while self.running:
|
||||
try:
|
||||
msg = ts.myreceive()
|
||||
except:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
self.threads.remove(self)
|
||||
self.running = False
|
||||
return False
|
||||
try:
|
||||
result = self.dispatch(msg[0], msg[1], msg[2:])
|
||||
|
@ -65,15 +75,15 @@ class TinySocketClientThread(threading.Thread, netsvc.OpenERPDispatcher):
|
|||
new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling
|
||||
ts.mysend(new_e, exception=True, traceback=e.traceback)
|
||||
except:
|
||||
pass
|
||||
self.running = False
|
||||
break
|
||||
except Exception, e:
|
||||
# this code should not be reachable, therefore we warn
|
||||
netsvc.Logger().notifyChannel("net-rpc", netsvc.LOG_WARNING, "exception: %" % str(e))
|
||||
break
|
||||
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
self.threads.remove(self)
|
||||
self.running = False
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
|
@ -82,7 +92,7 @@ class TinySocketClientThread(threading.Thread, netsvc.OpenERPDispatcher):
|
|||
|
||||
class TinySocketServerThread(threading.Thread,netsvc.Server):
|
||||
def __init__(self, interface, port, secure=False):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="Net-RPC socket")
|
||||
netsvc.Server.__init__(self)
|
||||
self.__port = port
|
||||
self.__interface = interface
|
||||
|
@ -101,6 +111,7 @@ class TinySocketServerThread(threading.Thread,netsvc.Server):
|
|||
while self.running:
|
||||
(clientsocket, address) = self.socket.accept()
|
||||
ct = TinySocketClientThread(clientsocket, self.threads)
|
||||
clientsocket = None
|
||||
self.threads.append(ct)
|
||||
ct.start()
|
||||
lt = len(self.threads)
|
||||
|
@ -129,6 +140,19 @@ class TinySocketServerThread(threading.Thread,netsvc.Server):
|
|||
except:
|
||||
return False
|
||||
|
||||
def stats(self):
|
||||
res = "Net-RPC: " + ( (self.running and "running") or "stopped")
|
||||
i = 0
|
||||
for t in self.threads:
|
||||
i += 1
|
||||
res += "\nNet-RPC #%d: %s " % (i, t.name)
|
||||
if t.isAlive():
|
||||
res += "running"
|
||||
else:
|
||||
res += "finished"
|
||||
if t.sock:
|
||||
res += ", socket"
|
||||
return res
|
||||
|
||||
netrpcd = None
|
||||
|
||||
|
|
|
@ -33,11 +33,13 @@ class Myexception(Exception):
|
|||
class mysocket:
|
||||
def __init__(self, sock=None):
|
||||
if sock is None:
|
||||
self.sock = socket.socket(
|
||||
socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
else:
|
||||
self.sock = sock
|
||||
self.sock.settimeout(120)
|
||||
# self.sock.settimeout(120)
|
||||
# prepare this socket for long operations: it may block for infinite
|
||||
# time, but should exit as soon as the net is down
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
def connect(self, host, port=False):
|
||||
if not port:
|
||||
protocol, buf = host.split('//')
|
||||
|
@ -61,8 +63,8 @@ class mysocket:
|
|||
buf=''
|
||||
while len(buf) < 8:
|
||||
chunk = self.sock.recv(8 - len(buf))
|
||||
if chunk == '':
|
||||
raise RuntimeError, "socket connection broken"
|
||||
if not chunk:
|
||||
raise socket.timeout
|
||||
buf += chunk
|
||||
size = int(buf)
|
||||
buf = self.sock.recv(1)
|
||||
|
@ -73,8 +75,8 @@ class mysocket:
|
|||
msg = ''
|
||||
while len(msg) < size:
|
||||
chunk = self.sock.recv(size-len(msg))
|
||||
if chunk == '':
|
||||
raise RuntimeError, "socket connection broken"
|
||||
if not chunk:
|
||||
raise socket.timeout
|
||||
msg = msg + chunk
|
||||
msgio = cStringIO.StringIO(msg)
|
||||
unpickler = cPickle.Unpickler(msgio)
|
||||
|
|
Loading…
Reference in New Issue