[REF] Moved to RST docstrings, leave out well known parameters

[FIX] Unauthorized login with empty password introduced in previous commit
[REF] Further refactored out duplicate code around authorization
[FIX] Filter length is too short (64)

lp bug: https://launchpad.net/bugs/806914 fixed

bzr revid: stefan@therp.nl-20110727220338-n1u19efzubngu0qt
This commit is contained in:
Stefan Rijnhart 2011-07-28 00:03:38 +02:00
parent 6437267e23
commit 6adef2a895
1 changed files with 89 additions and 80 deletions

View File

@ -33,11 +33,13 @@ class CompanyLDAP(osv.osv):
def get_ldap_dicts(self, cr, ids=None):
"""
Return res_company_ldap resources in the database as a list of
dictionaries.
Retrieve res_company_ldap resources from the database in dictionary
format.
@param cr : database cursor
@param ids : optional list of res_company_ldap ids
:param list ids: Valid ids of model res_company_ldap. If not \
specified, process all resources (unlike other ORM methods).
:return: ldap configurations
:rtype: list of dictionaries
"""
if ids:
@ -56,39 +58,73 @@ class CompanyLDAP(osv.osv):
def connect(self, conf):
"""
Compose an LDAP uri from an ldap configuration dictionary
and return a connection to it.
Connect to an LDAP server specified by an ldap
configuration dictionary.
@param conf : LDAP configuration dictionary
:param dict conf: LDAP configuration
:return: an LDAP object
"""
uri = 'ldap://%s:%d' % (conf['ldap_server'],
conf['ldap_server_port'])
return ldap.initialize(uri)
def authenticate(self, conf, dn, password):
def authenticate(self, conf, login, password):
"""
Perform an atomic LDAP authentication.
Authenticate a user against the specified LDAP server.
In order to prevent an unintended 'unauthenticated authentication',
which is an anonymous bind with a valid dn and a blank password,
check for empty passwords explicitely (:rfc:`4513#section-6.3.1`)
@param conf : LDAP configuration dictionary
@param dn : LDAP dn
@param password : Password for the LDAP user
:param dict conf: LDAP configuration
:param login: username
:param password: Password for the LDAP user
:return: LDAP entry of authenticated user or False
:rtype: dictionary of attributes
"""
conn = self.connect(conf)
conn.simple_bind_s(dn, password)
conn.unbind()
if not password:
return False
entry = False
filter = filter_format(conf['ldap_filter'], (login,))
try:
results = self.query(conf, filter)
if results and len(results) == 1:
dn = results[0][0]
conn = self.connect(conf)
conn.simple_bind_s(dn, password)
conn.unbind()
entry = results[0]
except ldap.INVALID_CREDENTIALS:
return False
except ldap.LDAPError, e:
logger = logging.getLogger('orm.ldap')
logger.error('An LDAP exception occurred: %s', e)
return entry
def query(self, conf, filter, retrieve_attributes=None):
"""
Query an LDAP server with the filter argument and scope subtree.
Return the results in native python-ldap format, ie. a list of
tuples (dn, attrs).
@param conf : LDAP configuration dictionary
@param filter : valid LDAP filter
@param retrieve_attributes : optional list of LDAP attributes to be
retrieved. If not specified, return all attributes.
Allow for all authentication methods of the simple authentication
method:
- authenticated bind (non-empty binddn + valid password)
- anonymous bind (empty binddn + empty password)
- unauthenticated authentication (non-empty binddn + empty password)
.. seealso::
:rfc:`4513#section-5.1` - LDAP: Simple Authentication Method.
:param dict conf: LDAP configuration
:param filter: valid LDAP filter
:param list retrieve_attributes: LDAP attributes to be retrieved. \
If not specified, return all attributes.
:return: ldap entries
:rtype: list of tuples (dn, attrs)
"""
results = []
@ -100,21 +136,22 @@ class CompanyLDAP(osv.osv):
results = conn.search_st(conf['ldap_base'], ldap.SCOPE_SUBTREE,
filter, retrieve_attributes, timeout=60)
conn.unbind()
except ldap.INVALID_CREDENTIALS:
logger.error('LDAP bind failed.')
except ldap.LDAPError, e:
logger.warning('An LDAP exception occurred: %s' % e)
pass
logger.error('An LDAP exception occurred: %s', e)
return results
def map_ldap_attributes(self, cr, uid, login, conf, ldap_entry):
def map_ldap_attributes(self, cr, uid, conf, login, ldap_entry):
"""
Compose a list of field values for creating a new resource of model
res_users, based upon the retrieved ldap entry and the LDAP settings.
Compose values for a new resource of model res_users,
based upon the retrieved ldap entry and the LDAP settings.
@param cr : the database cursor
@param uid : the OpenERP user id
@param login : the new user's login
@param conf : LDAP configuration dictionary
@param ldap_entry : single LDAP result in (dn, attrs) format
:param dict conf: LDAP configuration
:param login: the new user's login
:param tuple ldap_entry: single LDAP result (dn, attrs)
:return: parameters for a new resource of model res_users
:rtype: dict
"""
values = { 'name': ldap_entry[1]['cn'][0],
@ -123,18 +160,17 @@ class CompanyLDAP(osv.osv):
}
return values
def get_or_create_user(self, cr, uid, login, conf, ldap_entry,
def get_or_create_user(self, cr, uid, conf, login, ldap_entry,
context=None):
"""
Return the id of an active res_users resource with the specified
Retrieve an active resource of model res_users with the specified
login. Create the user if it is not initially found.
@param cr : the database cursor
@param uid : the OpenERP user id
@param login : the user's login
@param conf : LDAP configuration dictionary
@param ldap_entry : single LDAP result in (dn, attrs) format
@param context : the OpenERP context
:param dict conf: LDAP configuration
:param login: the user's login
:param tuple ldap_entry: single LDAP result (dn, attrs)
:return: res_users id
:rtype: int
"""
user_id = False
@ -148,7 +184,7 @@ class CompanyLDAP(osv.osv):
logger = logging.getLogger('orm.ldap')
logger.debug("Creating new OpenERP user \"%s\" from LDAP" % login)
user_obj = self.pool.get('res.users')
values = self.map_ldap_attributes(cr, uid, login, conf, ldap_entry)
values = self.map_ldap_attributes(cr, uid, conf, login, ldap_entry)
if conf['user']:
user_id = user_obj.copy(cr, 1, conf['user'],
default={'active': True})
@ -169,7 +205,7 @@ class CompanyLDAP(osv.osv):
'ldap_password': fields.char('LDAP password', size=64,
help=("The password of the user account on the LDAP server that is "
"used to query the directory.")),
'ldap_filter': fields.char('LDAP filter', size=64, required=True),
'ldap_filter': fields.char('LDAP filter', size=256, required=True),
'ldap_base': fields.char('LDAP base', size=64, required=True),
'user': fields.many2one('res.users', 'Model User',
help="Model used for user creation"),
@ -204,27 +240,15 @@ class users(osv.osv):
cr = pooler.get_db(db).cursor()
ldap_obj = pooler.get_pool(db).get('res.company.ldap')
for conf in ldap_obj.get_ldap_dicts(cr):
filter = filter_format(conf['ldap_filter'], (login,))
results = ldap_obj.query(conf, filter)
if results and len(results) == 1:
entry = results[0]
dn = entry[0]
name = entry[1]['cn'][0]
try:
ldap_obj.authenticate(conf, dn, password)
user_id = ldap_obj.get_or_create_user(
cr, 1, login, conf, entry)
if user_id:
cr.execute('UPDATE res_users SET date=now() WHERE '
'login=%s', (tools.ustr(login),))
cr.commit()
entry = ldap_obj.authenticate(conf, login, password)
if entry:
user_id = ldap_obj.get_or_create_user(
cr, 1, conf, login, entry)
if user_id:
cr.execute('UPDATE res_users SET date=now() WHERE '
'login=%s', (tools.ustr(login),))
cr.commit()
break
except ldap.INVALID_CREDENTIALS:
pass
except ldap.LDAPError, e:
logger = logging.getLogger('orm.ldap')
logger.warning('An LDAP exception occurred: %s' % e)
pass
cr.close()
return user_id
@ -234,10 +258,6 @@ class users(osv.osv):
except security.ExceptionNoTb: # AccessDenied
pass
if not passwd:
# empty passwords disallowed for obvious security reasons
raise security.ExceptionNoTb('AccessDenied')
cr = pooler.get_db(db).cursor()
cr.execute('SELECT login FROM res_users WHERE id=%s AND active=TRUE',
(int(uid),))
@ -245,21 +265,10 @@ class users(osv.osv):
if res:
ldap_obj = pooler.get_pool(db).get('res.company.ldap')
for conf in ldap_obj.get_ldap_dicts(cr):
filter = filter_format(conf['ldap_filter'], (res[0],))
results = ldap_obj.query(conf, filter)
if results and len(results) == 1:
dn = results[0][0]
# some LDAP servers allow anonymous binding with blank passwords,
# but these have been rejected above, so we're safe to use bind()
try:
ldap_obj.authenticate(conf, dn, passwd)
self._uid_cache.setdefault(db, {})[uid] = passwd
cr.close()
return True
except ldap.LDAPError, e:
logger = logging.getLogger('orm.ldap')
logger.warning('An LDAP exception occurred: %s' % e)
pass
if ldap_obj.authenticate(conf, res[0], passwd):
self._uid_cache.setdefault(db, {})[uid] = passwd
cr.close()
return True
cr.close()
raise security.ExceptionNoTb('AccessDenied')