From 8efd16f25a07f142dc78e509cd3172124cd15591 Mon Sep 17 00:00:00 2001 From: Olivier Dony Date: Mon, 15 Apr 2013 19:17:32 +0200 Subject: [PATCH] [FIX] res.partner, orm._check_recursion: prevent recursive partner structures + fix false positives in check_recursion() - res.partner must prevent creating loops in partner hierarchies, and this can be done easily with an extra _constraint using the ORM's builtin _check_recursion - _check_recursion's implementation incorrectly assumed that the provided 'ids' were unrelated (not part of a common hierarchy). - add tests for _check_recursion via extra tests on res.partner structure (explains why both patches are in the same commit) bzr revid: odo@openerp.com-20130415171732-aj3j2e2mycvzf4kp --- openerp/addons/base/res/res_partner.py | 4 +++ openerp/addons/base/tests/test_base.py | 39 +++++++++++++++++++++++++- openerp/osv/orm.py | 22 +++++++-------- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/openerp/addons/base/res/res_partner.py b/openerp/addons/base/res/res_partner.py index 72c221c5534..be7434ff3d8 100644 --- a/openerp/addons/base/res/res_partner.py +++ b/openerp/addons/base/res/res_partner.py @@ -307,6 +307,10 @@ class res_partner(osv.osv, format_address): 'image': False, } + _constraints = [ + (osv.osv._check_recursion, 'You cannot create recursive Partner hierarchies.', ['parent_id']), + ] + def copy(self, cr, uid, id, default=None, context=None): if default is None: default = {} diff --git a/openerp/addons/base/tests/test_base.py b/openerp/addons/base/tests/test_base.py index 285af8fbf6e..a0c0b2eadb1 100644 --- a/openerp/addons/base/tests/test_base.py +++ b/openerp/addons/base/tests/test_base.py @@ -1,6 +1,7 @@ import unittest2 import openerp.tests.common as common +from openerp.osv.orm import except_orm class test_base(common.TransactionCase): @@ -39,5 +40,41 @@ class test_base(common.TransactionCase): self.assertTrue(new_id2 > new_id, 'find_or_create failed - should have created new one again') +class test_partner_recursion(common.TransactionCase): + + def setUp(self): + super(test_partner_recursion,self).setUp() + self.res_partner = self.registry('res.partner') + cr, uid = self.cr, self.uid + self.p1 = self.res_partner.name_create(cr, uid, 'Elmtree')[0] + self.p2 = self.res_partner.create(cr, uid, {'name': 'Elmtree Child 1', 'parent_id': self.p1}) + self.p3 = self.res_partner.create(cr, uid, {'name': 'Elmtree Grand-Child 1.1', 'parent_id': self.p2}) + + # split 101, 102, 103 tests to force SQL rollback between them + + def test_101_res_partner_recursion(self): + cr, uid, p1, p3 = self.cr, self.uid, self.p1, self.p3 + self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p1], {'parent_id': p3}) + + def test_102_res_partner_recursion(self): + cr, uid, p2, p3 = self.cr, self.uid, self.p2, self.p3 + self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p2], {'parent_id': p3}) + + def test_103_res_partner_recursion(self): + cr, uid, p3 = self.cr, self.uid, self.p3 + self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p3], {'parent_id': p3}) + + def test_104_res_partner_recursion_indirect_cycle(self): + """ Indirect hacky write to create cycle in children """ + cr, uid, p2, p3 = self.cr, self.uid, self.p2, self.p3 + p3b = self.res_partner.create(cr, uid, {'name': 'Elmtree Grand-Child 1.2', 'parent_id': self.p2}) + self.assertRaises(except_orm, self.res_partner.write, cr, uid, [p2], + {'child_ids': [(1, p3, {'parent_id': p3b}), (1, p3b, {'parent_id': p3})]}) + + def test_110_res_partner_recursion_multi_update(self): + """ multi-write on several partners in same hierarchy must not trigger a false cycle detection """ + cr, uid, p1, p2, p3 = self.cr, self.uid, self.p1, self.p2, self.p3 + self.assertTrue(self.res_partner.write(cr, uid, [p1,p2,p3], {'phone': '123456'})) + if __name__ == '__main__': - unittest2.main() \ No newline at end of file + unittest2.main() diff --git a/openerp/osv/orm.py b/openerp/osv/orm.py index 63892297578..1296472fd95 100644 --- a/openerp/osv/orm.py +++ b/openerp/osv/orm.py @@ -5063,20 +5063,18 @@ class BaseModel(object): :param parent: optional parent field name (default: ``self._parent_name = parent_id``) :return: **True** if the operation can proceed safely, or **False** if an infinite loop is detected. """ - if not parent: parent = self._parent_name - ids_parent = ids[:] - query = 'SELECT distinct "%s" FROM "%s" WHERE id IN %%s' % (parent, self._table) - while ids_parent: - ids_parent2 = [] - for i in range(0, len(ids), cr.IN_MAX): - sub_ids_parent = ids_parent[i:i+cr.IN_MAX] - cr.execute(query, (tuple(sub_ids_parent),)) - ids_parent2.extend(filter(None, map(lambda x: x[0], cr.fetchall()))) - ids_parent = ids_parent2 - for i in ids_parent: - if i in ids: + + # must ignore 'active' flag, ir.rules, etc. => direct SQL query + query = 'SELECT "%s" FROM "%s" WHERE id = %%s' % (parent, self._table) + for id in ids: + current_id = id + while current_id is not None: + cr.execute(query, (current_id,)) + result = cr.fetchone() + current_id = result[0] if result else None + if current_id == id: return False return True