[IMP] res.users,web_services: new authenticate() API method for testing credentials

Provides a way to login for a given environment, passing
      user agent environment parameters.
      Currently used to initialize the web.base.url config
      parameter based on the web host whenever the admin user
      logs in.

bzr revid: odo@openerp.com-20111013104750-y0natm9v3wapn10l
This commit is contained in:
Olivier Dony 2011-10-13 12:47:50 +02:00
parent c18e1fc203
commit 97ba7be42d
2 changed files with 44 additions and 10 deletions

View File

@ -35,6 +35,7 @@ from osv import fields,osv
from osv.orm import browse_record
from service import security
from tools.translate import _
import openerp
import openerp.exceptions
class groups(osv.osv):
@ -417,6 +418,32 @@ class users(osv.osv):
data_id = dataobj._get_id(cr, 1, 'base', 'action_res_users_my')
return dataobj.browse(cr, uid, data_id, context=context).res_id
def authenticate(self, db, login, password, user_agent_env):
"""Verifies and returns the user ID corresponding to the given
``login`` and ``password`` combination, or False if there was
no matching user.
:param str db: the database on which user is trying to authenticate
:param str login: username
:param str password: user password
:param dict user_agent_env: environment dictionary describing any
relevant environment attributes
"""
uid = self.login(db, login, password)
if uid == openerp.SUPERUSER_ID:
# Successfully logged in as admin!
# Attempt to guess the web base url...
if user_agent_env and 'host' in user_agent_env:
cr = pooler.get_db(db).cursor()
try:
self.pool.get('ir.config_parameter').set_param(cr, uid, 'web.base.url',
user_agent_env['host'])
cr.commit()
except Exception:
logging.getLogger('res.users').exception("Failed to update web.base.url configuration parameter")
finally:
cr.close()
return uid
def login(self, db, login, password):
if not password:

View File

@ -366,20 +366,15 @@ class db(netsvc.ExportService):
return True
class common(netsvc.ExportService):
_logger = logging.getLogger('web-services')
def __init__(self,name="common"):
netsvc.ExportService.__init__(self,name)
def dispatch(self, method, params):
logger = netsvc.Logger()
if method == 'login':
res = security.login(params[0], params[1], params[2])
msg = res and 'successful login' or 'bad login or password'
# TODO log the client ip address..
logger.notifyChannel("web-service", netsvc.LOG_INFO, "%s from '%s' using database '%s'" % (msg, params[1], params[0].lower()))
return res or False
elif method in ['about', 'timezone_get', 'get_server_environment',
'login_message','get_stats', 'check_connectivity',
'list_http_services', 'version']:
if method in ['login', 'about', 'timezone_get', 'get_server_environment',
'login_message','get_stats', 'check_connectivity',
'list_http_services', 'version', 'authenticate']:
pass
elif method in ['get_available_updates', 'get_migration_scripts', 'set_loglevel', 'get_os_time', 'get_sqlcount']:
passwd = params[0]
@ -391,6 +386,18 @@ class common(netsvc.ExportService):
fn = getattr(self, 'exp_'+method)
return fn(*params)
def exp_login(self, db, login, password):
# TODO: legacy indirection through 'security', should use directly
# the res.users model
res = security.login(db, login, password)
msg = res and 'successful login' or 'bad login or password'
self._logger.info("%s from '%s' using database '%s'", msg, login, db.lower())
return res or False
def exp_authenticate(self, db, login, password, user_agent_env):
res_users = pooler.get_pool(db).get('res.users')
return res_users.authenticate(db, login, password, user_agent_env)
def exp_version(self):
return RPC_VERSION_1