[ADD] osv: initial basic implementation of Query object:
- added basic Query object to support more complex ORM use cases (LEFT OUTER JOINS at the moment) - added trivial unit tests for query object + run_test.py executable to launch them Warning: this is very basic and was only done as a low-risk fix to introduce support for OUTER joins in some cases. This will need to be improved and extended a lot to be really useful -> targeted for after v6.0... bzr revid: odo@openerp.com-20100930132403-2dx0k9znzromsig7
This commit is contained in:
parent
54fb35b34b
commit
109f51e125
|
@ -137,7 +137,8 @@ class ir_rule(osv.osv):
|
|||
def domain_get(self, cr, uid, model_name, mode='read', context={}):
|
||||
dom = self._compute_domain(cr, uid, model_name, mode=mode)
|
||||
if dom:
|
||||
return self.pool.get(model_name)._where_calc(cr, uid, dom, active_test=False)
|
||||
query = self.pool.get(model_name)._where_calc(cr, uid, dom, active_test=False)
|
||||
return query.where_clause, query.where_clause_params, query.tables
|
||||
return [], [], ['"'+self.pool.get(model_name)._table+'"']
|
||||
|
||||
def unlink(self, cr, uid, ids, context=None):
|
||||
|
|
204
bin/osv/orm.py
204
bin/osv/orm.py
|
@ -54,6 +54,7 @@ from tools.config import config
|
|||
from tools.translate import _
|
||||
|
||||
import fields
|
||||
from query import Query
|
||||
import tools
|
||||
from tools.safe_eval import safe_eval as eval
|
||||
|
||||
|
@ -2203,35 +2204,15 @@ class orm(orm_template):
|
|||
if not fields:
|
||||
fields = self._columns.keys()
|
||||
|
||||
# compute the where, order by, limit and offset clauses
|
||||
(where_clause, where_clause_params, tables) = self._where_calc(cr, uid, domain, context=context)
|
||||
|
||||
# apply direct ir.rules from current model
|
||||
self._apply_ir_rules(cr, uid, where_clause, where_clause_params, tables, 'read', context=context)
|
||||
|
||||
# then apply the ir.rules from the parents (through _inherits), adding the appropriate JOINs if needed
|
||||
for inherited_model in self._inherits:
|
||||
previous_tables = list(tables)
|
||||
if self._apply_ir_rules(cr, uid, where_clause, where_clause_params, tables, 'read', model_name=inherited_model, context=context):
|
||||
# if some rules were applied, need to add the missing JOIN for them to make sense, passing the previous
|
||||
# list of table in case the inherited table was not in the list before (as that means the corresponding
|
||||
# JOIN(s) was(were) not present)
|
||||
self._inherits_join_add(inherited_model, previous_tables, where_clause)
|
||||
tables = list(set(tables).union(set(previous_tables)))
|
||||
query = self._where_calc(cr, uid, domain, context=context)
|
||||
self._apply_ir_rules(cr, uid, query, 'read', context=context)
|
||||
|
||||
# Take care of adding join(s) if groupby is an '_inherits'ed field
|
||||
groupby_list = groupby
|
||||
if groupby:
|
||||
if isinstance(groupby, list):
|
||||
groupby = groupby[0]
|
||||
tables, where_clause, qfield = self._inherits_join_calc(groupby, tables, where_clause)
|
||||
|
||||
if len(where_clause):
|
||||
where_clause = ' where ' + ' and '.join(where_clause)
|
||||
else:
|
||||
where_clause = ''
|
||||
limit_str = limit and ' limit %d' % limit or ''
|
||||
offset_str = offset and ' offset %d' % offset or ''
|
||||
self._inherits_join_calc(groupby, query)
|
||||
|
||||
assert not groupby or groupby in fields, "Fields in 'groupby' must appear in the list of fields to read (perhaps it's missing in the list view?)"
|
||||
|
||||
|
@ -2263,8 +2244,13 @@ class orm(orm_template):
|
|||
flist += ','
|
||||
flist += operator+'('+f+') as '+f
|
||||
|
||||
gb = groupby and (' group by '+groupby) or ''
|
||||
cr.execute('select min(%s.id) as id,' % self._table + flist + ' from ' + ','.join(tables) + where_clause + gb + limit_str + offset_str, where_clause_params)
|
||||
gb = groupby and (' GROUP BY '+groupby) or ''
|
||||
|
||||
from_clause, where_clause, where_clause_params = query.get_sql()
|
||||
where_clause = where_clause and ' WHERE ' + where_clause
|
||||
limit_str = limit and ' limit %d' % limit or ''
|
||||
offset_str = offset and ' offset %d' % offset or ''
|
||||
cr.execute('SELECT min(%s.id) AS id,' % self._table + flist + ' FROM ' + from_clause + where_clause + gb + limit_str + offset_str, where_clause_params)
|
||||
alldata = {}
|
||||
groupby = group_by
|
||||
for r in cr.dictfetchall():
|
||||
|
@ -2303,43 +2289,37 @@ class orm(orm_template):
|
|||
del d['id']
|
||||
return data
|
||||
|
||||
def _inherits_join_add(self, parent_model_name, tables, where_clause):
|
||||
def _inherits_join_add(self, parent_model_name, query):
|
||||
"""
|
||||
Add missing table SELECT and JOIN clause for reaching the parent table (no duplicates)
|
||||
Add missing table SELECT and JOIN clause to ``query`` for reaching the parent table (no duplicates)
|
||||
|
||||
:param parent_model_name: name of the parent model for which the clauses should be added
|
||||
:param tables: list of table._table names enclosed in double quotes as returned
|
||||
by _where_calc()
|
||||
:param where_clause: current list of WHERE clause params
|
||||
:param query: query object on which the JOIN should be added
|
||||
"""
|
||||
inherits_field = self._inherits[parent_model_name]
|
||||
parent_model = self.pool.get(parent_model_name)
|
||||
parent_table_name = parent_model._table
|
||||
quoted_parent_table_name = '"%s"' % parent_table_name
|
||||
if quoted_parent_table_name not in tables:
|
||||
tables.append(quoted_parent_table_name)
|
||||
where_clause.append('("%s".%s = %s.id)' % (self._table, inherits_field, parent_table_name))
|
||||
return (tables, where_clause)
|
||||
if quoted_parent_table_name not in query.tables:
|
||||
query.tables.append(quoted_parent_table_name)
|
||||
query.where_clause.append('("%s".%s = %s.id)' % (self._table, inherits_field, parent_table_name))
|
||||
|
||||
def _inherits_join_calc(self, field, tables, where_clause):
|
||||
def _inherits_join_calc(self, field, query):
|
||||
"""
|
||||
Adds missing table select and join clause(s) for reaching
|
||||
Adds missing table select and join clause(s) to ``query`` for reaching
|
||||
the field coming from an '_inherits' parent table (no duplicates).
|
||||
|
||||
:param tables: list of table._table names enclosed in double quotes as returned
|
||||
by _where_calc()
|
||||
:param where_clause: current list of WHERE clause params
|
||||
:return: (tables, where_clause, qualified_field) where ``tables`` and ``where_clause`` are the updated
|
||||
versions of the parameters, and ``qualified_field`` is the qualified name of ``field``
|
||||
in the form ``table.field``, to be referenced in queries.
|
||||
:param field: name of inherited field to reach
|
||||
:param query: query object on which the JOIN should be added
|
||||
:return: qualified name of field, to be used in SELECT clause
|
||||
"""
|
||||
current_table = self
|
||||
while field in current_table._inherit_fields and not field in current_table._columns:
|
||||
parent_model_name = current_table._inherit_fields[field][0]
|
||||
parent_table = self.pool.get(parent_model_name)
|
||||
self._inherits_join_add(parent_model_name, tables, where_clause)
|
||||
self._inherits_join_add(parent_model_name, query)
|
||||
current_table = parent_table
|
||||
return (tables, where_clause, '"%s".%s' % (current_table._table, field))
|
||||
return '"%s".%s' % (current_table._table, field)
|
||||
|
||||
def _parent_store_compute(self, cr):
|
||||
if not self._parent_store:
|
||||
|
@ -3899,84 +3879,87 @@ class orm(orm_template):
|
|||
raise NotImplementedError(_('This method does not exist anymore'))
|
||||
|
||||
# TODO: ameliorer avec NULL
|
||||
def _where_calc(self, cr, user, args, active_test=True, context=None):
|
||||
def _where_calc(self, cr, user, domain, active_test=True, context=None):
|
||||
"""Computes the WHERE clause needed to implement an OpenERP domain.
|
||||
:param args: the domain to compute
|
||||
:type args: list
|
||||
:param domain: the domain to compute
|
||||
:type domain: list
|
||||
:param active_test: whether the default filtering of records with ``active``
|
||||
field set to ``False`` should be applied.
|
||||
:return: tuple with 3 elements: (where_clause, where_clause_params, tables) where
|
||||
``where_clause`` contains a list of where clause elements (to be joined with 'AND'),
|
||||
``where_clause_params`` is a list of parameters to be passed to the db layer
|
||||
for the where_clause expansion, and ``tables`` is the list of double-quoted
|
||||
table names that need to be included in the FROM clause.
|
||||
:rtype: tuple
|
||||
:return: the query expressing the given domain as provided in domain
|
||||
:rtype: osv.query.Query
|
||||
"""
|
||||
if not context:
|
||||
context = {}
|
||||
args = args[:]
|
||||
domain = domain[:]
|
||||
# if the object has a field named 'active', filter out all inactive
|
||||
# records unless they were explicitely asked for
|
||||
if 'active' in self._columns and (active_test and context.get('active_test', True)):
|
||||
if args:
|
||||
if domain:
|
||||
active_in_args = False
|
||||
for a in args:
|
||||
for a in domain:
|
||||
if a[0] == 'active':
|
||||
active_in_args = True
|
||||
if not active_in_args:
|
||||
args.insert(0, ('active', '=', 1))
|
||||
domain.insert(0, ('active', '=', 1))
|
||||
else:
|
||||
args = [('active', '=', 1)]
|
||||
domain = [('active', '=', 1)]
|
||||
|
||||
if args:
|
||||
if domain:
|
||||
import expression
|
||||
e = expression.expression(args)
|
||||
e = expression.expression(domain)
|
||||
e.parse(cr, user, self, context)
|
||||
tables = e.get_tables()
|
||||
qu1, qu2 = e.to_sql()
|
||||
qu1 = qu1 and [qu1] or []
|
||||
where_clause, where_params = e.to_sql()
|
||||
where_clause = where_clause and [where_clause] or []
|
||||
else:
|
||||
qu1, qu2, tables = [], [], ['"%s"' % self._table]
|
||||
where_clause, where_params, tables = [], [], ['"%s"' % self._table]
|
||||
|
||||
return (qu1, qu2, tables)
|
||||
return Query(tables, where_clause, where_params)
|
||||
|
||||
def _check_qorder(self, word):
|
||||
if not regex_order.match(word):
|
||||
raise except_orm(_('AccessError'), _('Invalid "order" specified. A valid "order" specification is a comma-separated list of valid field names (optionally followed by asc/desc for the direction)'))
|
||||
return True
|
||||
|
||||
def _apply_ir_rules(self, cr, uid, where_clause, where_clause_params, tables, mode='read', model_name=None, context=None):
|
||||
"""Add what's missing in ``where_clause``, ``where_params``, ``tables`` to implement
|
||||
all appropriate ir.rules (on the current object but also from it's _inherits parents)
|
||||
def _apply_ir_rules(self, cr, uid, query, mode='read', context=None):
|
||||
"""Add what's missing in ``query`` to implement all appropriate ir.rules
|
||||
(using the ``model_name``'s rules or the current model's rules if ``model_name`` is None)
|
||||
|
||||
:param where_clause: list with current elements of the WHERE clause (strings)
|
||||
:param where_clause_params: list with parameters for ``where_clause``
|
||||
:param tables: list with double-quoted names of the tables that are joined
|
||||
in ``where_clause``
|
||||
:param model_name: optional name of the model whose ir.rules should be applied (default:``self._name``)
|
||||
This could be useful for inheritance for example, but there is no provision to include
|
||||
the appropriate JOIN for linking the current model to the one referenced in model_name.
|
||||
:return: True if additional clauses where applied.
|
||||
:param query: the current query object
|
||||
"""
|
||||
added_clause, added_params, added_tables = self.pool.get('ir.rule').domain_get(cr, uid, model_name or self._name, mode, context=context)
|
||||
if added_clause:
|
||||
where_clause += added_clause
|
||||
where_clause_params += added_params
|
||||
for table in added_tables:
|
||||
if table not in tables:
|
||||
tables.append(table)
|
||||
return True
|
||||
return False
|
||||
def apply_rule(added_clause, added_params, added_tables):
|
||||
if added_clause:
|
||||
query.where_clause += added_clause
|
||||
query.where_clause_params += added_params
|
||||
for table in added_tables:
|
||||
if table not in query.tables:
|
||||
query.tables.append(table)
|
||||
return True
|
||||
return False
|
||||
|
||||
# apply main rules on the object
|
||||
rule_obj = self.pool.get('ir.rule')
|
||||
apply_rule(*rule_obj.domain_get(cr, uid, self._name, mode, context=context))
|
||||
|
||||
def _generate_m2o_order_by(self, order_field, tables, where_clause):
|
||||
# apply ir.rules from the parents (through _inherits), adding the appropriate JOINs if needed
|
||||
for inherited_model in self._inherits:
|
||||
if apply_rule(*rule_obj.domain_get(cr, uid, inherited_model, mode, context=context)):
|
||||
# if some rules were applied, need to add the missing JOIN for them to make sense, passing the previous
|
||||
# list of table in case the inherited table was not in the list before (as that means the corresponding
|
||||
# JOIN(s) was(were) not present)
|
||||
self._inherits_join_add(inherited_model, query)
|
||||
|
||||
def _generate_m2o_order_by(self, order_field, query):
|
||||
"""
|
||||
Add possibly missing JOIN and generate the ORDER BY clause for m2o fields,
|
||||
Add possibly missing JOIN to ``query`` and generate the ORDER BY clause for m2o fields,
|
||||
either native m2o fields or function/related fields that are stored, including
|
||||
intermediate JOINs for inheritance if required.
|
||||
|
||||
:return: the qualified field name to use in an ORDER BY clause to sort by ``order_field``
|
||||
"""
|
||||
if order_field not in self._columns and order_field in self._inherit_fields:
|
||||
# also add missing joins for reaching the table containing the m2o field
|
||||
tables, where_clause, qualified_field = self._inherits_join_calc(order_field, tables, where_clause)
|
||||
qualified_field = self._inherits_join_calc(order_field, query)
|
||||
order_field_column = self._inherit_fields[order_field][2]
|
||||
else:
|
||||
qualified_field = '"%s"."%s"' % (self._table, order_field)
|
||||
|
@ -3995,16 +3978,14 @@ class orm(orm_template):
|
|||
# extract the first field name, to be able to qualify it and add desc/asc
|
||||
m2o_order = m2o_order.split(",",1)[0].strip().split(" ",1)[0]
|
||||
|
||||
# the perhaps missing join:
|
||||
quoted_model_table = '"%s"' % dest_model._table
|
||||
if quoted_model_table not in tables:
|
||||
tables.append(quoted_model_table)
|
||||
where_clause.append('%s = %s.id' % (qualified_field, quoted_model_table))
|
||||
|
||||
return ('%s.%s' % (quoted_model_table, m2o_order), tables, where_clause)
|
||||
# Join the dest m2o table if it's not joined yet. We use [LEFT] OUTER join here
|
||||
# as we don't want to exclude results that have NULL values for the m2o
|
||||
src_table, src_field = qualified_field.replace('"','').split('.', 1)
|
||||
query.join((src_table, dest_model._table, src_field, 'id'), outer=True)
|
||||
return '"%s"."%s"' % (dest_model._table, m2o_order)
|
||||
|
||||
|
||||
def _generate_order_by(self, order_spec, tables, where_clause):
|
||||
def _generate_order_by(self, order_spec, query):
|
||||
"""
|
||||
Attempt to consruct an appropriate ORDER BY clause based on order_spec, which must be
|
||||
a comma-separated list of valid field names, optionally followed by an ASC or DESC direction.
|
||||
|
@ -4024,16 +4005,16 @@ class orm(orm_template):
|
|||
if order_column._classic_read:
|
||||
order_by_clause = '"%s"."%s"' % (self._table, order_field)
|
||||
elif order_column._type == 'many2one':
|
||||
order_by_clause, tables, where_clause = self._generate_m2o_order_by(order_field, tables, where_clause)
|
||||
order_by_clause = self._generate_m2o_order_by(order_field, query)
|
||||
else:
|
||||
continue # ignore non-readable or "non-joignable" fields
|
||||
elif order_field in self._inherit_fields:
|
||||
parent_obj = self.pool.get(self._inherit_fields[order_field][0])
|
||||
order_column = parent_obj._columns[order_field]
|
||||
if order_column._classic_read:
|
||||
tables, where_clause, order_by_clause = self._inherits_join_calc(order_field, tables, where_clause)
|
||||
order_by_clause = self._inherits_join_calc(order_field, query)
|
||||
elif order_column._type == 'many2one':
|
||||
order_by_clause, tables, where_clause = self._generate_m2o_order_by(order_field, tables, where_clause)
|
||||
order_by_clause = self._generate_m2o_order_by(order_field, query)
|
||||
else:
|
||||
continue # ignore non-readable or "non-joignable" fields
|
||||
order_by_elements.append("%s %s" % (order_by_clause, order_direction))
|
||||
|
@ -4054,34 +4035,21 @@ class orm(orm_template):
|
|||
if context is None:
|
||||
context = {}
|
||||
self.pool.get('ir.model.access').check(cr, access_rights_uid or user, self._name, 'read', context=context)
|
||||
# compute the where, order by, limit and offset clauses
|
||||
(where_clause, where_clause_params, tables) = self._where_calc(cr, user, args, context=context)
|
||||
|
||||
# apply direct ir.rules from current model
|
||||
self._apply_ir_rules(cr, user, where_clause, where_clause_params, tables, 'read', context=context)
|
||||
query = self._where_calc(cr, user, args, context=context)
|
||||
self._apply_ir_rules(cr, user, query, 'read', context=context)
|
||||
order_by = self._generate_order_by(order, query)
|
||||
from_clause, where_clause, where_clause_params = query.get_sql()
|
||||
|
||||
# then apply the ir.rules from the parents (through _inherits), adding the appropriate JOINs if needed
|
||||
for inherited_model in self._inherits:
|
||||
previous_tables = list(tables)
|
||||
if self._apply_ir_rules(cr, user, where_clause, where_clause_params, tables, 'read', model_name=inherited_model, context=context):
|
||||
# if some rules were applied, need to add the missing JOIN for them to make sense, passing the previous
|
||||
# list of table in case the inherited table was not in the list before (as that means the corresponding
|
||||
# JOIN(s) was(were) not present)
|
||||
self._inherits_join_add(inherited_model, previous_tables, where_clause)
|
||||
tables = list(set(tables).union(set(previous_tables)))
|
||||
|
||||
where = where_clause
|
||||
order_by = self._generate_order_by(order, tables, where_clause)
|
||||
limit_str = limit and ' limit %d' % limit or ''
|
||||
offset_str = offset and ' offset %d' % offset or ''
|
||||
where_str = where and (" WHERE %s" % " AND ".join(where)) or ''
|
||||
where_str = where_clause and (" WHERE %s" % where_clause) or ''
|
||||
|
||||
if count:
|
||||
cr.execute('select count(%s.id) from ' % self._table +
|
||||
','.join(tables) + where_str + limit_str + offset_str, where_clause_params)
|
||||
cr.execute('SELECT count("%s".id) FROM ' % self._table + from_clause + where_str + limit_str + offset_str, where_clause_params)
|
||||
res = cr.fetchall()
|
||||
return res[0][0]
|
||||
cr.execute('select %s.id from ' % self._table + ','.join(tables) + where_str + order_by + limit_str+offset_str, where_clause_params)
|
||||
cr.execute('SELECT "%s".id FROM ' % self._table + from_clause + where_str + order_by + limit_str + offset_str, where_clause_params)
|
||||
res = cr.fetchall()
|
||||
return [x[0] for x in res]
|
||||
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
##############################################################################
|
||||
#
|
||||
# OpenERP, Open Source Management Solution
|
||||
# Copyright (C) 2010 OpenERP S.A. http://www.openerp.com
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
|
||||
def _quote(to_quote):
|
||||
if '"' not in to_quote:
|
||||
return '"%s"' % to_quote
|
||||
return to_quote
|
||||
|
||||
|
||||
class Query(object):
|
||||
"""
|
||||
Dumb implementation of a Query object, using 3 string lists so far
|
||||
for backwards compatibility with the (table, where_clause, where_params) previously used.
|
||||
|
||||
TODO: To be improved after v6.0 to rewrite part of the ORM and add support for:
|
||||
- auto-generated multiple table aliases
|
||||
- multiple joins to the same table with different conditions
|
||||
- dynamic right-hand-side values in domains (e.g. a.name = a.description)
|
||||
- etc.
|
||||
"""
|
||||
|
||||
def __init__(self, tables=None, where_clause=None, where_clause_params=None, joins=None):
|
||||
|
||||
# holds the list of tables joined using default JOIN.
|
||||
# the table names are stored double-quoted (backwards compatibility)
|
||||
self.tables = tables or []
|
||||
|
||||
# holds the list of WHERE clause elements, to be joined with
|
||||
# 'AND' when generating the final query
|
||||
self.where_clause = where_clause or []
|
||||
|
||||
# holds the parameters for the formatting of `where_clause`, to be
|
||||
# passed to psycopg's execute method.
|
||||
self.where_clause_params = where_clause_params or []
|
||||
|
||||
# holds table joins done explicitly, supporting outer joins. The JOIN
|
||||
# condition should not be in `where_clause`. The dict is used as follows:
|
||||
# self.joins = {
|
||||
# 'table_a': [
|
||||
# ('table_b', 'table_a_col1', 'table_b_col', 'LEFT JOIN'),
|
||||
# ('table_c', 'table_a_col2', 'table_c_col', 'LEFT JOIN'),
|
||||
# ('table_d', 'table_a_col3', 'table_d_col', 'JOIN'),
|
||||
# ]
|
||||
# }
|
||||
# which should lead to the following SQL:
|
||||
# SELECT ... FROM "table_a" LEFT JOIN "table_b" ON ("table_a"."table_a_col1" = "table_b"."table_b_col")
|
||||
# LEFT JOIN "table_c" ON ("table_a"."table_a_col2" = "table_c"."table_c_col")
|
||||
self.joins = joins or {}
|
||||
|
||||
def join(self, connection, outer=False):
|
||||
"""Adds the JOIN specified in ``connection``.
|
||||
|
||||
:param connection: a tuple ``(lhs, table, lhs_col, col)``.
|
||||
The join corresponds to the SQL equivalent of::
|
||||
|
||||
``(lhs.lhs_col = table.col)``
|
||||
|
||||
:param outer: True if a LEFT OUTER JOIN should be used, if possible
|
||||
(no promotion to OUTER JOIN is supported in case the JOIN
|
||||
was already present in the query, as for the moment
|
||||
implicit INNER JOINs are only connected from NON-NULL
|
||||
columns so it would not be correct (e.g. for
|
||||
``_inherits`` or when a domain criterion explicitly
|
||||
adds filtering)
|
||||
"""
|
||||
(lhs, table, lhs_col, col) = connection
|
||||
lhs = _quote(lhs)
|
||||
table = _quote(table)
|
||||
assert lhs in self.tables, "Left-hand-side table must already be part of the query!"
|
||||
if table in self.tables:
|
||||
# already joined, must ignore (promotion to outer and multiple joins not supported yet)
|
||||
pass
|
||||
else:
|
||||
# add JOIN
|
||||
self.tables.append(table)
|
||||
self.joins.setdefault(lhs, []).append((table, lhs_col, col, outer and 'LEFT JOIN' or 'JOIN'))
|
||||
return self
|
||||
|
||||
def get_sql(self):
|
||||
"""Returns (query_from, query_where, query_params)"""
|
||||
query_from = ''
|
||||
tables_to_process = list(self.tables)
|
||||
for table in tables_to_process:
|
||||
query_from += ' %s ' % table
|
||||
if table in self.joins:
|
||||
for (dest_table, lhs_col, col, join) in self.joins[table]:
|
||||
tables_to_process.remove(dest_table)
|
||||
query_from += '%s %s ON (%s."%s" = %s."%s")' % \
|
||||
(join, dest_table, table, lhs_col, dest_table, col)
|
||||
query_from += ','
|
||||
query_from = query_from[:-1] # drop last comma
|
||||
return (query_from, " AND ".join(self.where_clause), self.where_clause_params)
|
||||
|
||||
def __str__(self):
|
||||
return '<osv.Query: "SELECT ... FROM %s WHERE %s" with params: %r>' % self.get_sql()
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
|
@ -0,0 +1,27 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
##############################################################################
|
||||
#
|
||||
# OpenERP, Open Source Management Solution
|
||||
# Copyright (C) 2010 OpenERP S.A. http://www.openerp.com
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
import unittest
|
||||
|
||||
import test
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.TextTestRunner(verbosity=2).run(unittest.defaultTestLoader.loadTestsFromModule(test))
|
|
@ -0,0 +1,22 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
##############################################################################
|
||||
#
|
||||
# OpenERP, Open Source Management Solution
|
||||
# Copyright (C) 2010 OpenERP S.A. http://www.openerp.com
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
from test_osv import *
|
|
@ -0,0 +1,40 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
##############################################################################
|
||||
#
|
||||
# OpenERP, Open Source Management Solution
|
||||
# Copyright (C) 2010 OpenERP S.A. http://www.openerp.com
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
import unittest
|
||||
from osv.query import Query
|
||||
|
||||
class QueryTestCase(unittest.TestCase):
|
||||
|
||||
def test_basic_query(self):
|
||||
query = Query()
|
||||
query.tables.extend(['"product_product"','"product_template"'])
|
||||
query.where_clause.append("product_product.template_id = product_template.id")
|
||||
query.join(("product_template", "product_category", "categ_id", "id"), outer=False) # add normal join
|
||||
query.join(("product_product", "res_user", "user_id", "id"), outer=True) # outer join
|
||||
self.assertEquals(query.get_sql()[0].strip(),
|
||||
""""product_product" LEFT JOIN "res_user" ON ("product_product"."user_id" = "res_user"."id"), "product_template" JOIN "product_category" ON ("product_template"."categ_id" = "product_category"."id") """.strip(), "Incorrect query")
|
||||
self.assertEquals(query.get_sql()[1].strip(), """product_product.template_id = product_template.id""".strip(), "Incorrect where clause")
|
||||
|
||||
def test_raise_missing_lhs(self):
|
||||
query = Query()
|
||||
query.tables.append('"product_product"')
|
||||
self.assertRaises(AssertionError, query.join, ("product_template", "product_category", "categ_id", "id"), outer=False)
|
Loading…
Reference in New Issue