From c7afc04be334d583510cfa66592397b8400a9190 Mon Sep 17 00:00:00 2001 From: Stephane Wirtel Date: Wed, 23 Oct 2013 14:53:43 +0200 Subject: [PATCH 1/5] [REF] First step, use the same vocabulary for the cr, uid, res_type and res_id => Session, Record. * a Session is a container of one cursor and one user * a Record is a container for the res_type and res_id, in this object, the right names are model and id. Examples: session = Session(cr, uid) record = Record(res_type, res_id) record.mode, record.id A new class named 'WorkflowService' is a wrapper for the instance.* functions. bzr revid: stw@openerp.com-20131023125343-uo0bq52n86ae0fb0 --- openerp/workflow/__init__.py | 66 +++++---------------- openerp/workflow/helpers.py | 15 +++++ openerp/workflow/instance.py | 87 +++++++++++++++++---------- openerp/workflow/service.py | 110 +++++++++++++++++++++++++++++++++++ openerp/workflow/wkf_expr.py | 35 +++++------ openerp/workflow/workitem.py | 108 +++++++++++++++++++++------------- 6 files changed, 280 insertions(+), 141 deletions(-) create mode 100644 openerp/workflow/helpers.py create mode 100644 openerp/workflow/service.py diff --git a/openerp/workflow/__init__.py b/openerp/workflow/__init__.py index f823a63fd42..5506e66bf18 100644 --- a/openerp/workflow/__init__.py +++ b/openerp/workflow/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ############################################################################## -# +# # OpenERP, Open Source Management Solution # Copyright (C) 2004-2009 Tiny SPRL (). # @@ -15,16 +15,17 @@ # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# along with this program. If not, see . # ############################################################################## -import instance +from openerp.workflow.service import WorkflowService -wkf_on_create_cache = {} +# The new API is in openerp.workflow.workflow_service +# OLD API of the Workflow def clear_cache(cr, uid): - wkf_on_create_cache[cr.dbname]={} + WorkflowService.clear_cache(cr.dbname) def trg_write(uid, res_type, res_id, cr): """ @@ -36,10 +37,7 @@ def trg_write(uid, res_type, res_id, cr): :param res_id: the model instance id the workflow belongs to :param cr: a database cursor """ - ident = (uid,res_type,res_id) - cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (res_id or None,res_type or None, 'active')) - for (id,) in cr.fetchall(): - instance.update(cr, id, ident) + return WorkflowService.new(cr, uid, res_type, res_id).write() def trg_trigger(uid, res_type, res_id, cr): """ @@ -52,12 +50,7 @@ def trg_trigger(uid, res_type, res_id, cr): :param res_id: the model instance id the workflow belongs to :param cr: a database cursor """ - cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (res_id,res_type)) - res = cr.fetchall() - for (instance_id,) in res: - cr.execute('select %s,res_type,res_id from wkf_instance where id=%s', (uid, instance_id,)) - ident = cr.fetchone() - instance.update(cr, instance_id, ident) + return WorkflowService.new(cr, uid, res_type, res_id).trigger() def trg_delete(uid, res_type, res_id, cr): """ @@ -67,8 +60,7 @@ def trg_delete(uid, res_type, res_id, cr): :param res_id: the model instance id the workflow belongs to :param cr: a database cursor """ - ident = (uid,res_type,res_id) - instance.delete(cr, ident) + return WorkflowService.new(cr, uid, res_type, res_id).delete() def trg_create(uid, res_type, res_id, cr): """ @@ -78,16 +70,7 @@ def trg_create(uid, res_type, res_id, cr): :param res_id: the model instance id to own the created worfklow instance :param cr: a database cursor """ - ident = (uid,res_type,res_id) - wkf_on_create_cache.setdefault(cr.dbname, {}) - if res_type in wkf_on_create_cache[cr.dbname]: - wkf_ids = wkf_on_create_cache[cr.dbname][res_type] - else: - cr.execute('select id from wkf where osv=%s and on_create=True', (res_type,)) - wkf_ids = cr.fetchall() - wkf_on_create_cache[cr.dbname][res_type] = wkf_ids - for (wkf_id,) in wkf_ids: - instance.create(cr, ident, wkf_id) + return WorkflowService.new(cr, uid, res_type, res_id).create() def trg_validate(uid, res_type, res_id, signal, cr): """ @@ -98,14 +81,8 @@ def trg_validate(uid, res_type, res_id, signal, cr): :signal: the signal name to be fired :param cr: a database cursor """ - result = False - ident = (uid,res_type,res_id) - # ids of all active workflow instances for a corresponding resource (id, model_nam) - cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (res_id, res_type, 'active')) - for (id,) in cr.fetchall(): - res2 = instance.validate(cr, id, ident, signal) - result = result or res2 - return result + assert isinstance(signal, basestring) + return WorkflowService.new(cr, uid, res_type, res_id).validate(signal) def trg_redirect(uid, res_type, res_id, new_rid, cr): """ @@ -120,22 +97,7 @@ def trg_redirect(uid, res_type, res_id, new_rid, cr): :param new_rid: the model instance id to own the worfklow instance :param cr: a database cursor """ - # get ids of wkf instances for the old resource (res_id) -#CHECKME: shouldn't we get only active instances? - cr.execute('select id, wkf_id from wkf_instance where res_id=%s and res_type=%s', (res_id, res_type)) - for old_inst_id, wkf_id in cr.fetchall(): - # first active instance for new resource (new_rid), using same wkf - cr.execute( - 'SELECT id '\ - 'FROM wkf_instance '\ - 'WHERE res_id=%s AND res_type=%s AND wkf_id=%s AND state=%s', - (new_rid, res_type, wkf_id, 'active')) - new_id = cr.fetchone() - if new_id: - # select all workitems which "wait" for the old instance - cr.execute('select id from wkf_workitem where subflow_id=%s', (old_inst_id,)) - for (item_id,) in cr.fetchall(): - # redirect all those workitems to the wkf instance of the new resource - cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (new_id[0], item_id)) + assert isinstance(new_rid, (long, int)) + return WorkflowService.new(cr, uid, res_type, res_id).redirect(new_rid) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/workflow/helpers.py b/openerp/workflow/helpers.py new file mode 100644 index 00000000000..980eb855461 --- /dev/null +++ b/openerp/workflow/helpers.py @@ -0,0 +1,15 @@ +import openerp.sql_db + +class Session(object): + def __init__(self, cr, uid): + assert isinstance(cr, openerp.sql_db.Cursor) + assert isinstance(uid, (int, long)) + self.cr = cr + self.uid = uid + +class Record(object): + def __init__(self, model, record_id): + assert isinstance(model, basestring) + assert isinstance(record_id, (int, long)) + self.model = model + self.id = record_id \ No newline at end of file diff --git a/openerp/workflow/instance.py b/openerp/workflow/instance.py index eed3e984f42..4db807daaac 100644 --- a/openerp/workflow/instance.py +++ b/openerp/workflow/instance.py @@ -19,57 +19,84 @@ # ############################################################################## import workitem +from helpers import Session +from helpers import Record -def create(cr, ident, wkf_id): - (uid,res_type,res_id) = ident - cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (res_type,res_id,uid,wkf_id)) - id_new = cr.fetchone()[0] - cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (wkf_id,)) - res = cr.dictfetchall() +def create(session, record, workflow_id): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(workflow_id, (int, long)) + + cr = session.cr + cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id)) + instance_id = cr.fetchone()[0] + cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,)) + activities = cr.dictfetchall() stack = [] - workitem.create(cr, res, id_new, ident, stack=stack) - update(cr, id_new, ident) - return id_new + workitem.create(session, record, activities, instance_id, stack=stack) + update(session, record, instance_id) + return instance_id -def delete(cr, ident): - (uid,res_type,res_id) = ident - cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (res_id,res_type)) +def delete(session, record): + assert isinstance(session, Session) + assert isinstance(record, Record) -def validate(cr, inst_id, ident, signal, force_running=False): - cr.execute("select * from wkf_workitem where inst_id=%s", (inst_id,)) + session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (record.id, record.model)) + +def validate(session, record, instance_id, signal, force_running=False): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(instance_id, (long, int)) + assert isinstance(signal, basestring) + assert isinstance(force_running, bool) + + cr = session.cr + cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) stack = [] - for witem in cr.dictfetchall(): + for work_item in cr.dictfetchall(): stack = [] - workitem.process(cr, witem, ident, signal, force_running, stack=stack) + workitem.process(session, record, work_item, signal, force_running, stack=stack) # An action is returned - _update_end(cr, inst_id, ident) + _update_end(session, record, instance_id) return stack and stack[0] or False -def update(cr, inst_id, ident): - cr.execute("select * from wkf_workitem where inst_id=%s", (inst_id,)) +def update(session, record, instance_id): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(instance_id, (long, int)) + + cr = session.cr + cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) for witem in cr.dictfetchall(): stack = [] - workitem.process(cr, witem, ident, stack=stack) - return _update_end(cr, inst_id, ident) + workitem.process(session, record, witem, stack=stack) + return _update_end(session, record, instance_id) -def _update_end(cr, inst_id, ident): - cr.execute('select wkf_id from wkf_instance where id=%s', (inst_id,)) +def _update_end(session, record, instance_id): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(instance_id, (long, int)) + + cr = session.cr + cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,)) wkf_id = cr.fetchone()[0] - cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (inst_id,)) + cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) ok=True for r in cr.fetchall(): if (r[0]<>'complete') or not r[1]: ok=False break if ok: - cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (inst_id,)) + cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) act_names = cr.fetchall() - cr.execute("update wkf_instance set state='complete' where id=%s", (inst_id,)) - cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (inst_id,)) - cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (inst_id,)) - for i in cr.fetchall(): + cr.execute("update wkf_instance set state='complete' where id=%s", (instance_id,)) + cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (instance_id,)) + cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (instance_id,)) + for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall(): + cur_record = Record(cur_model_name, cur_record_id) for act_name in act_names: - validate(cr, i[0], (ident[0],i[1],i[2]), 'subflow.'+act_name[0]) + validate(session, cur_record, cur_instance_id, 'subflow.%s' % act_name[0]) + return ok diff --git a/openerp/workflow/service.py b/openerp/workflow/service.py new file mode 100644 index 00000000000..08c0f61a7a2 --- /dev/null +++ b/openerp/workflow/service.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# OpenERP, Open Source Management Solution +# Copyright (C) 2004-TODAY OpenERP S.A. (). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## +from helpers import Session +from helpers import Record + +import instance + + +class WorkflowService(object): + CACHE = {} + + @classmethod + def clear_cache(cls, dbname): + cls.CACHE[dbname] = {} + + @classmethod + def new(cls, cr, uid, model_name, record_id): + return cls(Session(cr, uid), Record(model_name, record_id)) + + def __init__(self, session, record): + assert isinstance(session, Session) + assert isinstance(record, Record) + + self.session = session + self.record = record + + self.cr = self.session.cr + + def write(self): + self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', + (self.record.id or None, self.record.model or None, 'active') + ) + for (instance_id,) in self.cr.fetchall(): + instance.update(self.session, self.record, instance_id) + + def trigger(self): + self.cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (self.record.id, self.record.model)) + res = self.cr.fetchall() + for (instance_id,) in res: + self.cr.execute('select %s,res_type,res_id from wkf_instance where id=%s', (self.session.uid, instance_id,)) + current_uid, current_model_name, current_record_id = self.cr.fetchone() + + current_session = Session(self.session.cr, current_uid) + current_record = Record(current_model_name, current_record_id) + + instance.update(current_session, current_record, instance_id) + + def delete(self): + instance.delete(self.session, self.record) + + def create(self): + WorkflowService.CACHE.setdefault(self.cr.dbname, {}) + + wkf_ids = WorkflowService.CACHE[self.cr.dbname].get(self.record.model, None) + + if not wkf_ids: + self.cr.execute('select id from wkf where osv=%s and on_create=True', (self.record.model,)) + wkf_ids = self.cr.fetchall() + WorkflowService.CACHE[self.cr.dbname][self.record.model] = wkf_ids + + for (wkf_id, ) in wkf_ids: + instance.create(self.session, self.record, wkf_id) + + def validate(self, signal): + result = False + # ids of all active workflow instances for a corresponding resource (id, model_nam) + self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (self.record.id, self.record.model, 'active')) + for (instance_id,) in self.cr.fetchall(): + res2 = instance.validate(self.session, self.record, instance_id, signal) + result = result or res2 + return result + + def redirect(self, new_rid): + # get ids of wkf instances for the old resource (res_id) + # CHECKME: shouldn't we get only active instances? + self.cr.execute('select id, wkf_id from wkf_instance where res_id=%s and res_type=%s', (self.record.id, self.record.model)) + + for old_inst_id, workflow_id in self.cr.fetchall(): + # first active instance for new resource (new_rid), using same wkf + self.cr.execute( + 'SELECT id '\ + 'FROM wkf_instance '\ + 'WHERE res_id=%s AND res_type=%s AND wkf_id=%s AND state=%s', + (new_rid, self.record.model, workflow_id, 'active')) + new_id = self.cr.fetchone() + if new_id: + # select all workitems which "wait" for the old instance + self.cr.execute('select id from wkf_workitem where subflow_id=%s', (old_inst_id,)) + for (item_id,) in self.cr.fetchall(): + # redirect all those workitems to the wkf instance of the new resource + self.cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (new_id[0], item_id)) + diff --git a/openerp/workflow/wkf_expr.py b/openerp/workflow/wkf_expr.py index 6f3be25802d..2c5afb0db48 100644 --- a/openerp/workflow/wkf_expr.py +++ b/openerp/workflow/wkf_expr.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ############################################################################## -# +# # OpenERP, Open Source Management Solution # Copyright (C) 2004-2009 Tiny SPRL (). # @@ -15,7 +15,7 @@ # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# along with this program. If not, see . # ############################################################################## @@ -51,13 +51,12 @@ class Env(dict): else: return super(Env, self).__getitem__(key) -def _eval_expr(cr, ident, workitem, lines): +def _eval_expr(session, record, workitem, lines): """ Evaluate each line of ``lines`` with the ``Env`` environment, returning the value of the last line. """ assert lines, 'You used a NULL action in a workflow, use dummy node instead.' - uid, model, id = ident result = False for line in lines.split('\n'): line = line.strip() @@ -68,30 +67,29 @@ def _eval_expr(cr, ident, workitem, lines): elif line == 'False': result = False else: - env = Env(cr, uid, model, id) + env = Env(session.cr, session.uid, record.model, record.id) result = eval(line, env, nocopy=True) return result -def execute_action(cr, ident, workitem, activity): +def execute_action(session, record, workitem, activity): """ Evaluate the ir.actions.server action specified in the activity. """ - uid, model, id = ident - ir_actions_server = openerp.registry(cr.dbname)['ir.actions.server'] - context = { 'active_model': model, 'active_id': id, 'active_ids': [id] } - result = ir_actions_server.run(cr, uid, [activity['action_id']], context) + ir_actions_server = openerp.registry(session.cr.dbname)['ir.actions.server'] + context = { 'active_model': record.model, 'active_id': record.id, 'active_ids': [record.id] } + result = ir_actions_server.run(session.cr, session.uid, [activity['action_id']], context) return result -def execute(cr, ident, workitem, activity): +def execute(session, record, workitem, activity): """ Evaluate the action specified in the activity. """ - return _eval_expr(cr, ident, workitem, activity['action']) + return _eval_expr(session, record, workitem, activity['action']) -def check(cr, workitem, ident, transition, signal): +def check(session, record, workitem, transition, signal): """ Test if a transition can be taken. The transition can be taken if: - + - the signal name matches, - the uid is SUPERUSER_ID or the user groups contains the transition's group, @@ -100,14 +98,13 @@ def check(cr, workitem, ident, transition, signal): if transition['signal'] and signal != transition['signal']: return False - uid = ident[0] - if uid != openerp.SUPERUSER_ID and transition['group_id']: - registry = openerp.registry(cr.dbname) - user_groups = registry['res.users'].read(cr, uid, [uid], ['groups_id'])[0]['groups_id'] + if session.uid != openerp.SUPERUSER_ID and transition['group_id']: + registry = openerp.registry(session.cr.dbname) + user_groups = registry['res.users'].read(session.cr, session.uid, [session.uid], ['groups_id'])[0]['groups_id'] if transition['group_id'] not in user_groups: return False - return _eval_expr(cr, ident, workitem, transition['condition']) + return _eval_expr(session, record, workitem, transition['condition']) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/workflow/workitem.py b/openerp/workflow/workitem.py index a5639fa0c7c..f6dd5128d96 100644 --- a/openerp/workflow/workitem.py +++ b/openerp/workflow/workitem.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- + ############################################################################## # # OpenERP, Open Source Management Solution @@ -28,41 +28,59 @@ import logging import instance import wkf_expr +from helpers import Session +from helpers import Record logger = logging.getLogger(__name__) -def create(cr, act_datas, inst_id, ident, stack): - for act in act_datas: +def create(session, record, activities, instance_id, stack): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(activities, list) + assert isinstance(instance_id, (long, int)) + assert isinstance(stack, list) + cr = session.cr + + ident = session.uid, record.model, record.id + for activity in activities: cr.execute("select nextval('wkf_workitem_id_seq')") id_new = cr.fetchone()[0] - cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, act['id'], inst_id)) + cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id)) cr.execute('select * from wkf_workitem where id=%s',(id_new,)) - res = cr.dictfetchone() + work_item = cr.dictfetchone() logger.info('Created workflow item in activity %s', - act['id'], extra={'ident': ident}) - process(cr, res, ident, stack=stack) + activity['id'], + extra={'ident': (session.uid, record.model, record.id)}) + + process(session, record, work_item, stack=stack) + +def process(session, record, workitem, signal=None, force_running=False, stack=None): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(force_running, bool) -def process(cr, workitem, ident, signal=None, force_running=False, stack=None): assert stack is not None + cr = session.cr + cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],)) activity = cr.dictfetchone() triggers = False if workitem['state'] == 'active': triggers = True - if not _execute(cr, workitem, activity, ident, stack): + if not _execute(session, record, workitem, activity, stack): return False if force_running or workitem['state'] == 'complete': - ok = _split_test(cr, workitem, activity['split_mode'], ident, signal, stack) + ok = _split_test(session, record, workitem, activity['split_mode'], signal, stack) triggers = triggers and not ok if triggers: cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) for trans in cr.dictfetchall(): if trans['trigger_model']: - ids = wkf_expr._eval_expr(cr,ident,workitem,trans['trigger_expr_id']) + ids = wkf_expr._eval_expr(session, record, workitem, trans['trigger_expr_id']) for res_id in ids: cr.execute('select nextval(\'wkf_triggers_id_seq\')') id =cr.fetchone()[0] @@ -73,26 +91,32 @@ def process(cr, workitem, ident, signal=None, force_running=False, stack=None): # ---------------------- PRIVATE FUNCS -------------------------------- -def _state_set(cr, workitem, activity, state, ident): - cr.execute('update wkf_workitem set state=%s where id=%s', (state,workitem['id'])) - workitem['state'] = state - logger.info("Changed state of work item %s to \"%s\" in activity %s", - workitem['id'], state, activity['id'], extra={'ident': ident}) +# def new_state_set(session, record, workitem, activity, state): -def _execute(cr, workitem, activity, ident, stack): +def _state_set(session, record, workitem, activity, state): + session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id'])) + workitem['state'] = state + logger.info('Changed state of work item %s to "%s" in activity %s', + workitem['id'], state, activity['id'], + extra={'ident': (session.uid, record.model, record.id)}) + +def _execute(session, record, workitem, activity, stack): result = True # # send a signal to parent workflow (signal: subflow.signal_name) # + cr = session.cr + ident = (session.uid, record.model, record.id) signal_todo = [] if (workitem['state']=='active') and activity['signal_send']: cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],)) - for i in cr.fetchall(): - signal_todo.append((i[0], (ident[0],i[1],i[2]), activity['signal_send'])) + for instance_id, model_name, record_id in cr.fetchall(): + record = Record(model_name, record_id) + signal_todo.append((instance_id, record, activity['signal_send'])) if activity['kind']=='dummy': if workitem['state']=='active': - _state_set(cr, workitem, activity, 'complete', ident) + _state_set(session, record, workitem, activity, 'complete') if activity['action_id']: res2 = wkf_expr.execute_action(cr, ident, workitem, activity) if res2: @@ -100,29 +124,29 @@ def _execute(cr, workitem, activity, ident, stack): result=res2 elif activity['kind']=='function': if workitem['state']=='active': - _state_set(cr, workitem, activity, 'running', ident) - returned_action = wkf_expr.execute(cr, ident, workitem, activity) + _state_set(session, record, workitem, activity, 'running') + returned_action = wkf_expr.execute(session, record, workitem, activity) if type(returned_action) in (dict,): stack.append(returned_action) if activity['action_id']: - res2 = wkf_expr.execute_action(cr, ident, workitem, activity) + res2 = wkf_expr.execute_action(session, record, workitem, activity) # A client action has been returned if res2: stack.append(res2) result=res2 - _state_set(cr, workitem, activity, 'complete', ident) + _state_set(session, record, workitem, activity, 'complete') elif activity['kind']=='stopall': if workitem['state']=='active': - _state_set(cr, workitem, activity, 'running', ident) + _state_set(session, record, workitem, activity, 'running') cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id'])) if activity['action']: - wkf_expr.execute(cr, ident, workitem, activity) - _state_set(cr, workitem, activity, 'complete', ident) + wkf_expr.execute(session, record, workitem, activity) + _state_set(session, record, workitem, activity, 'complete') elif activity['kind']=='subflow': if workitem['state']=='active': - _state_set(cr, workitem, activity, 'running', ident) + _state_set(session, record, workitem, activity, 'running') if activity.get('action', False): - id_new = wkf_expr.execute(cr, ident, workitem, activity) + id_new = wkf_expr.execute(session, record, workitem, activity) if not id_new: cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) return False @@ -130,27 +154,29 @@ def _execute(cr, workitem, activity, ident, stack): cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new,activity['subflow_id'])) id_new = cr.fetchone()[0] else: - id_new = instance.create(cr, ident, activity['subflow_id']) + id_new = instance.create(session, record, activity['subflow_id']) cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id'])) workitem['subflow_id'] = id_new if workitem['state']=='running': cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],)) state= cr.fetchone()[0] if state=='complete': - _state_set(cr, workitem, activity, 'complete', ident) - for t in signal_todo: - instance.validate(cr, t[0], t[1], t[2], force_running=True) + _state_set(session, record, workitem, activity, 'complete') + + for instance_id, record, signal_send in signal_todo: + instance.validate(session, record, signal_send, force_running=True) return result -def _split_test(cr, workitem, split_mode, ident, signal=None, stack=None): +def _split_test(session, record, workitem, split_mode, signal, stack): + cr = session.cr cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) test = False transitions = [] alltrans = cr.dictfetchall() if split_mode=='XOR' or split_mode=='OR': for transition in alltrans: - if wkf_expr.check(cr, workitem, ident, transition,signal): + if wkf_expr.check(session, record, workitem, transition,signal): test = True transitions.append((transition['id'], workitem['inst_id'])) if split_mode=='XOR': @@ -158,7 +184,7 @@ def _split_test(cr, workitem, split_mode, ident, signal=None, stack=None): else: test = True for transition in alltrans: - if not wkf_expr.check(cr, workitem, ident, transition,signal): + if not wkf_expr.check(session, record, workitem, transition,signal): test = False break cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) @@ -168,15 +194,17 @@ def _split_test(cr, workitem, split_mode, ident, signal=None, stack=None): cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions) cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) for t in transitions: - _join_test(cr, t[0], t[1], ident, stack) + _join_test(session, record, t[0], t[1], stack) return True return False -def _join_test(cr, trans_id, inst_id, ident, stack): +def _join_test(session, record, trans_id, inst_id, stack): + # cr, trans_id, inst_id, ident, stack): + cr = session.cr cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) activity = cr.dictfetchone() if activity['join_mode']=='XOR': - create(cr,[activity], inst_id, ident, stack) + create(session, record, [activity], inst_id, stack) cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id)) else: cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],)) @@ -191,7 +219,7 @@ def _join_test(cr, trans_id, inst_id, ident, stack): if ok: for (id,) in trans_ids: cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) - create(cr, [activity], inst_id, ident, stack) + create(session, record, [activity], inst_id, stack) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: From fad743a9788275b5b0bdfb72875005765bd97345 Mon Sep 17 00:00:00 2001 From: Stephane Wirtel Date: Wed, 23 Oct 2013 14:58:18 +0200 Subject: [PATCH 2/5] [REF] Remove old code from the workflow engine bzr revid: stw@openerp.com-20131023125818-3g3fe49lfrube9kq --- openerp/workflow/workitem.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/openerp/workflow/workitem.py b/openerp/workflow/workitem.py index f6dd5128d96..6ee99fd787d 100644 --- a/openerp/workflow/workitem.py +++ b/openerp/workflow/workitem.py @@ -41,7 +41,6 @@ def create(session, record, activities, instance_id, stack): assert isinstance(stack, list) cr = session.cr - ident = session.uid, record.model, record.id for activity in activities: cr.execute("select nextval('wkf_workitem_id_seq')") id_new = cr.fetchone()[0] @@ -91,8 +90,6 @@ def process(session, record, workitem, signal=None, force_running=False, stack=N # ---------------------- PRIVATE FUNCS -------------------------------- -# def new_state_set(session, record, workitem, activity, state): - def _state_set(session, record, workitem, activity, state): session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id'])) workitem['state'] = state @@ -106,7 +103,6 @@ def _execute(session, record, workitem, activity, stack): # send a signal to parent workflow (signal: subflow.signal_name) # cr = session.cr - ident = (session.uid, record.model, record.id) signal_todo = [] if (workitem['state']=='active') and activity['signal_send']: cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],)) @@ -118,7 +114,7 @@ def _execute(session, record, workitem, activity, stack): if workitem['state']=='active': _state_set(session, record, workitem, activity, 'complete') if activity['action_id']: - res2 = wkf_expr.execute_action(cr, ident, workitem, activity) + res2 = wkf_expr.execute_action(session, record, workitem, activity) if res2: stack.append(res2) result=res2 @@ -199,7 +195,6 @@ def _split_test(session, record, workitem, split_mode, signal, stack): return False def _join_test(session, record, trans_id, inst_id, stack): - # cr, trans_id, inst_id, ident, stack): cr = session.cr cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) activity = cr.dictfetchone() From 6a3a2709efe78b2440480acb33d02e5e9f025b55 Mon Sep 17 00:00:00 2001 From: Stephane Wirtel Date: Wed, 23 Oct 2013 15:07:05 +0200 Subject: [PATCH 3/5] [REF] Remove the useless wkf_expr.py file and copy it into workitem.py bzr revid: stw@openerp.com-20131023130705-k4u7ssba820b7hwi --- openerp/workflow/instance.py | 4 +- openerp/workflow/wkf_expr.py | 111 -------------------------------- openerp/workflow/workitem.py | 119 ++++++++++++++++++++++++++++++----- 3 files changed, 105 insertions(+), 129 deletions(-) delete mode 100644 openerp/workflow/wkf_expr.py diff --git a/openerp/workflow/instance.py b/openerp/workflow/instance.py index 4db807daaac..3d7b8cbb04e 100644 --- a/openerp/workflow/instance.py +++ b/openerp/workflow/instance.py @@ -19,8 +19,8 @@ # ############################################################################## import workitem -from helpers import Session -from helpers import Record +from openerp.workflow.helpers import Session +from openerp.workflow.helpers import Record def create(session, record, workflow_id): assert isinstance(session, Session) diff --git a/openerp/workflow/wkf_expr.py b/openerp/workflow/wkf_expr.py deleted file mode 100644 index 2c5afb0db48..00000000000 --- a/openerp/workflow/wkf_expr.py +++ /dev/null @@ -1,111 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2004-2009 Tiny SPRL (). -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -""" -Evaluate workflow code found in activity actions and transition conditions. -""" - -import openerp -from openerp.tools.safe_eval import safe_eval as eval - -class Env(dict): - """ - Dictionary class used as an environment to evaluate workflow code (such as - the condition on transitions). - - This environment provides sybmols for cr, uid, id, model name, model - instance, column names, and all the record (the one obtained by browsing - the provided ID) attributes. - """ - def __init__(self, cr, uid, model, id): - self.cr = cr - self.uid = uid - self.model = model - self.id = id - self.ids = [id] - self.obj = openerp.registry(cr.dbname)[model] - self.columns = self.obj._columns.keys() + self.obj._inherit_fields.keys() - - def __getitem__(self, key): - if (key in self.columns) or (key in dir(self.obj)): - res = self.obj.browse(self.cr, self.uid, self.id) - return res[key] - else: - return super(Env, self).__getitem__(key) - -def _eval_expr(session, record, workitem, lines): - """ - Evaluate each line of ``lines`` with the ``Env`` environment, returning - the value of the last line. - """ - assert lines, 'You used a NULL action in a workflow, use dummy node instead.' - result = False - for line in lines.split('\n'): - line = line.strip() - if not line: - continue - if line == 'True': - result = True - elif line == 'False': - result = False - else: - env = Env(session.cr, session.uid, record.model, record.id) - result = eval(line, env, nocopy=True) - return result - -def execute_action(session, record, workitem, activity): - """ - Evaluate the ir.actions.server action specified in the activity. - """ - ir_actions_server = openerp.registry(session.cr.dbname)['ir.actions.server'] - context = { 'active_model': record.model, 'active_id': record.id, 'active_ids': [record.id] } - result = ir_actions_server.run(session.cr, session.uid, [activity['action_id']], context) - return result - -def execute(session, record, workitem, activity): - """ - Evaluate the action specified in the activity. - """ - return _eval_expr(session, record, workitem, activity['action']) - -def check(session, record, workitem, transition, signal): - """ - Test if a transition can be taken. The transition can be taken if: - - - the signal name matches, - - the uid is SUPERUSER_ID or the user groups contains the transition's - group, - - the condition evaluates to a truish value. - """ - if transition['signal'] and signal != transition['signal']: - return False - - if session.uid != openerp.SUPERUSER_ID and transition['group_id']: - registry = openerp.registry(session.cr.dbname) - user_groups = registry['res.users'].read(session.cr, session.uid, [session.uid], ['groups_id'])[0]['groups_id'] - if transition['group_id'] not in user_groups: - return False - - return _eval_expr(session, record, workitem, transition['condition']) - - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: - diff --git a/openerp/workflow/workitem.py b/openerp/workflow/workitem.py index 6ee99fd787d..5c2fc580cd0 100644 --- a/openerp/workflow/workitem.py +++ b/openerp/workflow/workitem.py @@ -2,7 +2,7 @@ ############################################################################## # # OpenERP, Open Source Management Solution -# Copyright (C) 2004-2009 Tiny SPRL (). +# Copyright (C) 2004-2009 OpenERP S.A. (%s', (workitem['inst_id'], workitem['id'])) if activity['action']: - wkf_expr.execute(session, record, workitem, activity) + wkf_expr_execute(session, record, workitem, activity) _state_set(session, record, workitem, activity, 'complete') + elif activity['kind']=='subflow': if workitem['state']=='active': _state_set(session, record, workitem, activity, 'running') if activity.get('action', False): - id_new = wkf_expr.execute(session, record, workitem, activity) + id_new = wkf_expr_execute(session, record, workitem, activity) if not id_new: cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) return False @@ -172,7 +259,7 @@ def _split_test(session, record, workitem, split_mode, signal, stack): alltrans = cr.dictfetchall() if split_mode=='XOR' or split_mode=='OR': for transition in alltrans: - if wkf_expr.check(session, record, workitem, transition,signal): + if wkf_expr_check(session, record, workitem, transition,signal): test = True transitions.append((transition['id'], workitem['inst_id'])) if split_mode=='XOR': @@ -180,7 +267,7 @@ def _split_test(session, record, workitem, split_mode, signal, stack): else: test = True for transition in alltrans: - if not wkf_expr.check(session, record, workitem, transition,signal): + if not wkf_expr_check(session, record, workitem, transition,signal): test = False break cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) From f6c1b6bb8638f9ea4d2c68cc73443a02a6298d7c Mon Sep 17 00:00:00 2001 From: Stephane Wirtel Date: Wed, 23 Oct 2013 15:58:38 +0200 Subject: [PATCH 4/5] [REF] Create a WorkflowItem object bzr revid: stw@openerp.com-20131023135838-ix7x6mzui3i3wfbi --- openerp/workflow/helpers.py | 9 +- openerp/workflow/instance.py | 14 +- openerp/workflow/workitem.py | 379 ++++++++++++++++++----------------- 3 files changed, 213 insertions(+), 189 deletions(-) diff --git a/openerp/workflow/helpers.py b/openerp/workflow/helpers.py index 980eb855461..e41d178be90 100644 --- a/openerp/workflow/helpers.py +++ b/openerp/workflow/helpers.py @@ -12,4 +12,11 @@ class Record(object): assert isinstance(model, basestring) assert isinstance(record_id, (int, long)) self.model = model - self.id = record_id \ No newline at end of file + self.id = record_id + +class WorkflowActivity(object): + KIND_FUNCTION = 'function' + KIND_DUMMY = 'dummy' + KIND_STOPALL = 'stopall' + KIND_SUBFLOW = 'subflow' + diff --git a/openerp/workflow/instance.py b/openerp/workflow/instance.py index 3d7b8cbb04e..befbd5970f7 100644 --- a/openerp/workflow/instance.py +++ b/openerp/workflow/instance.py @@ -21,6 +21,7 @@ import workitem from openerp.workflow.helpers import Session from openerp.workflow.helpers import Record +from openerp.workflow.workitem import WorkflowItem def create(session, record, workflow_id): assert isinstance(session, Session) @@ -33,7 +34,7 @@ def create(session, record, workflow_id): cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,)) activities = cr.dictfetchall() stack = [] - workitem.create(session, record, activities, instance_id, stack=stack) + WorkflowItem(session, record).create(activities, instance_id, stack) update(session, record, instance_id) return instance_id @@ -53,9 +54,10 @@ def validate(session, record, instance_id, signal, force_running=False): cr = session.cr cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) stack = [] + wi = WorkflowItem(session, record) for work_item in cr.dictfetchall(): - stack = [] - workitem.process(session, record, work_item, signal, force_running, stack=stack) + # stack = [] + wi.process(work_item, signal, force_running, stack=stack) # An action is returned _update_end(session, record, instance_id) return stack and stack[0] or False @@ -67,9 +69,11 @@ def update(session, record, instance_id): cr = session.cr cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) - for witem in cr.dictfetchall(): + wi = WorkflowItem(session, record) + + for work_item in cr.dictfetchall(): stack = [] - workitem.process(session, record, witem, stack=stack) + wi.process(work_item, stack=stack) return _update_end(session, record, instance_id) def _update_end(session, record, instance_id): diff --git a/openerp/workflow/workitem.py b/openerp/workflow/workitem.py index 5c2fc580cd0..7d0afcb519f 100644 --- a/openerp/workflow/workitem.py +++ b/openerp/workflow/workitem.py @@ -28,13 +28,14 @@ import instance from openerp.workflow.helpers import Session from openerp.workflow.helpers import Record +from openerp.workflow.helpers import WorkflowActivity logger = logging.getLogger(__name__) import openerp from openerp.tools.safe_eval import safe_eval as eval -class Env(dict): +class Environment(dict): """ Dictionary class used as an environment to evaluate workflow code (such as the condition on transitions). @@ -43,13 +44,13 @@ class Env(dict): instance, column names, and all the record (the one obtained by browsing the provided ID) attributes. """ - def __init__(self, cr, uid, model, id): - self.cr = cr - self.uid = uid - self.model = model - self.id = id - self.ids = [id] - self.obj = openerp.registry(cr.dbname)[model] + def __init__(self, session, record): + self.cr = session.cr + self.uid = session.uid + self.model = record.model + self.id = record.id + self.ids = [record.id] + self.obj = openerp.registry(self.cr.dbname)[self.model] self.columns = self.obj._columns.keys() + self.obj._inherit_fields.keys() def __getitem__(self, key): @@ -57,11 +58,11 @@ class Env(dict): res = self.obj.browse(self.cr, self.uid, self.id) return res[key] else: - return super(Env, self).__getitem__(key) + return super(Environment, self).__getitem__(key) def wkf_expr_eval_expr(session, record, workitem, lines): """ - Evaluate each line of ``lines`` with the ``Env`` environment, returning + Evaluate each line of ``lines`` with the ``Environment`` environment, returning the value of the last line. """ assert lines, 'You used a NULL action in a workflow, use dummy node instead.' @@ -75,7 +76,7 @@ def wkf_expr_eval_expr(session, record, workitem, lines): elif line == 'False': result = False else: - env = Env(session.cr, session.uid, record.model, record.id) + env = Environment(session, record) result = eval(line, env, nocopy=True) return result @@ -115,193 +116,205 @@ def wkf_expr_check(session, record, workitem, transition, signal): return wkf_expr_eval_expr(session, record, workitem, transition['condition']) -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: +class WorkflowItem(object): + def __init__(self, session, record): + assert isinstance(session, Session) + assert isinstance(record, Record) + self.session = session + self.record = record + def create(self, activities, instance_id, stack): + assert isinstance(activities, list) + assert isinstance(instance_id, (long, int)) + assert isinstance(stack, list) + cr = self.session.cr + for activity in activities: + cr.execute("select nextval('wkf_workitem_id_seq')") + id_new = cr.fetchone()[0] + cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id)) + cr.execute('select * from wkf_workitem where id=%s',(id_new,)) + work_item = cr.dictfetchone() + logger.info('Created workflow item in activity %s', + activity['id'], + extra={'ident': (self.session.uid, self.record.model, self.record.id)}) -def create(session, record, activities, instance_id, stack): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(activities, list) - assert isinstance(instance_id, (long, int)) - assert isinstance(stack, list) - cr = session.cr + self.process(work_item, stack=stack) - for activity in activities: - cr.execute("select nextval('wkf_workitem_id_seq')") - id_new = cr.fetchone()[0] - cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id)) - cr.execute('select * from wkf_workitem where id=%s',(id_new,)) - work_item = cr.dictfetchone() - logger.info('Created workflow item in activity %s', - activity['id'], - extra={'ident': (session.uid, record.model, record.id)}) + def process(self, workitem, signal=None, force_running=False, stack=None): + assert isinstance(workitem, dict) + assert isinstance(force_running, bool) - process(session, record, work_item, stack=stack) + # return _process(self.session, self.record, work_item, signal=signal, force_running=force_running, stack=stack) + # assert isinstance(force_running, bool) -def process(session, record, workitem, signal=None, force_running=False, stack=None): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(force_running, bool) + assert stack is not None - assert stack is not None + cr = self.session.cr - cr = session.cr + cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],)) + activity = cr.dictfetchone() - cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],)) - activity = cr.dictfetchone() + triggers = False + if workitem['state'] == 'active': + triggers = True + if not self._execute(workitem, activity, stack): + return False - triggers = False - if workitem['state'] == 'active': - triggers = True - if not _execute(session, record, workitem, activity, stack): - return False + if force_running or workitem['state'] == 'complete': + ok = self._split_test(workitem, activity['split_mode'], signal, stack) + triggers = triggers and not ok - if force_running or workitem['state'] == 'complete': - ok = _split_test(session, record, workitem, activity['split_mode'], signal, stack) - triggers = triggers and not ok + if triggers: + cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) + for trans in cr.dictfetchall(): + if trans['trigger_model']: + ids = wkf_expr_eval_expr(self.session, self.record, workitem, trans['trigger_expr_id']) + for res_id in ids: + cr.execute('select nextval(\'wkf_triggers_id_seq\')') + id =cr.fetchone()[0] + cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id,workitem['inst_id'], workitem['id'], id)) - if triggers: - cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) - for trans in cr.dictfetchall(): - if trans['trigger_model']: - ids = wkf_expr_eval_expr(session, record, workitem, trans['trigger_expr_id']) - for res_id in ids: - cr.execute('select nextval(\'wkf_triggers_id_seq\')') - id =cr.fetchone()[0] - cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id,workitem['inst_id'], workitem['id'], id)) - - return True - - -# ---------------------- PRIVATE FUNCS -------------------------------- - -def _state_set(session, record, workitem, activity, state): - session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id'])) - workitem['state'] = state - logger.info('Changed state of work item %s to "%s" in activity %s', - workitem['id'], state, activity['id'], - extra={'ident': (session.uid, record.model, record.id)}) - -def _execute(session, record, workitem, activity, stack): - """Send a signal to parenrt workflow (signal: subflow.signal_name)""" - result = True - cr = session.cr - signal_todo = [] - if (workitem['state']=='active') and activity['signal_send']: - cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],)) - for instance_id, model_name, record_id in cr.fetchall(): - record = Record(model_name, record_id) - signal_todo.append((instance_id, record, activity['signal_send'])) - - if activity['kind']=='dummy': - if workitem['state']=='active': - _state_set(session, record, workitem, activity, 'complete') - if activity['action_id']: - res2 = wkf_expr_execute_action(session, record, workitem, activity) - if res2: - stack.append(res2) - result=res2 - - elif activity['kind']=='function': - if workitem['state']=='active': - _state_set(session, record, workitem, activity, 'running') - returned_action = wkf_expr_execute(session, record, workitem, activity) - if type(returned_action) in (dict,): - stack.append(returned_action) - if activity['action_id']: - res2 = wkf_expr_execute_action(session, record, workitem, activity) - # A client action has been returned - if res2: - stack.append(res2) - result=res2 - _state_set(session, record, workitem, activity, 'complete') - - elif activity['kind']=='stopall': - if workitem['state']=='active': - _state_set(session, record, workitem, activity, 'running') - cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id'])) - if activity['action']: - wkf_expr_execute(session, record, workitem, activity) - _state_set(session, record, workitem, activity, 'complete') - - elif activity['kind']=='subflow': - if workitem['state']=='active': - _state_set(session, record, workitem, activity, 'running') - if activity.get('action', False): - id_new = wkf_expr_execute(session, record, workitem, activity) - if not id_new: - cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) - return False - assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new)) - cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new,activity['subflow_id'])) - id_new = cr.fetchone()[0] - else: - id_new = instance.create(session, record, activity['subflow_id']) - cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id'])) - workitem['subflow_id'] = id_new - if workitem['state']=='running': - cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],)) - state= cr.fetchone()[0] - if state=='complete': - _state_set(session, record, workitem, activity, 'complete') - - for instance_id, record, signal_send in signal_todo: - instance.validate(session, record, signal_send, force_running=True) - - return result - -def _split_test(session, record, workitem, split_mode, signal, stack): - cr = session.cr - cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) - test = False - transitions = [] - alltrans = cr.dictfetchall() - if split_mode=='XOR' or split_mode=='OR': - for transition in alltrans: - if wkf_expr_check(session, record, workitem, transition,signal): - test = True - transitions.append((transition['id'], workitem['inst_id'])) - if split_mode=='XOR': - break - else: - test = True - for transition in alltrans: - if not wkf_expr_check(session, record, workitem, transition,signal): - test = False - break - cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) - if not cr.fetchone()[0]: - transitions.append((transition['id'], workitem['inst_id'])) - if test and len(transitions): - cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions) - cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) - for t in transitions: - _join_test(session, record, t[0], t[1], stack) return True - return False -def _join_test(session, record, trans_id, inst_id, stack): - cr = session.cr - cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) - activity = cr.dictfetchone() - if activity['join_mode']=='XOR': - create(session, record, [activity], inst_id, stack) - cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id)) - else: - cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],)) - trans_ids = cr.fetchall() - ok = True - for (id,) in trans_ids: - cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) - res = cr.fetchone()[0] - if not res: - ok = False - break - if ok: + def _execute(self, workitem, activity, stack): + """Send a signal to parenrt workflow (signal: subflow.signal_name)""" + result = True + cr = self.session.cr + signal_todo = [] + + if (workitem['state']=='active') and activity['signal_send']: + # signal_send']: + cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],)) + for instance_id, model_name, record_id in cr.fetchall(): + record = Record(model_name, record_id) + signal_todo.append((instance_id, record, activity['signal_send'])) + + + if activity['kind'] == WorkflowActivity.KIND_DUMMY: + if workitem['state']=='active': + self._state_set(workitem, activity, 'complete') + if activity['action_id']: + res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity) + if res2: + stack.append(res2) + result=res2 + + elif activity['kind'] == WorkflowActivity.KIND_FUNCTION: + + if workitem['state']=='active': + self._state_set(workitem, activity, 'running') + returned_action = wkf_expr_execute(self.session, self.record, workitem, activity) + if type(returned_action) in (dict,): + stack.append(returned_action) + if activity['action_id']: + res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity) + # A client action has been returned + if res2: + stack.append(res2) + result=res2 + self._state_set(workitem, activity, 'complete') + + elif activity['kind'] == WorkflowActivity.KIND_STOPALL: + if workitem['state']=='active': + self._state_set(workitem, activity, 'running') + cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id'])) + if activity['action']: + wkf_expr_execute(self.session, self.record, workitem, activity) + self._state_set(workitem, activity, 'complete') + + elif activity['kind'] == WorkflowActivity.KIND_SUBFLOW: + + if workitem['state']=='active': + + self._state_set(workitem, activity, 'running') + if activity.get('action', False): + id_new = wkf_expr_execute(self.session, self.record, workitem, activity) + if not id_new: + cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) + return False + assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new)) + cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new,activity['subflow_id'])) + id_new = cr.fetchone()[0] + else: + id_new = instance.create(self.session, self.record, activity['subflow_id']) + cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id'])) + workitem['subflow_id'] = id_new + + if workitem['state']=='running': + cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],)) + state= cr.fetchone()[0] + if state=='complete': + self._state_set(workitem, activity, 'complete') + + for instance_id, record, signal_send in signal_todo: + instance.validate(self.session, self.record, signal_send, force_running=True) + + return result + + def _state_set(self, workitem, activity, state): + self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id'])) + workitem['state'] = state + logger.info('Changed state of work item %s to "%s" in activity %s', + workitem['id'], state, activity['id'], + extra={'ident': (self.session.uid, self.record.model, self.record.id)}) + + + + + def _split_test(self, workitem, split_mode, signal, stack): + cr = self.session.cr + cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) + test = False + transitions = [] + alltrans = cr.dictfetchall() + + if split_mode in ('XOR', 'OR'): + for transition in alltrans: + if wkf_expr_check(self.session, self.record, workitem, transition,signal): + test = True + transitions.append((transition['id'], workitem['inst_id'])) + if split_mode=='XOR': + break + else: + test = True + for transition in alltrans: + if not wkf_expr_check(self.session, self.record, workitem, transition,signal): + test = False + break + cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) + if not cr.fetchone()[0]: + transitions.append((transition['id'], workitem['inst_id'])) + + if test and transitions: + cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions) + cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) + for t in transitions: + self._join_test(t[0], t[1], stack) + return True + return False + + def _join_test(self, trans_id, inst_id, stack): + cr = self.session.cr + cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) + activity = cr.dictfetchone() + if activity['join_mode']=='XOR': + self.create([activity], inst_id, stack) + cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id)) + else: + cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],)) + trans_ids = cr.fetchall() + ok = True for (id,) in trans_ids: - cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) - create(session, record, [activity], inst_id, stack) + cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) + res = cr.fetchone()[0] + if not res: + ok = False + break + if ok: + for (id,) in trans_ids: + cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) + self.create([activity], inst_id, stack) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: From 1c398cd6f89a4979eade863f8aa89ccaabf52132 Mon Sep 17 00:00:00 2001 From: Stephane Wirtel Date: Thu, 24 Oct 2013 09:57:07 +0200 Subject: [PATCH 5/5] [REF] Start to abstract the Workflow engine bzr revid: stw@openerp.com-20131024075707-k1092bspi6ebixdw --- openerp/workflow/instance.py | 166 ++++++++++++--------- openerp/workflow/service.py | 17 ++- openerp/workflow/workitem.py | 275 +++++++++++++++++++---------------- 3 files changed, 253 insertions(+), 205 deletions(-) diff --git a/openerp/workflow/instance.py b/openerp/workflow/instance.py index befbd5970f7..223e90cca31 100644 --- a/openerp/workflow/instance.py +++ b/openerp/workflow/instance.py @@ -23,85 +23,111 @@ from openerp.workflow.helpers import Session from openerp.workflow.helpers import Record from openerp.workflow.workitem import WorkflowItem -def create(session, record, workflow_id): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(workflow_id, (int, long)) +class WorkflowInstance(object): + def __init__(self, session, record, values): + assert isinstance(session, Session) + assert isinstance(record, Record) + self.session = session + self.record = record - cr = session.cr - cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id)) - instance_id = cr.fetchone()[0] - cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,)) - activities = cr.dictfetchall() - stack = [] - WorkflowItem(session, record).create(activities, instance_id, stack) - update(session, record, instance_id) - return instance_id + if not values: + values = {} + + assert isinstance(values, dict) + self.instance = values + + @classmethod + def create(cls, session, record, workflow_id): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(workflow_id, (int, long)) + + cr = session.cr + cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id)) + instance_id = cr.fetchone()[0] + + cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,)) + stack = [] + + activities = cr.dictfetchall() + for activity in activities: + WorkflowItem.create(session, record, activity, instance_id, stack) + + cr.execute('SELECT * FROM wkf_instance WHERE id = %s', (instance_id,)) + values = cr.dictfetchone() + wi = WorkflowInstance(session, record, values) + wi.update() + + return wi + + def delete(self): + self.session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (self.record.id, self.record.model)) + + def validate(self, signal, force_running=False): + assert isinstance(signal, basestring) + assert isinstance(force_running, bool) + + cr = self.session.cr + cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],)) + stack = [] + for work_item_values in cr.dictfetchall(): + wi = WorkflowItem(self.session, self.record, work_item_values) + wi.process(signal=signal, force_running=force_running, stack=stack) + # An action is returned + self._update_end() + return stack and stack[0] or False + + def update(self): + cr = self.session.cr + + cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],)) + for work_item_values in cr.dictfetchall(): + stack = [] + WorkflowItem(self.session, self.record, work_item_values).process(stack=stack) + return self._update_end() + + def _update_end(self): + cr = self.session.cr + instance_id = self.instance['id'] + cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,)) + wkf_id = cr.fetchone()[0] + cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) + ok=True + for r in cr.fetchall(): + if (r[0]<>'complete') or not r[1]: + ok=False + break + if ok: + cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) + act_names = cr.fetchall() + cr.execute("update wkf_instance set state='complete' where id=%s", (instance_id,)) + cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (instance_id,)) + cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (instance_id,)) + for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall(): + cur_record = Record(cur_model_name, cur_record_id) + for act_name in act_names: + WorkflowInstance(self.session, cur_record, {'id':cur_instance_id}).validate('subflow.{}'.format(act_name[0])) + + return ok + + + + + +def create(session, record, workflow_id): + return WorkflowInstance(session, record).create(workflow_id) def delete(session, record): - assert isinstance(session, Session) - assert isinstance(record, Record) - - session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (record.id, record.model)) + return WorkflowInstance(session, record).delete() def validate(session, record, instance_id, signal, force_running=False): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(instance_id, (long, int)) - assert isinstance(signal, basestring) - assert isinstance(force_running, bool) - - cr = session.cr - cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) - stack = [] - wi = WorkflowItem(session, record) - for work_item in cr.dictfetchall(): - # stack = [] - wi.process(work_item, signal, force_running, stack=stack) - # An action is returned - _update_end(session, record, instance_id) - return stack and stack[0] or False + return WorkflowInstance(session, record).validate(instance_id, signal, force_running) def update(session, record, instance_id): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(instance_id, (long, int)) - - cr = session.cr - cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,)) - wi = WorkflowItem(session, record) - - for work_item in cr.dictfetchall(): - stack = [] - wi.process(work_item, stack=stack) - return _update_end(session, record, instance_id) + return WorkflowInstance(session, record).update(instance_id) def _update_end(session, record, instance_id): - assert isinstance(session, Session) - assert isinstance(record, Record) - assert isinstance(instance_id, (long, int)) - - cr = session.cr - cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,)) - wkf_id = cr.fetchone()[0] - cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) - ok=True - for r in cr.fetchall(): - if (r[0]<>'complete') or not r[1]: - ok=False - break - if ok: - cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (instance_id,)) - act_names = cr.fetchall() - cr.execute("update wkf_instance set state='complete' where id=%s", (instance_id,)) - cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (instance_id,)) - cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (instance_id,)) - for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall(): - cur_record = Record(cur_model_name, cur_record_id) - for act_name in act_names: - validate(session, cur_record, cur_instance_id, 'subflow.%s' % act_name[0]) - - return ok + return WorkflowInstance(session, record)._update_end(instance_id) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/workflow/service.py b/openerp/workflow/service.py index 08c0f61a7a2..cf8a5449b07 100644 --- a/openerp/workflow/service.py +++ b/openerp/workflow/service.py @@ -21,7 +21,8 @@ from helpers import Session from helpers import Record -import instance +from openerp.workflow.instance import WorkflowInstance +# import instance class WorkflowService(object): @@ -49,7 +50,7 @@ class WorkflowService(object): (self.record.id or None, self.record.model or None, 'active') ) for (instance_id,) in self.cr.fetchall(): - instance.update(self.session, self.record, instance_id) + WorkflowInstance(self.session, self.record, {'id': instance_id}).update() def trigger(self): self.cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (self.record.id, self.record.model)) @@ -61,10 +62,10 @@ class WorkflowService(object): current_session = Session(self.session.cr, current_uid) current_record = Record(current_model_name, current_record_id) - instance.update(current_session, current_record, instance_id) + WorkflowInstance(current_session, current_record, {'id': instance_id}).update() def delete(self): - instance.delete(self.session, self.record) + WorkflowInstance(self.session, self.record, {}).delete() def create(self): WorkflowService.CACHE.setdefault(self.cr.dbname, {}) @@ -77,14 +78,18 @@ class WorkflowService(object): WorkflowService.CACHE[self.cr.dbname][self.record.model] = wkf_ids for (wkf_id, ) in wkf_ids: - instance.create(self.session, self.record, wkf_id) + WorkflowInstance.create(self.session, self.record, wkf_id) def validate(self, signal): result = False # ids of all active workflow instances for a corresponding resource (id, model_nam) self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (self.record.id, self.record.model, 'active')) + # TODO: Refactor the workflow instance object for (instance_id,) in self.cr.fetchall(): - res2 = instance.validate(self.session, self.record, instance_id, signal) + wi = WorkflowInstance(self.session, self.record, {'id': instance_id}) + + res2 = wi.validate(signal) + result = result or res2 return result diff --git a/openerp/workflow/workitem.py b/openerp/workflow/workitem.py index 7d0afcb519f..2c71aaaf7d8 100644 --- a/openerp/workflow/workitem.py +++ b/openerp/workflow/workitem.py @@ -60,235 +60,191 @@ class Environment(dict): else: return super(Environment, self).__getitem__(key) -def wkf_expr_eval_expr(session, record, workitem, lines): - """ - Evaluate each line of ``lines`` with the ``Environment`` environment, returning - the value of the last line. - """ - assert lines, 'You used a NULL action in a workflow, use dummy node instead.' - result = False - for line in lines.split('\n'): - line = line.strip() - if not line: - continue - if line == 'True': - result = True - elif line == 'False': - result = False - else: - env = Environment(session, record) - result = eval(line, env, nocopy=True) - return result - -def wkf_expr_execute_action(session, record, workitem, activity): - """ - Evaluate the ir.actions.server action specified in the activity. - """ - ir_actions_server = openerp.registry(session.cr.dbname)['ir.actions.server'] - context = { 'active_model': record.model, 'active_id': record.id, 'active_ids': [record.id] } - result = ir_actions_server.run(session.cr, session.uid, [activity['action_id']], context) - return result - -def wkf_expr_execute(session, record, workitem, activity): - """ - Evaluate the action specified in the activity. - """ - return wkf_expr_eval_expr(session, record, workitem, activity['action']) - -def wkf_expr_check(session, record, workitem, transition, signal): - """ - Test if a transition can be taken. The transition can be taken if: - - - the signal name matches, - - the uid is SUPERUSER_ID or the user groups contains the transition's - group, - - the condition evaluates to a truish value. - """ - if transition['signal'] and signal != transition['signal']: - return False - - if session.uid != openerp.SUPERUSER_ID and transition['group_id']: - registry = openerp.registry(session.cr.dbname) - user_groups = registry['res.users'].read(session.cr, session.uid, [session.uid], ['groups_id'])[0]['groups_id'] - if transition['group_id'] not in user_groups: - return False - - return wkf_expr_eval_expr(session, record, workitem, transition['condition']) - class WorkflowItem(object): - def __init__(self, session, record): + def __init__(self, session, record, work_item_values): assert isinstance(session, Session) assert isinstance(record, Record) self.session = session self.record = record - def create(self, activities, instance_id, stack): - assert isinstance(activities, list) + if not work_item_values: + work_item_values = {} + + assert isinstance(work_item_values, dict) + self.workitem = work_item_values + + @classmethod + def create(cls, session, record, activity, instance_id, stack): + assert isinstance(session, Session) + assert isinstance(record, Record) + assert isinstance(activity, dict) assert isinstance(instance_id, (long, int)) assert isinstance(stack, list) - cr = self.session.cr + cr = session.cr + cr.execute("select nextval('wkf_workitem_id_seq')") + id_new = cr.fetchone()[0] + cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id)) + cr.execute('select * from wkf_workitem where id=%s',(id_new,)) + work_item_values = cr.dictfetchone() + logger.info('Created workflow item in activity %s', + activity['id'], + extra={'ident': (session.uid, record.model, record.id)}) + + workflow_item = WorkflowItem(session, record, work_item_values) + workflow_item.process(stack=stack) + + @classmethod + def create_all(cls, session, record, activities, instance_id, stack): + assert isinstance(activities, list) + for activity in activities: - cr.execute("select nextval('wkf_workitem_id_seq')") - id_new = cr.fetchone()[0] - cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id)) - cr.execute('select * from wkf_workitem where id=%s',(id_new,)) - work_item = cr.dictfetchone() - logger.info('Created workflow item in activity %s', - activity['id'], - extra={'ident': (self.session.uid, self.record.model, self.record.id)}) + cls.create(session, record, activity, instance_id, stack) - self.process(work_item, stack=stack) - - def process(self, workitem, signal=None, force_running=False, stack=None): - assert isinstance(workitem, dict) + def process(self, signal=None, force_running=False, stack=None): assert isinstance(force_running, bool) - - # return _process(self.session, self.record, work_item, signal=signal, force_running=force_running, stack=stack) - # assert isinstance(force_running, bool) - assert stack is not None cr = self.session.cr - cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],)) + cr.execute('select * from wkf_activity where id=%s', (self.workitem['act_id'],)) activity = cr.dictfetchone() triggers = False - if workitem['state'] == 'active': + if self.workitem['state'] == 'active': triggers = True - if not self._execute(workitem, activity, stack): + if not self._execute(activity, stack): return False - if force_running or workitem['state'] == 'complete': - ok = self._split_test(workitem, activity['split_mode'], signal, stack) + if force_running or self.workitem['state'] == 'complete': + ok = self._split_test(activity['split_mode'], signal, stack) triggers = triggers and not ok if triggers: - cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) + cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],)) for trans in cr.dictfetchall(): if trans['trigger_model']: - ids = wkf_expr_eval_expr(self.session, self.record, workitem, trans['trigger_expr_id']) + ids = self.wkf_expr_eval_expr(trans['trigger_expr_id']) for res_id in ids: cr.execute('select nextval(\'wkf_triggers_id_seq\')') id =cr.fetchone()[0] - cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id,workitem['inst_id'], workitem['id'], id)) + cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id, self.workitem['inst_id'], self.workitem['id'], id)) return True - def _execute(self, workitem, activity, stack): + def _execute(self, activity, stack): """Send a signal to parenrt workflow (signal: subflow.signal_name)""" result = True cr = self.session.cr signal_todo = [] - if (workitem['state']=='active') and activity['signal_send']: + if (self.workitem['state']=='active') and activity['signal_send']: # signal_send']: - cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],)) + cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (self.workitem['inst_id'],)) for instance_id, model_name, record_id in cr.fetchall(): record = Record(model_name, record_id) signal_todo.append((instance_id, record, activity['signal_send'])) if activity['kind'] == WorkflowActivity.KIND_DUMMY: - if workitem['state']=='active': - self._state_set(workitem, activity, 'complete') + if self.workitem['state']=='active': + self._state_set(activity, 'complete') if activity['action_id']: - res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity) + res2 = self.wkf_expr_execute_action(activity) if res2: stack.append(res2) result=res2 elif activity['kind'] == WorkflowActivity.KIND_FUNCTION: - if workitem['state']=='active': - self._state_set(workitem, activity, 'running') - returned_action = wkf_expr_execute(self.session, self.record, workitem, activity) + if self.workitem['state']=='active': + self._state_set(activity, 'running') + returned_action = self.wkf_expr_execute(activity) if type(returned_action) in (dict,): stack.append(returned_action) if activity['action_id']: - res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity) + res2 = self.wkf_expr_execute_action(activity) # A client action has been returned if res2: stack.append(res2) result=res2 - self._state_set(workitem, activity, 'complete') + self._state_set(activity, 'complete') elif activity['kind'] == WorkflowActivity.KIND_STOPALL: - if workitem['state']=='active': - self._state_set(workitem, activity, 'running') - cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id'])) + if self.workitem['state']=='active': + self._state_set(activity, 'running') + cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (self.workitem['inst_id'], self.workitem['id'])) if activity['action']: - wkf_expr_execute(self.session, self.record, workitem, activity) - self._state_set(workitem, activity, 'complete') + self.wkf_expr_execute(activity) + self._state_set(activity, 'complete') elif activity['kind'] == WorkflowActivity.KIND_SUBFLOW: - if workitem['state']=='active': + if self.workitem['state']=='active': - self._state_set(workitem, activity, 'running') + self._state_set(activity, 'running') if activity.get('action', False): - id_new = wkf_expr_execute(self.session, self.record, workitem, activity) + id_new = self.wkf_expr_execute(activity) if not id_new: - cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) + cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],)) return False assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new)) - cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new,activity['subflow_id'])) + cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new, activity['subflow_id'])) id_new = cr.fetchone()[0] else: - id_new = instance.create(self.session, self.record, activity['subflow_id']) - cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id'])) - workitem['subflow_id'] = id_new + inst = instance.WorkflowInstance(self.session, self.record) + id_new = inst.create(activity['subflow_id']) - if workitem['state']=='running': - cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],)) - state= cr.fetchone()[0] + cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, self.workitem['id'])) + self.workitem['subflow_id'] = id_new + + if self.workitem['state']=='running': + cr.execute("select state from wkf_instance where id=%s", (self.workitem['subflow_id'],)) + state = cr.fetchone()[0] if state=='complete': - self._state_set(workitem, activity, 'complete') + self._state_set(activity, 'complete') for instance_id, record, signal_send in signal_todo: - instance.validate(self.session, self.record, signal_send, force_running=True) + wi = instance.WorkflowInstance(self.session, record, {'id': instance_id}) + wi.validate(signal_send, force_running=True) return result - def _state_set(self, workitem, activity, state): - self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id'])) - workitem['state'] = state + def _state_set(self, activity, state): + self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, self.workitem['id'])) + self.workitem['state'] = state logger.info('Changed state of work item %s to "%s" in activity %s', - workitem['id'], state, activity['id'], + self.workitem['id'], state, activity['id'], extra={'ident': (self.session.uid, self.record.model, self.record.id)}) - - - - def _split_test(self, workitem, split_mode, signal, stack): + def _split_test(self, split_mode, signal, stack): cr = self.session.cr - cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) + cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],)) test = False transitions = [] alltrans = cr.dictfetchall() if split_mode in ('XOR', 'OR'): for transition in alltrans: - if wkf_expr_check(self.session, self.record, workitem, transition,signal): + if self.wkf_expr_check(transition,signal): test = True - transitions.append((transition['id'], workitem['inst_id'])) + transitions.append((transition['id'], self.workitem['inst_id'])) if split_mode=='XOR': break else: test = True for transition in alltrans: - if not wkf_expr_check(self.session, self.record, workitem, transition,signal): + if not self.wkf_expr_check(transition, signal): test = False break - cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) + cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], self.workitem['inst_id'])) if not cr.fetchone()[0]: - transitions.append((transition['id'], workitem['inst_id'])) + transitions.append((transition['id'], self.workitem['inst_id'])) if test and transitions: cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions) - cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) + cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],)) for t in transitions: self._join_test(t[0], t[1], stack) return True @@ -299,7 +255,7 @@ class WorkflowItem(object): cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) activity = cr.dictfetchone() if activity['join_mode']=='XOR': - self.create([activity], inst_id, stack) + WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack) cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id)) else: cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],)) @@ -314,7 +270,68 @@ class WorkflowItem(object): if ok: for (id,) in trans_ids: cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) - self.create([activity], inst_id, stack) + WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack) + + def wkf_expr_eval_expr(self, lines): + """ + Evaluate each line of ``lines`` with the ``Environment`` environment, returning + the value of the last line. + """ + assert lines, 'You used a NULL action in a workflow, use dummy node instead.' + result = False + for line in lines.split('\n'): + line = line.strip() + if not line: + continue + if line == 'True': + result = True + elif line == 'False': + result = False + else: + env = Environment(self.session, self.record) + result = eval(line, env, nocopy=True) + return result + + def wkf_expr_execute_action(self, activity): + """ + Evaluate the ir.actions.server action specified in the activity. + """ + context = { + 'active_model': self.record.model, + 'active_id': self.record.id, + 'active_ids': [self.record.id] + } + + ir_actions_server = openerp.registry(self.session.cr.dbname)['ir.actions.server'] + result = ir_actions_server.run(self.session.cr, self.session.uid, [activity['action_id']], context) + + return result + + def wkf_expr_execute(self, activity): + """ + Evaluate the action specified in the activity. + """ + return self.wkf_expr_eval_expr(activity['action']) + + def wkf_expr_check(self, transition, signal): + """ + Test if a transition can be taken. The transition can be taken if: + + - the signal name matches, + - the uid is SUPERUSER_ID or the user groups contains the transition's + group, + - the condition evaluates to a truish value. + """ + if transition['signal'] and signal != transition['signal']: + return False + + if self.session.uid != openerp.SUPERUSER_ID and transition['group_id']: + registry = openerp.registry(self.session.cr.dbname) + user_groups = registry['res.users'].read(self.session.cr, self.session.uid, [self.session.uid], ['groups_id'])[0]['groups_id'] + if transition['group_id'] not in user_groups: + return False + + return self.wkf_expr_eval_expr(transition['condition']) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: