195 lines
7.2 KiB
Python
195 lines
7.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import select
|
|
import threading
|
|
import time
|
|
import random
|
|
|
|
import simplejson
|
|
import openerp
|
|
from openerp.osv import osv, fields
|
|
from openerp.http import request
|
|
from openerp.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
TIMEOUT = 50
|
|
|
|
#----------------------------------------------------------
|
|
# Bus
|
|
#----------------------------------------------------------
|
|
def json_dump(v):
|
|
return simplejson.dumps(v, separators=(',', ':'))
|
|
|
|
def hashable(key):
|
|
if isinstance(key, list):
|
|
key = tuple(key)
|
|
return key
|
|
|
|
class ImBus(osv.Model):
|
|
_name = 'bus.bus'
|
|
_columns = {
|
|
'id' : fields.integer('Id'),
|
|
'create_date' : fields.datetime('Create date'),
|
|
'channel' : fields.char('Channel'),
|
|
'message' : fields.char('Message'),
|
|
}
|
|
|
|
def gc(self, cr, uid):
|
|
timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT*2)
|
|
domain = [('create_date', '<', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
|
|
ids = self.search(cr, openerp.SUPERUSER_ID, domain)
|
|
self.unlink(cr, openerp.SUPERUSER_ID, ids)
|
|
|
|
def sendmany(self, cr, uid, notifications):
|
|
channels = set()
|
|
for channel, message in notifications:
|
|
channels.add(channel)
|
|
values = {
|
|
"channel" : json_dump(channel),
|
|
"message" : json_dump(message)
|
|
}
|
|
self.pool['bus.bus'].create(cr, openerp.SUPERUSER_ID, values)
|
|
cr.commit()
|
|
if random.random() < 0.01:
|
|
self.gc(cr, uid)
|
|
if channels:
|
|
with openerp.sql_db.db_connect('postgres').cursor() as cr2:
|
|
cr2.execute("notify imbus, %s", (json_dump(list(channels)),))
|
|
|
|
def sendone(self, cr, uid, channel, message):
|
|
self.sendmany(cr, uid, [[channel, message]])
|
|
|
|
def poll(self, cr, uid, channels, last=0):
|
|
# first poll return the notification in the 'buffer'
|
|
if last == 0:
|
|
timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT)
|
|
domain = [('create_date', '>', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
|
|
else:
|
|
# else returns the unread notifications
|
|
domain = [('id','>',last)]
|
|
channels = [json_dump(c) for c in channels]
|
|
domain.append(('channel','in',channels))
|
|
notifications = self.search_read(cr, openerp.SUPERUSER_ID, domain)
|
|
return [{"id":notif["id"], "channel": simplejson.loads(notif["channel"]), "message":simplejson.loads(notif["message"])} for notif in notifications]
|
|
|
|
class ImDispatch(object):
|
|
def __init__(self):
|
|
self.channels = {}
|
|
|
|
def poll(self, dbname, channels, last, timeout=TIMEOUT):
|
|
# Dont hang ctrl-c for a poll request, we need to bypass private
|
|
# attribute access because we dont know before starting the thread that
|
|
# it will handle a longpolling request
|
|
if not openerp.evented:
|
|
current = threading.current_thread()
|
|
current._Thread__daemonic = True
|
|
# rename the thread to avoid tests waiting for a longpolling
|
|
current.setName("openerp.longpolling.request.%s" % current.ident)
|
|
|
|
registry = openerp.registry(dbname)
|
|
|
|
# immediatly returns if past notifications exist
|
|
with registry.cursor() as cr:
|
|
notifications = registry['bus.bus'].poll(cr, openerp.SUPERUSER_ID, channels, last)
|
|
# or wait for future ones
|
|
if not notifications:
|
|
event = self.Event()
|
|
for c in channels:
|
|
self.channels.setdefault(hashable(c), []).append(event)
|
|
try:
|
|
event.wait(timeout=timeout)
|
|
with registry.cursor() as cr:
|
|
notifications = registry['bus.bus'].poll(cr, openerp.SUPERUSER_ID, channels, last)
|
|
except Exception:
|
|
# timeout
|
|
pass
|
|
return notifications
|
|
|
|
def loop(self):
|
|
""" Dispatch postgres notifications to the relevant polling threads/greenlets """
|
|
_logger.info("Bus.loop listen imbus on db postgres")
|
|
with openerp.sql_db.db_connect('postgres').cursor() as cr:
|
|
conn = cr._cnx
|
|
cr.execute("listen imbus")
|
|
cr.commit();
|
|
while True:
|
|
if select.select([conn], [], [], TIMEOUT) == ([],[],[]):
|
|
pass
|
|
else:
|
|
conn.poll()
|
|
channels = []
|
|
while conn.notifies:
|
|
channels.extend(json.loads(conn.notifies.pop().payload))
|
|
# dispatch to local threads/greenlets
|
|
events = set()
|
|
for c in channels:
|
|
events.update(self.channels.pop(hashable(c),[]))
|
|
for e in events:
|
|
e.set()
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
self.loop()
|
|
except Exception, e:
|
|
_logger.exception("Bus.loop error, sleep and retry")
|
|
time.sleep(TIMEOUT)
|
|
|
|
def start(self):
|
|
if openerp.evented:
|
|
# gevent mode
|
|
import gevent
|
|
self.Event = gevent.event.Event
|
|
gevent.spawn(self.run)
|
|
elif openerp.multi_process:
|
|
# disabled in prefork mode
|
|
return
|
|
else:
|
|
# threaded mode
|
|
self.Event = threading.Event
|
|
t = threading.Thread(name="%s.Bus" % __name__, target=self.run)
|
|
t.daemon = True
|
|
t.start()
|
|
return self
|
|
|
|
dispatch = ImDispatch().start()
|
|
|
|
#----------------------------------------------------------
|
|
# Controller
|
|
#----------------------------------------------------------
|
|
class Controller(openerp.http.Controller):
|
|
""" Examples:
|
|
openerp.jsonRpc('/longpolling/poll','call',{"channels":["c1"],last:0}).then(function(r){console.log(r)});
|
|
openerp.jsonRpc('/longpolling/send','call',{"channel":"c1","message":"m1"});
|
|
openerp.jsonRpc('/longpolling/send','call',{"channel":"c2","message":"m2"});
|
|
"""
|
|
|
|
@openerp.http.route('/longpolling/send', type="json", auth="public")
|
|
def send(self, channel, message):
|
|
if not isinstance(channel, basestring):
|
|
raise Exception("bus.Bus only string channels are allowed.")
|
|
registry, cr, uid, context = request.registry, request.cr, request.session.uid, request.context
|
|
return registry['bus.bus'].sendone(cr, uid, channel, message)
|
|
|
|
# override to add channels
|
|
def _poll(self, dbname, channels, last, options):
|
|
request.cr.close()
|
|
request._cr = None
|
|
return dispatch.poll(dbname, channels, last)
|
|
|
|
@openerp.http.route('/longpolling/poll', type="json", auth="public")
|
|
def poll(self, channels, last, options=None):
|
|
if options is None:
|
|
options = {}
|
|
if not dispatch:
|
|
raise Exception("bus.Bus unavailable")
|
|
if [c for c in channels if not isinstance(c, basestring)]:
|
|
print channels
|
|
raise Exception("bus.Bus only string channels are allowed.")
|
|
return self._poll(request.db, channels, last, options)
|
|
|
|
# vim:et:
|