From 2cb66e21da405c14234d7cef388d4c61734674f6 Mon Sep 17 00:00:00 2001 From: niv-openerp Date: Tue, 29 Jan 2013 14:42:54 +0100 Subject: [PATCH] wip bzr revid: nicolas.vanhoren@openerp.com-20130129134254-22hnl55pjttf5znl --- addons/web_im/im.py | 68 +++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/addons/web_im/im.py b/addons/web_im/im.py index 1c83c701c74..0bbe41b891a 100644 --- a/addons/web_im/im.py +++ b/addons/web_im/im.py @@ -32,7 +32,7 @@ import select _logger = logging.getLogger(__name__) -def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60., error_delay=10.): +def listen_channel(cr, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60.): """ Begin a loop, listening on a PostgreSQL channel. This method does reserver its own cursor, so it should always be called when outside a transaction. This method does never terminate by default, you need to provide a check_stop @@ -49,40 +49,29 @@ def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: Fa :type check_stop: function (no arguments) :param check_stop_timer: The maximum amount of time between calls to check_stop_timer (can be shorter if messages are received). - :param error_delay: The amount of time to wait when a connection error occurs before querying a new connexion. """ - _logger.info("Begin watching on channel "+ channel_name +" for database " + db_name) - stopping = False - while not stopping: + try: + conn = cr._cnx + cr.execute("listen " + channel_name + ";") + cr.commit(); + stopping = False + while not stopping: + if check_stop(): + stopping = True + break + if select.select([conn], [], [], check_stop_timer) == ([],[],[]): + pass + else: + conn.poll() + while conn.notifies: + message = json.loads(conn.notifies.pop().payload) + handle_message(message) + finally: try: - registry = openerp.modules.registry.RegistryManager.get(db_name) - with registry.cursor() as c: - conn = c._cnx - try: - c.execute("listen " + channel_name + ";") - c.commit(); - while not stopping: - if check_stop(): - stopping = True - break - if select.select([conn], [], [], check_stop_timer) == ([],[],[]): - pass - else: - conn.poll() - while conn.notifies: - message = json.loads(conn.notifies.pop().payload) - handle_message(message) - finally: - try: - c.execute("unlisten " + channel_name + ";") - c.commit() - except: - pass # can't do anything if that fails + cr.execute("unlisten " + channel_name + ";") + cr.commit() except: - # if something crash, we wait some time then try again - _logger.exception("Exception during watcher activity") - time.sleep(error_delay) - _logger.info("End watching on channel "+ channel_name +" for database " + db_name) + pass # can't do anything if that fails def notify_channel(cr, channel_name, message): """ @@ -100,6 +89,7 @@ def notify_channel(cr, channel_name, message): POLL_TIMER = 30 DISCONNECTION_TIMER = POLL_TIMER + 5 +WATCHER_ERROR_DELAY = 10 if openerp.tools.config.options["gevent"]: import gevent @@ -124,7 +114,19 @@ if openerp.tools.config.options["gevent"]: gevent.spawn(self.loop) def loop(self): - listen_channel(self.db_name, "im_channel", self.handle_message, self.check_stop) + _logger.info("Begin watching on channel im_channel for database " + self.db_name) + stop = False + while not stop: + try: + registry = openerp.modules.registry.RegistryManager.get(self.db_name) + with registry.cursor() as cr: + listen_channel(cr, "im_channel", self.handle_message, self.check_stop) + stop = True + except: + # if something crash, we wait some time then try again + _logger.exception("Exception during watcher activity") + time.sleep(WATCHER_ERROR_DELAY) + _logger.info("End watching on channel im_channel for database " + self.db_name) del ImWatcher.watchers[self.db_name] def handle_message(self, message):