[FIX] res.partner, orm._check_recursion: prevent recursive partner structures + fix false positives in check_recursion()

- res.partner must prevent creating loops in partner
hierarchies, and this can be done easily with an
extra _constraint using the ORM's builtin _check_recursion

- _check_recursion's implementation incorrectly
assumed that the provided 'ids' were unrelated
(not part of a common hierarchy).

- add tests for _check_recursion via extra
tests on res.partner structure
(explains why both patches are in the same
commit)

bzr revid: odo@openerp.com-20130415171732-aj3j2e2mycvzf4kp
This commit is contained in:
Olivier Dony 2013-04-15 19:17:32 +02:00
parent ef062c086b
commit 8efd16f25a
3 changed files with 52 additions and 13 deletions

View File

@ -307,6 +307,10 @@ class res_partner(osv.osv, format_address):
'image': False,
}
_constraints = [
(osv.osv._check_recursion, 'You cannot create recursive Partner hierarchies.', ['parent_id']),
]
def copy(self, cr, uid, id, default=None, context=None):
if default is None:
default = {}

View File

@ -1,6 +1,7 @@
import unittest2
import openerp.tests.common as common
from openerp.osv.orm import except_orm
class test_base(common.TransactionCase):
@ -39,5 +40,41 @@ class test_base(common.TransactionCase):
self.assertTrue(new_id2 > new_id, 'find_or_create failed - should have created new one again')
class test_partner_recursion(common.TransactionCase):
def setUp(self):
super(test_partner_recursion,self).setUp()
self.res_partner = self.registry('res.partner')
cr, uid = self.cr, self.uid
self.p1 = self.res_partner.name_create(cr, uid, 'Elmtree')[0]
self.p2 = self.res_partner.create(cr, uid, {'name': 'Elmtree Child 1', 'parent_id': self.p1})
self.p3 = self.res_partner.create(cr, uid, {'name': 'Elmtree Grand-Child 1.1', 'parent_id': self.p2})
# split 101, 102, 103 tests to force SQL rollback between them
def test_101_res_partner_recursion(self):
cr, uid, p1, p3 = self.cr, self.uid, self.p1, self.p3
self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p1], {'parent_id': p3})
def test_102_res_partner_recursion(self):
cr, uid, p2, p3 = self.cr, self.uid, self.p2, self.p3
self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p2], {'parent_id': p3})
def test_103_res_partner_recursion(self):
cr, uid, p3 = self.cr, self.uid, self.p3
self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p3], {'parent_id': p3})
def test_104_res_partner_recursion_indirect_cycle(self):
""" Indirect hacky write to create cycle in children """
cr, uid, p2, p3 = self.cr, self.uid, self.p2, self.p3
p3b = self.res_partner.create(cr, uid, {'name': 'Elmtree Grand-Child 1.2', 'parent_id': self.p2})
self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p2],
{'child_ids': [(1, p3, {'parent_id': p3b}), (1, p3b, {'parent_id': p3})]})
def test_110_res_partner_recursion_multi_update(self):
""" multi-write on several partners in same hierarchy must not trigger a false cycle detection """
cr, uid, p1, p2, p3 = self.cr, self.uid, self.p1, self.p2, self.p3
self.assertTrue(self.res_partner.write(cr, uid, [p1,p2,p3], {'phone': '123456'}))
if __name__ == '__main__':
unittest2.main()
unittest2.main()

View File

@ -5063,20 +5063,18 @@ class BaseModel(object):
:param parent: optional parent field name (default: ``self._parent_name = parent_id``)
:return: **True** if the operation can proceed safely, or **False** if an infinite loop is detected.
"""
if not parent:
parent = self._parent_name
ids_parent = ids[:]
query = 'SELECT distinct "%s" FROM "%s" WHERE id IN %%s' % (parent, self._table)
while ids_parent:
ids_parent2 = []
for i in range(0, len(ids), cr.IN_MAX):
sub_ids_parent = ids_parent[i:i+cr.IN_MAX]
cr.execute(query, (tuple(sub_ids_parent),))
ids_parent2.extend(filter(None, map(lambda x: x[0], cr.fetchall())))
ids_parent = ids_parent2
for i in ids_parent:
if i in ids:
# must ignore 'active' flag, ir.rules, etc. => direct SQL query
query = 'SELECT "%s" FROM "%s" WHERE id = %%s' % (parent, self._table)
for id in ids:
current_id = id
while current_id is not None:
cr.execute(query, (current_id,))
result = cr.fetchone()
current_id = result[0] if result else None
if current_id == id:
return False
return True