[REF] [IMP] expression.parse: refactored the main parsing loop, now using source and result stacks. Leaf elements are encapsulated into a specific object, allowing to delegate the join condition formation and aliases generation, as well as working_table management to them. The main parsing loop is now a loop taking a leaf, doing one action on it, and putting the result back to be analyzed or in the results. This allows to avoid having while loops inside a main while loop with some weird corner effects.

bzr revid: tde@openerp.com-20121204142650-gkhmjdeu5upi25kp
This commit is contained in:
Thibault Delavallée 2012-12-04 15:26:50 +01:00
parent 638fc20618
commit 9e6c2805ab
6 changed files with 650 additions and 394 deletions

View File

@ -52,7 +52,7 @@ class ir_rule(osv.osv):
eval_context = self._eval_context(cr, uid)
for rule in self.browse(cr, uid, ids, context):
if rule.domain_force:
res[rule.id] = expression.normalize(eval(rule.domain_force, eval_context))
res[rule.id] = expression.normalize_domain(eval(rule.domain_force, eval_context))
else:
res[rule.id] = []
return res
@ -130,7 +130,7 @@ class ir_rule(osv.osv):
for rule in self.browse(cr, SUPERUSER_ID, rule_ids):
# read 'domain' as UID to have the correct eval context for the rule.
rule_domain = self.read(cr, uid, rule.id, ['domain'])['domain']
dom = expression.normalize(rule_domain)
dom = expression.normalize_domain(rule_domain)
for group in rule.groups:
if group in user.groups_id:
group_domains.setdefault(group, []).append(dom)

View File

@ -92,7 +92,30 @@ class test_expression(common.TransactionCase):
# self.assertTrue(a not in with_any_other_than_a, "Search for category_id with any other than cat_a failed (1).")
# self.assertTrue(ab in with_any_other_than_a, "Search for category_id with any other than cat_a failed (2).")
def test_10_auto_join(self):
def test_10_expression_parse(self):
# TDE note: those tests have been added when refactoring the expression.parse() method.
# They come in addition to the already existing test_osv_expression.yml; maybe some tests
# will be a bit redundant
registry, cr, uid = self.registry, self.cr, self.uid
users_obj = registry('res.users')
# Create users
a = users_obj.create(cr, uid, {'name': 'test_A', 'login': 'test_A'})
b1 = users_obj.create(cr, uid, {'name': 'test_B', 'login': 'test_B'})
b1_user = users_obj.browse(cr, uid, [b1])[0]
b2 = users_obj.create(cr, uid, {'name': 'test_B2', 'login': 'test_B2', 'parent_id': b1_user.partner_id.id})
# Test1: simple inheritance
user_ids = users_obj.search(cr, uid, [('name', 'like', 'test')])
self.assertEqual(set(user_ids), set([a, b1, b2]), 'searching through inheritance failed')
user_ids = users_obj.search(cr, uid, [('name', '=', 'test_B')])
self.assertEqual(set(user_ids), set([b1]), 'searching through inheritance failed')
# Test2: inheritance + relational fields
user_ids = users_obj.search(cr, uid, [('child_ids.name', 'like', 'test_B')])
self.assertEqual(set(user_ids), set([b1]), 'searching through inheritance failed')
def test_20_auto_join(self):
registry, cr, uid = self.registry, self.cr, self.uid
# Get models
@ -132,6 +155,9 @@ class test_expression(common.TransactionCase):
# Do: one2many without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.name', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result
self.assertEqual(set(partner_ids), set([p_aa]),
"_auto_join off: ('bank_ids.name', 'like', '..'): incorrect result")
@ -141,14 +167,14 @@ class test_expression(common.TransactionCase):
sql_query = self.query_list[0].get_sql()
self.assertIn('res_partner_bank', sql_query[0],
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect main table")
self.assertIn('res_partner_bank."name" like %s', sql_query[1],
self.assertIn('"res_partner_bank"."name" like %s', sql_query[1],
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect where condition")
self.assertEqual(set(['%' + name_test + '%']), set(sql_query[2]),
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect parameter")
sql_query = self.query_list[2].get_sql()
self.assertIn('res_partner', sql_query[0],
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect main table")
self.assertIn('res_partner."id" in (%s)', sql_query[1],
self.assertIn('"res_partner"."id" in (%s)', sql_query[1],
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect where condition")
self.assertEqual(set([p_aa]), set(sql_query[2]),
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect parameter")
@ -156,6 +182,9 @@ class test_expression(common.TransactionCase):
# Do: cascaded one2many without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
for query in self.query_list:
print query
print '----------------------'
# Test result
self.assertEqual(set(partner_ids), set([p_a, p_b]),
"_auto_join off: ('child_ids.bank_ids.id', 'in', [..]): incorrect result")
@ -167,6 +196,8 @@ class test_expression(common.TransactionCase):
partner_bank_ids_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.name', 'like', 'test_a')])
# for query in self.query_list:
# print query
# Test result
self.assertEqual(set(partner_ids), set([p_aa]),
"_auto_join on: ('bank_ids.name', 'like', '..') incorrect result")
@ -178,9 +209,9 @@ class test_expression(common.TransactionCase):
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect main table")
self.assertIn('"res_partner_bank" as res_partner__bank_ids', sql_query[0],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect join")
self.assertIn('res_partner__bank_ids."name" like %s', sql_query[1],
self.assertIn('"res_partner__bank_ids"."name" like %s', sql_query[1],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect where condition")
self.assertIn('res_partner__bank_ids."partner_id"=res_partner."id"', sql_query[1],
self.assertIn('res_partner."id"=res_partner__bank_ids."partner_id"', sql_query[1],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect join condition")
self.assertEqual(set(['%' + name_test + '%']), set(sql_query[2]),
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect parameter")
@ -188,6 +219,9 @@ class test_expression(common.TransactionCase):
# Do: one2many with _auto_join, test final leaf is an id
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.id', 'in', [b_aa, b_ab])])
for query in self.query_list:
print query
print '----------------------'
# Test result
self.assertEqual(set(partner_ids), set([p_aa, p_ab]),
"_auto_join on: ('bank_ids.id', 'in', [..]) incorrect result")
@ -199,9 +233,9 @@ class test_expression(common.TransactionCase):
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect main table")
self.assertIn('"res_partner_bank" as res_partner__bank_ids', sql_query[0],
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect join")
self.assertIn('res_partner__bank_ids."id" in (%s,%s)', sql_query[1],
self.assertIn('"res_partner__bank_ids"."id" in (%s,%s)', sql_query[1],
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect where condition")
self.assertIn('res_partner__bank_ids."partner_id"=res_partner."id"', sql_query[1],
self.assertIn('res_partner."id"=res_partner__bank_ids."partner_id"', sql_query[1],
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect join condition")
self.assertEqual(set([b_aa, b_ab]), set(sql_query[2]),
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect parameter")
@ -210,6 +244,9 @@ class test_expression(common.TransactionCase):
partner_child_ids_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
for query in self.query_list:
print query
print '----------------------'
# Test result
self.assertEqual(set(partner_ids), set([p_a, p_b]),
"_auto_join on: ('child_ids.bank_ids.id', 'not in', [..]): incorrect result")
@ -223,11 +260,11 @@ class test_expression(common.TransactionCase):
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join")
self.assertIn('"res_partner_bank" as res_partner__child_ids__bank_ids', sql_query[0],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join")
self.assertIn('res_partner__child_ids__bank_ids."id" in (%s,%s)', sql_query[1],
self.assertIn('"res_partner__child_ids__bank_ids"."id" in (%s,%s)', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect where condition")
self.assertIn('res_partner__child_ids."parent_id"=res_partner."id"', sql_query[1],
self.assertIn('res_partner."id"=res_partner__child_ids."parent_id"', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join condition")
self.assertIn('res_partner__child_ids__bank_ids."partner_id"=res_partner__child_ids."id"', sql_query[1],
self.assertIn('res_partner__child_ids."id"=res_partner__child_ids__bank_ids."partner_id"', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join condition")
self.assertEqual(set([b_aa, b_ba]), set(sql_query[2]),
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect parameter")
@ -241,6 +278,9 @@ class test_expression(common.TransactionCase):
# Do: many2one without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join off: ('state_id.country_id.code', 'like', '..') incorrect result")
@ -252,6 +292,9 @@ class test_expression(common.TransactionCase):
partner_state_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') incorrect result")
@ -261,7 +304,7 @@ class test_expression(common.TransactionCase):
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_country"', sql_query[0],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect main table")
self.assertIn('res_country."code" like %s', sql_query[1],
self.assertIn('"res_country"."code" like %s', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect where condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect parameter")
@ -270,9 +313,9 @@ class test_expression(common.TransactionCase):
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect main table")
self.assertIn('"res_country_state" as res_partner__state_id', sql_query[0],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect join")
self.assertIn('res_partner__state_id."country_id" in (%s)', sql_query[1],
self.assertIn('"res_partner__state_id"."country_id" in (%s)', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect where condition")
self.assertIn('(res_partner__state_id."id"=res_partner."state_id"', sql_query[1],
self.assertIn('res_partner."state_id"=res_partner__state_id."id"', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect join condition")
# Do: many2one with 1 _auto_join on the second many2one
@ -280,6 +323,9 @@ class test_expression(common.TransactionCase):
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') incorrect result")
@ -292,9 +338,9 @@ class test_expression(common.TransactionCase):
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect main table")
self.assertIn('"res_country" as res_country_state__country_id', sql_query[0],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect join")
self.assertIn('res_country_state__country_id."code" like %s', sql_query[1],
self.assertIn('"res_country_state__country_id"."code" like %s', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect where condition")
self.assertIn('res_country_state__country_id."id"=res_country_state."country_id"', sql_query[1],
self.assertIn('res_country_state."country_id"=res_country_state__country_id."id"', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect join condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect parameter")
@ -302,7 +348,7 @@ class test_expression(common.TransactionCase):
sql_query = self.query_list[1].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect main table")
self.assertIn('res_partner."state_id" in', sql_query[1],
self.assertIn('"res_partner"."state_id" in', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect where condition")
# Do: many2one with 2 _auto_join
@ -310,6 +356,8 @@ class test_expression(common.TransactionCase):
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on: ('state_id.country_id.code', 'like', '..') incorrect result")
@ -323,11 +371,11 @@ class test_expression(common.TransactionCase):
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join")
self.assertIn('"res_country" as res_partner__state_id__country_id', sql_query[0],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join")
self.assertIn('res_partner__state_id__country_id."code" like %s', sql_query[1],
self.assertIn('"res_partner__state_id__country_id"."code" like %s', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect where condition")
self.assertIn('res_partner__state_id."id"=res_partner."state_id"', sql_query[1],
self.assertIn('res_partner."state_id"=res_partner__state_id."id"', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join condition")
self.assertIn('res_partner__state_id__country_id."id"=res_partner__state_id."country_id"', sql_query[1],
self.assertIn('res_partner__state_id."country_id"=res_partner__state_id__country_id."id"', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect parameter")
@ -345,6 +393,9 @@ class test_expression(common.TransactionCase):
# Do: ('child_ids.state_id.country_id.code', 'like', '..') without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b]).issubset(set(partner_ids)),
"_auto_join off: ('child_ids.state_id.country_id.code', 'like', '..') incorrect result")
@ -358,6 +409,9 @@ class test_expression(common.TransactionCase):
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.state_id.country_id.code', 'like', name_test)])
for query in self.query_list:
print query
print '----------------------'
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b]).issubset(set(partner_ids)),
"_auto_join on: ('child_ids.state_id.country_id.code', 'like', '..') incorrect result")
@ -365,12 +419,12 @@ class test_expression(common.TransactionCase):
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('child_ids.state_id.country_id.code', 'like', '..') number of queries incorrect")
# Remove mocks and modifications
partner_bank_ids_col._auto_join = False
partner_child_ids_col._auto_join = False
partner_state_id_col._auto_join = False
state_country_id_col._auto_join = False
BaseModel._where_calc = self._base_model_where_calc
# # Remove mocks and modifications
# partner_bank_ids_col._auto_join = False
# partner_child_ids_col._auto_join = False
# partner_state_id_col._auto_join = False
# state_country_id_col._auto_join = False
# BaseModel._where_calc = self._base_model_where_calc
if __name__ == '__main__':
unittest2.main()

File diff suppressed because it is too large Load Diff

View File

@ -2721,11 +2721,10 @@ class BaseModel(object):
parent_table_name = parent_model._table
quoted_parent_table_name = '"%s"' % parent_table_name
if quoted_parent_table_name not in query.tables:
print '--> _inheratis_join_add adding %s' % (quoted_parent_table_name)
query.tables.append(quoted_parent_table_name)
query.where_clause.append('(%s.%s = %s.id)' % (current_table._table, inherits_field, parent_table_name))
def _inherits_join_calc(self, field, query):
"""
Adds missing table select and join clause(s) to ``query`` for reaching
@ -4658,8 +4657,10 @@ class BaseModel(object):
query.where_clause += added_clause
query.where_clause_params += added_params
for table in added_tables:
if table not in query.tables:
query.tables.append(table)
quoted_table_name = '%s' % (table)
if quoted_table_name not in query.tables:
print '--> apply_rule adding %s' % quoted_table_name
query.tables.append(quoted_table_name)
return True
return False

View File

@ -102,7 +102,7 @@ class Query(object):
tables_to_process = list(self.tables)
def add_joins_for_table(table, query_from):
for (dest_table, lhs_col, col, join) in self.joins.get(table,[]):
for (dest_table, lhs_col, col, join) in self.joins.get(table, []):
tables_to_process.remove(dest_table)
query_from += ' %s %s ON (%s."%s" = %s."%s")' % \
(join, dest_table, table, lhs_col, dest_table, col)
@ -114,7 +114,7 @@ class Query(object):
if table in self.joins:
query_from = add_joins_for_table(table, query_from)
query_from += ','
query_from = query_from[:-1] # drop last comma
query_from = query_from[:-1] # drop last comma
return (query_from, " AND ".join(self.where_clause), self.where_clause_params)
def __str__(self):

View File

@ -2,11 +2,12 @@ import unittest2
import openerp
class test_domain_normalization(unittest2.TestCase):
def test_normalize_domain(self):
expression = openerp.osv.expression
norm_domain = domain = ['&',(1,'=',1),('a','=','b')]
assert norm_domain == expression.normalize(domain), "Normalized domains should be left untouched"
domain = [('x','in',['y','z']),('a.v','=','e'),'|','|',('a','=','b'),'!',('c','>','d'),('e','!=','f'),('g','=','h')]
norm_domain = ['&','&','&'] + domain
assert norm_domain == expression.normalize(domain), "Non-normalized domains should be properly normalized"
norm_domain = domain = ['&', (1, '=', 1), ('a', '=', 'b')]
assert norm_domain == expression.normalize_domain(domain), "Normalized domains should be left untouched"
domain = [('x', 'in', ['y', 'z']), ('a.v', '=', 'e'), '|', '|', ('a', '=', 'b'), '!', ('c', '>', 'd'), ('e', '!=', 'f'), ('g', '=', 'h')]
norm_domain = ['&', '&', '&'] + domain
assert norm_domain == expression.normalize_domain(domain), "Non-normalized domains should be properly normalized"