wip
bzr revid: nicolas.vanhoren@openerp.com-20130129101052-jeuyybi6mjbiue6x
This commit is contained in:
parent
33da7330ed
commit
66c7f18810
|
@ -28,73 +28,57 @@ from osv import osv, fields
|
|||
import time
|
||||
import logging
|
||||
import json
|
||||
import select
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60., error_delay=10.):
|
||||
_logger.info("Begin watching on channel "+ channel_name +" for database " + db_name)
|
||||
stopping = False
|
||||
while not stopping:
|
||||
try:
|
||||
registry = openerp.modules.registry.RegistryManager.get(db_name)
|
||||
with registry.cursor() as c:
|
||||
conn = c._cnx
|
||||
try:
|
||||
c.execute("listen " + channel_name + ";")
|
||||
c.commit();
|
||||
while not stopping:
|
||||
if check_stop():
|
||||
stopping = True
|
||||
break
|
||||
if select.select([conn], [], [], check_stop_timer) == ([],[],[]):
|
||||
pass
|
||||
else:
|
||||
conn.poll()
|
||||
while conn.notifies:
|
||||
message = json.loads(conn.notifies.pop().payload)
|
||||
handle_message(message)
|
||||
finally:
|
||||
try:
|
||||
c.execute("unlisten " + channel_name + ";")
|
||||
c.commit()
|
||||
except:
|
||||
pass # can't do anything if that fails
|
||||
except:
|
||||
# if something crash, we wait some time then try again
|
||||
_logger.exception("Exception during watcher activity")
|
||||
time.sleep(error_delay)
|
||||
_logger.info("End watching on channel "+ channel_name +" for database " + db_name)
|
||||
|
||||
def notify_channel(cr, channel_name, message):
|
||||
cr.commit()
|
||||
cr.execute("notify " + channel_name + ", %s", [json.dumps(message)])
|
||||
cr.commit()
|
||||
|
||||
POLL_TIMER = 30
|
||||
DISCONNECTION_TIMER = POLL_TIMER + 5
|
||||
|
||||
if openerp.tools.config.options["gevent"]:
|
||||
import gevent
|
||||
import gevent.event
|
||||
import select
|
||||
|
||||
WATCHER_TIMER = 60
|
||||
WATCHER_ERROR_DELAY = 10
|
||||
|
||||
class Watcher(object):
|
||||
def __init__(self, channel_name, db_name):
|
||||
self.channel_name = channel_name
|
||||
self.db_name = db_name
|
||||
|
||||
def loop(self):
|
||||
_logger.info("Begin watching on channel "+ self.channel_name +" for database " + self.db_name)
|
||||
stopping = False
|
||||
while not stopping:
|
||||
try:
|
||||
registry = openerp.modules.registry.RegistryManager.get(self.db_name)
|
||||
with registry.cursor() as c:
|
||||
conn = c._cnx
|
||||
try:
|
||||
c.execute("listen " + self.channel_name + ";")
|
||||
c.commit();
|
||||
while not stopping:
|
||||
if self.check_stop():
|
||||
stopping = True
|
||||
break
|
||||
if select.select([conn], [], [], WATCHER_TIMER) == ([],[],[]):
|
||||
pass
|
||||
else:
|
||||
conn.poll()
|
||||
while conn.notifies:
|
||||
message = json.loads(conn.notifies.pop().payload)
|
||||
self.handle_message(message)
|
||||
finally:
|
||||
try:
|
||||
c.execute("unlisten " + self.channel_name + ";")
|
||||
c.commit()
|
||||
except:
|
||||
pass # can't do anything if that fails
|
||||
except:
|
||||
# if something crash, we wait some time then try again
|
||||
_logger.exception("Exception during watcher activity")
|
||||
time.sleep(WATCHER_ERROR_DELAY)
|
||||
del ImWatcher.watchers[self.db_name]
|
||||
_logger.info("End watching on channel "+ self.channel_name +" for database " + self.db_name)
|
||||
|
||||
def handle_message(self, message):
|
||||
pass
|
||||
|
||||
def check_stop(self):
|
||||
return False
|
||||
|
||||
def post_on_channel(cr, channel_name, message):
|
||||
cr.commit()
|
||||
cr.execute("notify " + channel_name + ", %s", [json.dumps(message)])
|
||||
cr.commit()
|
||||
|
||||
|
||||
class ImWatcher(Watcher):
|
||||
class ImWatcher(object):
|
||||
watchers = {}
|
||||
|
||||
@staticmethod
|
||||
|
@ -104,7 +88,7 @@ if openerp.tools.config.options["gevent"]:
|
|||
return ImWatcher.watchers[db_name]
|
||||
|
||||
def __init__(self, db_name):
|
||||
super(ImWatcher, self).__init__("im_channel", db_name)
|
||||
self.db_name = db_name
|
||||
ImWatcher.watchers[db_name] = self
|
||||
self.waiting = 0
|
||||
self.wait_id = 0
|
||||
|
@ -112,6 +96,10 @@ if openerp.tools.config.options["gevent"]:
|
|||
self.users_watch = {}
|
||||
gevent.spawn(self.loop)
|
||||
|
||||
def loop(self):
|
||||
listen_channel(self.db_name, "im_channel", self.handle_message, self.check_stop)
|
||||
del ImWatcher.watchers[self.db_name]
|
||||
|
||||
def handle_message(self, message):
|
||||
if message["type"] == "message":
|
||||
for waiter in self.users.get(message["receiver"], {}).values():
|
||||
|
@ -208,7 +196,7 @@ class im_message(osv.osv):
|
|||
|
||||
def post(self, cr, uid, message, to_user_id, context=None):
|
||||
self.create(cr, uid, {"message": message, 'from': uid, 'to': to_user_id}, context=context)
|
||||
post_on_channel(cr, "im_channel", {'type': 'message', 'receiver': to_user_id})
|
||||
notify_channel(cr, "im_channel", {'type': 'message', 'receiver': to_user_id})
|
||||
return False
|
||||
|
||||
class im_user(osv.osv):
|
||||
|
@ -256,7 +244,7 @@ class im_user(osv.osv):
|
|||
"im_last_status_update": datetime.datetime.now().strftime(DEFAULT_SERVER_DATETIME_FORMAT)}, context=context)
|
||||
cr.commit()
|
||||
if current_status != new_one:
|
||||
cr.execute("notify im_channel, %s", [json.dumps({'type': 'status', 'user': uid})])
|
||||
notify_channel(cr, "im_channel", {'type': 'status', 'user': uid})
|
||||
cr.commit()
|
||||
return True
|
||||
|
||||
|
|
Loading…
Reference in New Issue