niv-openerp 2013-01-29 14:42:54 +01:00
parent 4baed85c90
commit 2cb66e21da
1 changed files with 35 additions and 33 deletions

View File

@ -32,7 +32,7 @@ import select
_logger = logging.getLogger(__name__)
def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60., error_delay=10.):
def listen_channel(cr, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60.):
"""
Begin a loop, listening on a PostgreSQL channel. This method does reserver its own cursor, so it should always
be called when outside a transaction. This method does never terminate by default, you need to provide a check_stop
@ -49,40 +49,29 @@ def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: Fa
:type check_stop: function (no arguments)
:param check_stop_timer: The maximum amount of time between calls to check_stop_timer (can be shorter if messages
are received).
:param error_delay: The amount of time to wait when a connection error occurs before querying a new connexion.
"""
_logger.info("Begin watching on channel "+ channel_name +" for database " + db_name)
stopping = False
while not stopping:
try:
conn = cr._cnx
cr.execute("listen " + channel_name + ";")
cr.commit();
stopping = False
while not stopping:
if check_stop():
stopping = True
break
if select.select([conn], [], [], check_stop_timer) == ([],[],[]):
pass
else:
conn.poll()
while conn.notifies:
message = json.loads(conn.notifies.pop().payload)
handle_message(message)
finally:
try:
registry = openerp.modules.registry.RegistryManager.get(db_name)
with registry.cursor() as c:
conn = c._cnx
try:
c.execute("listen " + channel_name + ";")
c.commit();
while not stopping:
if check_stop():
stopping = True
break
if select.select([conn], [], [], check_stop_timer) == ([],[],[]):
pass
else:
conn.poll()
while conn.notifies:
message = json.loads(conn.notifies.pop().payload)
handle_message(message)
finally:
try:
c.execute("unlisten " + channel_name + ";")
c.commit()
except:
pass # can't do anything if that fails
cr.execute("unlisten " + channel_name + ";")
cr.commit()
except:
# if something crash, we wait some time then try again
_logger.exception("Exception during watcher activity")
time.sleep(error_delay)
_logger.info("End watching on channel "+ channel_name +" for database " + db_name)
pass # can't do anything if that fails
def notify_channel(cr, channel_name, message):
"""
@ -100,6 +89,7 @@ def notify_channel(cr, channel_name, message):
POLL_TIMER = 30
DISCONNECTION_TIMER = POLL_TIMER + 5
WATCHER_ERROR_DELAY = 10
if openerp.tools.config.options["gevent"]:
import gevent
@ -124,7 +114,19 @@ if openerp.tools.config.options["gevent"]:
gevent.spawn(self.loop)
def loop(self):
listen_channel(self.db_name, "im_channel", self.handle_message, self.check_stop)
_logger.info("Begin watching on channel im_channel for database " + self.db_name)
stop = False
while not stop:
try:
registry = openerp.modules.registry.RegistryManager.get(self.db_name)
with registry.cursor() as cr:
listen_channel(cr, "im_channel", self.handle_message, self.check_stop)
stop = True
except:
# if something crash, we wait some time then try again
_logger.exception("Exception during watcher activity")
time.sleep(WATCHER_ERROR_DELAY)
_logger.info("End watching on channel im_channel for database " + self.db_name)
del ImWatcher.watchers[self.db_name]
def handle_message(self, message):