[MERGE] Perform auto-join and mailboxes+needaction speed udpate. Server-side branch, holding a refactoring of expression.parse and the addition of _auto_join feature on relational fields.

expression.parse: refactored the main parsing loop, now using source and result stacks. Leaf elements are encapsulated into a specific object, allowing to delegate the join condition formation, working table management and table aliases generation to them. The parsing is now a simple loop taking a leaf, doing one action on it, and putting the result back to be analyzed or in the results. This allows to avoid having nested loops and uncontrolled results.

Other modifications :
- updated the generation of order_by to take into account the table now referred using aliases
- updated Query object to use the aliases
- fixed an outstanding bug about _order, now handled as order. This means that putting direct SQL into _order of models will not be supported anymore, because of the computing complexity now that we have multiple joins in queries. The branch holds the patch from Paulius Sladkevičius
- updated and added tests
- added a basic documentation for the freature

lp bug: https://launchpad.net/bugs/1070757 fixed

bzr revid: tde@openerp.com-20121210154652-6sab9usb38i7adlh
This commit is contained in:
Thibault Delavallée 2012-12-10 16:46:52 +01:00
commit bd0bae6625
15 changed files with 1307 additions and 394 deletions

View File

@ -10,3 +10,4 @@ Miscellanous
06_misc_need_action_specs.rst
06_misc_user_img_specs.rst
06_misc_import.rst
06_misc_auto_join.rst

75
doc/06_misc_auto_join.rst Normal file
View File

@ -0,0 +1,75 @@
.. _performing_joins_in_select:
Perfoming joins in select
=========================
.. versionadded:: 7.0
Starting with OpenERP 7.0, an ``auto_join`` attribute is added on *many2one* and
*one2many* fields. The purpose is to allow the automatic generation of joins in
select queries. This attribute is set to False by default, therefore not changing
the default behavior. Please note that we consider this feature as still experimental
and should be used only if you understand its limitations and targets.
Without ``_auto_join``, the behavior of expression.parse() is the same as before.
Leafs holding a path beginning with many2one or one2many fields perform a search
on the relational table. The result is then used to replace the leaf content.
For example, if you have on res.partner a domain like ``[('bank_ids.name',
'like', 'foo')]`` with bank_ids linking to res.partner.bank, 3 queries will be
performed :
- 1 on res_partner_bank, with domain ``[('name', '=', 'foo')]``, that returns a
list of res.partner.bank ids (bids)
- 1 on res_partner, with a domain ``['bank_ids', 'in', bids)]``, that returns a
list of res.partner ids (pids)
- 1 on res_partner, with a domain ``[('id', 'in', pids)]``
When the ``auto_join`` attribute is True on a relational field, the destination
table will be joined to produce only one query.
- the relational table is accessed using an alias: ``'"res_partner_bank"
as res_partner__bank_ids``. The alias is generated using the relational field
name. This allows to have multiple joins with different join conditions on the
same table, depending on the domain.
- there is a join condition between the destination table and the main table:
``res_partner__bank_ids."partner_id"=res_partner."id"``
- the condition is then written on the relational table:
``res_partner__bank_ids."name" = 'foo'``
This manipulation is performed in expression.parse(). It checks leafs that
contain a path, i.e. any domain containing a '.'. It then checks whether the
first item of the path is a *many2one* or *one2many* field with the ``auto_join``
attribute set. If set, it adds a join query and recursively analyzes the
remaining of the leaf, using the same behavior. If the remaining path also holds
a path with auto_join fields, it will add all tables and add every necessary
join conditions.
Chaining joins allows to reduce the number of queries performed, and to avoid
having too long equivalent leaf replacement in domains. Indeed, the internal
queries produced by this behavior can be very costly, because they were generally
select queries without limit that could lead to huge ('id', 'in', [...])
leafs to analyze and execute.
Some limitations exist on this feature that limits its current use as of version
7.0. **This feature is therefore considered as experimental, and used
to speedup some precise bottlenecks in OpenERP**.
List of known issues and limitations:
- using ``auto_join`` bypasses the business logic; no name search is performed,
only direct matches between ids using join conditions
- ir.rules are not taken into account when analyzing and adding the join
conditions
List of already-supported corner cases :
- one2many fields having a domain attribute. Static domains as well as dynamic
domain are supported
- auto_join leading to functional searchable fields
Typical use in OpenERP 7.0:
- in mail module: notification_ids field on mail_message, allowing to speedup
the display of the various mailboxes
- in mail module: message_ids field on mail_thread, allowing to speedup the
display of needaction counters and documents having unread messages

View File

@ -21,6 +21,7 @@
from osv import osv
class ir_needaction_mixin(osv.AbstractModel):
'''Mixin class for objects using the need action feature.
@ -60,4 +61,5 @@ class ir_needaction_mixin(osv.AbstractModel):
dom = self._needaction_domain_get(cr, uid, context=context)
if not dom:
return 0
return self.search(cr, uid, (domain or []) +dom, context=context, count=True)
res = self.search(cr, uid, (domain or []) + dom, limit=100, order='id DESC', context=context)
return len(res)

View File

@ -52,7 +52,7 @@ class ir_rule(osv.osv):
eval_context = self._eval_context(cr, uid)
for rule in self.browse(cr, uid, ids, context):
if rule.domain_force:
res[rule.id] = expression.normalize(eval(rule.domain_force, eval_context))
res[rule.id] = expression.normalize_domain(eval(rule.domain_force, eval_context))
else:
res[rule.id] = []
return res
@ -130,7 +130,7 @@ class ir_rule(osv.osv):
for rule in self.browse(cr, SUPERUSER_ID, rule_ids):
# read 'domain' as UID to have the correct eval context for the rule.
rule_domain = self.read(cr, uid, rule.id, ['domain'])['domain']
dom = expression.normalize(rule_domain)
dom = expression.normalize_domain(rule_domain)
for group in rule.groups:
if group in user.groups_id:
group_domains.setdefault(group, []).append(dom)

View File

@ -1,11 +1,33 @@
import unittest2
from openerp.osv.orm import BaseModel
import openerp.tests.common as common
class test_expression(common.TransactionCase):
def test_in_not_in_m2m(self):
def _reinit_mock(self):
self.query_list = list()
def _mock_base_model_where_calc(self, model, *args, **kwargs):
""" Mock build_email to be able to test its values. Store them into
some internal variable for latter processing. """
self.query_list.append(self._base_model_where_calc(model, *args, **kwargs))
# return the lastly stored query, the one the ORM wants to perform
return self.query_list[-1]
def setUp(self):
super(test_expression, self).setUp()
# Mock BaseModel._where_calc(), to be able to proceed to some tests about generated expression
self._reinit_mock()
self._base_model_where_calc = BaseModel._where_calc
BaseModel._where_calc = lambda model, cr, uid, args, context: self._mock_base_model_where_calc(model, cr, uid, args, context)
def tearDown(self):
# Remove mocks
BaseModel._where_calc = self._base_model_where_calc
super(test_expression, self).tearDown()
def test_00_in_not_in_m2m(self):
registry, cr, uid = self.registry, self.cr, self.uid
# Create 4 partners with no category, or one or two categories (out of two categories).
@ -70,3 +92,346 @@ class test_expression(common.TransactionCase):
# self.assertTrue(a not in with_any_other_than_a, "Search for category_id with any other than cat_a failed (1).")
# self.assertTrue(ab in with_any_other_than_a, "Search for category_id with any other than cat_a failed (2).")
def test_10_expression_parse(self):
# TDE note: those tests have been added when refactoring the expression.parse() method.
# They come in addition to the already existing test_osv_expression.yml; maybe some tests
# will be a bit redundant
registry, cr, uid = self.registry, self.cr, self.uid
users_obj = registry('res.users')
# Create users
a = users_obj.create(cr, uid, {'name': 'test_A', 'login': 'test_A'})
b1 = users_obj.create(cr, uid, {'name': 'test_B', 'login': 'test_B'})
b1_user = users_obj.browse(cr, uid, [b1])[0]
b2 = users_obj.create(cr, uid, {'name': 'test_B2', 'login': 'test_B2', 'parent_id': b1_user.partner_id.id})
# Test1: simple inheritance
user_ids = users_obj.search(cr, uid, [('name', 'like', 'test')])
self.assertEqual(set(user_ids), set([a, b1, b2]), 'searching through inheritance failed')
user_ids = users_obj.search(cr, uid, [('name', '=', 'test_B')])
self.assertEqual(set(user_ids), set([b1]), 'searching through inheritance failed')
# Test2: inheritance + relational fields
user_ids = users_obj.search(cr, uid, [('child_ids.name', 'like', 'test_B')])
self.assertEqual(set(user_ids), set([b1]), 'searching through inheritance failed')
def test_20_auto_join(self):
registry, cr, uid = self.registry, self.cr, self.uid
# Get models
partner_obj = registry('res.partner')
state_obj = registry('res.country.state')
bank_obj = registry('res.partner.bank')
# Get test columns
partner_state_id_col = partner_obj._columns.get('state_id') # many2one on res.partner to res.country.state
partner_parent_id_col = partner_obj._columns.get('parent_id') # many2one on res.partner to res.partner
state_country_id_col = state_obj._columns.get('country_id') # many2one on res.country.state on res.country
partner_child_ids_col = partner_obj._columns.get('child_ids') # one2many on res.partner to res.partner
partner_bank_ids_col = partner_obj._columns.get('bank_ids') # one2many on res.partner to res.partner.bank
category_id_col = partner_obj._columns.get('category_id') # many2many on res.partner to res.partner.category
# Get the first bank account type to be able to create a res.partner.bank
bank_type = bank_obj._bank_type_get(cr, uid)[0]
# Get country/state data
country_us_id = registry('res.country').search(cr, uid, [('code', 'like', 'US')])[0]
state_ids = registry('res.country.state').search(cr, uid, [('country_id', '=', country_us_id)], limit=2)
# Create demo data: partners and bank object
p_a = partner_obj.create(cr, uid, {'name': 'test__A', 'state_id': state_ids[0]})
p_b = partner_obj.create(cr, uid, {'name': 'test__B', 'state_id': state_ids[1]})
p_aa = partner_obj.create(cr, uid, {'name': 'test__AA', 'parent_id': p_a, 'state_id': state_ids[0]})
p_ab = partner_obj.create(cr, uid, {'name': 'test__AB', 'parent_id': p_a, 'state_id': state_ids[1]})
p_ba = partner_obj.create(cr, uid, {'name': 'test__BA', 'parent_id': p_b, 'state_id': state_ids[0]})
b_aa = bank_obj.create(cr, uid, {'name': '__bank_test_a', 'state': bank_type[0], 'partner_id': p_aa, 'acc_number': '1234'})
b_ab = bank_obj.create(cr, uid, {'name': '__bank_test_b', 'state': bank_type[0], 'partner_id': p_ab, 'acc_number': '5678'})
b_ba = bank_obj.create(cr, uid, {'name': '__bank_test_b', 'state': bank_type[0], 'partner_id': p_ba, 'acc_number': '9876'})
# --------------------------------------------------
# Test1: basics about the attribute
# --------------------------------------------------
category_id_col._auto_join = True
self.assertRaises(NotImplementedError, partner_obj.search, cr, uid, [('category_id.name', '=', 'foo')])
category_id_col._auto_join = False
# --------------------------------------------------
# Test2: one2many
# --------------------------------------------------
name_test = 'test_a'
# Do: one2many without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.name', 'like', name_test)])
# Test result
self.assertEqual(set(partner_ids), set([p_aa]),
"_auto_join off: ('bank_ids.name', 'like', '..'): incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 3,
"_auto_join off: ('bank_ids.name', 'like', '..') should produce 3 queries (1 in res_partner_bank, 2 on res_partner)")
sql_query = self.query_list[0].get_sql()
self.assertIn('res_partner_bank', sql_query[0],
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect main table")
self.assertIn('"res_partner_bank"."name" like %s', sql_query[1],
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect where condition")
self.assertEqual(set(['%' + name_test + '%']), set(sql_query[2]),
"_auto_join off: ('bank_ids.name', 'like', '..') first query incorrect parameter")
sql_query = self.query_list[2].get_sql()
self.assertIn('res_partner', sql_query[0],
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect main table")
self.assertIn('"res_partner"."id" in (%s)', sql_query[1],
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect where condition")
self.assertEqual(set([p_aa]), set(sql_query[2]),
"_auto_join off: ('bank_ids.name', 'like', '..') third query incorrect parameter")
# Do: cascaded one2many without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
# Test result
self.assertEqual(set(partner_ids), set([p_a, p_b]),
"_auto_join off: ('child_ids.bank_ids.id', 'in', [..]): incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 5,
"_auto_join off: ('child_ids.bank_ids.id', 'in', [..]) should produce 5 queries (1 in res_partner_bank, 4 on res_partner)")
# Do: one2many with _auto_join
partner_bank_ids_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.name', 'like', 'test_a')])
# Test result
self.assertEqual(set(partner_ids), set([p_aa]),
"_auto_join on: ('bank_ids.name', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('bank_ids.name', 'like', '..') should produce 1 query")
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect main table")
self.assertIn('"res_partner_bank" as "res_partner__bank_ids"', sql_query[0],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect join")
self.assertIn('"res_partner__bank_ids"."name" like %s', sql_query[1],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect where condition")
self.assertIn('"res_partner"."id"="res_partner__bank_ids"."partner_id"', sql_query[1],
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect join condition")
self.assertEqual(set(['%' + name_test + '%']), set(sql_query[2]),
"_auto_join on: ('bank_ids.name', 'like', '..') query incorrect parameter")
# Do: one2many with _auto_join, test final leaf is an id
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('bank_ids.id', 'in', [b_aa, b_ab])])
# Test result
self.assertEqual(set(partner_ids), set([p_aa, p_ab]),
"_auto_join on: ('bank_ids.id', 'in', [..]) incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('bank_ids.id', 'in', [..]) should produce 1 query")
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect main table")
self.assertIn('"res_partner__bank_ids"."id" in (%s,%s)', sql_query[1],
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect where condition")
self.assertEqual(set([b_aa, b_ab]), set(sql_query[2]),
"_auto_join on: ('bank_ids.id', 'in', [..]) query incorrect parameter")
# Do: 2 cascaded one2many with _auto_join, test final leaf is an id
partner_child_ids_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
# Test result
self.assertEqual(set(partner_ids), set([p_a, p_b]),
"_auto_join on: ('child_ids.bank_ids.id', 'not in', [..]): incorrect result")
# # Test produced queries
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) should produce 1 query")
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) incorrect main table")
self.assertIn('"res_partner" as "res_partner__child_ids"', sql_query[0],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join")
self.assertIn('"res_partner_bank" as "res_partner__child_ids__bank_ids"', sql_query[0],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join")
self.assertIn('"res_partner__child_ids__bank_ids"."id" in (%s,%s)', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect where condition")
self.assertIn('"res_partner"."id"="res_partner__child_ids"."parent_id"', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join condition")
self.assertIn('"res_partner__child_ids"."id"="res_partner__child_ids__bank_ids"."partner_id"', sql_query[1],
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect join condition")
self.assertEqual(set([b_aa, b_ba]), set(sql_query[2]),
"_auto_join on: ('child_ids.bank_ids.id', 'in', [..]) query incorrect parameter")
# --------------------------------------------------
# Test3: many2one
# --------------------------------------------------
name_test = 'US'
# Do: many2one without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join off: ('state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 3,
"_auto_join off: ('state_id.country_id.code', 'like', '..') should produce 3 queries (1 on res_country, 1 on res_country_state, 1 on res_partner)")
# Do: many2one with 1 _auto_join on the first many2one
partner_state_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 2,
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') should produce 2 query")
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_country"', sql_query[0],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect main table")
self.assertIn('"res_country"."code" like %s', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect where condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect parameter")
sql_query = self.query_list[1].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect main table")
self.assertIn('"res_country_state" as "res_partner__state_id"', sql_query[0],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect join")
self.assertIn('"res_partner__state_id"."country_id" in (%s)', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect where condition")
self.assertIn('"res_partner"."state_id"="res_partner__state_id"."id"', sql_query[1],
"_auto_join on for state_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect join condition")
# Do: many2one with 1 _auto_join on the second many2one
partner_state_id_col._auto_join = False
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 2,
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') should produce 2 query")
# -- first query
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_country_state"', sql_query[0],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect main table")
self.assertIn('"res_country" as "res_country_state__country_id"', sql_query[0],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect join")
self.assertIn('"res_country_state__country_id"."code" like %s', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect where condition")
self.assertIn('"res_country_state"."country_id"="res_country_state__country_id"."id"', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect join condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 1 incorrect parameter")
# -- second query
sql_query = self.query_list[1].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect main table")
self.assertIn('"res_partner"."state_id" in', sql_query[1],
"_auto_join on for country_id: ('state_id.country_id.code', 'like', '..') query 2 incorrect where condition")
# Do: many2one with 2 _auto_join
partner_state_id_col._auto_join = True
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b, p_aa, p_ab, p_ba]).issubset(set(partner_ids)),
"_auto_join on: ('state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('state_id.country_id.code', 'like', '..') should produce 1 query")
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_partner"', sql_query[0],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect main table")
self.assertIn('"res_country_state" as "res_partner__state_id"', sql_query[0],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join")
self.assertIn('"res_country" as "res_partner__state_id__country_id"', sql_query[0],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join")
self.assertIn('"res_partner__state_id__country_id"."code" like %s', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect where condition")
self.assertIn('"res_partner"."state_id"="res_partner__state_id"."id"', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join condition")
self.assertIn('"res_partner__state_id"."country_id"="res_partner__state_id__country_id"."id"', sql_query[1],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect join condition")
self.assertEqual(['%' + name_test + '%'], sql_query[2],
"_auto_join on: ('state_id.country_id.code', 'like', '..') query incorrect parameter")
# --------------------------------------------------
# Test4: domain attribute on one2many fields
# --------------------------------------------------
partner_child_ids_col._auto_join = True
partner_bank_ids_col._auto_join = True
partner_child_ids_col._domain = lambda self: ['!', ('name', '=', self._name)]
partner_bank_ids_col._domain = [('acc_number', 'like', '1')]
# Do: 2 cascaded one2many with _auto_join, test final leaf is an id
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, ['&', (1, '=', 1), ('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
# Test result: at least one of our added data
self.assertTrue(set([p_a]).issubset(set(partner_ids)),
"_auto_join on one2many with domains incorrect result")
self.assertTrue(set([p_ab, p_ba]) not in set(partner_ids),
"_auto_join on one2many with domains incorrect result")
# Test produced queries that domains effectively present
sql_query = self.query_list[0].get_sql()
self.assertIn('"res_partner__child_ids__bank_ids"."acc_number" like %s', sql_query[1],
"_auto_join on one2many with domains incorrect result")
# TDE TODO: check first domain has a correct table name
self.assertIn('"res_partner__child_ids"."name" = %s', sql_query[1],
"_auto_join on one2many with domains incorrect result")
partner_child_ids_col._domain = lambda self: [('name', '=', '__%s' % self._name)]
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, ['&', (1, '=', 1), ('child_ids.bank_ids.id', 'in', [b_aa, b_ba])])
# Test result: no one
self.assertFalse(partner_ids,
"_auto_join on one2many with domains incorrect result")
# ----------------------------------------
# Test5: result-based tests
# ----------------------------------------
partner_bank_ids_col._auto_join = False
partner_child_ids_col._auto_join = False
partner_state_id_col._auto_join = False
partner_parent_id_col._auto_join = False
state_country_id_col._auto_join = False
partner_child_ids_col._domain = []
partner_bank_ids_col._domain = []
# Do: ('child_ids.state_id.country_id.code', 'like', '..') without _auto_join
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b]).issubset(set(partner_ids)),
"_auto_join off: ('child_ids.state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 5,
"_auto_join off: ('child_ids.state_id.country_id.code', 'like', '..') number of queries incorrect")
# Do: ('child_ids.state_id.country_id.code', 'like', '..') with _auto_join
partner_child_ids_col._auto_join = True
partner_state_id_col._auto_join = True
state_country_id_col._auto_join = True
self._reinit_mock()
partner_ids = partner_obj.search(cr, uid, [('child_ids.state_id.country_id.code', 'like', name_test)])
# Test result: at least our added data + demo data
self.assertTrue(set([p_a, p_b]).issubset(set(partner_ids)),
"_auto_join on: ('child_ids.state_id.country_id.code', 'like', '..') incorrect result")
# Test produced queries
self.assertEqual(len(self.query_list), 1,
"_auto_join on: ('child_ids.state_id.country_id.code', 'like', '..') number of queries incorrect")
# Remove mocks and modifications
partner_bank_ids_col._auto_join = False
partner_child_ids_col._auto_join = False
partner_state_id_col._auto_join = False
partner_parent_id_col._auto_join = False
state_country_id_col._auto_join = False
if __name__ == '__main__':
unittest2.main()

View File

@ -2,16 +2,17 @@ import unittest2
import openerp.tests.common as common
class test_expression(common.TransactionCase):
def test_search_order(self):
class test_search(common.TransactionCase):
def test_00_search_order(self):
registry, cr, uid = self.registry, self.cr, self.uid
# Create 6 partners with a given name, and a given creation order to
# ensure the order of their ID. Some are set as unactive to verify they
# are by default excluded from the searches and to provide a second
# `order` argument.
# Create 6 partners with a given name, and a given creation order to
# ensure the order of their ID. Some are set as unactive to verify they
# are by default excluded from the searches and to provide a second
# `order` argument.
partners = registry('res.partner')
c = partners.create(cr, uid, {'name': 'test_search_order_C'})
@ -23,9 +24,9 @@ class test_expression(common.TransactionCase):
# The tests.
# The basic searches should exclude records that have active = False.
# The order of the returned ids should be given by the `order`
# parameter of search().
# The basic searches should exclude records that have active = False.
# The order of the returned ids should be given by the `order`
# parameter of search().
name_asc = partners.search(cr, uid, [('name', 'like', 'test_search_order%')], order="name asc")
self.assertEqual([a, ab, b, c], name_asc, "Search with 'NAME ASC' order failed.")
@ -36,9 +37,9 @@ class test_expression(common.TransactionCase):
id_desc = partners.search(cr, uid, [('name', 'like', 'test_search_order%')], order="id desc")
self.assertEqual([ab, b, a, c], id_desc, "Search with 'ID DESC' order failed.")
# The inactive records shouldn't be excluded as soon as a condition on
# that field is present in the domain. The `order` parameter of
# search() should support any legal coma-separated values.
# The inactive records shouldn't be excluded as soon as a condition on
# that field is present in the domain. The `order` parameter of
# search() should support any legal coma-separated values.
active_asc_id_asc = partners.search(cr, uid, [('name', 'like', 'test_search_order%'), '|', ('active', '=', True), ('active', '=', False)], order="active asc, id asc")
self.assertEqual([d, e, c, a, b, ab], active_asc_id_asc, "Search with 'ACTIVE ASC, ID ASC' order failed.")
@ -57,4 +58,52 @@ class test_expression(common.TransactionCase):
id_desc_active_desc = partners.search(cr, uid, [('name', 'like', 'test_search_order%'), '|', ('active', '=', True), ('active', '=', False)], order="id desc, active desc")
self.assertEqual([e, ab, b, a, d, c], id_desc_active_desc, "Search with 'ID DESC, ACTIVE DESC' order failed.")
def test_10_inherits_m2order(self):
registry, cr, uid = self.registry, self.cr, self.uid
users_obj = registry('res.users')
# Find Employee group
group_employee_ref = self.registry('ir.model.data').get_object_reference(cr, uid, 'base', 'group_user')
group_employee_id = group_employee_ref and group_employee_ref[1] or False
# Get country/state data
country_us_id = registry('res.country').search(cr, uid, [('code', 'like', 'US')])[0]
state_ids = registry('res.country.state').search(cr, uid, [('country_id', '=', country_us_id)], limit=2)
country_be_id = registry('res.country').search(cr, uid, [('code', 'like', 'BE')])[0]
# Create test users
search_user = users_obj.create(cr, uid, {'name': '__search', 'login': '__search', 'groups_id': [(6, 0, [group_employee_id])]})
a = users_obj.create(cr, uid, {'name': '__test_A', 'login': '__test_A', 'country_id': country_be_id, 'state_id': country_be_id})
b = users_obj.create(cr, uid, {'name': '__test_B', 'login': '__a_test_B', 'country_id': country_us_id, 'state_id': state_ids[1]})
c = users_obj.create(cr, uid, {'name': '__test_B', 'login': '__z_test_B', 'country_id': country_us_id, 'state_id': state_ids[0]})
# Do: search on res.users, order on a field on res.partner to try inherits'd fields, then res.users
user_ids = users_obj.search(cr, search_user, [], order='name asc, login desc')
expected_ids = [search_user, a, c, b]
test_user_ids = filter(lambda x: x in expected_ids, user_ids)
self.assertEqual(test_user_ids, expected_ids, 'search on res_users did not provide expected ids or expected order')
# Do: order on many2one and inherits'd fields
user_ids = users_obj.search(cr, search_user, [], order='state_id asc, country_id desc, name asc, login desc')
expected_ids = [c, b, a, search_user]
test_user_ids = filter(lambda x: x in expected_ids, user_ids)
self.assertEqual(test_user_ids, expected_ids, 'search on res_users did not provide expected ids or expected order')
# Do: order on many2one and inherits'd fields
user_ids = users_obj.search(cr, search_user, [], order='country_id desc, state_id desc, name asc, login desc')
expected_ids = [search_user, b, c, a]
test_user_ids = filter(lambda x: x in expected_ids, user_ids)
self.assertEqual(test_user_ids, expected_ids, 'search on res_users did not provide expected ids or expected order')
# Do: order on many2one, but not by specifying in order parameter of search, but by overriding _order of res_users
old_order = users_obj._order
users_obj._order = 'country_id desc, name asc, login desc'
user_ids = users_obj.search(cr, search_user, [])
expected_ids = [search_user, c, b, a]
test_user_ids = filter(lambda x: x in expected_ids, user_ids)
self.assertEqual(test_user_ids, expected_ids, 'search on res_users did not provide expected ids or expected order')
users_obj._order = old_order
if __name__ == '__main__':
unittest2.main()

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,9 @@
Fields Attributes:
* _classic_read: is a classic sql fields
* _type : field type
* _auto_join: for one2many and many2one fields, tells whether select
queries will join the relational table instead of replacing the
field condition by an equivalent-one based on a search.
* readonly
* required
* size
@ -67,6 +70,7 @@ class _column(object):
"""
_classic_read = True
_classic_write = True
_auto_join = False
_prefetch = True
_properties = False
_type = 'unknown'
@ -427,9 +431,10 @@ class many2one(_column):
_symbol_f = lambda x: x or None
_symbol_set = (_symbol_c, _symbol_f)
def __init__(self, obj, string='unknown', **args):
def __init__(self, obj, string='unknown', auto_join=False, **args):
_column.__init__(self, string=string, **args)
self._obj = obj
self._auto_join = auto_join
def get(self, cr, obj, ids, name, user=None, context=None, values=None):
if context is None:
@ -496,11 +501,12 @@ class one2many(_column):
_prefetch = False
_type = 'one2many'
def __init__(self, obj, fields_id, string='unknown', limit=None, **args):
def __init__(self, obj, fields_id, string='unknown', limit=None, auto_join=False, **args):
_column.__init__(self, string=string, **args)
self._obj = obj
self._fields_id = fields_id
self._limit = limit
self._auto_join = auto_join
#one2many can't be used as condition for defaults
assert(self.change_default != True)

View File

@ -58,7 +58,6 @@ import types
import psycopg2
from lxml import etree
import warnings
import fields
import openerp
@ -2721,22 +2720,17 @@ class BaseModel(object):
return result
def _inherits_join_add(self, current_table, parent_model_name, query):
def _inherits_join_add(self, current_model, parent_model_name, query):
"""
Add missing table SELECT and JOIN clause to ``query`` for reaching the parent table (no duplicates)
:param current_table: current model object
:param current_model: current model object
:param parent_model_name: name of the parent model for which the clauses should be added
:param query: query object on which the JOIN should be added
"""
inherits_field = current_table._inherits[parent_model_name]
inherits_field = current_model._inherits[parent_model_name]
parent_model = self.pool.get(parent_model_name)
parent_table_name = parent_model._table
quoted_parent_table_name = '"%s"' % parent_table_name
if quoted_parent_table_name not in query.tables:
query.tables.append(quoted_parent_table_name)
query.where_clause.append('(%s.%s = %s.id)' % (current_table._table, inherits_field, parent_table_name))
parent_alias, parent_alias_statement = query.add_join((current_model._table, parent_model._table, inherits_field, 'id', inherits_field), implicit=True)
return parent_alias
def _inherits_join_calc(self, field, query):
"""
@ -2748,12 +2742,13 @@ class BaseModel(object):
:return: qualified name of field, to be used in SELECT clause
"""
current_table = self
parent_alias = '"%s"' % current_table._table
while field in current_table._inherit_fields and not field in current_table._columns:
parent_model_name = current_table._inherit_fields[field][0]
parent_table = self.pool.get(parent_model_name)
self._inherits_join_add(current_table, parent_model_name, query)
parent_alias = self._inherits_join_add(current_table, parent_model_name, query)
current_table = parent_table
return '"%s".%s' % (current_table._table, field)
return '%s."%s"' % (parent_alias, field)
def _parent_store_compute(self, cr):
if not self._parent_store:
@ -4662,11 +4657,27 @@ class BaseModel(object):
:param query: the current query object
"""
def apply_rule(added_clause, added_params, added_tables, parent_model=None, child_object=None):
""" :param string parent_model: string of the parent model
:param model child_object: model object, base of the rule application
"""
if added_clause:
if parent_model and child_object:
# as inherited rules are being applied, we need to add the missing JOIN
# to reach the parent table (if it was not JOINed yet in the query)
child_object._inherits_join_add(child_object, parent_model, query)
parent_alias = child_object._inherits_join_add(child_object, parent_model, query)
# inherited rules are applied on the external table -> need to get the alias and replace
parent_table = self.pool.get(parent_model)._table
added_clause = [clause.replace('"%s"' % parent_table, '"%s"' % parent_alias) for clause in added_clause]
# change references to parent_table to parent_alias, because we now use the alias to refer to the table
new_tables = []
for table in added_tables:
# table is just a table name -> switch to the full alias
if table == '"%s"' % (parent_table):
new_tables.append('"%s" as "%s"' % (parent_table, parent_alias))
# table is already a full statement -> replace reference to the table to its alias, is correct with the way aliases are generated
else:
new_tables.append(table.replace('"%s"' % parent_table, '"%s"' % parent_alias))
added_tables = new_tables
query.where_clause += added_clause
query.where_clause_params += added_params
for table in added_tables:
@ -4677,12 +4688,14 @@ class BaseModel(object):
# apply main rules on the object
rule_obj = self.pool.get('ir.rule')
apply_rule(*rule_obj.domain_get(cr, uid, self._name, mode, context=context))
rule_where_clause, rule_where_clause_params, rule_tables = rule_obj.domain_get(cr, uid, self._name, mode, context=context)
apply_rule(rule_where_clause, rule_where_clause_params, rule_tables)
# apply ir.rules from the parents (through _inherits)
for inherited_model in self._inherits:
kwargs = dict(parent_model=inherited_model, child_object=self) #workaround for python2.5
apply_rule(*rule_obj.domain_get(cr, uid, inherited_model, mode, context=context), **kwargs)
rule_where_clause, rule_where_clause_params, rule_tables = rule_obj.domain_get(cr, uid, inherited_model, mode, context=context)
apply_rule(rule_where_clause, rule_where_clause_params, rule_tables,
parent_model=inherited_model, child_object=self)
def _generate_m2o_order_by(self, order_field, query):
"""
@ -4717,17 +4730,16 @@ class BaseModel(object):
# extract the field names, to be able to qualify them and add desc/asc
m2o_order_list = []
for order_part in m2o_order.split(","):
m2o_order_list.append(order_part.strip().split(" ",1)[0].strip())
m2o_order_list.append(order_part.strip().split(" ", 1)[0].strip())
m2o_order = m2o_order_list
# Join the dest m2o table if it's not joined yet. We use [LEFT] OUTER join here
# as we don't want to exclude results that have NULL values for the m2o
src_table, src_field = qualified_field.replace('"','').split('.', 1)
query.join((src_table, dest_model._table, src_field, 'id'), outer=True)
qualify = lambda field: '"%s"."%s"' % (dest_model._table, field)
src_table, src_field = qualified_field.replace('"', '').split('.', 1)
dst_alias, dst_alias_statement = query.add_join((src_table, dest_model._table, src_field, 'id', src_field), implicit=False, outer=True)
qualify = lambda field: '"%s"."%s"' % (dst_alias, field)
return map(qualify, m2o_order) if isinstance(m2o_order, list) else qualify(m2o_order)
def _generate_order_by(self, order_spec, query):
"""
Attempt to consruct an appropriate ORDER BY clause based on order_spec, which must be
@ -4735,7 +4747,8 @@ class BaseModel(object):
:raise" except_orm in case order_spec is malformed
"""
order_by_clause = self._order
order_by_clause = ''
order_spec = order_spec or self._order
if order_spec:
order_by_elements = []
self._check_qorder(order_spec)
@ -4753,7 +4766,7 @@ class BaseModel(object):
elif order_column._type == 'many2one':
inner_clause = self._generate_m2o_order_by(order_field, query)
else:
continue # ignore non-readable or "non-joinable" fields
continue # ignore non-readable or "non-joinable" fields
elif order_field in self._inherit_fields:
parent_obj = self.pool.get(self._inherit_fields[order_field][3])
order_column = parent_obj._columns[order_field]
@ -4762,7 +4775,7 @@ class BaseModel(object):
elif order_column._type == 'many2one':
inner_clause = self._generate_m2o_order_by(order_field, query)
else:
continue # ignore non-readable or "non-joinable" fields
continue # ignore non-readable or "non-joinable" fields
if inner_clause:
if isinstance(inner_clause, list):
for clause in inner_clause:

View File

@ -21,6 +21,7 @@
#.apidoc title: Query object
def _quote(to_quote):
if '"' not in to_quote:
return '"%s"' % to_quote
@ -67,15 +68,35 @@ class Query(object):
# LEFT JOIN "table_c" ON ("table_a"."table_a_col2" = "table_c"."table_c_col")
self.joins = joins or {}
def join(self, connection, outer=False):
"""Adds the JOIN specified in ``connection``.
def _get_table_aliases(self):
from openerp.osv.expression import get_alias_from_query
return [get_alias_from_query(from_statement)[1] for from_statement in self.tables]
:param connection: a tuple ``(lhs, table, lhs_col, col)``.
The join corresponds to the SQL equivalent of::
def _get_alias_mapping(self):
from openerp.osv.expression import get_alias_from_query
mapping = {}
for table in self.tables:
alias, statement = get_alias_from_query(table)
mapping[statement] = table
return mapping
(lhs.lhs_col = table.col)
def add_join(self, connection, implicit=True, outer=False):
""" Join a destination table to the current table.
:param outer: True if a LEFT OUTER JOIN should be used, if possible
:param implicit: False if the join is an explicit join. This allows
to fall back on the previous implementation of ``join`` before
OpenERP 7.0. It therefore adds the JOIN specified in ``connection``
If True, the join is done implicitely, by adding the table alias
in the from clause and the join condition in the where clause
of the query. Implicit joins do not handle outer parameter.
:param connection: a tuple ``(lhs, table, lhs_col, col, link)``.
The join corresponds to the SQL equivalent of::
(lhs.lhs_col = table.col)
Note that all connection elements are strings. Please refer to expression.py for more details about joins.
:param outer: True if a LEFT OUTER JOIN should be used, if possible
(no promotion to OUTER JOIN is supported in case the JOIN
was already present in the query, as for the moment
implicit INNER JOINs are only connected from NON-NULL
@ -83,38 +104,53 @@ class Query(object):
``_inherits`` or when a domain criterion explicitly
adds filtering)
"""
(lhs, table, lhs_col, col) = connection
lhs = _quote(lhs)
table = _quote(table)
assert lhs in self.tables, "Left-hand-side table must already be part of the query!"
if table in self.tables:
# already joined, must ignore (promotion to outer and multiple joins not supported yet)
pass
from openerp.osv.expression import generate_table_alias
(lhs, table, lhs_col, col, link) = connection
alias, alias_statement = generate_table_alias(lhs, [(table, link)])
if implicit:
if alias_statement not in self.tables:
self.tables.append(alias_statement)
condition = '("%s"."%s" = "%s"."%s")' % (lhs, lhs_col, alias, col)
self.where_clause.append(condition)
else:
# already joined
pass
return alias, alias_statement
else:
# add JOIN
self.tables.append(table)
self.joins.setdefault(lhs, []).append((table, lhs_col, col, outer and 'LEFT JOIN' or 'JOIN'))
return self
aliases = self._get_table_aliases()
assert lhs in aliases, "Left-hand-side table %s must already be part of the query tables %s!" % (lhs, str(self.tables))
if alias_statement in self.tables:
# already joined, must ignore (promotion to outer and multiple joins not supported yet)
pass
else:
# add JOIN
self.tables.append(alias_statement)
self.joins.setdefault(lhs, []).append((alias, lhs_col, col, outer and 'LEFT JOIN' or 'JOIN'))
return alias, alias_statement
def get_sql(self):
"""Returns (query_from, query_where, query_params)"""
""" Returns (query_from, query_where, query_params). """
from openerp.osv.expression import get_alias_from_query
query_from = ''
tables_to_process = list(self.tables)
alias_mapping = self._get_alias_mapping()
def add_joins_for_table(table, query_from):
for (dest_table, lhs_col, col, join) in self.joins.get(table,[]):
tables_to_process.remove(dest_table)
query_from += ' %s %s ON (%s."%s" = %s."%s")' % \
(join, dest_table, table, lhs_col, dest_table, col)
for (dest_table, lhs_col, col, join) in self.joins.get(table, []):
tables_to_process.remove(alias_mapping[dest_table])
query_from += ' %s %s ON ("%s"."%s" = "%s"."%s")' % \
(join, alias_mapping[dest_table], table, lhs_col, dest_table, col)
query_from = add_joins_for_table(dest_table, query_from)
return query_from
for table in tables_to_process:
query_from += table
if table in self.joins:
query_from = add_joins_for_table(table, query_from)
table_alias = get_alias_from_query(table)[1]
if table_alias in self.joins:
query_from = add_joins_for_table(table_alias, query_from)
query_from += ','
query_from = query_from[:-1] # drop last comma
query_from = query_from[:-1] # drop last comma
return (query_from, " AND ".join(self.where_clause), self.where_clause_params)
def __str__(self):

View File

@ -1,25 +0,0 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2010 OpenERP S.A. http://www.openerp.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from test_osv import *
from test_translate import *
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -8,9 +8,10 @@ Tests can be explicitely added to the `fast_suite` or `checks` lists or not.
See the :ref:`test-framework` section in the :ref:`features` list.
"""
from . import test_expression, test_mail, test_ir_sequence, test_orm,\
from . import test_expression, test_mail, test_ir_sequence, test_orm, \
test_fields, test_basecase, \
test_view_validation, test_uninstall, test_misc, test_db_cursor
test_view_validation, test_uninstall, test_misc, test_db_cursor, \
test_osv, test_translate
from . import test_ir_filters
fast_suite = [
@ -27,6 +28,8 @@ checks = [
test_basecase,
test_view_validation,
test_misc,
test_osv,
test_translate,
]
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -2,11 +2,12 @@ import unittest2
import openerp
class test_domain_normalization(unittest2.TestCase):
def test_normalize_domain(self):
expression = openerp.osv.expression
norm_domain = domain = ['&',(1,'=',1),('a','=','b')]
assert norm_domain == expression.normalize(domain), "Normalized domains should be left untouched"
domain = [('x','in',['y','z']),('a.v','=','e'),'|','|',('a','=','b'),'!',('c','>','d'),('e','!=','f'),('g','=','h')]
norm_domain = ['&','&','&'] + domain
assert norm_domain == expression.normalize(domain), "Non-normalized domains should be properly normalized"
norm_domain = domain = ['&', (1, '=', 1), ('a', '=', 'b')]
assert norm_domain == expression.normalize_domain(domain), "Normalized domains should be left untouched"
domain = [('x', 'in', ['y', 'z']), ('a.v', '=', 'e'), '|', '|', ('a', '=', 'b'), '!', ('c', '>', 'd'), ('e', '!=', 'f'), ('g', '=', 'h')]
norm_domain = ['&', '&', '&'] + domain
assert norm_domain == expression.normalize_domain(domain), "Non-normalized domains should be properly normalized"

View File

@ -22,45 +22,45 @@
import unittest
from openerp.osv.query import Query
class QueryTestCase(unittest.TestCase):
def test_basic_query(self):
query = Query()
query.tables.extend(['"product_product"','"product_template"'])
query.tables.extend(['"product_product"', '"product_template"'])
query.where_clause.append("product_product.template_id = product_template.id")
query.join(("product_template", "product_category", "categ_id", "id"), outer=False) # add normal join
query.join(("product_product", "res_user", "user_id", "id"), outer=True) # outer join
query.add_join(("product_template", "product_category", "categ_id", "id", "categ_id"), implicit=False, outer=False) # add normal join
query.add_join(("product_product", "res_user", "user_id", "id", "user_id"), implicit=False, outer=True) # outer join
self.assertEquals(query.get_sql()[0].strip(),
""""product_product" LEFT JOIN "res_user" ON ("product_product"."user_id" = "res_user"."id"),"product_template" JOIN "product_category" ON ("product_template"."categ_id" = "product_category"."id") """.strip())
""""product_product" LEFT JOIN "res_user" as "product_product__user_id" ON ("product_product"."user_id" = "product_product__user_id"."id"),"product_template" JOIN "product_category" as "product_template__categ_id" ON ("product_template"."categ_id" = "product_template__categ_id"."id") """.strip())
self.assertEquals(query.get_sql()[1].strip(), """product_product.template_id = product_template.id""".strip())
def test_query_chained_explicit_joins(self):
query = Query()
query.tables.extend(['"product_product"','"product_template"'])
query.tables.extend(['"product_product"', '"product_template"'])
query.where_clause.append("product_product.template_id = product_template.id")
query.join(("product_template", "product_category", "categ_id", "id"), outer=False) # add normal join
query.join(("product_category", "res_user", "user_id", "id"), outer=True) # CHAINED outer join
query.add_join(("product_template", "product_category", "categ_id", "id", "categ_id"), implicit=False, outer=False) # add normal join
query.add_join(("product_template__categ_id", "res_user", "user_id", "id", "user_id"), implicit=False, outer=True) # CHAINED outer join
self.assertEquals(query.get_sql()[0].strip(),
""""product_product","product_template" JOIN "product_category" ON ("product_template"."categ_id" = "product_category"."id") LEFT JOIN "res_user" ON ("product_category"."user_id" = "res_user"."id")""".strip())
""""product_product","product_template" JOIN "product_category" as "product_template__categ_id" ON ("product_template"."categ_id" = "product_template__categ_id"."id") LEFT JOIN "res_user" as "product_template__categ_id__user_id" ON ("product_template__categ_id"."user_id" = "product_template__categ_id__user_id"."id")""".strip())
self.assertEquals(query.get_sql()[1].strip(), """product_product.template_id = product_template.id""".strip())
def test_mixed_query_chained_explicit_implicit_joins(self):
query = Query()
query.tables.extend(['"product_product"','"product_template"'])
query.tables.extend(['"product_product"', '"product_template"'])
query.where_clause.append("product_product.template_id = product_template.id")
query.join(("product_template", "product_category", "categ_id", "id"), outer=False) # add normal join
query.join(("product_category", "res_user", "user_id", "id"), outer=True) # CHAINED outer join
query.add_join(("product_template", "product_category", "categ_id", "id", "categ_id"), implicit=False, outer=False) # add normal join
query.add_join(("product_template__categ_id", "res_user", "user_id", "id", "user_id"), implicit=False, outer=True) # CHAINED outer join
query.tables.append('"account.account"')
query.where_clause.append("product_category.expense_account_id = account_account.id") # additional implicit join
query.where_clause.append("product_category.expense_account_id = account_account.id") # additional implicit join
self.assertEquals(query.get_sql()[0].strip(),
""""product_product","product_template" JOIN "product_category" ON ("product_template"."categ_id" = "product_category"."id") LEFT JOIN "res_user" ON ("product_category"."user_id" = "res_user"."id"),"account.account" """.strip())
""""product_product","product_template" JOIN "product_category" as "product_template__categ_id" ON ("product_template"."categ_id" = "product_template__categ_id"."id") LEFT JOIN "res_user" as "product_template__categ_id__user_id" ON ("product_template__categ_id"."user_id" = "product_template__categ_id__user_id"."id"),"account.account" """.strip())
self.assertEquals(query.get_sql()[1].strip(), """product_product.template_id = product_template.id AND product_category.expense_account_id = account_account.id""".strip())
def test_raise_missing_lhs(self):
query = Query()
query.tables.append('"product_product"')
self.assertRaises(AssertionError, query.join, ("product_template", "product_category", "categ_id", "id"), outer=False)
self.assertRaises(AssertionError, query.add_join, ("product_template", "product_category", "categ_id", "id", "categ_id"), implicit=False, outer=False)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: