[REF] Start to abstract the Workflow engine
bzr revid: stw@openerp.com-20131024075707-k1092bspi6ebixdw
This commit is contained in:
parent
f6c1b6bb86
commit
1c398cd6f8
|
@ -23,7 +23,21 @@ from openerp.workflow.helpers import Session
|
||||||
from openerp.workflow.helpers import Record
|
from openerp.workflow.helpers import Record
|
||||||
from openerp.workflow.workitem import WorkflowItem
|
from openerp.workflow.workitem import WorkflowItem
|
||||||
|
|
||||||
def create(session, record, workflow_id):
|
class WorkflowInstance(object):
|
||||||
|
def __init__(self, session, record, values):
|
||||||
|
assert isinstance(session, Session)
|
||||||
|
assert isinstance(record, Record)
|
||||||
|
self.session = session
|
||||||
|
self.record = record
|
||||||
|
|
||||||
|
if not values:
|
||||||
|
values = {}
|
||||||
|
|
||||||
|
assert isinstance(values, dict)
|
||||||
|
self.instance = values
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, session, record, workflow_id):
|
||||||
assert isinstance(session, Session)
|
assert isinstance(session, Session)
|
||||||
assert isinstance(record, Record)
|
assert isinstance(record, Record)
|
||||||
assert isinstance(workflow_id, (int, long))
|
assert isinstance(workflow_id, (int, long))
|
||||||
|
@ -31,57 +45,50 @@ def create(session, record, workflow_id):
|
||||||
cr = session.cr
|
cr = session.cr
|
||||||
cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id))
|
cr.execute('insert into wkf_instance (res_type,res_id,uid,wkf_id) values (%s,%s,%s,%s) RETURNING id', (record.model, record.id, session.uid, workflow_id))
|
||||||
instance_id = cr.fetchone()[0]
|
instance_id = cr.fetchone()[0]
|
||||||
|
|
||||||
cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,))
|
cr.execute('select * from wkf_activity where flow_start=True and wkf_id=%s', (workflow_id,))
|
||||||
activities = cr.dictfetchall()
|
|
||||||
stack = []
|
stack = []
|
||||||
WorkflowItem(session, record).create(activities, instance_id, stack)
|
|
||||||
update(session, record, instance_id)
|
|
||||||
return instance_id
|
|
||||||
|
|
||||||
def delete(session, record):
|
activities = cr.dictfetchall()
|
||||||
assert isinstance(session, Session)
|
for activity in activities:
|
||||||
assert isinstance(record, Record)
|
WorkflowItem.create(session, record, activity, instance_id, stack)
|
||||||
|
|
||||||
session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (record.id, record.model))
|
cr.execute('SELECT * FROM wkf_instance WHERE id = %s', (instance_id,))
|
||||||
|
values = cr.dictfetchone()
|
||||||
|
wi = WorkflowInstance(session, record, values)
|
||||||
|
wi.update()
|
||||||
|
|
||||||
def validate(session, record, instance_id, signal, force_running=False):
|
return wi
|
||||||
assert isinstance(session, Session)
|
|
||||||
assert isinstance(record, Record)
|
def delete(self):
|
||||||
assert isinstance(instance_id, (long, int))
|
self.session.cr.execute('delete from wkf_instance where res_id=%s and res_type=%s', (self.record.id, self.record.model))
|
||||||
|
|
||||||
|
def validate(self, signal, force_running=False):
|
||||||
assert isinstance(signal, basestring)
|
assert isinstance(signal, basestring)
|
||||||
assert isinstance(force_running, bool)
|
assert isinstance(force_running, bool)
|
||||||
|
|
||||||
cr = session.cr
|
cr = self.session.cr
|
||||||
cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,))
|
cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],))
|
||||||
stack = []
|
stack = []
|
||||||
wi = WorkflowItem(session, record)
|
for work_item_values in cr.dictfetchall():
|
||||||
for work_item in cr.dictfetchall():
|
wi = WorkflowItem(self.session, self.record, work_item_values)
|
||||||
# stack = []
|
wi.process(signal=signal, force_running=force_running, stack=stack)
|
||||||
wi.process(work_item, signal, force_running, stack=stack)
|
|
||||||
# An action is returned
|
# An action is returned
|
||||||
_update_end(session, record, instance_id)
|
self._update_end()
|
||||||
return stack and stack[0] or False
|
return stack and stack[0] or False
|
||||||
|
|
||||||
def update(session, record, instance_id):
|
def update(self):
|
||||||
assert isinstance(session, Session)
|
cr = self.session.cr
|
||||||
assert isinstance(record, Record)
|
|
||||||
assert isinstance(instance_id, (long, int))
|
|
||||||
|
|
||||||
cr = session.cr
|
cr.execute("select * from wkf_workitem where inst_id=%s", (self.instance['id'],))
|
||||||
cr.execute("select * from wkf_workitem where inst_id=%s", (instance_id,))
|
for work_item_values in cr.dictfetchall():
|
||||||
wi = WorkflowItem(session, record)
|
|
||||||
|
|
||||||
for work_item in cr.dictfetchall():
|
|
||||||
stack = []
|
stack = []
|
||||||
wi.process(work_item, stack=stack)
|
WorkflowItem(self.session, self.record, work_item_values).process(stack=stack)
|
||||||
return _update_end(session, record, instance_id)
|
return self._update_end()
|
||||||
|
|
||||||
def _update_end(session, record, instance_id):
|
def _update_end(self):
|
||||||
assert isinstance(session, Session)
|
cr = self.session.cr
|
||||||
assert isinstance(record, Record)
|
instance_id = self.instance['id']
|
||||||
assert isinstance(instance_id, (long, int))
|
|
||||||
|
|
||||||
cr = session.cr
|
|
||||||
cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,))
|
cr.execute('select wkf_id from wkf_instance where id=%s', (instance_id,))
|
||||||
wkf_id = cr.fetchone()[0]
|
wkf_id = cr.fetchone()[0]
|
||||||
cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,))
|
cr.execute('select state,flow_stop from wkf_workitem w left join wkf_activity a on (a.id=w.act_id) where w.inst_id=%s', (instance_id,))
|
||||||
|
@ -99,10 +106,29 @@ def _update_end(session, record, instance_id):
|
||||||
for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall():
|
for cur_instance_id, cur_model_name, cur_record_id in cr.fetchall():
|
||||||
cur_record = Record(cur_model_name, cur_record_id)
|
cur_record = Record(cur_model_name, cur_record_id)
|
||||||
for act_name in act_names:
|
for act_name in act_names:
|
||||||
validate(session, cur_record, cur_instance_id, 'subflow.%s' % act_name[0])
|
WorkflowInstance(self.session, cur_record, {'id':cur_instance_id}).validate('subflow.{}'.format(act_name[0]))
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def create(session, record, workflow_id):
|
||||||
|
return WorkflowInstance(session, record).create(workflow_id)
|
||||||
|
|
||||||
|
def delete(session, record):
|
||||||
|
return WorkflowInstance(session, record).delete()
|
||||||
|
|
||||||
|
def validate(session, record, instance_id, signal, force_running=False):
|
||||||
|
return WorkflowInstance(session, record).validate(instance_id, signal, force_running)
|
||||||
|
|
||||||
|
def update(session, record, instance_id):
|
||||||
|
return WorkflowInstance(session, record).update(instance_id)
|
||||||
|
|
||||||
|
def _update_end(session, record, instance_id):
|
||||||
|
return WorkflowInstance(session, record)._update_end(instance_id)
|
||||||
|
|
||||||
|
|
||||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,8 @@
|
||||||
from helpers import Session
|
from helpers import Session
|
||||||
from helpers import Record
|
from helpers import Record
|
||||||
|
|
||||||
import instance
|
from openerp.workflow.instance import WorkflowInstance
|
||||||
|
# import instance
|
||||||
|
|
||||||
|
|
||||||
class WorkflowService(object):
|
class WorkflowService(object):
|
||||||
|
@ -49,7 +50,7 @@ class WorkflowService(object):
|
||||||
(self.record.id or None, self.record.model or None, 'active')
|
(self.record.id or None, self.record.model or None, 'active')
|
||||||
)
|
)
|
||||||
for (instance_id,) in self.cr.fetchall():
|
for (instance_id,) in self.cr.fetchall():
|
||||||
instance.update(self.session, self.record, instance_id)
|
WorkflowInstance(self.session, self.record, {'id': instance_id}).update()
|
||||||
|
|
||||||
def trigger(self):
|
def trigger(self):
|
||||||
self.cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (self.record.id, self.record.model))
|
self.cr.execute('select instance_id from wkf_triggers where res_id=%s and model=%s', (self.record.id, self.record.model))
|
||||||
|
@ -61,10 +62,10 @@ class WorkflowService(object):
|
||||||
current_session = Session(self.session.cr, current_uid)
|
current_session = Session(self.session.cr, current_uid)
|
||||||
current_record = Record(current_model_name, current_record_id)
|
current_record = Record(current_model_name, current_record_id)
|
||||||
|
|
||||||
instance.update(current_session, current_record, instance_id)
|
WorkflowInstance(current_session, current_record, {'id': instance_id}).update()
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
instance.delete(self.session, self.record)
|
WorkflowInstance(self.session, self.record, {}).delete()
|
||||||
|
|
||||||
def create(self):
|
def create(self):
|
||||||
WorkflowService.CACHE.setdefault(self.cr.dbname, {})
|
WorkflowService.CACHE.setdefault(self.cr.dbname, {})
|
||||||
|
@ -77,14 +78,18 @@ class WorkflowService(object):
|
||||||
WorkflowService.CACHE[self.cr.dbname][self.record.model] = wkf_ids
|
WorkflowService.CACHE[self.cr.dbname][self.record.model] = wkf_ids
|
||||||
|
|
||||||
for (wkf_id, ) in wkf_ids:
|
for (wkf_id, ) in wkf_ids:
|
||||||
instance.create(self.session, self.record, wkf_id)
|
WorkflowInstance.create(self.session, self.record, wkf_id)
|
||||||
|
|
||||||
def validate(self, signal):
|
def validate(self, signal):
|
||||||
result = False
|
result = False
|
||||||
# ids of all active workflow instances for a corresponding resource (id, model_nam)
|
# ids of all active workflow instances for a corresponding resource (id, model_nam)
|
||||||
self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (self.record.id, self.record.model, 'active'))
|
self.cr.execute('select id from wkf_instance where res_id=%s and res_type=%s and state=%s', (self.record.id, self.record.model, 'active'))
|
||||||
|
# TODO: Refactor the workflow instance object
|
||||||
for (instance_id,) in self.cr.fetchall():
|
for (instance_id,) in self.cr.fetchall():
|
||||||
res2 = instance.validate(self.session, self.record, instance_id, signal)
|
wi = WorkflowInstance(self.session, self.record, {'id': instance_id})
|
||||||
|
|
||||||
|
res2 = wi.validate(signal)
|
||||||
|
|
||||||
result = result or res2
|
result = result or res2
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,219 @@ class Environment(dict):
|
||||||
else:
|
else:
|
||||||
return super(Environment, self).__getitem__(key)
|
return super(Environment, self).__getitem__(key)
|
||||||
|
|
||||||
def wkf_expr_eval_expr(session, record, workitem, lines):
|
|
||||||
|
class WorkflowItem(object):
|
||||||
|
def __init__(self, session, record, work_item_values):
|
||||||
|
assert isinstance(session, Session)
|
||||||
|
assert isinstance(record, Record)
|
||||||
|
self.session = session
|
||||||
|
self.record = record
|
||||||
|
|
||||||
|
if not work_item_values:
|
||||||
|
work_item_values = {}
|
||||||
|
|
||||||
|
assert isinstance(work_item_values, dict)
|
||||||
|
self.workitem = work_item_values
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, session, record, activity, instance_id, stack):
|
||||||
|
assert isinstance(session, Session)
|
||||||
|
assert isinstance(record, Record)
|
||||||
|
assert isinstance(activity, dict)
|
||||||
|
assert isinstance(instance_id, (long, int))
|
||||||
|
assert isinstance(stack, list)
|
||||||
|
|
||||||
|
cr = session.cr
|
||||||
|
cr.execute("select nextval('wkf_workitem_id_seq')")
|
||||||
|
id_new = cr.fetchone()[0]
|
||||||
|
cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id))
|
||||||
|
cr.execute('select * from wkf_workitem where id=%s',(id_new,))
|
||||||
|
work_item_values = cr.dictfetchone()
|
||||||
|
logger.info('Created workflow item in activity %s',
|
||||||
|
activity['id'],
|
||||||
|
extra={'ident': (session.uid, record.model, record.id)})
|
||||||
|
|
||||||
|
workflow_item = WorkflowItem(session, record, work_item_values)
|
||||||
|
workflow_item.process(stack=stack)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create_all(cls, session, record, activities, instance_id, stack):
|
||||||
|
assert isinstance(activities, list)
|
||||||
|
|
||||||
|
for activity in activities:
|
||||||
|
cls.create(session, record, activity, instance_id, stack)
|
||||||
|
|
||||||
|
def process(self, signal=None, force_running=False, stack=None):
|
||||||
|
assert isinstance(force_running, bool)
|
||||||
|
assert stack is not None
|
||||||
|
|
||||||
|
cr = self.session.cr
|
||||||
|
|
||||||
|
cr.execute('select * from wkf_activity where id=%s', (self.workitem['act_id'],))
|
||||||
|
activity = cr.dictfetchone()
|
||||||
|
|
||||||
|
triggers = False
|
||||||
|
if self.workitem['state'] == 'active':
|
||||||
|
triggers = True
|
||||||
|
if not self._execute(activity, stack):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if force_running or self.workitem['state'] == 'complete':
|
||||||
|
ok = self._split_test(activity['split_mode'], signal, stack)
|
||||||
|
triggers = triggers and not ok
|
||||||
|
|
||||||
|
if triggers:
|
||||||
|
cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],))
|
||||||
|
for trans in cr.dictfetchall():
|
||||||
|
if trans['trigger_model']:
|
||||||
|
ids = self.wkf_expr_eval_expr(trans['trigger_expr_id'])
|
||||||
|
for res_id in ids:
|
||||||
|
cr.execute('select nextval(\'wkf_triggers_id_seq\')')
|
||||||
|
id =cr.fetchone()[0]
|
||||||
|
cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id, self.workitem['inst_id'], self.workitem['id'], id))
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _execute(self, activity, stack):
|
||||||
|
"""Send a signal to parenrt workflow (signal: subflow.signal_name)"""
|
||||||
|
result = True
|
||||||
|
cr = self.session.cr
|
||||||
|
signal_todo = []
|
||||||
|
|
||||||
|
if (self.workitem['state']=='active') and activity['signal_send']:
|
||||||
|
# signal_send']:
|
||||||
|
cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (self.workitem['inst_id'],))
|
||||||
|
for instance_id, model_name, record_id in cr.fetchall():
|
||||||
|
record = Record(model_name, record_id)
|
||||||
|
signal_todo.append((instance_id, record, activity['signal_send']))
|
||||||
|
|
||||||
|
|
||||||
|
if activity['kind'] == WorkflowActivity.KIND_DUMMY:
|
||||||
|
if self.workitem['state']=='active':
|
||||||
|
self._state_set(activity, 'complete')
|
||||||
|
if activity['action_id']:
|
||||||
|
res2 = self.wkf_expr_execute_action(activity)
|
||||||
|
if res2:
|
||||||
|
stack.append(res2)
|
||||||
|
result=res2
|
||||||
|
|
||||||
|
elif activity['kind'] == WorkflowActivity.KIND_FUNCTION:
|
||||||
|
|
||||||
|
if self.workitem['state']=='active':
|
||||||
|
self._state_set(activity, 'running')
|
||||||
|
returned_action = self.wkf_expr_execute(activity)
|
||||||
|
if type(returned_action) in (dict,):
|
||||||
|
stack.append(returned_action)
|
||||||
|
if activity['action_id']:
|
||||||
|
res2 = self.wkf_expr_execute_action(activity)
|
||||||
|
# A client action has been returned
|
||||||
|
if res2:
|
||||||
|
stack.append(res2)
|
||||||
|
result=res2
|
||||||
|
self._state_set(activity, 'complete')
|
||||||
|
|
||||||
|
elif activity['kind'] == WorkflowActivity.KIND_STOPALL:
|
||||||
|
if self.workitem['state']=='active':
|
||||||
|
self._state_set(activity, 'running')
|
||||||
|
cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (self.workitem['inst_id'], self.workitem['id']))
|
||||||
|
if activity['action']:
|
||||||
|
self.wkf_expr_execute(activity)
|
||||||
|
self._state_set(activity, 'complete')
|
||||||
|
|
||||||
|
elif activity['kind'] == WorkflowActivity.KIND_SUBFLOW:
|
||||||
|
|
||||||
|
if self.workitem['state']=='active':
|
||||||
|
|
||||||
|
self._state_set(activity, 'running')
|
||||||
|
if activity.get('action', False):
|
||||||
|
id_new = self.wkf_expr_execute(activity)
|
||||||
|
if not id_new:
|
||||||
|
cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],))
|
||||||
|
return False
|
||||||
|
assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new))
|
||||||
|
cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new, activity['subflow_id']))
|
||||||
|
id_new = cr.fetchone()[0]
|
||||||
|
else:
|
||||||
|
inst = instance.WorkflowInstance(self.session, self.record)
|
||||||
|
id_new = inst.create(activity['subflow_id'])
|
||||||
|
|
||||||
|
cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, self.workitem['id']))
|
||||||
|
self.workitem['subflow_id'] = id_new
|
||||||
|
|
||||||
|
if self.workitem['state']=='running':
|
||||||
|
cr.execute("select state from wkf_instance where id=%s", (self.workitem['subflow_id'],))
|
||||||
|
state = cr.fetchone()[0]
|
||||||
|
if state=='complete':
|
||||||
|
self._state_set(activity, 'complete')
|
||||||
|
|
||||||
|
for instance_id, record, signal_send in signal_todo:
|
||||||
|
wi = instance.WorkflowInstance(self.session, record, {'id': instance_id})
|
||||||
|
wi.validate(signal_send, force_running=True)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _state_set(self, activity, state):
|
||||||
|
self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, self.workitem['id']))
|
||||||
|
self.workitem['state'] = state
|
||||||
|
logger.info('Changed state of work item %s to "%s" in activity %s',
|
||||||
|
self.workitem['id'], state, activity['id'],
|
||||||
|
extra={'ident': (self.session.uid, self.record.model, self.record.id)})
|
||||||
|
|
||||||
|
def _split_test(self, split_mode, signal, stack):
|
||||||
|
cr = self.session.cr
|
||||||
|
cr.execute('select * from wkf_transition where act_from=%s', (self.workitem['act_id'],))
|
||||||
|
test = False
|
||||||
|
transitions = []
|
||||||
|
alltrans = cr.dictfetchall()
|
||||||
|
|
||||||
|
if split_mode in ('XOR', 'OR'):
|
||||||
|
for transition in alltrans:
|
||||||
|
if self.wkf_expr_check(transition,signal):
|
||||||
|
test = True
|
||||||
|
transitions.append((transition['id'], self.workitem['inst_id']))
|
||||||
|
if split_mode=='XOR':
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
test = True
|
||||||
|
for transition in alltrans:
|
||||||
|
if not self.wkf_expr_check(transition, signal):
|
||||||
|
test = False
|
||||||
|
break
|
||||||
|
cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], self.workitem['inst_id']))
|
||||||
|
if not cr.fetchone()[0]:
|
||||||
|
transitions.append((transition['id'], self.workitem['inst_id']))
|
||||||
|
|
||||||
|
if test and transitions:
|
||||||
|
cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions)
|
||||||
|
cr.execute('delete from wkf_workitem where id=%s', (self.workitem['id'],))
|
||||||
|
for t in transitions:
|
||||||
|
self._join_test(t[0], t[1], stack)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _join_test(self, trans_id, inst_id, stack):
|
||||||
|
cr = self.session.cr
|
||||||
|
cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,))
|
||||||
|
activity = cr.dictfetchone()
|
||||||
|
if activity['join_mode']=='XOR':
|
||||||
|
WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack)
|
||||||
|
cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id))
|
||||||
|
else:
|
||||||
|
cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],))
|
||||||
|
trans_ids = cr.fetchall()
|
||||||
|
ok = True
|
||||||
|
for (id,) in trans_ids:
|
||||||
|
cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id))
|
||||||
|
res = cr.fetchone()[0]
|
||||||
|
if not res:
|
||||||
|
ok = False
|
||||||
|
break
|
||||||
|
if ok:
|
||||||
|
for (id,) in trans_ids:
|
||||||
|
cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id))
|
||||||
|
WorkflowItem.create(self.session, self.record, activity, inst_id, stack=stack)
|
||||||
|
|
||||||
|
def wkf_expr_eval_expr(self, lines):
|
||||||
"""
|
"""
|
||||||
Evaluate each line of ``lines`` with the ``Environment`` environment, returning
|
Evaluate each line of ``lines`` with the ``Environment`` environment, returning
|
||||||
the value of the last line.
|
the value of the last line.
|
||||||
|
@ -76,26 +288,32 @@ def wkf_expr_eval_expr(session, record, workitem, lines):
|
||||||
elif line == 'False':
|
elif line == 'False':
|
||||||
result = False
|
result = False
|
||||||
else:
|
else:
|
||||||
env = Environment(session, record)
|
env = Environment(self.session, self.record)
|
||||||
result = eval(line, env, nocopy=True)
|
result = eval(line, env, nocopy=True)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def wkf_expr_execute_action(session, record, workitem, activity):
|
def wkf_expr_execute_action(self, activity):
|
||||||
"""
|
"""
|
||||||
Evaluate the ir.actions.server action specified in the activity.
|
Evaluate the ir.actions.server action specified in the activity.
|
||||||
"""
|
"""
|
||||||
ir_actions_server = openerp.registry(session.cr.dbname)['ir.actions.server']
|
context = {
|
||||||
context = { 'active_model': record.model, 'active_id': record.id, 'active_ids': [record.id] }
|
'active_model': self.record.model,
|
||||||
result = ir_actions_server.run(session.cr, session.uid, [activity['action_id']], context)
|
'active_id': self.record.id,
|
||||||
|
'active_ids': [self.record.id]
|
||||||
|
}
|
||||||
|
|
||||||
|
ir_actions_server = openerp.registry(self.session.cr.dbname)['ir.actions.server']
|
||||||
|
result = ir_actions_server.run(self.session.cr, self.session.uid, [activity['action_id']], context)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def wkf_expr_execute(session, record, workitem, activity):
|
def wkf_expr_execute(self, activity):
|
||||||
"""
|
"""
|
||||||
Evaluate the action specified in the activity.
|
Evaluate the action specified in the activity.
|
||||||
"""
|
"""
|
||||||
return wkf_expr_eval_expr(session, record, workitem, activity['action'])
|
return self.wkf_expr_eval_expr(activity['action'])
|
||||||
|
|
||||||
def wkf_expr_check(session, record, workitem, transition, signal):
|
def wkf_expr_check(self, transition, signal):
|
||||||
"""
|
"""
|
||||||
Test if a transition can be taken. The transition can be taken if:
|
Test if a transition can be taken. The transition can be taken if:
|
||||||
|
|
||||||
|
@ -107,214 +325,13 @@ def wkf_expr_check(session, record, workitem, transition, signal):
|
||||||
if transition['signal'] and signal != transition['signal']:
|
if transition['signal'] and signal != transition['signal']:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if session.uid != openerp.SUPERUSER_ID and transition['group_id']:
|
if self.session.uid != openerp.SUPERUSER_ID and transition['group_id']:
|
||||||
registry = openerp.registry(session.cr.dbname)
|
registry = openerp.registry(self.session.cr.dbname)
|
||||||
user_groups = registry['res.users'].read(session.cr, session.uid, [session.uid], ['groups_id'])[0]['groups_id']
|
user_groups = registry['res.users'].read(self.session.cr, self.session.uid, [self.session.uid], ['groups_id'])[0]['groups_id']
|
||||||
if transition['group_id'] not in user_groups:
|
if transition['group_id'] not in user_groups:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return wkf_expr_eval_expr(session, record, workitem, transition['condition'])
|
return self.wkf_expr_eval_expr(transition['condition'])
|
||||||
|
|
||||||
|
|
||||||
class WorkflowItem(object):
|
|
||||||
def __init__(self, session, record):
|
|
||||||
assert isinstance(session, Session)
|
|
||||||
assert isinstance(record, Record)
|
|
||||||
self.session = session
|
|
||||||
self.record = record
|
|
||||||
|
|
||||||
def create(self, activities, instance_id, stack):
|
|
||||||
assert isinstance(activities, list)
|
|
||||||
assert isinstance(instance_id, (long, int))
|
|
||||||
assert isinstance(stack, list)
|
|
||||||
|
|
||||||
cr = self.session.cr
|
|
||||||
for activity in activities:
|
|
||||||
cr.execute("select nextval('wkf_workitem_id_seq')")
|
|
||||||
id_new = cr.fetchone()[0]
|
|
||||||
cr.execute("insert into wkf_workitem (id,act_id,inst_id,state) values (%s,%s,%s,'active')", (id_new, activity['id'], instance_id))
|
|
||||||
cr.execute('select * from wkf_workitem where id=%s',(id_new,))
|
|
||||||
work_item = cr.dictfetchone()
|
|
||||||
logger.info('Created workflow item in activity %s',
|
|
||||||
activity['id'],
|
|
||||||
extra={'ident': (self.session.uid, self.record.model, self.record.id)})
|
|
||||||
|
|
||||||
self.process(work_item, stack=stack)
|
|
||||||
|
|
||||||
def process(self, workitem, signal=None, force_running=False, stack=None):
|
|
||||||
assert isinstance(workitem, dict)
|
|
||||||
assert isinstance(force_running, bool)
|
|
||||||
|
|
||||||
# return _process(self.session, self.record, work_item, signal=signal, force_running=force_running, stack=stack)
|
|
||||||
# assert isinstance(force_running, bool)
|
|
||||||
|
|
||||||
assert stack is not None
|
|
||||||
|
|
||||||
cr = self.session.cr
|
|
||||||
|
|
||||||
cr.execute('select * from wkf_activity where id=%s', (workitem['act_id'],))
|
|
||||||
activity = cr.dictfetchone()
|
|
||||||
|
|
||||||
triggers = False
|
|
||||||
if workitem['state'] == 'active':
|
|
||||||
triggers = True
|
|
||||||
if not self._execute(workitem, activity, stack):
|
|
||||||
return False
|
|
||||||
|
|
||||||
if force_running or workitem['state'] == 'complete':
|
|
||||||
ok = self._split_test(workitem, activity['split_mode'], signal, stack)
|
|
||||||
triggers = triggers and not ok
|
|
||||||
|
|
||||||
if triggers:
|
|
||||||
cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],))
|
|
||||||
for trans in cr.dictfetchall():
|
|
||||||
if trans['trigger_model']:
|
|
||||||
ids = wkf_expr_eval_expr(self.session, self.record, workitem, trans['trigger_expr_id'])
|
|
||||||
for res_id in ids:
|
|
||||||
cr.execute('select nextval(\'wkf_triggers_id_seq\')')
|
|
||||||
id =cr.fetchone()[0]
|
|
||||||
cr.execute('insert into wkf_triggers (model,res_id,instance_id,workitem_id,id) values (%s,%s,%s,%s,%s)', (trans['trigger_model'],res_id,workitem['inst_id'], workitem['id'], id))
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _execute(self, workitem, activity, stack):
|
|
||||||
"""Send a signal to parenrt workflow (signal: subflow.signal_name)"""
|
|
||||||
result = True
|
|
||||||
cr = self.session.cr
|
|
||||||
signal_todo = []
|
|
||||||
|
|
||||||
if (workitem['state']=='active') and activity['signal_send']:
|
|
||||||
# signal_send']:
|
|
||||||
cr.execute("select i.id,w.osv,i.res_id from wkf_instance i left join wkf w on (i.wkf_id=w.id) where i.id IN (select inst_id from wkf_workitem where subflow_id=%s)", (workitem['inst_id'],))
|
|
||||||
for instance_id, model_name, record_id in cr.fetchall():
|
|
||||||
record = Record(model_name, record_id)
|
|
||||||
signal_todo.append((instance_id, record, activity['signal_send']))
|
|
||||||
|
|
||||||
|
|
||||||
if activity['kind'] == WorkflowActivity.KIND_DUMMY:
|
|
||||||
if workitem['state']=='active':
|
|
||||||
self._state_set(workitem, activity, 'complete')
|
|
||||||
if activity['action_id']:
|
|
||||||
res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity)
|
|
||||||
if res2:
|
|
||||||
stack.append(res2)
|
|
||||||
result=res2
|
|
||||||
|
|
||||||
elif activity['kind'] == WorkflowActivity.KIND_FUNCTION:
|
|
||||||
|
|
||||||
if workitem['state']=='active':
|
|
||||||
self._state_set(workitem, activity, 'running')
|
|
||||||
returned_action = wkf_expr_execute(self.session, self.record, workitem, activity)
|
|
||||||
if type(returned_action) in (dict,):
|
|
||||||
stack.append(returned_action)
|
|
||||||
if activity['action_id']:
|
|
||||||
res2 = wkf_expr_execute_action(self.session, self.record, workitem, activity)
|
|
||||||
# A client action has been returned
|
|
||||||
if res2:
|
|
||||||
stack.append(res2)
|
|
||||||
result=res2
|
|
||||||
self._state_set(workitem, activity, 'complete')
|
|
||||||
|
|
||||||
elif activity['kind'] == WorkflowActivity.KIND_STOPALL:
|
|
||||||
if workitem['state']=='active':
|
|
||||||
self._state_set(workitem, activity, 'running')
|
|
||||||
cr.execute('delete from wkf_workitem where inst_id=%s and id<>%s', (workitem['inst_id'], workitem['id']))
|
|
||||||
if activity['action']:
|
|
||||||
wkf_expr_execute(self.session, self.record, workitem, activity)
|
|
||||||
self._state_set(workitem, activity, 'complete')
|
|
||||||
|
|
||||||
elif activity['kind'] == WorkflowActivity.KIND_SUBFLOW:
|
|
||||||
|
|
||||||
if workitem['state']=='active':
|
|
||||||
|
|
||||||
self._state_set(workitem, activity, 'running')
|
|
||||||
if activity.get('action', False):
|
|
||||||
id_new = wkf_expr_execute(self.session, self.record, workitem, activity)
|
|
||||||
if not id_new:
|
|
||||||
cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],))
|
|
||||||
return False
|
|
||||||
assert type(id_new)==type(1) or type(id_new)==type(1L), 'Wrong return value: '+str(id_new)+' '+str(type(id_new))
|
|
||||||
cr.execute('select id from wkf_instance where res_id=%s and wkf_id=%s', (id_new,activity['subflow_id']))
|
|
||||||
id_new = cr.fetchone()[0]
|
|
||||||
else:
|
|
||||||
id_new = instance.create(self.session, self.record, activity['subflow_id'])
|
|
||||||
cr.execute('update wkf_workitem set subflow_id=%s where id=%s', (id_new, workitem['id']))
|
|
||||||
workitem['subflow_id'] = id_new
|
|
||||||
|
|
||||||
if workitem['state']=='running':
|
|
||||||
cr.execute("select state from wkf_instance where id=%s", (workitem['subflow_id'],))
|
|
||||||
state= cr.fetchone()[0]
|
|
||||||
if state=='complete':
|
|
||||||
self._state_set(workitem, activity, 'complete')
|
|
||||||
|
|
||||||
for instance_id, record, signal_send in signal_todo:
|
|
||||||
instance.validate(self.session, self.record, signal_send, force_running=True)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _state_set(self, workitem, activity, state):
|
|
||||||
self.session.cr.execute('update wkf_workitem set state=%s where id=%s', (state, workitem['id']))
|
|
||||||
workitem['state'] = state
|
|
||||||
logger.info('Changed state of work item %s to "%s" in activity %s',
|
|
||||||
workitem['id'], state, activity['id'],
|
|
||||||
extra={'ident': (self.session.uid, self.record.model, self.record.id)})
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _split_test(self, workitem, split_mode, signal, stack):
|
|
||||||
cr = self.session.cr
|
|
||||||
cr.execute('select * from wkf_transition where act_from=%s', (workitem['act_id'],))
|
|
||||||
test = False
|
|
||||||
transitions = []
|
|
||||||
alltrans = cr.dictfetchall()
|
|
||||||
|
|
||||||
if split_mode in ('XOR', 'OR'):
|
|
||||||
for transition in alltrans:
|
|
||||||
if wkf_expr_check(self.session, self.record, workitem, transition,signal):
|
|
||||||
test = True
|
|
||||||
transitions.append((transition['id'], workitem['inst_id']))
|
|
||||||
if split_mode=='XOR':
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
test = True
|
|
||||||
for transition in alltrans:
|
|
||||||
if not wkf_expr_check(self.session, self.record, workitem, transition,signal):
|
|
||||||
test = False
|
|
||||||
break
|
|
||||||
cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (transition['id'], workitem['inst_id']))
|
|
||||||
if not cr.fetchone()[0]:
|
|
||||||
transitions.append((transition['id'], workitem['inst_id']))
|
|
||||||
|
|
||||||
if test and transitions:
|
|
||||||
cr.executemany('insert into wkf_witm_trans (trans_id,inst_id) values (%s,%s)', transitions)
|
|
||||||
cr.execute('delete from wkf_workitem where id=%s', (workitem['id'],))
|
|
||||||
for t in transitions:
|
|
||||||
self._join_test(t[0], t[1], stack)
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _join_test(self, trans_id, inst_id, stack):
|
|
||||||
cr = self.session.cr
|
|
||||||
cr.execute('select * from wkf_activity where id=(select act_to from wkf_transition where id=%s)', (trans_id,))
|
|
||||||
activity = cr.dictfetchone()
|
|
||||||
if activity['join_mode']=='XOR':
|
|
||||||
self.create([activity], inst_id, stack)
|
|
||||||
cr.execute('delete from wkf_witm_trans where inst_id=%s and trans_id=%s', (inst_id,trans_id))
|
|
||||||
else:
|
|
||||||
cr.execute('select id from wkf_transition where act_to=%s', (activity['id'],))
|
|
||||||
trans_ids = cr.fetchall()
|
|
||||||
ok = True
|
|
||||||
for (id,) in trans_ids:
|
|
||||||
cr.execute('select count(*) from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id))
|
|
||||||
res = cr.fetchone()[0]
|
|
||||||
if not res:
|
|
||||||
ok = False
|
|
||||||
break
|
|
||||||
if ok:
|
|
||||||
for (id,) in trans_ids:
|
|
||||||
cr.execute('delete from wkf_witm_trans where trans_id=%s and inst_id=%s', (id,inst_id))
|
|
||||||
self.create([activity], inst_id, stack)
|
|
||||||
|
|
||||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue