niv-openerp 2013-01-29 11:27:56 +01:00
parent 66c7f18810
commit 4baed85c90
1 changed files with 58 additions and 31 deletions

View File

@ -33,40 +33,67 @@ import select
_logger = logging.getLogger(__name__)
def listen_channel(db_name, channel_name, handle_message, check_stop=(lambda: False), check_stop_timer=60., error_delay=10.):
_logger.info("Begin watching on channel "+ channel_name +" for database " + db_name)
stopping = False
while not stopping:
try:
registry = openerp.modules.registry.RegistryManager.get(db_name)
with registry.cursor() as c:
conn = c._cnx
"""
Begin a loop, listening on a PostgreSQL channel. This method does reserver its own cursor, so it should always
be called when outside a transaction. This method does never terminate by default, you need to provide a check_stop
callback to do so. This method also assume that all notifications will include a message formated using JSON (see the
corresponding notify_channel() method).
:param db_name: database name
:param channel_name: the name of the PostgreSQL channel to listen
:param handle_message: function that will be called when a message is received. It takes one argument, the message
attached to the notification.
:type handle_message: function (one argument)
:param check_stop: function that will be called periodically (see the check_stop_timer argument). If it returns True
this function will stop to watch the channel.
:type check_stop: function (no arguments)
:param check_stop_timer: The maximum amount of time between calls to check_stop_timer (can be shorter if messages
are received).
:param error_delay: The amount of time to wait when a connection error occurs before querying a new connexion.
"""
_logger.info("Begin watching on channel "+ channel_name +" for database " + db_name)
stopping = False
while not stopping:
try:
registry = openerp.modules.registry.RegistryManager.get(db_name)
with registry.cursor() as c:
conn = c._cnx
try:
c.execute("listen " + channel_name + ";")
c.commit();
while not stopping:
if check_stop():
stopping = True
break
if select.select([conn], [], [], check_stop_timer) == ([],[],[]):
pass
else:
conn.poll()
while conn.notifies:
message = json.loads(conn.notifies.pop().payload)
handle_message(message)
finally:
try:
c.execute("listen " + channel_name + ";")
c.commit();
while not stopping:
if check_stop():
stopping = True
break
if select.select([conn], [], [], check_stop_timer) == ([],[],[]):
pass
else:
conn.poll()
while conn.notifies:
message = json.loads(conn.notifies.pop().payload)
handle_message(message)
finally:
try:
c.execute("unlisten " + channel_name + ";")
c.commit()
except:
pass # can't do anything if that fails
except:
# if something crash, we wait some time then try again
_logger.exception("Exception during watcher activity")
time.sleep(error_delay)
_logger.info("End watching on channel "+ channel_name +" for database " + db_name)
c.execute("unlisten " + channel_name + ";")
c.commit()
except:
pass # can't do anything if that fails
except:
# if something crash, we wait some time then try again
_logger.exception("Exception during watcher activity")
time.sleep(error_delay)
_logger.info("End watching on channel "+ channel_name +" for database " + db_name)
def notify_channel(cr, channel_name, message):
"""
Send a message through a PostgreSQL channel. The message will be formatted using JSON. This method will
commit the given transaction because the notify command in Postgresql seems to work correctly when executed in
a separate transaction (despite what is written in the documentation).
:param cr: The cursor.
:param channel_name: The name of the PostgreSQL channel.
:param message: The message, must be JSON-compatible data.
"""
cr.commit()
cr.execute("notify " + channel_name + ", %s", [json.dumps(message)])
cr.commit()