[FIX] Stop evaluating non-literal contexts coming from the client browser
The browser can not be trusted, it is not a decent person, as opposed to chickens who're decent people bzr revid: xmo@openerp.com-20110328152657-oc48whrcyp5wxh6f
This commit is contained in:
commit
c098950009
|
@ -6,7 +6,12 @@ from cStringIO import StringIO
|
|||
import simplejson
|
||||
|
||||
import openerpweb
|
||||
import openerpweb.ast
|
||||
import openerpweb.nonliterals
|
||||
|
||||
__all__ = ['Session', 'Menu', 'DataSet', 'DataRecord',
|
||||
'View', 'FormView', 'ListView', 'SearchView',
|
||||
'Action']
|
||||
|
||||
class Xml2Json:
|
||||
# xml2json-direct
|
||||
|
@ -16,7 +21,8 @@ class Xml2Json:
|
|||
# URL: http://code.google.com/p/xml2json-direct/
|
||||
@staticmethod
|
||||
def convert_to_json(s):
|
||||
return simplejson.dumps(Xml2Json.convert_to_structure(s), sort_keys=True, indent=4)
|
||||
return simplejson.dumps(
|
||||
Xml2Json.convert_to_structure(s), sort_keys=True, indent=4)
|
||||
|
||||
@staticmethod
|
||||
def convert_to_structure(s):
|
||||
|
@ -169,10 +175,17 @@ class Menu(openerpweb.Controller):
|
|||
actions = Values.get('action', 'tree_but_open', [('ir.ui.menu', menu_id)], False, {})
|
||||
|
||||
for _, _, action in actions:
|
||||
action['context'] = req.session.eval_context(
|
||||
action['context']) or {}
|
||||
action['domain'] = req.session.eval_domain(
|
||||
action['domain'], action['context']) or []
|
||||
# values come from the server, we can just eval them
|
||||
if isinstance(action['context'], basestring):
|
||||
action['context'] = eval(
|
||||
action['context'],
|
||||
req.session.evaluation_context()) or {}
|
||||
|
||||
if isinstance(action['domain'], basestring):
|
||||
action['domain'] = eval(
|
||||
action['domain'],
|
||||
req.session.evaluation_context(
|
||||
action['context'])) or []
|
||||
|
||||
return {"action": actions}
|
||||
|
||||
|
@ -196,20 +209,14 @@ class DataSet(openerpweb.Controller):
|
|||
|
||||
:param request: a JSON-RPC request object
|
||||
:type request: openerpweb.JsonRequest
|
||||
:param model: the name of the model to search on
|
||||
:type model: str
|
||||
:param str model: the name of the model to search on
|
||||
:param fields: a list of the fields to return in the result records
|
||||
:type fields: [str]
|
||||
:param offset: from which index should the results start being returned
|
||||
:type offset: int
|
||||
:param limit: the maximum number of records to return
|
||||
:type limit: int
|
||||
:param domain: the search domain for the query
|
||||
:type domain: list
|
||||
:param context: the context in which the search should be executed
|
||||
:type context: dict
|
||||
:param sort: sorting directives
|
||||
:type sort: list
|
||||
:param int offset: from which index should the results start being returned
|
||||
:param int limit: the maximum number of records to return
|
||||
:param list domain: the search domain for the query
|
||||
:param dict context: the context in which the search should be executed
|
||||
:param list sort: sorting directives
|
||||
:returns: a list of result records
|
||||
:rtype: list
|
||||
"""
|
||||
|
@ -260,14 +267,13 @@ class DataRecord(openerpweb.Controller):
|
|||
value = r[0]
|
||||
return {'value': value}
|
||||
|
||||
|
||||
class View(openerpweb.Controller):
|
||||
def fields_view_get(self, session, model, view_id, view_type, transform=True):
|
||||
Model = session.model(model)
|
||||
r = Model.fields_view_get(view_id, view_type)
|
||||
if transform:
|
||||
context = {} # TODO: dict(ctx_sesssion, **ctx_action)
|
||||
xml = self.transform_view(r['arch'], context)
|
||||
xml = self.transform_view(r['arch'], session, context)
|
||||
else:
|
||||
xml = ElementTree.fromstring(r['arch'])
|
||||
r['arch'] = Xml2Json.convert_element(xml)
|
||||
|
@ -306,7 +312,7 @@ class View(openerpweb.Controller):
|
|||
if a == 'invisible' and 'attrs' in elem.attrib:
|
||||
del elem.attrib['attrs']
|
||||
|
||||
def transform_view(self, view_string, context=None):
|
||||
def transform_view(self, view_string, session, context=None):
|
||||
# transform nodes on the fly via iterparse, instead of
|
||||
# doing it statically on the parsing result
|
||||
parser = ElementTree.iterparse(StringIO(view_string), events=("start",))
|
||||
|
@ -316,8 +322,40 @@ class View(openerpweb.Controller):
|
|||
if root is None:
|
||||
root = elem
|
||||
self.normalize_attrs(elem, context)
|
||||
self.parse_domains_and_contexts(elem, session)
|
||||
return root
|
||||
|
||||
def parse_domains_and_contexts(self, elem, session):
|
||||
""" Converts domains and contexts from the view into Python objects,
|
||||
either literals if they can be parsed by literal_eval or a special
|
||||
placeholder object if the domain or context refers to free variables.
|
||||
|
||||
:param elem: the current node being parsed
|
||||
:type param: xml.etree.ElementTree.Element
|
||||
:param session: OpenERP session object, used to store and retrieve
|
||||
non-literal objects
|
||||
:type session: openerpweb.openerpweb.OpenERPSession
|
||||
"""
|
||||
domain = elem.get('domain')
|
||||
if domain:
|
||||
try:
|
||||
elem.set(
|
||||
'domain',
|
||||
openerpweb.ast.literal_eval(
|
||||
domain))
|
||||
except ValueError:
|
||||
# not a literal
|
||||
elem.set('domain',
|
||||
openerpweb.nonliterals.Domain(session, domain))
|
||||
context_string = elem.get('context')
|
||||
if context_string:
|
||||
try:
|
||||
elem.set('context',
|
||||
openerpweb.ast.literal_eval(context_string))
|
||||
except ValueError:
|
||||
elem.set('context',
|
||||
openerpweb.nonliterals.Context(
|
||||
session, context_string))
|
||||
|
||||
class FormView(View):
|
||||
_cp_path = "/base/formview"
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
import xml.etree.ElementTree
|
||||
import mock
|
||||
|
||||
import unittest2
|
||||
|
||||
import base.controllers.main
|
||||
import openerpweb.nonliterals
|
||||
import openerpweb.openerpweb
|
||||
|
||||
#noinspection PyCompatibility
|
||||
class ViewTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
self.view = base.controllers.main.View()
|
||||
def test_identity(self):
|
||||
view = base.controllers.main.View()
|
||||
base_view = """
|
||||
<form string="Title">
|
||||
<group>
|
||||
|
@ -18,10 +23,95 @@ class ViewTest(unittest2.TestCase):
|
|||
"""
|
||||
|
||||
pristine = xml.etree.ElementTree.fromstring(base_view)
|
||||
transformed = view.transform_view(base_view)
|
||||
transformed = self.view.transform_view(base_view, None)
|
||||
|
||||
self.assertEqual(
|
||||
xml.etree.ElementTree.tostring(transformed),
|
||||
xml.etree.ElementTree.tostring(pristine)
|
||||
)
|
||||
|
||||
def test_convert_literal_domain(self):
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field', domain="[('somefield', '=', 3)]")
|
||||
self.view.parse_domains_and_contexts(e, None)
|
||||
|
||||
self.assertEqual(
|
||||
e.get('domain'),
|
||||
[('somefield', '=', 3)])
|
||||
|
||||
def test_convert_complex_domain(self):
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field',
|
||||
domain="[('account_id.type','in',['receivable','payable']),"
|
||||
"('reconcile_id','=',False),"
|
||||
"('reconcile_partial_id','=',False),"
|
||||
"('state', '=', 'valid')]"
|
||||
)
|
||||
self.view.parse_domains_and_contexts(e, None)
|
||||
|
||||
self.assertEqual(
|
||||
e.get('domain'),
|
||||
[('account_id.type', 'in', ['receivable', 'payable']),
|
||||
('reconcile_id', '=', False),
|
||||
('reconcile_partial_id', '=', False),
|
||||
('state', '=', 'valid')]
|
||||
)
|
||||
|
||||
def test_retrieve_nonliteral_domain(self):
|
||||
session = mock.Mock(spec=openerpweb.openerpweb.OpenERPSession)
|
||||
session.domains_store = {}
|
||||
domain_string = ("[('month','=',(datetime.date.today() - "
|
||||
"datetime.timedelta(365/12)).strftime('%%m'))]")
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field', domain=domain_string)
|
||||
|
||||
self.view.parse_domains_and_contexts(e, session)
|
||||
|
||||
self.assertIsInstance(e.get('domain'), openerpweb.nonliterals.Domain)
|
||||
self.assertEqual(
|
||||
openerpweb.nonliterals.Domain(
|
||||
session, key=e.get('domain').key).get_domain_string(),
|
||||
domain_string)
|
||||
|
||||
def test_convert_literal_context(self):
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field', context="{'some_prop': 3}")
|
||||
self.view.parse_domains_and_contexts(e, None)
|
||||
|
||||
self.assertEqual(
|
||||
e.get('context'),
|
||||
{'some_prop': 3})
|
||||
|
||||
def test_convert_complex_domain(self):
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field',
|
||||
context="{'account_id.type': ['receivable','payable'],"
|
||||
"'reconcile_id': False,"
|
||||
"'reconcile_partial_id': False,"
|
||||
"'state': 'valid'}"
|
||||
)
|
||||
self.view.parse_domains_and_contexts(e, None)
|
||||
|
||||
self.assertEqual(
|
||||
e.get('context'),
|
||||
{'account_id.type': ['receivable', 'payable'],
|
||||
'reconcile_id': False,
|
||||
'reconcile_partial_id': False,
|
||||
'state': 'valid'}
|
||||
)
|
||||
|
||||
def test_retrieve_nonliteral_domain(self):
|
||||
session = mock.Mock(spec=openerpweb.openerpweb.OpenERPSession)
|
||||
session.domains_store = {}
|
||||
context_string = ("{'month': (datetime.date.today() - "
|
||||
"datetime.timedelta(365/12)).strftime('%%m')}")
|
||||
e = xml.etree.ElementTree.Element(
|
||||
'field', context=context_string)
|
||||
|
||||
self.view.parse_domains_and_contexts(e, session)
|
||||
|
||||
self.assertIsInstance(e.get('context'), openerpweb.nonliterals.Context)
|
||||
self.assertEqual(
|
||||
openerpweb.nonliterals.Context(
|
||||
session, key=e.get('context').key).get_context_string(),
|
||||
context_string)
|
||||
|
|
|
@ -14,8 +14,18 @@ Internal API Doc
|
|||
Python
|
||||
++++++
|
||||
|
||||
.. autoclass:: base.controllers.main.DataSet
|
||||
These classes should be moved to other sections of the doc as needed,
|
||||
probably.
|
||||
|
||||
.. automodule:: openerpweb.openerpweb
|
||||
:members: JsonRequest
|
||||
|
||||
See also: :class:`~openerpweb.openerpweb.OpenERPSession`,
|
||||
:class:`~openerpweb.openerpweb.OpenERPModel`
|
||||
|
||||
.. automodule:: base.controllers.main
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
Testing
|
||||
-------
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
""" Backport of Python 2.6's ast.py for Python 2.5
|
||||
"""
|
||||
__all__ = ['literal_eval']
|
||||
try:
|
||||
from ast import literal_eval
|
||||
except ImportError:
|
||||
from _ast import *
|
||||
from _ast import __version__
|
||||
|
||||
|
||||
def parse(expr, filename='<unknown>', mode='exec'):
|
||||
"""
|
||||
Parse an expression into an AST node.
|
||||
Equivalent to compile(expr, filename, mode, PyCF_ONLY_AST).
|
||||
"""
|
||||
return compile(expr, filename, mode, PyCF_ONLY_AST)
|
||||
|
||||
|
||||
def literal_eval(node_or_string):
|
||||
"""
|
||||
Safely evaluate an expression node or a string containing a Python
|
||||
expression. The string or node provided may only consist of the
|
||||
following Python literal structures: strings, numbers, tuples, lists,
|
||||
dicts, booleans, and None.
|
||||
"""
|
||||
_safe_names = {'None': None, 'True': True, 'False': False}
|
||||
if isinstance(node_or_string, basestring):
|
||||
node_or_string = parse(node_or_string, mode='eval')
|
||||
if isinstance(node_or_string, Expression):
|
||||
node_or_string = node_or_string.body
|
||||
def _convert(node):
|
||||
if isinstance(node, Str):
|
||||
return node.s
|
||||
elif isinstance(node, Num):
|
||||
return node.n
|
||||
elif isinstance(node, Tuple):
|
||||
return tuple(map(_convert, node.elts))
|
||||
elif isinstance(node, List):
|
||||
return list(map(_convert, node.elts))
|
||||
elif isinstance(node, Dict):
|
||||
return dict((_convert(k), _convert(v)) for k, v
|
||||
in zip(node.keys, node.values))
|
||||
elif isinstance(node, Name):
|
||||
if node.id in _safe_names:
|
||||
return _safe_names[node.id]
|
||||
raise ValueError('malformed string')
|
||||
return _convert(node_or_string)
|
|
@ -0,0 +1,129 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
""" Manages the storage and lifecycle of non-literal domains and contexts
|
||||
(and potentially other structures) which have to be evaluated with client data,
|
||||
but still need to be safely round-tripped to and from the browser (and thus
|
||||
can't be sent there themselves).
|
||||
"""
|
||||
import binascii
|
||||
import hashlib
|
||||
import simplejson.decoder
|
||||
import simplejson.encoder
|
||||
|
||||
__all__ = ['Domain', 'Context', 'NonLiteralEncoder, non_literal_decoder']
|
||||
|
||||
#: 48 bits should be sufficient to have almost no chance of collision
|
||||
#: with a million hashes, according to hg@67081329d49a
|
||||
SHORT_HASH_BYTES_SIZE = 6
|
||||
|
||||
class NonLiteralEncoder(simplejson.encoder.JSONEncoder):
|
||||
def default(self, object):
|
||||
if isinstance(object, Domain):
|
||||
return {
|
||||
'__ref': 'domain',
|
||||
'__id': object.key
|
||||
}
|
||||
elif isinstance(object, Context):
|
||||
return {
|
||||
'__ref': 'context',
|
||||
'__id': object.key
|
||||
}
|
||||
return super(NonLiteralEncoder, self).default(object)
|
||||
|
||||
def non_literal_decoder(dct):
|
||||
if '__ref' in dct:
|
||||
if dct['__ref'] == 'domain':
|
||||
return Domain(None, key=dct['__id'])
|
||||
elif dct['__ref'] == 'context':
|
||||
return Context(None, key=dct['__id'])
|
||||
return dct
|
||||
|
||||
class Domain(object):
|
||||
def __init__(self, session, domain_string=None, key=None):
|
||||
""" Uses session information to store the domain string and map it to a
|
||||
domain key, which can be safely round-tripped to the client.
|
||||
|
||||
If initialized with a domain string, will generate a key for that
|
||||
string and store the domain string out of the way. When initialized
|
||||
with a key, considers this key is a reference to an existing domain
|
||||
string.
|
||||
|
||||
:param session: the OpenERP Session to use when evaluating the domain
|
||||
:type session: openerpweb.openerpweb.OpenERPSession
|
||||
:param str domain_string: a non-literal domain in string form
|
||||
:param str key: key used to retrieve the domain string
|
||||
"""
|
||||
if domain_string and key:
|
||||
raise ValueError("A nonliteral domain can not take both a key "
|
||||
"and a domain string")
|
||||
|
||||
self.session = session
|
||||
if domain_string:
|
||||
self.key = binascii.hexlify(
|
||||
hashlib.sha256(domain_string).digest()[:SHORT_HASH_BYTES_SIZE])
|
||||
self.session.domains_store[self.key] = domain_string
|
||||
elif key:
|
||||
self.key = key
|
||||
|
||||
def get_domain_string(self):
|
||||
""" Retrieves the domain string linked to this non-literal domain in
|
||||
the provided session.
|
||||
|
||||
:param session: the OpenERP Session used to store the domain string in
|
||||
the first place.
|
||||
:type session: openerpweb.openerpweb.OpenERPSession
|
||||
"""
|
||||
return self.session.domains_store[self.key]
|
||||
|
||||
def evaluate(self, context=None):
|
||||
""" Forces the evaluation of the linked domain, using the provided
|
||||
context (as well as the session's base context), and returns the
|
||||
evaluated result.
|
||||
"""
|
||||
return eval(self.get_domain_string(),
|
||||
self.session.evaluation_context(context))
|
||||
|
||||
class Context(object):
|
||||
def __init__(self, session, context_string=None, key=None):
|
||||
""" Uses session information to store the context string and map it to
|
||||
a key (stored in a secret location under a secret mountain), which can
|
||||
be safely round-tripped to the client.
|
||||
|
||||
If initialized with a context string, will generate a key for that
|
||||
string and store the context string out of the way. When initialized
|
||||
with a key, considers this key is a reference to an existing context
|
||||
string.
|
||||
|
||||
:param session: the OpenERP Session to use when evaluating the context
|
||||
:type session: openerpweb.openerpweb.OpenERPSession
|
||||
:param str context_string: a non-literal context in string form
|
||||
:param str key: key used to retrieve the context string
|
||||
"""
|
||||
if context_string and key:
|
||||
raise ValueError("A nonliteral domain can not take both a key "
|
||||
"and a domain string")
|
||||
|
||||
self.session = session
|
||||
if context_string:
|
||||
self.key = binascii.hexlify(
|
||||
hashlib.sha256(context_string).digest()[:SHORT_HASH_BYTES_SIZE])
|
||||
self.session.domains_store[self.key] = context_string
|
||||
elif key:
|
||||
self.key = key
|
||||
|
||||
def get_context_string(self):
|
||||
""" Retrieves the context string linked to this non-literal context in
|
||||
the provided session.
|
||||
|
||||
:param session: the OpenERP Session used to store the context string in
|
||||
the first place.
|
||||
:type session: openerpweb.openerpweb.OpenERPSession
|
||||
"""
|
||||
return self.session.domains_store[self.key]
|
||||
|
||||
def evaluate(self, context=None):
|
||||
""" Forces the evaluation of the linked context, using the provided
|
||||
context (as well as the session's base context), and returns the
|
||||
evaluated result.
|
||||
"""
|
||||
return eval(self.get_context_string(),
|
||||
self.session.evaluation_context(context))
|
|
@ -15,6 +15,7 @@ import cherrypy
|
|||
import cherrypy.lib.static
|
||||
import simplejson
|
||||
|
||||
import nonliterals
|
||||
import xmlrpctimeout
|
||||
|
||||
#----------------------------------------------------------
|
||||
|
@ -50,6 +51,13 @@ class OpenERPSession(object):
|
|||
|
||||
The session context, a ``dict``. Can be reloaded by calling
|
||||
:meth:`openerpweb.openerpweb.OpenERPSession.get_context`
|
||||
|
||||
.. attribute:: domains_store
|
||||
|
||||
A ``dict`` matching domain keys to evaluable (but non-literal) domains.
|
||||
|
||||
Used to store references to non-literal domains which need to be
|
||||
round-tripped to the client browser.
|
||||
"""
|
||||
def __init__(self, server='127.0.0.1', port=8069,
|
||||
model_factory=OpenERPModel):
|
||||
|
@ -62,6 +70,7 @@ class OpenERPSession(object):
|
|||
self.model_factory = model_factory
|
||||
|
||||
self.context = {}
|
||||
self.domains_store = {}
|
||||
|
||||
def proxy(self, service):
|
||||
s = xmlrpctimeout.TimeoutServerProxy('http://%s:%s/xmlrpc/%s' % (self._server, self._port, service), timeout=5)
|
||||
|
@ -113,45 +122,65 @@ class OpenERPSession(object):
|
|||
|
||||
Used to evaluate contexts and domains.
|
||||
"""
|
||||
return dict(
|
||||
base = dict(
|
||||
uid=self._uid,
|
||||
current_date=datetime.date.today().strftime('%Y-%m-%d'),
|
||||
time=time,
|
||||
datetime=datetime,
|
||||
relativedelta=dateutil.relativedelta.relativedelta,
|
||||
**self.context
|
||||
relativedelta=dateutil.relativedelta.relativedelta
|
||||
)
|
||||
base.update(self.context)
|
||||
return base
|
||||
|
||||
def eval_context(self, context_string, context=None, use_base=True):
|
||||
""" Evaluates the provided context_string in the context (haha) of
|
||||
the context.
|
||||
def evaluation_context(self, context=None):
|
||||
""" Returns the session's evaluation context, augmented with the
|
||||
provided context if any.
|
||||
|
||||
:param str context_string: a context to evaluate, if not a string,
|
||||
will be returned as-is
|
||||
:param dict context: the context to use in the evaluation, if any.
|
||||
:param bool use_base: whether the base eval context (combination
|
||||
of the default context and the session
|
||||
context) should be merged to the provided
|
||||
context (or used alone)
|
||||
:returns: the evaluated context
|
||||
:param dict context: to add merge in the session's base eval context
|
||||
:returns: the augmented context
|
||||
:rtype: dict
|
||||
"""
|
||||
if not isinstance(context_string, basestring):
|
||||
return context_string
|
||||
d = {}
|
||||
d.update(self.base_eval_context)
|
||||
if context:
|
||||
d.update(context)
|
||||
return d
|
||||
|
||||
def eval_context(self, context_to_eval, context=None):
|
||||
""" Evaluates the provided context_to_eval in the context (haha) of
|
||||
the context.
|
||||
|
||||
:param context_to_eval: a context to evaluate. Must be a dict or a
|
||||
non-literal context. If it's a dict, will be
|
||||
returned as-is
|
||||
:type context_to_eval: openerpweb.nonliterals.Context
|
||||
:returns: the evaluated context
|
||||
:rtype: dict
|
||||
|
||||
:raises: ``TypeError`` if ``context_to_eval`` is neither a dict nor
|
||||
a Context
|
||||
"""
|
||||
if not isinstance(context_to_eval, (dict, nonliterals.Domain)):
|
||||
raise TypeError("Context %r is not a dict or a nonliteral Context",
|
||||
context_to_eval)
|
||||
|
||||
if isinstance(context_to_eval, dict):
|
||||
return context_to_eval
|
||||
|
||||
ctx = {}
|
||||
if use_base:
|
||||
ctx.update(self.base_eval_context)
|
||||
if context:
|
||||
ctx.update(context)
|
||||
ctx['context'] = ctx
|
||||
|
||||
return eval(context_string, ctx)
|
||||
# if the domain was unpacked from JSON, it needs the current
|
||||
# OpenERPSession for its data retrieval
|
||||
context_to_eval.session = self
|
||||
return context_to_eval.evaluate(ctx)
|
||||
|
||||
def eval_contexts(self, contexts, context=None):
|
||||
""" Evaluates a sequence of contexts to build a single final result
|
||||
|
||||
:param list contexts: a list of string or dict contexts
|
||||
:param list contexts: a list of Context or dict contexts
|
||||
:param dict context: a base context, if needed
|
||||
:returns: the final combination of all provided contexts
|
||||
:rtype: dict
|
||||
|
@ -168,37 +197,43 @@ class OpenERPSession(object):
|
|||
# the result
|
||||
final_context.update(
|
||||
self.eval_context(
|
||||
ctx, current_context, use_base=False))
|
||||
ctx, current_context))
|
||||
# update the current evaluation context so that future
|
||||
# evaluations can use the results we just gathered
|
||||
current_context.update(final_context)
|
||||
return final_context
|
||||
|
||||
def eval_domain(self, domain_string, context=None, use_base=True):
|
||||
""" Evaluates the provided domain_string using the provided context
|
||||
def eval_domain(self, domain, context=None):
|
||||
""" Evaluates the provided domain using the provided context
|
||||
(merged with the session's evaluation context)
|
||||
|
||||
:param str domain_string: an OpenERP domain as a string, to evaluate.
|
||||
:param domain: an OpenERP domain as a list or as a
|
||||
:class:`openerpweb.nonliterals.Domain` instance
|
||||
|
||||
If not a string, is returned as-is
|
||||
In the second case, it will be evaluated and returned.
|
||||
:type domain: openerpweb.nonliterals.Domain
|
||||
:param dict context: the context to use in the evaluation, if any.
|
||||
:param bool use_base: whether the base eval context (combination
|
||||
of the default context and the session
|
||||
context) should be used
|
||||
:returns: the evaluated domain
|
||||
:rtype: list
|
||||
|
||||
:raises: ``TypeError`` if ``domain`` is neither a list nor a Domain
|
||||
"""
|
||||
if not isinstance(domain_string, basestring):
|
||||
return domain_string
|
||||
if not isinstance(domain, (list, nonliterals.Domain)):
|
||||
raise TypeError("Domain %r is not a list or a nonliteral Domain",
|
||||
domain)
|
||||
|
||||
if isinstance(domain, list):
|
||||
return domain
|
||||
|
||||
ctx = {}
|
||||
if use_base:
|
||||
ctx.update(self.base_eval_context)
|
||||
if context:
|
||||
ctx.update(context)
|
||||
ctx['context'] = ctx
|
||||
|
||||
return eval(domain_string, ctx)
|
||||
# if the domain was unpacked from JSON, it needs the current
|
||||
# OpenERPSession for its data retrieval
|
||||
domain.session = self
|
||||
return domain.evaluate(ctx)
|
||||
|
||||
def eval_domains(self, domains, context=None):
|
||||
""" Evaluates and concatenates the provided domains using the
|
||||
|
@ -206,21 +241,17 @@ class OpenERPSession(object):
|
|||
|
||||
Returns the final, concatenated result.
|
||||
|
||||
:param list domains: a list of string or list domains
|
||||
:param list domains: a list of Domain or list domains
|
||||
:param dict context: the context in which the domains
|
||||
should be evaluated (if evaluations need
|
||||
to happen)
|
||||
:returns: the final combination of all domains in the sequence
|
||||
:rtype: list
|
||||
"""
|
||||
ctx = dict(
|
||||
self.base_eval_context,
|
||||
**(context or {}))
|
||||
|
||||
final_domain = []
|
||||
for domain in domains:
|
||||
final_domain.extend(
|
||||
self.eval_domain(domain, ctx))
|
||||
self.eval_domain(domain, context))
|
||||
return final_domain
|
||||
|
||||
#----------------------------------------------------------
|
||||
|
@ -230,8 +261,9 @@ class OpenERPSession(object):
|
|||
class JsonRequest(object):
|
||||
""" JSON-RPC2 over HTTP POST using non standard POST encoding.
|
||||
Difference with the standard:
|
||||
- the json string is passed as a form parameter named "request"
|
||||
- method is currently ignored
|
||||
|
||||
* the json string is passed as a form parameter named "request"
|
||||
* method is currently ignored
|
||||
|
||||
Sucessful request:
|
||||
--> {"jsonrpc": "2.0", "method": "call", "params": {"session_id": "SID", "context": {}, "arg1": "val1" }, "id": null}
|
||||
|
@ -266,9 +298,11 @@ class JsonRequest(object):
|
|||
:rtype: bytes
|
||||
'''
|
||||
if requestf:
|
||||
request = simplejson.load(requestf)
|
||||
request = simplejson.load(
|
||||
requestf, object_hook=nonliterals.non_literal_decoder)
|
||||
else:
|
||||
request = simplejson.loads(request)
|
||||
request = simplejson.loads(
|
||||
request, object_hook=nonliterals.non_literal_decoder)
|
||||
try:
|
||||
print "--> %s.%s %s" % (controller.__class__.__name__, method.__name__, request)
|
||||
error = None
|
||||
|
@ -311,7 +345,8 @@ class JsonRequest(object):
|
|||
print "<--", response
|
||||
print
|
||||
|
||||
content = simplejson.dumps(response)
|
||||
content = simplejson.dumps(
|
||||
response, cls=nonliterals.NonLiteralEncoder)
|
||||
cherrypy.response.headers['Content-Type'] = 'application/json'
|
||||
cherrypy.response.headers['Content-Length'] = len(content)
|
||||
return content
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import mock
|
||||
import simplejson
|
||||
import unittest2
|
||||
|
||||
from openerpweb.nonliterals import Domain, Context
|
||||
import openerpweb.nonliterals
|
||||
import openerpweb.openerpweb
|
||||
|
||||
class NonLiteralDomainTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
self.session = mock.Mock(spec=openerpweb.openerpweb.OpenERPSession)
|
||||
self.session.domains_store = {}
|
||||
def test_store_domain(self):
|
||||
d = Domain(self.session, "some arbitrary string")
|
||||
|
||||
self.assertEqual(
|
||||
self.session.domains_store[d.key],
|
||||
"some arbitrary string")
|
||||
|
||||
def test_get_domain_back(self):
|
||||
d = Domain(self.session, "some arbitrary string")
|
||||
|
||||
self.assertEqual(
|
||||
d.get_domain_string(),
|
||||
"some arbitrary string")
|
||||
def test_retrieve_second_domain(self):
|
||||
""" A different domain should be able to retrieve the nonliteral set
|
||||
previously
|
||||
"""
|
||||
key = Domain(self.session, "some arbitrary string").key
|
||||
|
||||
self.assertEqual(
|
||||
Domain(self.session, key=key).get_domain_string(),
|
||||
"some arbitrary string")
|
||||
|
||||
def test_key_and_string(self):
|
||||
self.assertRaises(
|
||||
ValueError, Domain, None, domain_string="a", key="b")
|
||||
|
||||
def test_eval(self):
|
||||
self.session.evaluation_context.return_value = {'foo': 3}
|
||||
result = Domain(self.session, "[('a', '=', foo)]").evaluate({'foo': 3})
|
||||
self.assertEqual(
|
||||
result, [('a', '=', 3)])
|
||||
|
||||
class NonLiteralContextTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
self.session = mock.Mock(spec=openerpweb.openerpweb.OpenERPSession)
|
||||
self.session.domains_store = {}
|
||||
def test_store_domain(self):
|
||||
c = Context(self.session, "some arbitrary string")
|
||||
|
||||
self.assertEqual(
|
||||
self.session.domains_store[c.key],
|
||||
"some arbitrary string")
|
||||
|
||||
def test_get_domain_back(self):
|
||||
c = Context(self.session, "some arbitrary string")
|
||||
|
||||
self.assertEqual(
|
||||
c.get_context_string(),
|
||||
"some arbitrary string")
|
||||
def test_retrieve_second_domain(self):
|
||||
""" A different domain should be able to retrieve the nonliteral set
|
||||
previously
|
||||
"""
|
||||
key = Context(self.session, "some arbitrary string").key
|
||||
|
||||
self.assertEqual(
|
||||
Domain(self.session, key=key).get_domain_string(),
|
||||
"some arbitrary string")
|
||||
|
||||
def test_key_and_string(self):
|
||||
self.assertRaises(
|
||||
ValueError, Context, None, context_string="a", key="b")
|
||||
|
||||
def test_eval(self):
|
||||
self.session.evaluation_context.return_value = {'foo': 3}
|
||||
result = Context(self.session, "[('a', '=', foo)]")\
|
||||
.evaluate({'foo': 3})
|
||||
self.assertEqual(
|
||||
result, [('a', '=', 3)])
|
||||
|
||||
class NonLiteralJSON(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
self.session = mock.Mock(spec=openerpweb.openerpweb.OpenERPSession)
|
||||
self.session.domains_store = {}
|
||||
|
||||
def test_encode_domain(self):
|
||||
d = Domain(self.session, "some arbitrary string")
|
||||
self.assertEqual(
|
||||
simplejson.dumps(d, cls=openerpweb.nonliterals.NonLiteralEncoder),
|
||||
simplejson.dumps({'__ref': 'domain', '__id': d.key}))
|
||||
|
||||
def test_decode_domain(self):
|
||||
encoded = simplejson.dumps(
|
||||
Domain(self.session, "some arbitrary string"),
|
||||
cls=openerpweb.nonliterals.NonLiteralEncoder)
|
||||
|
||||
domain = simplejson.loads(
|
||||
encoded, object_hook=openerpweb.nonliterals.non_literal_decoder)
|
||||
domain.session = self.session
|
||||
|
||||
self.assertEqual(
|
||||
domain.get_domain_string(),
|
||||
"some arbitrary string"
|
||||
)
|
||||
|
||||
def test_encode_context(self):
|
||||
c = Context(self.session, "some arbitrary string")
|
||||
self.assertEqual(
|
||||
simplejson.dumps(c, cls=openerpweb.nonliterals.NonLiteralEncoder),
|
||||
simplejson.dumps({'__ref': 'context', '__id': c.key}))
|
||||
|
||||
def test_decode_context(self):
|
||||
encoded = simplejson.dumps(
|
||||
Context(self.session, "some arbitrary string"),
|
||||
cls=openerpweb.nonliterals.NonLiteralEncoder)
|
||||
|
||||
context = simplejson.loads(
|
||||
encoded, object_hook=openerpweb.nonliterals.non_literal_decoder)
|
||||
context.session = self.session
|
||||
|
||||
self.assertEqual(
|
||||
context.get_context_string(),
|
||||
"some arbitrary string"
|
||||
)
|
|
@ -0,0 +1,110 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest2
|
||||
from openerpweb.nonliterals import Domain
|
||||
import openerpweb.openerpweb
|
||||
|
||||
class TestOpenERPSession(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
self.module = object()
|
||||
self.session = openerpweb.openerpweb.OpenERPSession()
|
||||
self.session._uid = -1
|
||||
self.session.context = {
|
||||
'current_date': '1945-08-05',
|
||||
'date': self.module,
|
||||
'time': self.module,
|
||||
'datetime': self.module,
|
||||
'relativedelta': self.module
|
||||
}
|
||||
def test_base_eval_context(self):
|
||||
self.assertEqual(type(self.session.base_eval_context), dict)
|
||||
self.assertEqual(
|
||||
self.session.base_eval_context,
|
||||
{'uid': -1, 'current_date': '1945-08-05',
|
||||
'date': self.module, 'datetime': self.module, 'time': self.module,
|
||||
'relativedelta': self.module}
|
||||
)
|
||||
|
||||
def test_evaluation_context_nocontext(self):
|
||||
self.assertEqual(
|
||||
type(self.session.evaluation_context()),
|
||||
dict
|
||||
)
|
||||
self.assertEqual(
|
||||
self.session.evaluation_context(),
|
||||
self.session.base_eval_context
|
||||
)
|
||||
|
||||
def test_evaluation_context(self):
|
||||
ctx = self.session.evaluation_context({'foo': 3})
|
||||
self.assertEqual(
|
||||
type(ctx),
|
||||
dict
|
||||
)
|
||||
self.assertIn('foo', ctx)
|
||||
|
||||
def test_eval_with_context(self):
|
||||
self.assertEqual(
|
||||
eval('current_date', self.session.evaluation_context()),
|
||||
'1945-08-05')
|
||||
|
||||
self.assertEqual(
|
||||
eval('foo + 3', self.session.evaluation_context({'foo': 4})),
|
||||
7)
|
||||
|
||||
def test_eval_domain_typeerror(self):
|
||||
self.assertRaises(
|
||||
TypeError, self.session.eval_domain, "foo")
|
||||
|
||||
def test_eval_domain_list(self):
|
||||
self.assertEqual(
|
||||
self.session.eval_domain([]),
|
||||
[])
|
||||
|
||||
def test_eval_nonliteral_domain(self):
|
||||
d = Domain(self.session, "[('foo', 'is', 3)]")
|
||||
self.assertEqual(
|
||||
self.session.eval_domain(d),
|
||||
[('foo', 'is', 3)])
|
||||
|
||||
def test_eval_nonliteral_domain_bykey(self):
|
||||
key = Domain(
|
||||
self.session, "[('foo', 'is', 3)]").key
|
||||
|
||||
d = Domain(None, key=key)
|
||||
self.assertEqual(
|
||||
self.session.eval_domain(d),
|
||||
[('foo', 'is', 3)])
|
||||
|
||||
def test_eval_empty_domains(self):
|
||||
self.assertEqual(
|
||||
self.session.eval_domains([]),
|
||||
[])
|
||||
|
||||
def test_eval_literal_domains(self):
|
||||
domains = [
|
||||
[('a', 'is', 3)],
|
||||
[('b', 'ilike', 'foo')],
|
||||
['|',
|
||||
('c', '=', False),
|
||||
('c', 'in', ['a', 'b', 'c'])]
|
||||
]
|
||||
self.assertEqual(
|
||||
self.session.eval_domains(domains),
|
||||
[
|
||||
('a', 'is', 3),
|
||||
('b', 'ilike', 'foo'),
|
||||
'|',
|
||||
('c', '=', False),
|
||||
('c', 'in', ['a', 'b', 'c'])
|
||||
])
|
||||
def test_eval_nonliteral_domains(self):
|
||||
domains = [
|
||||
Domain(self.session, "[('uid', '=', uid)]"),
|
||||
Domain(self.session,
|
||||
"['|', ('date', '<', current_date),"
|
||||
" ('date', '>', current_date)]")]
|
||||
self.assertEqual(
|
||||
self.session.eval_domains(domains),
|
||||
[('uid', '=', -1),
|
||||
'|', ('date', '<', '1945-08-05'), ('date', '>', '1945-08-05')]
|
||||
)
|
Loading…
Reference in New Issue