[IMP] ir.rule,expression: merged improved rule computation mechanism, suggested by RCO

bzr revid: odo@openerp.com-20110531144318-vut0opufvqd5mu07
This commit is contained in:
Olivier Dony 2011-05-31 16:43:18 +02:00
parent 581d06ab65
commit e10e8073db
2 changed files with 85 additions and 76 deletions

View File

@ -27,6 +27,8 @@ import tools
from tools.safe_eval import safe_eval as eval
from tools.misc import unquote as unquote
SUPERUSER_UID = 1
class ir_rule(osv.osv):
_name = 'ir.rule'
_order = 'name'
@ -46,31 +48,12 @@ class ir_rule(osv.osv):
return {'user': self.pool.get('res.users').browse(cr, 1, uid),
'time':time}
def domain_binary_operation(self, cr, uid, str_domain_1, str_domain_2, operator):
"""Returns the string representation of the binary operation designated by
``operator`` (should be '|' or '&') for the two string domains ``str_domain_1``
and ``str_domain_2`` """
eval_context = self._eval_context_for_combinations()
canonical_domain_1 = expression.expression.normalize_domain(eval(str_domain_1 or '[]', eval_context))
canonical_domain_2 = expression.expression.normalize_domain(eval(str_domain_2 or '[]', eval_context))
return str([operator] + canonical_domain_1 + canonical_domain_1)
def domain_conjunction(self, cr, uid, str_domain_1, str_domain_2):
"""Returns the string representation of the conjunction (AND) of the
two string domains ``str_domain_1`` and ``str_domain_2``"""
return self.domain_binary_operation(cr, uid, str_domain_1, str_domain_2, expression.AND_OPERATOR)
def domain_disjunction(self, cr, uid, str_domain_1, str_domain_2):
"""Returns the string representation of the disjunction (OR) of the
two string domains ``str_domain_1`` and ``str_domain_2``"""
return self.domain_binary_operation(cr, uid, str_domain_1, str_domain_2, expression.OR_OPERATOR)
def _domain_force_get(self, cr, uid, ids, field_name, arg, context=None):
res = {}
eval_context = self._eval_context(cr, uid)
for rule in self.browse(cr, uid, ids, context):
if rule.domain_force:
res[rule.id] = expression.expression.normalize_domain(eval(rule.domain_force, eval_context))
res[rule.id] = expression.normalize(eval(rule.domain_force, eval_context))
else:
res[rule.id] = []
return res
@ -116,25 +99,12 @@ class ir_rule(osv.osv):
(_check_model_obj, 'Rules are not supported for osv_memory objects !', ['model_id'])
]
def domain_create(self, cr, uid, rule_ids):
count = 0
dom = []
for rule in self.browse(cr, uid, rule_ids):
if rule.domain:
dom += rule.domain
count += 1
if count:
return [expression.AND_OPERATOR] * (count-1) + dom
return []
@tools.cache()
def _compute_domain(self, cr, uid, model_name, mode="read"):
if mode not in self._MODES:
raise ValueError('Invalid mode: %r' % (mode,))
group_rule = {}
global_rules = []
if uid == 1:
if uid == SUPERUSER_UID:
return None
cr.execute("""SELECT r.id
FROM ir_rule r
@ -144,26 +114,26 @@ class ir_rule(osv.osv):
AND (r.id IN (SELECT rule_group_id FROM rule_group_rel g_rel
JOIN res_groups_users_rel u_rel ON (g_rel.group_id = u_rel.gid)
WHERE u_rel.uid = %s) OR r.global)""", (model_name, uid))
ids = map(lambda x: x[0], cr.fetchall())
if ids:
for rule in self.browse(cr, uid, ids):
rule_ids = [x[0] for x in cr.fetchall()]
if rule_ids:
# browse user as super-admin root to avoid access errors!
user = self.pool.get('res.users').browse(cr, SUPERUSER_UID, uid)
global_domains = [] # list of domains
group_domains = {} # map: group -> list of domains
for rule in self.browse(cr, SUPERUSER_UID, rule_ids):
dom = expression.normalize(rule.domain)
for group in rule.groups:
group_rule.setdefault(group.id, []).append(rule.id)
if group in user.groups_id:
group_domains.setdefault(group, []).append(dom)
if not rule.groups:
global_rules.append(rule.id)
global_domain = self.domain_create(cr, uid, global_rules)
count = 0
group_domains = []
for value in group_rule.values():
group_domain = self.domain_create(cr, uid, value)
if group_domain:
group_domains += group_domain
count += 1
if count and global_domain:
return [expression.AND_OPERATOR] + global_domain + [expression.OR_OPERATOR] * (count-1) + group_domains
if count:
return [expression.OR_OPERATOR] * (count-1) + group_domains
return global_domain
global_domains.append(dom)
# combine global domains and group domains
if group_domains:
group_domain = expression.OR(map(expression.AND, group_domains.values()))
else:
group_domain = []
domain = expression.AND(global_domains + [group_domain])
return domain
return []
def clear_cache(self, cr, uid):

View File

@ -27,6 +27,69 @@ NOT_OPERATOR = '!'
OR_OPERATOR = '|'
AND_OPERATOR = '&'
TRUE_DOMAIN = [(1,'=',1)]
FALSE_DOMAIN = [(0,'=',1)]
def normalize(domain):
"""Returns a normalized version of ``domain_expr``, where all implicit '&' operators
have been made explicit. One property of normalized domain expressions is that they
can be easily combined together as if they were single domain components.
"""
assert isinstance(domain, (list, tuple)), "Domains to normalize must have a 'domain' form: a list or tuple of domain components"
if not domain:
return TRUE_DOMAIN
result = []
expected = 1 # expected number of expressions
op_arity = {NOT_OPERATOR: 1, AND_OPERATOR: 2, OR_OPERATOR: 2}
for token in domain:
if expected == 0: # more than expected, like in [A, B]
result[0:0] = ['&'] # put an extra '&' in front
expected = 1
result.append(token)
if isinstance(token, (list,tuple)): # domain term
expected -= 1
else:
expected += op_arity.get(token, 0) - 1
assert expected == 0
return result
def combine(operator, unit, zero, domains):
"""Returns a new domain expression where all domain components from ``domains``
have been added together using the binary operator ``operator``.
:param unit: the identity element of the domains "set" with regard to the operation
performed by ``operator``, i.e the domain component ``i`` which, when
combined with any domain ``x`` via ``operator``, yields ``x``.
E.g. [(1,'=',1)] is the typical unit for AND_OPERATOR: adding it
to any domain component gives the same domain.
:param zero: the absorbing element of the domains "set" with regard to the operation
performed by ``operator``, i.e the domain component ``z`` which, when
combined with any domain ``x`` via ``operator``, yields ``z``.
E.g. [(1,'=',1)] is the typical zero for OR_OPERATOR: as soon as
you see it in a domain component the resulting domain is the zero.
"""
result = []
count = 0
for domain in domains:
if domain == unit:
continue
if domain == zero:
return zero
if domain:
result += domain
count += 1
result = [operator] * (count - 1) + result
return result
def AND(domains):
""" AND([D1,D2,...]) returns a domain representing D1 and D2 and ... """
return combine(AND_OPERATOR, TRUE_DOMAIN, FALSE_DOMAIN, domains)
def OR(domains):
""" OR([D1,D2,...]) returns a domain representing D1 or D2 or ... """
return combine(OR_OPERATOR, FALSE_DOMAIN, TRUE_DOMAIN, domains)
class expression(object):
"""
parse a domain expression
@ -48,30 +111,6 @@ class expression(object):
and (((not internal) and element[1] in OPS) \
or (internal and element[1] in INTERNAL_OPS))
@classmethod
def normalize_domain(cls, domain_expr):
"""Returns a normalized version of ``domain_expr``, where all implicit '&' operators
have been made explicit. One property of normalized domain expressions is that they
can be easily combined together as if they were single domain components.
"""
assert isinstance(domain_expr, (list, tuple)), "Domain to normalize must have a 'domain' form: a list or tuple of domain components"
missing_operators = -1
for item in domain_expr:
if cls._is_operator(item):
if item != NOT_OPERATOR:
missing_operators -= 1
else:
missing_operators += 1
return [AND_OPERATOR] * missing_operators + domain_expr
def normalize(self):
"""Make this expression normalized, i.e. change it so that all implicit '&'
operator become explicit. If the expression had already been parsed,
there is no need to do it again.
"""
self.__exp = expression.normalize_domain(self.__exp)
return self
def __execute_recursive_in(self, cr, s, f, w, ids, op, type):
# todo: merge into parent query as sub-query
res = []