Source code for s4.clarity.steputils.actions
# Copyright 2016 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
from __future__ import print_function
import logging
from collections import defaultdict
log = logging.getLogger(__name__)
[docs]def set_next_actions(epp, default_action=None, controls_action=None, failed_qc_action=None, action_func=None):
"""
:type epp: s4.clarity.scripts.StepEPP
:type action_func: (s4.clarity.Artifact) -> str
:param failed_qc_action: applied to any sample or control that has failed qc.
:param controls_action: applied to controls. if this is a qc step, only controls which have passed.
:param action_func: called with each artifact; must return an action (string).
if either failed_qc_action or controls_action are set, and also action_func is set, action_func will only
be called for artifacts which are not caught by those actions.
if action_func is None, or returns None for an artifact, the default is used.
:param default_action: if None, an appropriate action is calculated (e.g. next step, or complete protocol.)
"""
actions_list = epp.step.actions.next_actions
auto_default_action, next_step_uri = _default_action_and_uri_for_step(epp.step)
if default_action is None:
default_action = auto_default_action
log.debug("default action: %s" % default_action)
changed = False
need_full_artifact_fetch = action_func is not None or \
failed_qc_action is not None or \
controls_action is not None
if need_full_artifact_fetch:
artifact_uris = [action["artifact-uri"] for action in actions_list]
epp.step.lims.artifacts.batch_get(artifact_uris)
for action_for_artifact in actions_list:
artifact = epp.step.lims.artifact_from_uri(action_for_artifact["artifact-uri"])
if controls_action is not None and artifact.is_control:
new_action = controls_action
elif failed_qc_action is not None and artifact.qc_failed():
new_action = failed_qc_action
elif action_func is not None:
new_action = action_func(artifact) or default_action
else:
new_action = default_action
if new_action is None:
# only possible if default_action is None
log.info("SKIPPING %s: taking no action (leaving in step)", artifact)
continue
current_action = action_for_artifact.get("action")
if current_action is None or current_action != new_action:
action_for_artifact["action"] = new_action
if new_action == "nextstep":
action_for_artifact["step-uri"] = next_step_uri
elif "step-uri" in action_for_artifact:
del action_for_artifact["step-uri"]
log.info("CHANGED %s: %s" % (artifact, new_action))
changed = True
else:
log.info("UNCHANGED %s: %s" % (artifact, current_action))
if changed:
epp.step.actions.next_actions = actions_list
epp.step.actions.commit()
print("Next Actions Set Successfully")
[docs]def route_to_next_protocol(step, artifacts_to_route):
"""
Queues the given artifacts directly to the first step of the next protocol.
NOTE: Artifacts *must* be in-progress in the current step, or an exception will be thrown.
:type step: step.Step
:type artifacts_to_route: list[s4.clarity.Artifact]
"""
# figure out how many workflow stages need to be skipped
current_protocol_step_count = step.configuration.protocol.number_of_steps
protocol_step_index = step.configuration.protocol_step_index
steps_to_skip = int(current_protocol_step_count - protocol_step_index + 1)
stages_to_artifacts = get_current_workflow_stages(step, artifacts_to_route)
for current_stage, artifact_list in stages_to_artifacts.items():
new_stage_index = int(current_stage.index) + steps_to_skip
if new_stage_index > len(current_stage.workflow.stages):
# This is the last protocol, so don't route the artifact.
log.info("Artifacts with LIMS IDs \"%s\" are in the last protocol. Will not route." % ", ".join(artifact.limsid for artifact in artifact_list))
continue
new_stage_to_route_to = current_stage.workflow.stages[new_stage_index]
new_stage_to_route_to.enqueue(artifact_list)
[docs]def get_current_workflow_stages(step, artifacts):
"""
Given artifacts in a currently running step, finds their current workflow stages.
:returns: a dict mapping workflow stages to lists of artifacts which are currently in them.
:rtype: dict[Stage, list[Artifact]]
"""
iomaps_output_keyed = step.details.iomaps_output_keyed()
stage_to_artifacts = defaultdict(list)
for artifact in artifacts:
# Make sure to get the workflow stages from the input, as it may not be the artifact we're actually routing
inputs = iomaps_output_keyed.get(artifact)
if inputs:
workflow_stages = get_artifact_workflow_stages_for_current_step(step, inputs[0])
else:
workflow_stages = get_artifact_workflow_stages_for_current_step(step, artifact)
for workflow_stage in workflow_stages:
stage_to_artifacts[workflow_stage].append(artifact)
return stage_to_artifacts
[docs]def route_to_stage_by_name(step, artifacts_to_route, target_stage_name,
name_matches_base_name=lambda name, requested: name == requested):
"""
Queues the given artifacts to the first stage in the artifact's workflow with the given name.
NOTE: Artifacts *must* be in-progress in the current step, or an exception will be thrown.
Optionally takes a name comparison function to use. Defaults to exact name matching.
:type step: step.Step
:type artifacts_to_route: list[s4.clarity.Artifact]
:type target_stage_name: str
:type name_matches_base_name: (str, str) -> bool
"""
if len(artifacts_to_route) == 0:
return
stages_to_artifacts = get_current_workflow_stages(step, artifacts_to_route)
for current_stage, artifact_list in stages_to_artifacts.items():
found_stage = False
workflow = current_stage.workflow
for workflow_stage in workflow.stages:
if name_matches_base_name(workflow_stage.name, target_stage_name):
workflow_stage.enqueue(artifact_list)
found_stage = True
break
if not found_stage:
raise Exception("Unable to route artifacts (%s) to stage '%s' -- match not found in workflow." %
(artifact_list, target_stage_name))
[docs]def get_artifact_workflow_stages_for_current_step(step, artifact):
workflow_stages = [stage_history.stage
for stage_history in artifact.workflow_stages
if stage_history.status == "IN_PROGRESS"
and stage_history.stage.step.uri == step.configuration.uri]
if not workflow_stages:
# The artifact is not in progress at the current step, so we can't determine which stage to route to.
raise Exception("Can not retrieve an in-progress workflow stage for artifact '%s' in step '%s'." %
(artifact.name, step.name))
return workflow_stages
def _default_action_and_uri_for_step(step):
stepconf = step.configuration
transitions = stepconf.transitions
if transitions is None or len(transitions) == 0:
if stepconf.protocol_step_index == stepconf.protocol.number_of_steps:
action = "complete"
else:
action = "leave"
next_step_uri = None
else:
action = "nextstep"
next_step_uri = transitions[0]["next-step-uri"]
return action, next_step_uri