6099 lines
277 KiB
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
Object Relational Mapping module:
* Hierarchical structure
* Constraints consistency and validation
* Object metadata depends on its status
* Optimised processing by complex query (multiple actions at once)
* Default field values
* Permissions optimisation
* Persistant object: DB postgresql
* Data conversion
* Multi-level caching system
* Two different inheritance mechanisms
* Rich set of field types:
- classical (varchar, integer, boolean, ...)
- relational (one2many, many2one, many2many)
- functional
import datetime
import functools
import itertools
import logging
import operator
import pickle
import pytz
import re
import time
from collections import defaultdict, MutableMapping
from inspect import getmembers, currentframe
from operator import itemgetter
import babel.dates
import dateutil.relativedelta
import psycopg2
from lxml import etree
import openerp
from . import SUPERUSER_ID
from . import api
from . import tools
from .api import Environment
from .exceptions import except_orm, AccessError, MissingError, ValidationError
from .osv import fields
from .osv.query import Query
from .tools import frozendict, lazy_property, ormcache
from .tools.config import config
from .tools.func import frame_codeinfo
from .tools.safe_eval import safe_eval as eval
from .tools.translate import _
_logger = logging.getLogger(__name__)
_schema = logging.getLogger(__name__ + '.schema')
regex_order = re.compile('^( *([a-z0-9:_]+|"[a-z0-9:_]+")( *desc| *asc)?( *, *|))+$', re.I)
regex_object_name = re.compile(r'^[a-z0-9_.]+$')
onchange_v7 = re.compile(r"^(\w+)\((.*)\)$")
def check_object_name(name):
""" Check if the given name is a valid openerp object name.
The _name attribute in osv and osv_memory object is subject to
some restrictions. This function returns True or False whether
the given name is allowed or not.
TODO: this is an approximation. The goal in this approximation
is to disallow uppercase characters (in some places, we quote
table/column names and in other not, which leads to this kind
of errors:
psycopg2.ProgrammingError: relation "xxx" does not exist).
The same restriction should apply to both osv and osv_memory
objects for consistency.
if regex_object_name.match(name) is None:
return False
return True
def raise_on_invalid_object_name(name):
if not check_object_name(name):
msg = "The _name attribute %s is not valid." % name
raise except_orm('ValueError', msg)
'RESTRICT': 'r',
'NO ACTION': 'a',
'CASCADE': 'c',
'SET NULL': 'n',
def intersect(la, lb):
return filter(lambda x: x in lb, la)
def same_name(f, g):
""" Test whether functions ``f`` and ``g`` are identical or have the same name """
return f == g or getattr(f, '__name__', 0) == getattr(g, '__name__', 1)
def fix_import_export_id_paths(fieldname):
Fixes the id fields in import and exports, and splits field paths
on '/'.
:param str fieldname: name of the field to import/export
:return: split field name
:rtype: list of str
fixed_db_id = re.sub(r'([^/])\.id', r'\1/.id', fieldname)
fixed_external_id = re.sub(r'([^/]):id', r'\1/id', fixed_db_id)
return fixed_external_id.split('/')
def pg_varchar(size=0):
""" Returns the VARCHAR declaration for the provided size:
* If no size (or an empty or negative size is provided) return an
'infinite' VARCHAR
* Otherwise return a VARCHAR(n)
:type int size: varchar size, optional
:rtype: str
if size:
if not isinstance(size, int):
raise TypeError("VARCHAR parameter should be an int, got %s"
% type(size))
if size > 0:
return 'VARCHAR(%d)' % size
return 'VARCHAR'
fields.boolean: 'bool',
fields.integer: 'int4',
fields.text: 'text',
fields.html: 'text',
fields.date: 'date',
fields.datetime: 'timestamp',
fields.binary: 'bytea',
fields.many2one: 'int4',
fields.serialized: 'text',
def get_pg_type(f, type_override=None):
:param fields._column f: field to get a Postgres type for
:param type type_override: use the provided type for dispatching instead of the field's own type
:returns: (postgres_identification_type, postgres_type_specification)
:rtype: (str, str)
field_type = type_override or type(f)
if field_type in FIELDS_TO_PGTYPES:
pg_type = (FIELDS_TO_PGTYPES[field_type], FIELDS_TO_PGTYPES[field_type])
elif issubclass(field_type, fields.float):
# Explicit support for "falsy" digits (0, False) to indicate a
# NUMERIC field with no fixed precision. The values will be saved
# in the database with all significant digits.
# FLOAT8 type is still the default when there is no precision because
# it is faster for most operations (sums, etc.)
if f.digits is not None:
pg_type = ('numeric', 'NUMERIC')
pg_type = ('float8', 'DOUBLE PRECISION')
elif issubclass(field_type, (fields.char, fields.reference)):
pg_type = ('varchar', pg_varchar(f.size))
elif issubclass(field_type, fields.selection):
if (f.selection and isinstance(f.selection, list) and isinstance(f.selection[0][0], int))\
or getattr(f, 'size', None) == -1:
pg_type = ('int4', 'INTEGER')
pg_type = ('varchar', pg_varchar(getattr(f, 'size', None)))
elif issubclass(field_type, fields.function):
if f._type == 'selection':
pg_type = ('varchar', pg_varchar())
pg_type = get_pg_type(f, getattr(fields, f._type))
_logger.warning('%s type not supported!', field_type)
pg_type = None
return pg_type
class MetaModel(api.Meta):
""" Metaclass for the models.
This class is used as the metaclass for the class :class:`BaseModel` to
discover the models defined in a module (without instantiating them).
If the automatic discovery is not needed, it is possible to set the model's
``_register`` attribute to False.
module_to_models = {}
def __init__(self, name, bases, attrs):
if not self._register:
self._register = True
super(MetaModel, self).__init__(name, bases, attrs)
if not hasattr(self, '_module'):
# The (OpenERP) module name can be in the ``openerp.addons`` namespace
# or not. For instance, module ``sale`` can be imported as
# ``openerp.addons.sale`` (the right way) or ``sale`` (for backward
# compatibility).
module_parts = self.__module__.split('.')
if len(module_parts) > 2 and module_parts[:2] == ['openerp', 'addons']:
module_name = self.__module__.split('.')[2]
module_name = self.__module__.split('.')[0]
self._module = module_name
# Remember which models to instanciate for this module.
if not self._custom:
self.module_to_models.setdefault(self._module, []).append(self)
# check for new-api conversion error: leave comma after field definition
for key, val in attrs.iteritems():
if type(val) is tuple and len(val) == 1 and isinstance(val[0], Field):
_logger.error("Trailing comma after field definition: %s.%s", self, key)
# transform columns into new-style fields (enables field inheritance)
for name, column in self._columns.iteritems():
if name in self.__dict__:
_logger.warning("In class %s, field %r overriding an existing value", self, name)
setattr(self, name, column.to_field())
class NewId(object):
""" Pseudo-ids for new records. """
def __nonzero__(self):
return False
IdType = (int, long, basestring, NewId)
# maximum number of prefetched records
# special columns automatically created by the ORM
LOG_ACCESS_COLUMNS = ['create_uid', 'create_date', 'write_uid', 'write_date']
class BaseModel(object):
""" Base class for OpenERP models.
OpenERP models are created by inheriting from this class' subclasses:
* :class:`Model` for regular database-persisted models
* :class:`TransientModel` for temporary data, stored in the database but
automatically vacuumed every so often
* :class:`AbstractModel` for abstract super classes meant to be shared by
multiple inheriting model
The system automatically instantiates every model once per database. Those
instances represent the available models on each database, and depend on
which modules are installed on that database. The actual class of each
instance is built from the Python classes that create and inherit from the
corresponding model.
Every model instance is a "recordset", i.e., an ordered collection of
records of the model. Recordsets are returned by methods like
:meth:`~.browse`, :meth:`~.search`, or field accesses. Records have no
explicit representation: a record is represented as a recordset of one
To create a class that should not be instantiated, the _register class
attribute may be set to False.
__metaclass__ = MetaModel
_auto = True # create database backend
_register = False # Set to false if the model shouldn't be automatically discovered.
_name = None
_columns = {}
_constraints = []
_custom = False
_defaults = {}
_rec_name = None
_parent_name = 'parent_id'
_parent_store = False
_parent_order = False
_date_name = 'date'
_order = 'id'
_sequence = None
_description = None
_needaction = False
_translate = True # set to False to disable translations export for this model
# dict of {field:method}, with method returning the (name_get of records, {id: fold})
# to include in the _read_group, if grouped on this field
_group_by_full = {}
# Transience
_transient = False # True in a TransientModel
# structure:
# { 'parent_model': 'm2o_field', ... }
_inherits = {}
# Mapping from inherits'd field name to triple (m, r, f, n) where m is the
# model from which it is inherits'd, r is the (local) field towards m, f
# is the _column object itself, and n is the original (i.e. top-most)
# parent model.
# Example:
# { 'field_name': ('parent_model', 'm2o_field_to_reach_parent',
# field_column_obj, origina_parent_model), ... }
_inherit_fields = {}
_table = None
_log_create = False
_sql_constraints = []
# model dependencies, for models backed up by sql views:
# {model_name: field_names, ...}
_depends = {}
def log(self, cr, uid, id, message, secondary=False, context=None):
return _logger.warning("log() is deprecated. Please use OpenChatter notification system instead of the res.log mechanism.")
def view_init(self, cr, uid, fields_list, context=None):
"""Override this method to do specific things when a view on the object is opened."""
def _field_create(self, cr, context=None):
""" Create entries in ir_model_fields for all the model's fields.
If necessary, also create an entry in ir_model, and if called from the
modules loading scheme (by receiving 'module' in the context), also
create entries in ir_model_data (for the model and the fields).
- create an entry in ir_model (if there is not already one),
- create an entry in ir_model_data (if there is not already one, and if
'module' is in the context),
- update ir_model_fields with the fields found in _columns
(TODO there is some redundancy as _columns is updated from
ir_model_fields in __init__).
if context is None:
context = {}
cr.execute("SELECT id FROM ir_model WHERE model=%s", (self._name,))
if not cr.rowcount:
cr.execute('SELECT nextval(%s)', ('ir_model_id_seq',))
model_id = cr.fetchone()[0]
cr.execute("INSERT INTO ir_model (id,model, name, info,state) VALUES (%s, %s, %s, %s, %s)", (model_id, self._name, self._description, self.__doc__, 'base'))
model_id = cr.fetchone()[0]
if 'module' in context:
name_id = 'model_'+self._name.replace('.', '_')
cr.execute('select * from ir_model_data where name=%s and module=%s', (name_id, context['module']))
if not cr.rowcount:
cr.execute("INSERT INTO ir_model_data (name,date_init,date_update,module,model,res_id) VALUES (%s, (now() at time zone 'UTC'), (now() at time zone 'UTC'), %s, %s, %s)", \
(name_id, context['module'], 'ir.model', model_id)
cr.execute("SELECT * FROM ir_model_fields WHERE model=%s", (self._name,))
cols = {}
for rec in cr.dictfetchall():
cols[rec['name']] = rec
ir_model_fields_obj = self.pool.get('ir.model.fields')
# sparse field should be created at the end, as it depends on its serialized field already existing
model_fields = sorted(self._fields.items(), key=lambda x: 1 if x[1].type == 'sparse' else 0)
for (k, f) in model_fields:
vals = {
'model_id': model_id,
'model': self._name,
'name': k,
'field_description': f.string,
'ttype': f.type,
'relation': f.comodel_name or '',
'select_level': tools.ustr(int(f.index)),
'readonly': (f.readonly and 1) or 0,
'required': (f.required and 1) or 0,
'selectable': (f.search or f.store and 1) or 0,
'translate': (f.translate if hasattr(f,'translate') else False and 1) or 0,
'relation_field': f.inverse_name if hasattr(f, 'inverse_name') else '',
'serialization_field_id': None,
if getattr(f, 'serialization_field', None):
# resolve link to serialization_field if specified by name
serialization_field_id = ir_model_fields_obj.search(cr, SUPERUSER_ID, [('model','=',vals['model']), ('name', '=', f.serialization_field)])
if not serialization_field_id:
raise except_orm(_('Error'), _("Serialization field `%s` not found for sparse field `%s`!") % (f.serialization_field, k))
vals['serialization_field_id'] = serialization_field_id[0]
# When its a custom field,it does not contain f.select
if context.get('field_state', 'base') == 'manual':
if context.get('field_name', '') == k:
vals['select_level'] = context.get('select', '0')
#setting value to let the problem NOT occur next time
elif k in cols:
vals['select_level'] = cols[k]['select_level']
if k not in cols:
cr.execute('select nextval(%s)', ('ir_model_fields_id_seq',))
id = cr.fetchone()[0]
vals['id'] = id
cr.execute("""INSERT INTO ir_model_fields (
id, model_id, model, name, field_description, ttype,
relation,state,select_level,relation_field, translate, serialization_field_id
)""", (
id, vals['model_id'], vals['model'], vals['name'], vals['field_description'], vals['ttype'],
vals['relation'], 'base',
vals['select_level'], vals['relation_field'], bool(vals['translate']), vals['serialization_field_id']
if 'module' in context:
name1 = 'field_' + self._table + '_' + k
cr.execute("select name from ir_model_data where name=%s", (name1,))
if cr.fetchone():
name1 = name1 + "_" + str(id)
cr.execute("INSERT INTO ir_model_data (name,date_init,date_update,module,model,res_id) VALUES (%s, (now() at time zone 'UTC'), (now() at time zone 'UTC'), %s, %s, %s)", \
(name1, context['module'], 'ir.model.fields', id)
for key, val in vals.items():
if cols[k][key] != vals[key]:
cr.execute('update ir_model_fields set field_description=%s where model=%s and name=%s', (vals['field_description'], vals['model'], vals['name']))
cr.execute("""UPDATE ir_model_fields SET
model_id=%s, field_description=%s, ttype=%s, relation=%s,
select_level=%s, readonly=%s ,required=%s, selectable=%s, relation_field=%s, translate=%s, serialization_field_id=%s
model=%s AND name=%s""", (
vals['model_id'], vals['field_description'], vals['ttype'],
vals['select_level'], bool(vals['readonly']), bool(vals['required']), bool(vals['selectable']), vals['relation_field'], bool(vals['translate']), vals['serialization_field_id'], vals['model'], vals['name']
self.invalidate_cache(cr, SUPERUSER_ID)
def _add_field(cls, name, field):
""" Add the given ``field`` under the given ``name`` in the class """
# add field as an attribute and in cls._fields (for reflection)
if not isinstance(getattr(cls, name, field), Field):
_logger.warning("In model %r, field %r overriding existing value", cls._name, name)
setattr(cls, name, field)
cls._fields[name] = field
# basic setup of field
field.set_class_name(cls, name)
# cls._columns will be updated once fields are set up
def _pop_field(cls, name):
""" Remove the field with the given ``name`` from the model.
This method should only be used for manual fields.
field = cls._fields.pop(name)
cls._columns.pop(name, None)
if hasattr(cls, name):
delattr(cls, name)
return field
def _add_magic_fields(cls):
""" Introduce magic fields on the current class
* id is a "normal" field (with a specific getter)
* create_uid, create_date, write_uid and write_date have become
"normal" fields
* $CONCURRENCY_CHECK_FIELD is a computed field with its computing
method defined dynamically. Uses ``str(datetime.datetime.utcnow())``
to get the same structure as the previous
``(now() at time zone 'UTC')::timestamp``::
# select (now() at time zone 'UTC')::timestamp;
2013-06-18 08:30:37.292809
>>> str(datetime.datetime.utcnow())
'2013-06-18 08:31:32.821177'
def add(name, field):
""" add ``field`` with the given ``name`` if it does not exist yet """
if name not in cls._fields:
cls._add_field(name, field)
# cyclic import
from . import fields
# this field 'id' must override any other column or field
cls._add_field('id', fields.Id(automatic=True))
add('display_name', fields.Char(string='Display Name', automatic=True,
if cls._log_access:
add('create_uid', fields.Many2one('res.users', string='Created by', automatic=True))
add('create_date', fields.Datetime(string='Created on', automatic=True))
add('write_uid', fields.Many2one('res.users', string='Last Updated by', automatic=True))
add('write_date', fields.Datetime(string='Last Updated on', automatic=True))
last_modified_name = 'compute_concurrency_field_with_access'
last_modified_name = 'compute_concurrency_field'
# this field must override any other column or field
cls._add_field(cls.CONCURRENCY_CHECK_FIELD, fields.Datetime(
string='Last Modified on', compute=last_modified_name, automatic=True))
def compute_concurrency_field(self):
@api.depends('create_date', 'write_date')
def compute_concurrency_field_with_access(self):
self.write_date or self.create_date or \
# Goal: try to apply inheritance at the instantiation level and
# put objects in the pool var
def _build_model(cls, pool, cr):
""" Instantiate a given model.
This class method instantiates the class of some model (i.e. a class
deriving from osv or osv_memory). The class might be the class passed
in argument or, if it inherits from another class, a class constructed
by combining the two classes.
# The model's class inherits from cls and the classes of the inherited
# models. All those classes are combined in a flat hierarchy:
# Model the base class of all models
# / | \
# cls c2 c1 the classes defined in modules
# \ | /
# ModelClass the final class of the model
# / | \
# model recordset ... the class' instances
# The registry contains the instance ``model``. Its class, ``ModelClass``,
# carries inferred metadata that is shared between all the model's
# instances for this registry only. When we '_inherit' from another
# model, we do not inherit its ``ModelClass``, but this class' parents.
# This is a limitation of the inheritance mechanism.
# Keep links to non-inherited constraints in cls; this is useful for
# instance when exporting translations
cls._local_constraints = cls.__dict__.get('_constraints', [])
cls._local_sql_constraints = cls.__dict__.get('_sql_constraints', [])
# determine inherited models
parents = getattr(cls, '_inherit', [])
parents = [parents] if isinstance(parents, basestring) else (parents or [])
# determine the model's name
name = cls._name or (len(parents) == 1 and parents[0]) or cls.__name__
# determine the module that introduced the model
original_module = pool[name]._original_module if name in parents else cls._module
# determine all the classes the model should inherit from
bases = [cls]
hierarchy = cls
for parent in parents:
if parent not in pool:
raise TypeError('The model "%s" specifies an unexisting parent class "%s"\n'
'You may need to add a dependency on the parent class\' module.' % (name, parent))
parent_class = type(pool[parent])
bases += parent_class.__bases__
hierarchy = type(name, (hierarchy, parent_class), {'_register': False})
# order bases following the mro of class hierarchy
bases = [base for base in hierarchy.mro() if base in bases]
# determine the attributes of the model's class
inherits = {}
depends = {}
constraints = {}
sql_constraints = []
for base in reversed(bases):
for mname, fnames in base._depends.iteritems():
depends[mname] = depends.get(mname, []) + fnames
for cons in base._constraints:
# cons may override a constraint with the same function name
constraints[getattr(cons[0], '__name__', id(cons[0]))] = cons
sql_constraints += base._sql_constraints
# build the actual class of the model
ModelClass = type(name, tuple(bases), {
'_name': name,
'_register': False,
'_columns': None, # recomputed in _setup_fields()
'_defaults': None, # recomputed in _setup_base()
'_fields': frozendict(), # idem
'_inherits': inherits,
'_depends': depends,
'_constraints': constraints.values(),
'_sql_constraints': sql_constraints,
'_original_module': original_module,
# instantiate the model, and initialize it
model = object.__new__(ModelClass)
model.__init__(pool, cr)
return model
def _init_function_fields(cls, pool, cr):
# initialize the list of non-stored function fields for this model
pool._pure_function_fields[cls._name] = []
# process store of low-level function fields
for fname, column in cls._columns.iteritems():
# filter out existing store about this field
pool._store_function[cls._name] = [
for stored in pool._store_function.get(cls._name, [])
if (stored[0], stored[1]) != (cls._name, fname)
if not isinstance(column, fields.function):
if not column.store:
# register it on the pool for invalidation
# process store parameter
store = column.store
if store is True:
get_ids = lambda self, cr, uid, ids, c={}: ids
store = {cls._name: (get_ids, None, column.priority, None)}
for model, spec in store.iteritems():
if len(spec) == 4:
(fnct, fields2, order, length) = spec
elif len(spec) == 3:
(fnct, fields2, order) = spec
length = None
raise except_orm('Error',
('Invalid function definition %s in object %s !\nYou must use the definition: store={object:(fnct, fields, priority, time length)}.' % (fname, cls._name)))
pool._store_function.setdefault(model, [])
t = (cls._name, fname, fnct, tuple(fields2) if fields2 else None, order, length)
if t not in pool._store_function[model]:
pool._store_function[model].sort(key=lambda x: x[4])
def _init_manual_fields(cls, cr, partial):
manual_fields = cls.pool.get_manual_fields(cr, cls._name)
for name, field in manual_fields.iteritems():
if name in cls._fields:
attrs = {
'manual': True,
'string': field['field_description'],
'required': bool(field['required']),
'readonly': bool(field['readonly']),
# FIXME: ignore field['serialization_field_id']
if field['ttype'] in ('char', 'text', 'html'):
attrs['translate'] = bool(field['translate'])
attrs['size'] = field['size'] or None
elif field['ttype'] in ('selection', 'reference'):
attrs['selection'] = eval(field['selection'])
elif field['ttype'] == 'many2one':
if partial and field['relation'] not in cls.pool:
attrs['comodel_name'] = field['relation']
attrs['ondelete'] = field['on_delete']
attrs['domain'] = eval(field['domain']) if field['domain'] else None
elif field['ttype'] == 'one2many':
if partial and not (
field['relation'] in cls.pool and (
field['relation_field'] in cls.pool[field['relation']]._fields or
field['relation_field'] in cls.pool.get_manual_fields(cr, field['relation'])
attrs['comodel_name'] = field['relation']
attrs['inverse_name'] = field['relation_field']
attrs['domain'] = eval(field['domain']) if field['domain'] else None
elif field['ttype'] == 'many2many':
if partial and field['relation'] not in cls.pool:
attrs['comodel_name'] = field['relation']
_rel1 = field['relation'].replace('.', '_')
_rel2 = field['model'].replace('.', '_')
attrs['relation'] = 'x_%s_%s_%s_rel' % (_rel1, _rel2, name)
attrs['column1'] = 'id1'
attrs['column2'] = 'id2'
attrs['domain'] = eval(field['domain']) if field['domain'] else None
cls._add_field(name, Field.by_type[field['ttype']](**attrs))
def _init_constraints_onchanges(cls):
# store sql constraint error messages
for (key, _, msg) in cls._sql_constraints:
cls.pool._sql_error[cls._table + '_' + key] = msg
def _constraint_methods(self):
""" Return a list of methods implementing Python constraints. """
def is_constraint(func):
return callable(func) and hasattr(func, '_constrains')
cls = type(self)
methods = []
for attr, func in getmembers(cls, is_constraint):
if not all(name in cls._fields for name in func._constrains):
_logger.warning("@constrains%r parameters must be field names", func._constrains)
# optimization: memoize result on cls, it will not be recomputed
cls._constraint_methods = methods
return methods
def _onchange_methods(self):
""" Return a dictionary mapping field names to onchange methods. """
def is_onchange(func):
return callable(func) and hasattr(func, '_onchange')
cls = type(self)
methods = defaultdict(list)
for attr, func in getmembers(cls, is_onchange):
for name in func._onchange:
if name not in cls._fields:
_logger.warning("@onchange%r parameters must be field names", func._onchange)
# optimization: memoize result on cls, it will not be recomputed
cls._onchange_methods = methods
return methods
def __new__(cls):
# In the past, this method was registering the model class in the server.
# This job is now done entirely by the metaclass MetaModel.
# Do not create an instance here. Model instances are created by method
# _build_model().
return None
def __init__(self, pool, cr):
""" Initialize a model and make it part of the given registry.
- copy the stored fields' functions in the registry,
- retrieve custom fields and add them in the model,
- ensure there is a many2one for each _inherits'd parent,
- update the children's _columns,
- give a chance to each field to initialize itself.
cls = type(self)
# link the class to the registry, and update the registry
cls.pool = pool
cls._model = self # backward compatibility
pool.add(cls._name, self)
# determine description, table, sequence and log_access
if not cls._description:
cls._description = cls._name
if not cls._table:
cls._table = cls._name.replace('.', '_')
if not cls._sequence:
cls._sequence = cls._table + '_id_seq'
if not hasattr(cls, '_log_access'):
# If _log_access is not specified, it is the same value as _auto.
cls._log_access = cls._auto
# Transience
if cls.is_transient():
cls._transient_check_count = 0
cls._transient_max_count = config.get('osv_memory_count_limit')
cls._transient_max_hours = config.get('osv_memory_age_limit')
assert cls._log_access, \
"TransientModels must have log_access turned on, " \
"in order to implement their access rights policy"
def _is_an_ordinary_table(self):
FROM pg_class
WHERE relname = %s
AND relkind = %s""", [self._table, 'r'])
return bool(self.env.cr.fetchone())
def __export_xml_id(self):
""" Return a valid xml_id for the record ``self``. """
if not self._is_an_ordinary_table():
raise Exception(
"You can not export the column ID of model %s, because the "
"table %s is not an ordinary table."
% (self._name, self._table))
ir_model_data = self.sudo().env['ir.model.data']
data = ir_model_data.search([('model', '=', self._name), ('res_id', '=', self.id)])
if data:
if data[0].module:
return '%s.%s' % (data[0].module, data[0].name)
return data[0].name
postfix = 0
name = '%s_%s' % (self._table, self.id)
while ir_model_data.search([('module', '=', '__export__'), ('name', '=', name)]):
postfix += 1
name = '%s_%s_%s' % (self._table, self.id, postfix)
'model': self._name,
'res_id': self.id,
'module': '__export__',
'name': name,
return '__export__.' + name
def __export_rows(self, fields):
""" Export fields of the records in ``self``.
:param fields: list of lists of fields to traverse
:return: list of lists of corresponding values
lines = []
for record in self:
# main line of record, initially empty
current = [''] * len(fields)
# list of primary fields followed by secondary field(s)
primary_done = []
# process column by column
for i, path in enumerate(fields):
if not path:
name = path[0]
if name in primary_done:
if name == '.id':
current[i] = str(record.id)
elif name == 'id':
current[i] = record.__export_xml_id()
field = record._fields[name]
value = record[name]
# this part could be simpler, but it has to be done this way
# in order to reproduce the former behavior
if not isinstance(value, BaseModel):
current[i] = field.convert_to_export(value, self.env)
# This is a special case, its strange behavior is intended!
if field.type == 'many2many' and len(path) > 1 and path[1] == 'id':
xml_ids = [r.__export_xml_id() for r in value]
current[i] = ','.join(xml_ids) or False
# recursively export the fields that follow name
fields2 = [(p[1:] if p and p[0] == name else []) for p in fields]
lines2 = value.__export_rows(fields2)
if lines2:
# merge first line with record's main line
for j, val in enumerate(lines2[0]):
if val:
current[j] = val
# check value of current field
if not current[i]:
# assign xml_ids, and forget about remaining lines
xml_ids = [item[1] for item in value.name_get()]
current[i] = ','.join(xml_ids)
# append the other lines at the end
lines += lines2[1:]
current[i] = False
return lines
def export_data(self, fields_to_export, raw_data=False):
""" Export fields for selected objects
:param fields_to_export: list of fields
:param raw_data: True to return value in native Python type
:rtype: dictionary with a *datas* matrix
This method is used when exporting data via client menu
fields_to_export = map(fix_import_export_id_paths, fields_to_export)
if raw_data:
self = self.with_context(export_raw_data=True)
return {'datas': self.__export_rows(fields_to_export)}
def import_data(self, cr, uid, fields, datas, mode='init', current_module='', noupdate=False, context=None, filename=None):
.. deprecated:: 7.0
Use :meth:`~load` instead
Import given data in given module
This method is used when importing data via client menu.
Example of fields to import for a sale.order::
.id, (=database_id)
partner_id, (=name_search)
order_line/.id, (=database_id)
order_line/product_id/id, (=xml id)
order_line/product_uom/id (=xml_id)
This method returns a 4-tuple with the following structure::
(return_code, errored_resource, error_message, unused)
* The first item is a return code, it is ``-1`` in case of
import error, or the last imported row number in case of success
* The second item contains the record data dict that failed to import
in case of error, otherwise it's 0
* The third item contains an error message string in case of error,
otherwise it's 0
* The last item is currently unused, with no specific semantics
:param fields: list of fields to import
:param datas: data to import
:param mode: 'init' or 'update' for record creation
:param current_module: module name
:param noupdate: flag for record creation
:param filename: optional file to store partial import state for recovery
:returns: 4-tuple in the form (return_code, errored_resource, error_message, unused)
:rtype: (int, dict or 0, str or 0, str or 0)
context = dict(context) if context is not None else {}
context['_import_current_module'] = current_module
fields = map(fix_import_export_id_paths, fields)
ir_model_data_obj = self.pool.get('ir.model.data')
def log(m):
if m['type'] == 'error':
raise Exception(m['message'])
if config.get('import_partial') and filename:
with open(config.get('import_partial'), 'rb') as partial_import_file:
data = pickle.load(partial_import_file)
position = data.get(filename, 0)
position = 0
for res_id, xml_id, res, info in self._convert_records(cr, uid,
self._extract_records(cr, uid, fields, datas,
context=context, log=log),
context=context, log=log):
ir_model_data_obj._update(cr, uid, self._name,
current_module, res, mode=mode, xml_id=xml_id,
noupdate=noupdate, res_id=res_id, context=context)
position = info.get('rows', {}).get('to', 0) + 1
if config.get('import_partial') and filename and (not (position%100)):
with open(config.get('import_partial'), 'rb') as partial_import:
data = pickle.load(partial_import)
data[filename] = position
with open(config.get('import_partial'), 'wb') as partial_import:
pickle.dump(data, partial_import)
if context.get('defer_parent_store_computation'):
except Exception, e:
return -1, {}, 'Line %d : %s' % (position + 1, tools.ustr(e)), ''
if context.get('defer_parent_store_computation'):
return position, 0, 0, 0
def load(self, cr, uid, fields, data, context=None):
Attempts to load the data matrix, and returns a list of ids (or
``False`` if there was an error and no id could be generated) and a
list of messages.
The ids are those of the records created and saved (in database), in
the same order they were extracted from the file. They can be passed
directly to :meth:`~read`
:param fields: list of fields to import, at the same index as the corresponding data
:type fields: list(str)
:param data: row-major matrix of data to import
:type data: list(list(str))
:param dict context:
:returns: {ids: list(int)|False, messages: [Message]}
cr.execute('SAVEPOINT model_load')
messages = []
fields = map(fix_import_export_id_paths, fields)
ModelData = self.pool['ir.model.data'].clear_caches()
fg = self.fields_get(cr, uid, context=context)
mode = 'init'
current_module = ''
noupdate = False
ids = []
for id, xid, record, info in self._convert_records(cr, uid,
self._extract_records(cr, uid, fields, data,
context=context, log=messages.append),
context=context, log=messages.append):
cr.execute('SAVEPOINT model_load_save')
except psycopg2.InternalError, e:
# broken transaction, exit and hope the source error was
# already logged
if not any(message['type'] == 'error' for message in messages):
messages.append(dict(info, type='error',message=
u"Unknown database error: '%s'" % e))
ids.append(ModelData._update(cr, uid, self._name,
current_module, record, mode=mode, xml_id=xid,
noupdate=noupdate, res_id=id, context=context))
cr.execute('RELEASE SAVEPOINT model_load_save')
except psycopg2.Warning, e:
messages.append(dict(info, type='warning', message=str(e)))
cr.execute('ROLLBACK TO SAVEPOINT model_load_save')
except psycopg2.Error, e:
info, type='error',
**PGERROR_TO_OE[e.pgcode](self, fg, info, e)))
# Failed to write, log to messages, rollback savepoint (to
# avoid broken transaction) and keep going
cr.execute('ROLLBACK TO SAVEPOINT model_load_save')
except Exception, e:
message = (_('Unknown error during import:') +
' %s: %s' % (type(e), unicode(e)))
moreinfo = _('Resolve other errors first')
messages.append(dict(info, type='error',
# Failed for some reason, perhaps due to invalid data supplied,
# rollback savepoint and keep going
cr.execute('ROLLBACK TO SAVEPOINT model_load_save')
if any(message['type'] == 'error' for message in messages):
cr.execute('ROLLBACK TO SAVEPOINT model_load')
ids = False
return {'ids': ids, 'messages': messages}
def _add_fake_fields(self, cr, uid, fields, context=None):
from openerp.fields import Char, Integer
fields[None] = Char('rec_name')
fields['id'] = Char('External ID')
fields['.id'] = Integer('Database ID')
return fields
def _extract_records(self, cr, uid, fields_, data,
context=None, log=lambda a: None):
""" Generates record dicts from the data sequence.
The result is a generator of dicts mapping field names to raw
(unconverted, unvalidated) values.
For relational fields, if sub-fields were provided the value will be
a list of sub-records
The following sub-fields may be set on the record (by key):
* None is the name_get for the record (to use with name_create/name_search)
* "id" is the External ID for the record
* ".id" is the Database ID for the record
fields = dict(self._fields)
# Fake fields to avoid special cases in extractor
fields = self._add_fake_fields(cr, uid, fields, context=context)
# m2o fields can't be on multiple lines so exclude them from the
# is_relational field rows filter, but special-case it later on to
# be handled with relational fields (as it can have subfields)
is_relational = lambda field: fields[field].relational
get_o2m_values = itemgetter_tuple(
[index for index, field in enumerate(fields_)
if fields[field[0]].type == 'one2many'])
get_nono2m_values = itemgetter_tuple(
[index for index, field in enumerate(fields_)
if fields[field[0]].type != 'one2many'])
# Checks if the provided row has any non-empty non-relational field
def only_o2m_values(row, f=get_nono2m_values, g=get_o2m_values):
return any(g(row)) and not any(f(row))
index = 0
while True:
if index >= len(data): return
row = data[index]
# copy non-relational fields to record dict
record = dict((field[0], value)
for field, value in itertools.izip(fields_, row)
if not is_relational(field[0]))
# Get all following rows which have relational values attached to
# the current record (no non-relational values)
record_span = itertools.takewhile(
only_o2m_values, itertools.islice(data, index + 1, None))
# stitch record row back on for relational fields
record_span = list(itertools.chain([row], record_span))
for relfield in set(
field[0] for field in fields_
if is_relational(field[0])):
# FIXME: how to not use _obj without relying on fields_get?
Model = self.pool[fields[relfield].comodel_name]
# get only cells for this sub-field, should be strictly
# non-empty, field path [None] is for name_get field
indices, subfields = zip(*((index, field[1:] or [None])
for index, field in enumerate(fields_)
if field[0] == relfield))
# return all rows which have at least one value for the
# subfields of relfield
relfield_data = filter(any, map(itemgetter_tuple(indices), record_span))
record[relfield] = [subrecord
for subrecord, _subinfo in Model._extract_records(
cr, uid, subfields, relfield_data,
context=context, log=log)]
yield record, {'rows': {
'from': index,
'to': index + len(record_span) - 1
index += len(record_span)
def _convert_records(self, cr, uid, records,
context=None, log=lambda a: None):
""" Converts records from the source iterable (recursive dicts of
strings) into forms which can be written to the database (via
self.create or (ir.model.data)._update)
:returns: a list of triplets of (id, xid, record)
:rtype: list((int|None, str|None, dict))
if context is None: context = {}
Converter = self.pool['ir.fields.converter']
Translation = self.pool['ir.translation']
fields = dict(self._fields)
field_names = dict(
(f, (Translation._get_source(cr, uid, self._name + ',' + f, 'field',
or field.string))
for f, field in fields.iteritems())
convert = Converter.for_model(cr, uid, self, context=context)
def _log(base, field, exception):
type = 'warning' if isinstance(exception, Warning) else 'error'
# logs the logical (not human-readable) field name for automated
# processing of response, but injects human readable in message
record = dict(base, type=type, field=field,
message=unicode(exception.args[0]) % base)
if len(exception.args) > 1 and exception.args[1]:
stream = CountingStream(records)
for record, extras in stream:
dbid = False
xid = False
# name_get/name_create
if None in record: pass
# xid
if 'id' in record:
xid = record['id']
# dbid
if '.id' in record:
dbid = int(record['.id'])
except ValueError:
# in case of overridden id column
dbid = record['.id']
if not self.search(cr, uid, [('id', '=', dbid)], context=context):
message=_(u"Unknown database identifier '%s'") % dbid))
dbid = False
converted = convert(record, lambda field, err:\
_log(dict(extras, record=stream.index, field=field_names[field]), field, err))
yield dbid, xid, converted, dict(extras, record=stream.index)
def _validate_fields(self, field_names):
field_names = set(field_names)
# old-style constraint methods
trans = self.env['ir.translation']
cr, uid, context = self.env.args
ids = self.ids
errors = []
for fun, msg, names in self._constraints:
# validation must be context-independent; call ``fun`` without context
valid = not (set(names) & field_names) or fun(self._model, cr, uid, ids)
extra_error = None
except Exception, e:
_logger.debug('Exception while validating constraint', exc_info=True)
valid = False
extra_error = tools.ustr(e)
if not valid:
if callable(msg):
res_msg = msg(self._model, cr, uid, ids, context=context)
if isinstance(res_msg, tuple):
template, params = res_msg
res_msg = template % params
res_msg = trans._get_source(self._name, 'constraint', self.env.lang, msg)
if extra_error:
res_msg += "\n\n%s\n%s" % (_('Error details:'), extra_error)
_("Field(s) `%s` failed against a constraint: %s") %
(', '.join(names), res_msg)
if errors:
raise ValidationError('\n'.join(errors))
# new-style constraint methods
for check in self._constraint_methods:
if set(check._constrains) & field_names:
except ValidationError, e:
except Exception, e:
raise ValidationError("Error while validating constraint\n\n%s" % tools.ustr(e))
def default_get(self, fields_list):
""" default_get(fields) -> default_values
Return default values for the fields in ``fields_list``. Default
values are determined by the context, user defaults, and the model
:param fields_list: a list of field names
:return: a dictionary mapping each field name to its corresponding
default value, if it has one.
# trigger view init hook
defaults = {}
parent_fields = defaultdict(list)
for name in fields_list:
# 1. look up context
key = 'default_' + name
if key in self._context:
defaults[name] = self._context[key]
# 2. look up ir_values
# Note: performance is good, because get_defaults_dict is cached!
ir_values_dict = self.env['ir.values'].get_defaults_dict(self._name)
if name in ir_values_dict:
defaults[name] = ir_values_dict[name]
field = self._fields.get(name)
# 3. look up property fields
# TODO: get rid of this one
if field and field.company_dependent:
defaults[name] = self.env['ir.property'].get(name, self._name)
# 4. look up field.default
if field and field.default:
defaults[name] = field.default(self)
# 5. delegate to parent model
if field and field.inherited:
field = field.related_field
# convert default values to the right format
defaults = self._convert_to_cache(defaults, validate=False)
defaults = self._convert_to_write(defaults)
# add default values for inherited fields
for model, names in parent_fields.iteritems():
return defaults
def fields_get_keys(self, cr, user, context=None):
res = self._columns.keys()
# TODO I believe this loop can be replace by
# res.extend(self._inherit_fields.key())
for parent in self._inherits:
res.extend(self.pool[parent].fields_get_keys(cr, user, context))
return res
def _rec_name_fallback(self, cr, uid, context=None):
rec_name = self._rec_name
if rec_name not in self._columns:
rec_name = self._columns.keys()[0] if len(self._columns.keys()) > 0 else "id"
return rec_name
# Overload this method if you need a window title which depends on the context
def view_header_get(self, cr, user, view_id=None, view_type='form', context=None):
return False
def user_has_groups(self, cr, uid, groups, context=None):
"""Return true if the user is at least member of one of the groups
in groups_str. Typically used to resolve ``groups`` attribute
in view and model definitions.
:param str groups: comma-separated list of fully-qualified group
external IDs, e.g.: ``base.group_user,base.group_system``
:return: True if the current user is a member of one of the
given groups
return any(self.pool['res.users'].has_group(cr, uid, group_ext_id)
for group_ext_id in groups.split(','))
def _get_default_form_view(self, cr, user, context=None):
""" Generates a default single-line form view using all fields
of the current model except the m2m and o2m ones.
:param cr: database cursor
:param int user: user id
:param dict context: connection context
:returns: a form view as an lxml document
:rtype: etree._Element
view = etree.Element('form', string=self._description)
group = etree.SubElement(view, 'group', col="4")
for fname, field in self._fields.iteritems():
if field.automatic or field.type in ('one2many', 'many2many'):
etree.SubElement(group, 'field', name=fname)
if field.type == 'text':
etree.SubElement(group, 'newline')
return view
def _get_default_search_view(self, cr, user, context=None):
""" Generates a single-field search view, based on _rec_name.
:param cr: database cursor
:param int user: user id
:param dict context: connection context
:returns: a tree view as an lxml document
:rtype: etree._Element
view = etree.Element('search', string=self._description)
etree.SubElement(view, 'field', name=self._rec_name_fallback(cr, user, context))
return view
def _get_default_tree_view(self, cr, user, context=None):
""" Generates a single-field tree view, based on _rec_name.
:param cr: database cursor
:param int user: user id
:param dict context: connection context
:returns: a tree view as an lxml document
:rtype: etree._Element
view = etree.Element('tree', string=self._description)
etree.SubElement(view, 'field', name=self._rec_name_fallback(cr, user, context))
return view
def _get_default_calendar_view(self, cr, user, context=None):
""" Generates a default calendar view by trying to infer
calendar fields from a number of pre-set attribute names
:param cr: database cursor
:param int user: user id
:param dict context: connection context
:returns: a calendar view
:rtype: etree._Element
def set_first_of(seq, in_, to):
"""Sets the first value of ``seq`` also found in ``in_`` to
the ``to`` attribute of the view being closed over.
Returns whether it's found a suitable value (and set it on
the attribute) or not
for item in seq:
if item in in_:
view.set(to, item)
return True
return False
view = etree.Element('calendar', string=self._description)
etree.SubElement(view, 'field', name=self._rec_name_fallback(cr, user, context))
if self._date_name not in self._columns:
date_found = False
for dt in ['date', 'date_start', 'x_date', 'x_date_start']:
if dt in self._columns:
self._date_name = dt
date_found = True
if not date_found:
raise except_orm(_('Invalid Object Architecture!'), _("Insufficient fields for Calendar View!"))
view.set('date_start', self._date_name)
set_first_of(["user_id", "partner_id", "x_user_id", "x_partner_id"],
self._columns, 'color')
if not set_first_of(["date_stop", "date_end", "x_date_stop", "x_date_end"],
self._columns, 'date_stop'):
if not set_first_of(["date_delay", "planned_hours", "x_date_delay", "x_planned_hours"],
self._columns, 'date_delay'):
raise except_orm(
_('Invalid Object Architecture!'),
_("Insufficient fields to generate a Calendar View for %s, missing a date_stop or a date_delay" % self._name))
return view
def fields_view_get(self, cr, uid, view_id=None, view_type='form', context=None, toolbar=False, submenu=False):
""" fields_view_get([view_id | view_type='form'])
Get the detailed composition of the requested view like fields, model, view architecture
:param view_id: id of the view or None
:param view_type: type of the view to return if view_id is None ('form', 'tree', ...)
:param toolbar: true to include contextual actions
:param submenu: deprecated
:return: dictionary describing the composition of the requested view (including inherited views and extensions)
:raise AttributeError:
* if the inherited view has unknown position to work with other than 'before', 'after', 'inside', 'replace'
* if some tag other than 'position' is found in parent view
:raise Invalid ArchitectureError: if there is view type other than form, tree, calendar, search etc defined on the structure
if context is None:
context = {}
View = self.pool['ir.ui.view']
result = {
'model': self._name,
'field_parent': False,
# try to find a view_id if none provided
if not view_id:
# <view_type>_view_ref in context can be used to overrride the default view
view_ref_key = view_type + '_view_ref'
view_ref = context.get(view_ref_key)
if view_ref:
if '.' in view_ref:
module, view_ref = view_ref.split('.', 1)
cr.execute("SELECT res_id FROM ir_model_data WHERE model='ir.ui.view' AND module=%s AND name=%s", (module, view_ref))
view_ref_res = cr.fetchone()
if view_ref_res:
view_id = view_ref_res[0]
_logger.warning('%r requires a fully-qualified external id (got: %r for model %s). '
'Please use the complete `module.view_id` form instead.', view_ref_key, view_ref,
if not view_id:
# otherwise try to find the lowest priority matching ir.ui.view
view_id = View.default_view(cr, uid, self._name, view_type, context=context)
# context for post-processing might be overriden
ctx = context
if view_id:
# read the view with inherited views applied
root_view = View.read_combined(cr, uid, view_id, fields=['id', 'name', 'field_parent', 'type', 'model', 'arch'], context=context)
result['arch'] = root_view['arch']
result['name'] = root_view['name']
result['type'] = root_view['type']
result['view_id'] = root_view['id']
result['field_parent'] = root_view['field_parent']
# override context from postprocessing
if root_view.get('model') != self._name:
ctx = dict(context, base_model_name=root_view.get('model'))
# fallback on default views methods if no ir.ui.view could be found
get_func = getattr(self, '_get_default_%s_view' % view_type)
arch_etree = get_func(cr, uid, context)
result['arch'] = etree.tostring(arch_etree, encoding='utf-8')
result['type'] = view_type
result['name'] = 'default'
except AttributeError:
raise except_orm(_('Invalid Architecture!'), _("No default view of type '%s' could be found !") % view_type)
# Apply post processing, groups and modifiers etc...
xarch, xfields = View.postprocess_and_fields(cr, uid, self._name, etree.fromstring(result['arch']), view_id, context=ctx)
result['arch'] = xarch
result['fields'] = xfields
# Add related action information if aksed
if toolbar:
toclean = ('report_sxw_content', 'report_rml_content', 'report_sxw', 'report_rml', 'report_sxw_content_data', 'report_rml_content_data')
def clean(x):
x = x[2]
for key in toclean:
x.pop(key, None)
return x
ir_values_obj = self.pool.get('ir.values')
resprint = ir_values_obj.get(cr, uid, 'action', 'client_print_multi', [(self._name, False)], False, context)
resaction = ir_values_obj.get(cr, uid, 'action', 'client_action_multi', [(self._name, False)], False, context)
resrelate = ir_values_obj.get(cr, uid, 'action', 'client_action_relate', [(self._name, False)], False, context)
resaction = [clean(action) for action in resaction if view_type == 'tree' or not action[2].get('multi')]
resprint = [clean(print_) for print_ in resprint if view_type == 'tree' or not print_[2].get('multi')]
#When multi="True" set it will display only in More of the list view
resrelate = [clean(action) for action in resrelate
if (action[2].get('multi') and view_type == 'tree') or (not action[2].get('multi') and view_type == 'form')]
for x in itertools.chain(resprint, resaction, resrelate):
x['string'] = x['name']
result['toolbar'] = {
'print': resprint,
'action': resaction,
'relate': resrelate
return result
def get_formview_id(self, cr, uid, id, context=None):
""" Return an view id to open the document with. This method is meant to be
overridden in addons that want to give specific view ids for example.
:param int id: id of the document to open
return False
def get_formview_action(self, cr, uid, id, context=None):
""" Return an action to open the document. This method is meant to be
overridden in addons that want to give specific view ids for example.
:param int id: id of the document to open
view_id = self.get_formview_id(cr, uid, id, context=context)
return {
'type': 'ir.actions.act_window',
'res_model': self._name,
'view_type': 'form',
'view_mode': 'form',
'views': [(view_id, 'form')],
'target': 'current',
'res_id': id,
'context': context,
def get_access_action(self, cr, uid, id, context=None):
""" Return an action to open the document. This method is meant to be
overridden in addons that want to give specific access to the document.
By default it opens the formview of the document.
:param int id: id of the document to open
return self.get_formview_action(cr, uid, id, context=context)
def _view_look_dom_arch(self, cr, uid, node, view_id, context=None):
return self.pool['ir.ui.view'].postprocess_and_fields(
cr, uid, self._name, node, view_id, context=context)
def search_count(self, cr, user, args, context=None):
""" search_count(args) -> int
Returns the number of records in the current model matching :ref:`the
provided domain <reference/orm/domains>`.
res = self.search(cr, user, args, context=context, count=True)
if isinstance(res, list):
return len(res)
return res
def search(self, cr, user, args, offset=0, limit=None, order=None, context=None, count=False):
""" search(args[, offset=0][, limit=None][, order=None][, count=False])
Searches for records based on the ``args``
:ref:`search domain <reference/orm/domains>`.
:param args: :ref:`A search domain <reference/orm/domains>`. Use an empty
list to match all records.
:param int offset: number of results to ignore (default: none)
:param int limit: maximum number of records to return (default: all)
:param str order: sort string
:param bool count: if True, only counts and returns the number of matching records (default: False)
:returns: at most ``limit`` records matching the search criteria
:raise AccessError: * if user tries to bypass access rules for read on the requested object.
return self._search(cr, user, args, offset=offset, limit=limit, order=order, context=context, count=count)
# display_name, name_get, name_create, name_search
@api.depends(lambda self: (self._rec_name,) if self._rec_name else ())
def _compute_display_name(self):
names = dict(self.name_get())
for record in self:
record.display_name = names.get(record.id, False)
def name_get(self):
""" name_get() -> [(id, name), ...]
Returns a textual representation for the records in ``self``.
By default this is the value of the ``display_name`` field.
:return: list of pairs ``(id, text_repr)`` for each records
:rtype: list(tuple)
result = []
name = self._rec_name
if name in self._fields:
convert = self._fields[name].convert_to_display_name
for record in self:
result.append((record.id, convert(record[name], record)))
for record in self:
result.append((record.id, "%s,%s" % (record._name, record.id)))
return result
def name_create(self, name):
""" name_create(name) -> record
Create a new record by calling :meth:`~.create` with only one value
provided: the display name of the new record.
The new record will be initialized with any default values
applicable to this model, or provided through the context. The usual
behavior of :meth:`~.create` applies.
:param name: display name of the record to create
:rtype: tuple
:return: the :meth:`~.name_get` pair value of the created record
if self._rec_name:
record = self.create({self._rec_name: name})
return record.name_get()[0]
_logger.warning("Cannot execute name_create, no _rec_name defined on %s", self._name)
return False
def name_search(self, name='', args=None, operator='ilike', limit=100):
""" name_search(name='', args=None, operator='ilike', limit=100) -> records
Search for records that have a display name matching the given
``name`` pattern when compared with the given ``operator``, while also
matching the optional search domain (``args``).
This is used for example to provide suggestions based on a partial
value for a relational field. Sometimes be seen as the inverse
function of :meth:`~.name_get`, but it is not guaranteed to be.
This method is equivalent to calling :meth:`~.search` with a search
domain based on ``display_name`` and then :meth:`~.name_get` on the
result of the search.
:param str name: the name pattern to match
:param list args: optional search domain (see :meth:`~.search` for
syntax), specifying further restrictions
:param str operator: domain operator for matching ``name``, such as
``'like'`` or ``'='``.
:param int limit: optional max number of records to return
:rtype: list
:return: list of pairs ``(id, text_repr)`` for all matching records.
return self._name_search(name, args, operator, limit=limit)
def _name_search(self, cr, user, name='', args=None, operator='ilike', context=None, limit=100, name_get_uid=None):
# private implementation of name_search, allows passing a dedicated user
# for the name_get part to solve some access rights issues
args = list(args or [])
# optimize out the default criterion of ``ilike ''`` that matches everything
if not self._rec_name:
_logger.warning("Cannot execute name_search, no _rec_name defined on %s", self._name)
elif not (name == '' and operator == 'ilike'):
args += [(self._rec_name, operator, name)]
access_rights_uid = name_get_uid or user
ids = self._search(cr, user, args, limit=limit, context=context, access_rights_uid=access_rights_uid)
res = self.name_get(cr, access_rights_uid, ids, context)
return res
def read_string(self, cr, uid, id, langs, fields=None, context=None):
res = {}
res2 = {}
self.pool.get('ir.translation').check_access_rights(cr, uid, 'read')
if not fields:
fields = self._columns.keys() + self._inherit_fields.keys()
#FIXME: collect all calls to _get_source into one SQL call.
for lang in langs:
res[lang] = {'code': lang}
for f in fields:
if f in self._columns:
res_trans = self.pool.get('ir.translation')._get_source(cr, uid, self._name+','+f, 'field', lang)
if res_trans:
res[lang][f] = res_trans
res[lang][f] = self._columns[f].string
for table in self._inherits:
cols = intersect(self._inherit_fields.keys(), fields)
res2 = self.pool[table].read_string(cr, uid, id, langs, cols, context)
for lang in res2:
if lang in res:
res[lang]['code'] = lang
for f in res2[lang]:
res[lang][f] = res2[lang][f]
return res
def write_string(self, cr, uid, id, langs, vals, context=None):
self.pool.get('ir.translation').check_access_rights(cr, uid, 'write')
#FIXME: try to only call the translation in one SQL
for lang in langs:
for field in vals:
if field in self._columns:
src = self._columns[field].string
self.pool.get('ir.translation')._set_ids(cr, uid, self._name+','+field, 'field', lang, [0], vals[field], src)
for table in self._inherits:
cols = intersect(self._inherit_fields.keys(), vals)
if cols:
self.pool[table].write_string(cr, uid, id, langs, vals, context)
return True
def _add_missing_default_values(self, cr, uid, values, context=None):
# avoid overriding inherited values when parent is set
avoid_tables = []
for tables, parent_field in self._inherits.items():
if parent_field in values:
# compute missing fields
missing_defaults = set()
for field in self._columns.keys():
if not field in values:
for field in self._inherit_fields.keys():
if (field not in values) and (self._inherit_fields[field][0] not in avoid_tables):
# discard magic fields
missing_defaults -= set(MAGIC_COLUMNS)
if missing_defaults:
# override defaults with the provided values, never allow the other way around
defaults = self.default_get(cr, uid, list(missing_defaults), context)
for dv in defaults:
if ((dv in self._columns and self._columns[dv]._type == 'many2many') \
or (dv in self._inherit_fields and self._inherit_fields[dv][2]._type == 'many2many')) \
and defaults[dv] and isinstance(defaults[dv][0], (int, long)):
defaults[dv] = [(6, 0, defaults[dv])]
if (dv in self._columns and self._columns[dv]._type == 'one2many' \
or (dv in self._inherit_fields and self._inherit_fields[dv][2]._type == 'one2many')) \
and isinstance(defaults[dv], (list, tuple)) and defaults[dv] and isinstance(defaults[dv][0], dict):
defaults[dv] = [(0, 0, x) for x in defaults[dv]]
values = defaults
return values
def clear_caches(self):
""" Clear the caches
This clears the caches associated to methods decorated with
``tools.ormcache`` or ``tools.ormcache_multi``.
self.pool._any_cache_cleared = True
except AttributeError:
def _read_group_fill_results(self, cr, uid, domain, groupby, remaining_groupbys,
aggregated_fields, count_field,
read_group_result, read_group_order=None, context=None):
"""Helper method for filling in empty groups for all possible values of
the field being grouped by"""
# self._group_by_full should map groupable fields to a method that returns
# a list of all aggregated values that we want to display for this field,
# in the form of a m2o-like pair (key,label).
# This is useful to implement kanban views for instance, where all columns
# should be displayed even if they don't contain any record.
# Grab the list of all groups that should be displayed, including all present groups
present_group_ids = [x[groupby][0] for x in read_group_result if x[groupby]]
all_groups,folded = self._group_by_full[groupby](self, cr, uid, present_group_ids, domain,
result_template = dict.fromkeys(aggregated_fields, False)
result_template[groupby + '_count'] = 0
if remaining_groupbys:
result_template['__context'] = {'group_by': remaining_groupbys}
# Merge the left_side (current results as dicts) with the right_side (all
# possible values as m2o pairs). Both lists are supposed to be using the
# same ordering, and can be merged in one pass.
result = []
known_values = {}
def append_left(left_side):
grouped_value = left_side[groupby] and left_side[groupby][0]
if not grouped_value in known_values:
known_values[grouped_value] = left_side
known_values[grouped_value].update({count_field: left_side[count_field]})
def append_right(right_side):
grouped_value = right_side[0]
if not grouped_value in known_values:
line = dict(result_template)
line[groupby] = right_side
line['__domain'] = [(groupby,'=',grouped_value)] + domain
known_values[grouped_value] = line
while read_group_result or all_groups:
left_side = read_group_result[0] if read_group_result else None
right_side = all_groups[0] if all_groups else None
assert left_side is None or left_side[groupby] is False \
or isinstance(left_side[groupby], (tuple,list)), \
'M2O-like pair expected, got %r' % left_side[groupby]
assert right_side is None or isinstance(right_side, (tuple,list)), \
'M2O-like pair expected, got %r' % right_side
if left_side is None:
elif right_side is None:
elif left_side[groupby] == right_side:
all_groups.pop(0) # discard right_side
elif not left_side[groupby] or not left_side[groupby][0]:
# left side == "Undefined" entry, not present on right_side
if folded:
for r in result:
r['__fold'] = folded.get(r[groupby] and r[groupby][0], False)
return result
def _read_group_prepare(self, orderby, aggregated_fields, annotated_groupbys, query):
Prepares the GROUP BY and ORDER BY terms for the read_group method. Adds the missing JOIN clause
to the query if order should be computed against m2o field.
:param orderby: the orderby definition in the form "%(field)s %(order)s"
:param aggregated_fields: list of aggregated fields in the query
:param annotated_groupbys: list of dictionaries returned by _read_group_process_groupby
These dictionaries contains the qualified name of each groupby
(fully qualified SQL name for the corresponding field),
and the (non raw) field name.
:param osv.Query query: the query under construction
:return: (groupby_terms, orderby_terms)
orderby_terms = []
groupby_terms = [gb['qualified_field'] for gb in annotated_groupbys]
groupby_fields = [gb['groupby'] for gb in annotated_groupbys]
if not orderby:
return groupby_terms, orderby_terms
for order_part in orderby.split(','):
order_split = order_part.split()
order_field = order_split[0]
if order_field in groupby_fields:
if self._fields[order_field.split(':')[0]].type == 'many2one':
order_clause = self._generate_order_by(order_part, query).replace('ORDER BY ', '')
if order_clause:
groupby_terms += [order_term.split()[0] for order_term in order_clause.split(',')]
order = '"%s" %s' % (order_field, '' if len(order_split) == 1 else order_split[1])
elif order_field in aggregated_fields:
# Cannot order by a field that will not appear in the results (needs to be grouped or aggregated)
_logger.warn('%s: read_group order by `%s` ignored, cannot sort on empty columns (not grouped/aggregated)',
self._name, order_part)
return groupby_terms, orderby_terms
def _read_group_process_groupby(self, gb, query, context):
Helper method to collect important information about groupbys: raw
field name, type, time information, qualified name, ...
split = gb.split(':')
field_type = self._fields[split[0]].type
gb_function = split[1] if len(split) == 2 else None
temporal = field_type in ('date', 'datetime')
tz_convert = field_type == 'datetime' and context.get('tz') in pytz.all_timezones
qualified_field = self._inherits_join_calc(split[0], query)
if temporal:
display_formats = {
# Careful with week/year formats:
# - yyyy (lower) must always be used, *except* for week+year formats
# - YYYY (upper) must always be used for week+year format
# e.g. 2006-01-01 is W52 2005 in some locales (de_DE),
# and W1 2006 for others
# Mixing both formats, e.g. 'MMM YYYY' would yield wrong results,
# such as 2006-01-01 being formatted as "January 2005" in some locales.
# Cfr: http://babel.pocoo.org/docs/dates/#date-fields
'day': 'dd MMM yyyy', # yyyy = normal year
'week': "'W'w YYYY", # w YYYY = ISO week-year
'month': 'MMMM yyyy',
'quarter': 'QQQ yyyy',
'year': 'yyyy',
time_intervals = {
'day': dateutil.relativedelta.relativedelta(days=1),
'week': datetime.timedelta(days=7),
'month': dateutil.relativedelta.relativedelta(months=1),
'quarter': dateutil.relativedelta.relativedelta(months=3),
'year': dateutil.relativedelta.relativedelta(years=1)
if tz_convert:
qualified_field = "timezone('%s', timezone('UTC',%s))" % (context.get('tz', 'UTC'), qualified_field)
qualified_field = "date_trunc('%s', %s)" % (gb_function or 'month', qualified_field)
if field_type == 'boolean':
qualified_field = "coalesce(%s,false)" % qualified_field
return {
'field': split[0],
'groupby': gb,
'type': field_type,
'display_format': display_formats[gb_function or 'month'] if temporal else None,
'interval': time_intervals[gb_function or 'month'] if temporal else None,
'tz_convert': tz_convert,
'qualified_field': qualified_field
def _read_group_prepare_data(self, key, value, groupby_dict, context):
Helper method to sanitize the data received by read_group. The None
values are converted to False, and the date/datetime are formatted,
and corrected according to the timezones.
value = False if value is None else value
gb = groupby_dict.get(key)
if gb and gb['type'] in ('date', 'datetime') and value:
if isinstance(value, basestring):
dt_format = DEFAULT_SERVER_DATETIME_FORMAT if gb['type'] == 'datetime' else DEFAULT_SERVER_DATE_FORMAT
value = datetime.datetime.strptime(value, dt_format)
if gb['tz_convert']:
value = pytz.timezone(context['tz']).localize(value)
return value
def _read_group_get_domain(self, groupby, value):
Helper method to construct the domain corresponding to a groupby and
a given value. This is mostly relevant for date/datetime.
if groupby['type'] in ('date', 'datetime') and value:
dt_format = DEFAULT_SERVER_DATETIME_FORMAT if groupby['type'] == 'datetime' else DEFAULT_SERVER_DATE_FORMAT
domain_dt_begin = value
domain_dt_end = value + groupby['interval']
if groupby['tz_convert']:
domain_dt_begin = domain_dt_begin.astimezone(pytz.utc)
domain_dt_end = domain_dt_end.astimezone(pytz.utc)
return [(groupby['field'], '>=', domain_dt_begin.strftime(dt_format)),
(groupby['field'], '<', domain_dt_end.strftime(dt_format))]
if groupby['type'] == 'many2one' and value:
value = value[0]
return [(groupby['field'], '=', value)]
def _read_group_format_result(self, data, annotated_groupbys, groupby, groupby_dict, domain, context):
Helper method to format the data contained in the dictionary data by
adding the domain corresponding to its values, the groupbys in the
context and by properly formatting the date/datetime values.
domain_group = [dom for gb in annotated_groupbys for dom in self._read_group_get_domain(gb, data[gb['groupby']])]
for k,v in data.iteritems():
gb = groupby_dict.get(k)
if gb and gb['type'] in ('date', 'datetime') and v:
data[k] = babel.dates.format_date(v, format=gb['display_format'], locale=context.get('lang', 'en_US'))
data['__domain'] = domain_group + domain
if len(groupby) - len(annotated_groupbys) >= 1:
data['__context'] = { 'group_by': groupby[len(annotated_groupbys):]}
del data['id']
return data
def read_group(self, cr, uid, domain, fields, groupby, offset=0, limit=None, context=None, orderby=False, lazy=True):
Get the list of records in list view grouped by the given ``groupby`` fields
:param cr: database cursor
:param uid: current user id
:param domain: list specifying search criteria [['field_name', 'operator', 'value'], ...]
:param list fields: list of fields present in the list view specified on the object
:param list groupby: list of groupby descriptions by which the records will be grouped.
A groupby description is either a field (then it will be grouped by that field)
or a string 'field:groupby_function'. Right now, the only functions supported
are 'day', 'week', 'month', 'quarter' or 'year', and they only make sense for
date/datetime fields.
:param int offset: optional number of records to skip
:param int limit: optional max number of records to return
:param dict context: context arguments, like lang, time zone.
:param list orderby: optional ``order by`` specification, for
overriding the natural sort ordering of the
groups, see also :py:meth:`~osv.osv.osv.search`
(supported only for many2one fields currently)
:param bool lazy: if true, the results are only grouped by the first groupby and the
remaining groupbys are put in the __context key. If false, all the groupbys are
done in one call.
:return: list of dictionaries(one dictionary for each record) containing:
* the values of fields grouped by the fields in ``groupby`` argument
* __domain: list of tuples specifying the search criteria
* __context: dictionary with argument like ``groupby``
:rtype: [{'field_name_1': value, ...]
:raise AccessError: * if user has no read rights on the requested object
* if user tries to bypass access rules for read on the requested object
if context is None:
context = {}
self.check_access_rights(cr, uid, 'read')
query = self._where_calc(cr, uid, domain, context=context)
fields = fields or self._columns.keys()
groupby = [groupby] if isinstance(groupby, basestring) else groupby
groupby_list = groupby[:1] if lazy else groupby
annotated_groupbys = [self._read_group_process_groupby(gb, query, context)
for gb in groupby_list]
groupby_fields = [g['field'] for g in annotated_groupbys]
order = orderby or ','.join([g for g in groupby_list])
groupby_dict = {gb['groupby']: gb for gb in annotated_groupbys}
self._apply_ir_rules(cr, uid, query, 'read', context=context)
for gb in groupby_fields:
assert gb in fields, "Fields in 'groupby' must appear in the list of fields to read (perhaps it's missing in the list view?)"
groupby_def = self._columns.get(gb) or (self._inherit_fields.get(gb) and self._inherit_fields.get(gb)[2])
assert groupby_def and groupby_def._classic_write, "Fields in 'groupby' must be regular database-persisted fields (no function or related fields), or function fields with store=True"
if not (gb in self._fields):
# Don't allow arbitrary values, as this would be a SQL injection vector!
raise except_orm(_('Invalid group_by'),
_('Invalid group_by specification: "%s".\nA group_by specification must be a list of valid fields.')%(gb,))
aggregated_fields = [
f for f in fields
if f not in ('id', 'sequence')
if f not in groupby_fields
if f in self._fields
if self._fields[f].type in ('integer', 'float')
if getattr(self._fields[f].base_field.column, '_classic_write', False)
field_formatter = lambda f: (self._fields[f].group_operator or 'sum', self._inherits_join_calc(f, query), f)
select_terms = ["%s(%s) AS %s" % field_formatter(f) for f in aggregated_fields]
for gb in annotated_groupbys:
select_terms.append('%s as "%s" ' % (gb['qualified_field'], gb['groupby']))
groupby_terms, orderby_terms = self._read_group_prepare(order, aggregated_fields, annotated_groupbys, query)
from_clause, where_clause, where_clause_params = query.get_sql()
if lazy and (len(groupby_fields) >= 2 or not context.get('group_by_no_leaf')):
count_field = groupby_fields[0] if len(groupby_fields) >= 1 else '_'
count_field = '_'
count_field += '_count'
prefix_terms = lambda prefix, terms: (prefix + " " + ",".join(terms)) if terms else ''
prefix_term = lambda prefix, term: ('%s %s' % (prefix, term)) if term else ''