[REF] Merge a refactoring of the workflow (lp:~openerp-dev/openobject-server/trunk-refactor-workflow-stw)

bzr revid: stw@openerp.com-20140109095017-xakyapswp8b5eed5
This commit is contained in:
Stephane Wirtel 2014-01-09 10:50:17 +01:00
commit 5aef04550f
6 changed files with 550 additions and 368 deletions

View File

@ -2,7 +2,7 @@
############################################################################## ##############################################################################
# #
# OpenERP, Open Source Management Solution # OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). # Copyright (C) 2004-2014 Tiny SPRL (<http://tiny.be>).
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -19,12 +19,13 @@
# #
############################################################################## ##############################################################################
import instance from openerp.workflow.service import WorkflowService
wkf_on_create_cache = {} # The new API is in openerp.workflow.workflow_service
# OLD API of the Workflow
def clear_cache(cr, uid): def clear_cache(cr, uid):
wkf_on_create_cache[cr.dbname]={} WorkflowService.clear_cache(cr.dbname)
def trg_write(uid, res_type, res_id, cr): def trg_write(uid, res_type, res_id, cr):
""" """
@ -36,10 +37,7 @@ def trg_write(uid, res_type, res_id, cr):
:param res_id: the model instance id the workflow belongs to :param res_id: the model instance id the workflow belongs to
:param cr: a database cursor :param cr: a database cursor
""" """
ident = (uid,res_type,res_id) return WorkflowService.new(cr, uid, res_type, res_id).write()
cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (res_id or None,res_type or None, 'active'))
for (id,) in cr.fetchall():
instance.update(cr, id, ident)
def trg_trigger(uid, res_type, res_id, cr): def trg_trigger(uid, res_type, res_id, cr):
""" """
@ -52,12 +50,7 @@ def trg_trigger(uid, res_type, res_id, cr):
:param res_id: the model instance id the workflow belongs to :param res_id: the model instance id the workflow belongs to
:param cr: a database cursor :param cr: a database cursor
""" """
cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (res_id,res_type)) return WorkflowService.new(cr, uid, res_type, res_id).trigger()
res = cr.fetchall()
for (instance_id,) in res:
cr.execute('select %s,res_type,res_id from wkf_instance where id=%s', (uid, instance_id,))
ident = cr.fetchone()
instance.update(cr, instance_id, ident)
def trg_delete(uid, res_type, res_id, cr): def trg_delete(uid, res_type, res_id, cr):
""" """
@ -67,8 +60,7 @@ def trg_delete(uid, res_type, res_id, cr):
:param res_id: the model instance id the workflow belongs to :param res_id: the model instance id the workflow belongs to
:param cr: a database cursor :param cr: a database cursor
""" """
ident = (uid,res_type,res_id) return WorkflowService.new(cr, uid, res_type, res_id).delete()
instance.delete(cr, ident)
def trg_create(uid, res_type, res_id, cr): def trg_create(uid, res_type, res_id, cr):
""" """
@ -78,16 +70,7 @@ def trg_create(uid, res_type, res_id, cr):
:param res_id: the model instance id to own the created worfklow instance :param res_id: the model instance id to own the created worfklow instance
:param cr: a database cursor :param cr: a database cursor
""" """
ident = (uid,res_type,res_id) return WorkflowService.new(cr, uid, res_type, res_id).create()
wkf_on_create_cache.setdefault(cr.dbname, {})
if res_type in wkf_on_create_cache[cr.dbname]:
wkf_ids = wkf_on_create_cache[cr.dbname][res_type]
else:
cr.execute('select id from wkf where osv=%s and on_create=True', (res_type,))
wkf_ids = cr.fetchall()
wkf_on_create_cache[cr.dbname][res_type] = wkf_ids
for (wkf_id,) in wkf_ids:
instance.create(cr, ident, wkf_id)
def trg_validate(uid, res_type, res_id, signal, cr): def trg_validate(uid, res_type, res_id, signal, cr):
""" """
@ -98,14 +81,8 @@ def trg_validate(uid, res_type, res_id, signal, cr):
:signal: the signal name to be fired :signal: the signal name to be fired
:param cr: a database cursor :param cr: a database cursor
""" """
result = False assert isinstance(signal, basestring)
ident = (uid,res_type,res_id) return WorkflowService.new(cr, uid, res_type, res_id).validate(signal)
# ids of all active workflow instances for a corresponding resource (id, model_nam)
cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (res_id, res_type, 'active'))
for (id,) in cr.fetchall():
res2 = instance.validate(cr, id, ident, signal)
result = result or res2
return result
def trg_redirect(uid, res_type, res_id, new_rid, cr): def trg_redirect(uid, res_type, res_id, new_rid, cr):
""" """
@ -120,22 +97,7 @@ def trg_redirect(uid, res_type, res_id, new_rid, cr):
:param new_rid: the model instance id to own the worfklow instance :param new_rid: the model instance id to own the worfklow instance
:param cr: a database cursor :param cr: a database cursor
""" """
# get ids of wkf instances for the old resource (res_id) assert isinstance(new_rid, (long, int))
#CHECKME: shouldn't we get only active instances? return WorkflowService.new(cr, uid, res_type, res_id).redirect(new_rid)
cr.execute('select id, wkf_id from wkf_instance where res_id=%s and res_type=%s', (res_id, res_type))
for old_inst_id, wkf_id in cr.fetchall():
# first active instance for new resource (new_rid), using same wkf
cr.execute(
'SELECT id '\
'FROM wkf_instance '\
'WHERE res_id=%s AND res_type=%s AND wkf_id=%s AND state=%s',
(new_rid, res_type, wkf_id, 'active'))
new_id = cr.fetchone()
if new_id:
# select all workitems which "wait" for the old instance
cr.execute('select id from wkf_workitem where subflow_id=%s', (old_inst_id,))
for (item_id,) in cr.fetchall():
# redirect all those workitems to the wkf instance of the new resource
cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (new_id[0], item_id))
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,22 @@
import openerp.sql_db
class Session(object):
def __init__(self, cr, uid):
assert isinstance(cr, openerp.sql_db.Cursor)
assert isinstance(uid, (int, long))
self.cr = cr
self.uid = uid
class Record(object):
def __init__(self, model, record_id):
assert isinstance(model, basestring)
assert isinstance(record_id, (int, long))
self.model = model
self.id = record_id
class WorkflowActivity(object):
KIND_FUNCTION = 'function'
KIND_DUMMY = 'dummy'
KIND_STOPALL = 'stopall'
KIND_SUBFLOW = 'subflow'

View File

@ -2,7 +2,7 @@
############################################################################## ##############################################################################
# #
# OpenERP, Open Source Management Solution # OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). # Copyright (C) 2004-2014 Tiny SPRL (<http://tiny.be>).
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -19,59 +19,116 @@
# #
############################################################################## ##############################################################################
import workitem import workitem
from openerp.workflow.helpers import Session
from openerp.workflow.helpers import Record
from openerp.workflow.workitem import WorkflowItem
def create(cr, ident, wkf_id): class WorkflowInstance(object):
(uid,res_type,res_id) = ident def __init__(self, session, record, values):
cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (res_type,res_id,uid,wkf_id)) assert isinstance(session, Session)
id_new = cr.fetchone()[0] assert isinstance(record, Record)
cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (wkf_id,)) self.session = session
res = cr.dictfetchall() self.record = record
stack = []
workitem.create(cr, res, id_new, ident, stack=stack)
update(cr, id_new, ident)
return id_new
def delete(cr, ident): if not values:
(uid,res_type,res_id) = ident values = {}
cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (res_id,res_type))
def validate(cr, inst_id, ident, signal, force_running=False): assert isinstance(values, dict)
cr.execute("select * from wkf_workitem where inst_id=%s", (inst_id,)) self.instance = values
@classmethod
def create(cls, session, record, workflow_id):
assert isinstance(session, Session)
assert isinstance(record, Record)
assert isinstance(workflow_id, (int, long))
cr = session.cr
cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id))
instance_id = cr.fetchone()[0]
cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,))
stack = [] stack = []
for witem in cr.dictfetchall():
activities = cr.dictfetchall()
for activity in activities:
WorkflowItem.create(session, record, activity, instance_id, stack)
cr.execute('SELECT * FROM wkf_instance WHERE id = %s', (instance_id,))
values = cr.dictfetchone()
wi = WorkflowInstance(session, record, values)
wi.update()
return wi
def delete(self):
self.session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (self.record.id, self.record.model))
def validate(self, signal, force_running=False):
assert isinstance(signal, basestring)
assert isinstance(force_running, bool)
cr = self.session.cr
cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],))
stack = [] stack = []
workitem.process(cr, witem, ident, signal, force_running, stack=stack) for work_item_values in cr.dictfetchall():
wi = WorkflowItem(self.session, self.record, work_item_values)
wi.process(signal=signal, force_running=force_running, stack=stack)
# An action is returned # An action is returned
_update_end(cr, inst_id, ident) self._update_end()
return stack and stack[0] or False return stack and stack[0] or False
def update(cr, inst_id, ident): def update(self):
cr.execute("select * from wkf_workitem where inst_id=%s", (inst_id,)) cr = self.session.cr
for witem in cr.dictfetchall():
stack = []
workitem.process(cr, witem, ident, stack=stack)
return _update_end(cr, inst_id, ident)
def _update_end(cr, inst_id, ident): cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],))
cr.execute('select wkf_id from wkf_instance where id=%s', (inst_id,)) for work_item_values in cr.dictfetchall():
stack = []
WorkflowItem(self.session, self.record, work_item_values).process(stack=stack)
return self._update_end()
def _update_end(self):
cr = self.session.cr
instance_id = self.instance['id']
cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,))
wkf_id = cr.fetchone()[0] wkf_id = cr.fetchone()[0]
cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (inst_id,)) cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,))
ok=True ok=True
for r in cr.fetchall(): for r in cr.fetchall():
if (r[0]<>'complete') or not r[1]: if (r[0]<>'complete') or not r[1]:
ok=False ok=False
break break
if ok: if ok:
cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (inst_id,)) cr.execute('select distinct a.name from wkf_activity a left join wkf_workitem w on (a.id=w.act_id) where w.inst_id=%s', (instance_id,))
act_names = cr.fetchall() act_names = cr.fetchall()
cr.execute("update wkf_instance set state='complete' where id=%s", (inst_id,)) cr.execute("update wkf_instance set state='complete' where id=%s", (instance_id,))
cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (inst_id,)) cr.execute("update wkf_workitem set state='complete' where subflow_id=%s", (instance_id,))
cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (inst_id,)) cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (instance_id,))
for i in cr.fetchall(): for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall():
cur_record = Record(cur_model_name, cur_record_id)
for act_name in act_names: for act_name in act_names:
validate(cr, i[0], (ident[0],i[1],i[2]), 'subflow.'+act_name[0]) WorkflowInstance(self.session, cur_record, {'id':cur_instance_id}).validate('subflow.{}'.format(act_name[0]))
return ok return ok
def create(session, record, workflow_id):
return WorkflowInstance(session, record).create(workflow_id)
def delete(session, record):
return WorkflowInstance(session, record).delete()
def validate(session, record, instance_id, signal, force_running=False):
return WorkflowInstance(session, record).validate(instance_id, signal, force_running)
def update(session, record, instance_id):
return WorkflowInstance(session, record).update(instance_id)
def _update_end(session, record, instance_id):
return WorkflowInstance(session, record)._update_end(instance_id)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

115
openerp/workflow/service.py Normal file
View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-TODAY OpenERP S.A. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from helpers import Session
from helpers import Record
from openerp.workflow.instance import WorkflowInstance
# import instance
class WorkflowService(object):
CACHE = {}
@classmethod
def clear_cache(cls, dbname):
cls.CACHE[dbname] = {}
@classmethod
def new(cls, cr, uid, model_name, record_id):
return cls(Session(cr, uid), Record(model_name, record_id))
def __init__(self, session, record):
assert isinstance(session, Session)
assert isinstance(record, Record)
self.session = session
self.record = record
self.cr = self.session.cr
def write(self):
self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s',
(self.record.id or None, self.record.model or None, 'active')
)
for (instance_id,) in self.cr.fetchall():
WorkflowInstance(self.session, self.record, {'id': instance_id}).update()
def trigger(self):
self.cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (self.record.id, self.record.model))
res = self.cr.fetchall()
for (instance_id,) in res:
self.cr.execute('select %s,res_type,res_id from wkf_instance where id=%s', (self.session.uid, instance_id,))
current_uid, current_model_name, current_record_id = self.cr.fetchone()
current_session = Session(self.session.cr, current_uid)
current_record = Record(current_model_name, current_record_id)
WorkflowInstance(current_session, current_record, {'id': instance_id}).update()
def delete(self):
WorkflowInstance(self.session, self.record, {}).delete()
def create(self):
WorkflowService.CACHE.setdefault(self.cr.dbname, {})
wkf_ids = WorkflowService.CACHE[self.cr.dbname].get(self.record.model, None)
if not wkf_ids:
self.cr.execute('select id from wkf where osv=%s and on_create=True', (self.record.model,))
wkf_ids = self.cr.fetchall()
WorkflowService.CACHE[self.cr.dbname][self.record.model] = wkf_ids
for (wkf_id, ) in wkf_ids:
WorkflowInstance.create(self.session, self.record, wkf_id)
def validate(self, signal):
result = False
# ids of all active workflow instances for a corresponding resource (id, model_nam)
self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (self.record.id, self.record.model, 'active'))
# TODO: Refactor the workflow instance object
for (instance_id,) in self.cr.fetchall():
wi = WorkflowInstance(self.session, self.record, {'id': instance_id})
res2 = wi.validate(signal)
result = result or res2
return result
def redirect(self, new_rid):
# get ids of wkf instances for the old resource (res_id)
# CHECKME: shouldn't we get only active instances?
self.cr.execute('select id, wkf_id from wkf_instance where res_id=%s and res_type=%s', (self.record.id, self.record.model))
for old_inst_id, workflow_id in self.cr.fetchall():
# first active instance for new resource (new_rid), using same wkf
self.cr.execute(
'SELECT id '\
'FROM wkf_instance '\
'WHERE res_id=%s AND res_type=%s AND wkf_id=%s AND state=%s',
(new_rid, self.record.model, workflow_id, 'active'))
new_id = self.cr.fetchone()
if new_id:
# select all workitems which "wait" for the old instance
self.cr.execute('select id from wkf_workitem where subflow_id=%s', (old_inst_id,))
for (item_id,) in self.cr.fetchall():
# redirect all those workitems to the wkf instance of the new resource
self.cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (new_id[0], item_id))

View File

@ -1,114 +0,0 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
"""
Evaluate workflow code found in activity actions and transition conditions.
"""
import openerp
from openerp.tools.safe_eval import safe_eval as eval
class Env(dict):
"""
Dictionary class used as an environment to evaluate workflow code (such as
the condition on transitions).
This environment provides sybmols for cr, uid, id, model name, model
instance, column names, and all the record (the one obtained by browsing
the provided ID) attributes.
"""
def __init__(self, cr, uid, model, id):
self.cr = cr
self.uid = uid
self.model = model
self.id = id
self.ids = [id]
self.obj = openerp.registry(cr.dbname)[model]
self.columns = self.obj._columns.keys() + self.obj._inherit_fields.keys()
def __getitem__(self, key):
if (key in self.columns) or (key in dir(self.obj)):
res = self.obj.browse(self.cr, self.uid, self.id)
return res[key]
else:
return super(Env, self).__getitem__(key)
def _eval_expr(cr, ident, workitem, lines):
"""
Evaluate each line of ``lines`` with the ``Env`` environment, returning
the value of the last line.
"""
assert lines, 'You used a NULL action in a workflow, use dummy node instead.'
uid, model, id = ident
result = False
for line in lines.split('\n'):
line = line.strip()
if not line:
continue
if line == 'True':
result = True
elif line == 'False':
result = False
else:
env = Env(cr, uid, model, id)
result = eval(line, env, nocopy=True)
return result
def execute_action(cr, ident, workitem, activity):
"""
Evaluate the ir.actions.server action specified in the activity.
"""
uid, model, id = ident
ir_actions_server = openerp.registry(cr.dbname)['ir.actions.server']
context = { 'active_model': model, 'active_id': id, 'active_ids': [id] }
result = ir_actions_server.run(cr, uid, [activity['action_id']], context)
return result
def execute(cr, ident, workitem, activity):
"""
Evaluate the action specified in the activity.
"""
return _eval_expr(cr, ident, workitem, activity['action'])
def check(cr, workitem, ident, transition, signal):
"""
Test if a transition can be taken. The transition can be taken if:
- the signal name matches,
- the uid is SUPERUSER_ID or the user groups contains the transition's
group,
- the condition evaluates to a truish value.
"""
if transition['signal'] and signal != transition['signal']:
return False
uid = ident[0]
if uid != openerp.SUPERUSER_ID and transition['group_id']:
registry = openerp.registry(cr.dbname)
user_groups = registry['res.users'].read(cr, uid, [uid], ['groups_id'])[0]['groups_id']
if transition['group_id'] not in user_groups:
return False
return _eval_expr(cr, ident, workitem, transition['condition'])
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
############################################################################## ##############################################################################
# #
# OpenERP, Open Source Management Solution # OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). # Copyright (C) 2004-2014 OpenERP S.A. (<http://openerp.com).
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -24,159 +24,238 @@
# cr.execute('delete from wkf_triggers where model=%s and res_id=%s', (res_type,res_id)) # cr.execute('delete from wkf_triggers where model=%s and res_id=%s', (res_type,res_id))
# #
import logging import logging
import instance import instance
import wkf_expr from openerp.workflow.helpers import Session
from openerp.workflow.helpers import Record
from openerp.workflow.helpers import WorkflowActivity
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def create(cr, act_datas, inst_id, ident, stack): import openerp
for act in act_datas: from openerp.tools.safe_eval import safe_eval as eval
class Environment(dict):
"""
Dictionary class used as an environment to evaluate workflow code (such as
the condition on transitions).
This environment provides sybmols for cr, uid, id, model name, model
instance, column names, and all the record (the one obtained by browsing
the provided ID) attributes.
"""
def __init__(self, session, record):
self.cr = session.cr
self.uid = session.uid
self.model = record.model
self.id = record.id
self.ids = [record.id]
self.obj = openerp.registry(self.cr.dbname)[self.model]
self.columns = self.obj._columns.keys() + self.obj._inherit_fields.keys()
def __getitem__(self, key):
if (key in self.columns) or (key in dir(self.obj)):
res = self.obj.browse(self.cr, self.uid, self.id)
return res[key]
else:
return super(Environment, self).__getitem__(key)
class WorkflowItem(object):
def __init__(self, session, record, work_item_values):
assert isinstance(session, Session)
assert isinstance(record, Record)
self.session = session
self.record = record
if not work_item_values:
work_item_values = {}
assert isinstance(work_item_values, dict)
self.workitem = work_item_values
@classmethod
def create(cls, session, record, activity, instance_id, stack):
assert isinstance(session, Session)
assert isinstance(record, Record)
assert isinstance(activity, dict)
assert isinstance(instance_id, (long, int))
assert isinstance(stack, list)
cr = session.cr
cr.execute("select nextval('wkf_workitem_id_seq')") cr.execute("select nextval('wkf_workitem_id_seq')")
id_new = cr.fetchone()[0] id_new = cr.fetchone()[0]
cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, act['id'], inst_id)) cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id))
cr.execute('select * from wkf_workitem where id=%s',(id_new,)) cr.execute('select * from wkf_workitem where id=%s',(id_new,))
res = cr.dictfetchone() work_item_values = cr.dictfetchone()
logger.info('Created workflow item in activity %s', logger.info('Created workflow item in activity %s',
act['id'], extra={'ident': ident}) activity['id'],
process(cr, res, ident, stack=stack) extra={'ident': (session.uid, record.model, record.id)})
def process(cr, workitem, ident, signal=None, force_running=False, stack=None): workflow_item = WorkflowItem(session, record, work_item_values)
workflow_item.process(stack=stack)
@classmethod
def create_all(cls, session, record, activities, instance_id, stack):
assert isinstance(activities, list)
for activity in activities:
cls.create(session, record, activity, instance_id, stack)
def process(self, signal=None, force_running=False, stack=None):
assert isinstance(force_running, bool)
assert stack is not None assert stack is not None
cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],)) cr = self.session.cr
cr.execute('select * from wkf_activity where id=%s', (self.workitem['act_id'],))
activity = cr.dictfetchone() activity = cr.dictfetchone()
triggers = False triggers = False
if workitem['state'] == 'active': if self.workitem['state'] == 'active':
triggers = True triggers = True
if not _execute(cr, workitem, activity, ident, stack): if not self._execute(activity, stack):
return False return False
if force_running or workitem['state'] == 'complete': if force_running or self.workitem['state'] == 'complete':
ok = _split_test(cr, workitem, activity['split_mode'], ident, signal, stack) ok = self._split_test(activity['split_mode'], signal, stack)
triggers = triggers and not ok triggers = triggers and not ok
if triggers: if triggers:
cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],))
for trans in cr.dictfetchall(): for trans in cr.dictfetchall():
if trans['trigger_model']: if trans['trigger_model']:
ids = wkf_expr._eval_expr(cr,ident,workitem,trans['trigger_expr_id']) ids = self.wkf_expr_eval_expr(trans['trigger_expr_id'])
for res_id in ids: for res_id in ids:
cr.execute('select nextval(\'wkf_triggers_id_seq\')') cr.execute('select nextval(\'wkf_triggers_id_seq\')')
id =cr.fetchone()[0] id =cr.fetchone()[0]
cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id,workitem['inst_id'], workitem['id'], id)) cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id, self.workitem['inst_id'], self.workitem['id'], id))
return True return True
def _execute(self, activity, stack):
# ---------------------- PRIVATE FUNCS -------------------------------- """Send a signal to parenrt workflow (signal: subflow.signal_name)"""
def _state_set(cr, workitem, activity, state, ident):
cr.execute('update wkf_workitem set state=%s where id=%s', (state,workitem['id']))
workitem['state'] = state
logger.info("Changed state of work item %s to \"%s\" in activity %s",
workitem['id'], state, activity['id'], extra={'ident': ident})
def _execute(cr, workitem, activity, ident, stack):
result = True result = True
# cr = self.session.cr
# send a signal to parent workflow (signal: subflow.signal_name)
#
signal_todo = [] signal_todo = []
if (workitem['state']=='active') and activity['signal_send']:
cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],))
for i in cr.fetchall():
signal_todo.append((i[0], (ident[0],i[1],i[2]), activity['signal_send']))
if activity['kind']=='dummy': if (self.workitem['state']=='active') and activity['signal_send']:
if workitem['state']=='active': # signal_send']:
_state_set(cr, workitem, activity, 'complete', ident) cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (self.workitem['inst_id'],))
for instance_id, model_name, record_id in cr.fetchall():
record = Record(model_name, record_id)
signal_todo.append((instance_id, record, activity['signal_send']))
if activity['kind'] == WorkflowActivity.KIND_DUMMY:
if self.workitem['state']=='active':
self._state_set(activity, 'complete')
if activity['action_id']: if activity['action_id']:
res2 = wkf_expr.execute_action(cr, ident, workitem, activity) res2 = self.wkf_expr_execute_action(activity)
if res2: if res2:
stack.append(res2) stack.append(res2)
result=res2 result=res2
elif activity['kind']=='function':
if workitem['state']=='active': elif activity['kind'] == WorkflowActivity.KIND_FUNCTION:
_state_set(cr, workitem, activity, 'running', ident)
returned_action = wkf_expr.execute(cr, ident, workitem, activity) if self.workitem['state']=='active':
self._state_set(activity, 'running')
returned_action = self.wkf_expr_execute(activity)
if type(returned_action) in (dict,): if type(returned_action) in (dict,):
stack.append(returned_action) stack.append(returned_action)
if activity['action_id']: if activity['action_id']:
res2 = wkf_expr.execute_action(cr, ident, workitem, activity) res2 = self.wkf_expr_execute_action(activity)
# A client action has been returned # A client action has been returned
if res2: if res2:
stack.append(res2) stack.append(res2)
result=res2 result=res2
_state_set(cr, workitem, activity, 'complete', ident) self._state_set(activity, 'complete')
elif activity['kind']=='stopall':
if workitem['state']=='active': elif activity['kind'] == WorkflowActivity.KIND_STOPALL:
_state_set(cr, workitem, activity, 'running', ident) if self.workitem['state']=='active':
cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id'])) self._state_set(activity, 'running')
cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (self.workitem['inst_id'], self.workitem['id']))
if activity['action']: if activity['action']:
wkf_expr.execute(cr, ident, workitem, activity) self.wkf_expr_execute(activity)
_state_set(cr, workitem, activity, 'complete', ident) self._state_set(activity, 'complete')
elif activity['kind']=='subflow':
if workitem['state']=='active': elif activity['kind'] == WorkflowActivity.KIND_SUBFLOW:
_state_set(cr, workitem, activity, 'running', ident)
if self.workitem['state']=='active':
self._state_set(activity, 'running')
if activity.get('action', False): if activity.get('action', False):
id_new = wkf_expr.execute(cr, ident, workitem, activity) id_new = self.wkf_expr_execute(activity)
if not id_new: if not id_new:
cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],))
return False return False
assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new)) assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new))
cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new, activity['subflow_id'])) cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new, activity['subflow_id']))
id_new = cr.fetchone()[0] id_new = cr.fetchone()[0]
else: else:
id_new = instance.create(cr, ident, activity['subflow_id']) inst = instance.WorkflowInstance(self.session, self.record)
cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id'])) id_new = inst.create(activity['subflow_id'])
workitem['subflow_id'] = id_new
if workitem['state']=='running': cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, self.workitem['id']))
cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],)) self.workitem['subflow_id'] = id_new
if self.workitem['state']=='running':
cr.execute("select state from wkf_instance where id=%s", (self.workitem['subflow_id'],))
state = cr.fetchone()[0] state = cr.fetchone()[0]
if state=='complete': if state=='complete':
_state_set(cr, workitem, activity, 'complete', ident) self._state_set(activity, 'complete')
for t in signal_todo:
instance.validate(cr, t[0], t[1], t[2], force_running=True) for instance_id, record, signal_send in signal_todo:
wi = instance.WorkflowInstance(self.session, record, {'id': instance_id})
wi.validate(signal_send, force_running=True)
return result return result
def _split_test(cr, workitem, split_mode, ident, signal=None, stack=None): def _state_set(self, activity, state):
cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],)) self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, self.workitem['id']))
self.workitem['state'] = state
logger.info('Changed state of work item %s to "%s" in activity %s',
self.workitem['id'], state, activity['id'],
extra={'ident': (self.session.uid, self.record.model, self.record.id)})
def _split_test(self, split_mode, signal, stack):
cr = self.session.cr
cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],))
test = False test = False
transitions = [] transitions = []
alltrans = cr.dictfetchall() alltrans = cr.dictfetchall()
if split_mode=='XOR' or split_mode=='OR':
if split_mode in ('XOR', 'OR'):
for transition in alltrans: for transition in alltrans:
if wkf_expr.check(cr, workitem, ident, transition,signal): if self.wkf_expr_check(transition,signal):
test = True test = True
transitions.append((transition['id'], workitem['inst_id'])) transitions.append((transition['id'], self.workitem['inst_id']))
if split_mode=='XOR': if split_mode=='XOR':
break break
else: else:
test = True test = True
for transition in alltrans: for transition in alltrans:
if not wkf_expr.check(cr, workitem, ident, transition,signal): if not self.wkf_expr_check(transition, signal):
test = False test = False
break break
cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id'])) cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], self.workitem['inst_id']))
if not cr.fetchone()[0]: if not cr.fetchone()[0]:
transitions.append((transition['id'], workitem['inst_id'])) transitions.append((transition['id'], self.workitem['inst_id']))
if test and len(transitions):
if test and transitions:
cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions) cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions)
cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],)) cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],))
for t in transitions: for t in transitions:
_join_test(cr, t[0], t[1], ident, stack) self._join_test(t[0], t[1], stack)
return True return True
return False return False
def _join_test(cr, trans_id, inst_id, ident, stack): def _join_test(self, trans_id, inst_id, stack):
cr = self.session.cr
cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,)) cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,))
activity = cr.dictfetchone() activity = cr.dictfetchone()
if activity['join_mode']=='XOR': if activity['join_mode']=='XOR':
create(cr,[activity], inst_id, ident, stack) WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack)
cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id)) cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id))
else: else:
cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],)) cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],))
@ -191,7 +270,68 @@ def _join_test(cr, trans_id, inst_id, ident, stack):
if ok: if ok:
for (id,) in trans_ids: for (id,) in trans_ids:
cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id)) cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id))
create(cr, [activity], inst_id, ident, stack) WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack)
def wkf_expr_eval_expr(self, lines):
"""
Evaluate each line of ``lines`` with the ``Environment`` environment, returning
the value of the last line.
"""
assert lines, 'You used a NULL action in a workflow, use dummy node instead.'
result = False
for line in lines.split('\n'):
line = line.strip()
if not line:
continue
if line == 'True':
result = True
elif line == 'False':
result = False
else:
env = Environment(self.session, self.record)
result = eval(line, env, nocopy=True)
return result
def wkf_expr_execute_action(self, activity):
"""
Evaluate the ir.actions.server action specified in the activity.
"""
context = {
'active_model': self.record.model,
'active_id': self.record.id,
'active_ids': [self.record.id]
}
ir_actions_server = openerp.registry(self.session.cr.dbname)['ir.actions.server']
result = ir_actions_server.run(self.session.cr, self.session.uid, [activity['action_id']], context)
return result
def wkf_expr_execute(self, activity):
"""
Evaluate the action specified in the activity.
"""
return self.wkf_expr_eval_expr(activity['action'])
def wkf_expr_check(self, transition, signal):
"""
Test if a transition can be taken. The transition can be taken if:
- the signal name matches,
- the uid is SUPERUSER_ID or the user groups contains the transition's
group,
- the condition evaluates to a truish value.
"""
if transition['signal'] and signal != transition['signal']:
return False
if self.session.uid != openerp.SUPERUSER_ID and transition['group_id']:
registry = openerp.registry(self.session.cr.dbname)
user_groups = registry['res.users'].read(self.session.cr, self.session.uid, [self.session.uid], ['groups_id'])[0]['groups_id']
if transition['group_id'] not in user_groups:
return False
return self.wkf_expr_eval_expr(transition['condition'])
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: