# Copyright 2016 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import logging
import time
import re
from s4.clarity.artifact import Artifact
from s4.clarity.researcher import Researcher
from s4.clarity import lazy_property
from . import ETree, ClarityException
from . import types
from .instrument import Instrument
from .reagent_lot import ReagentLot
from .container import Container
from .iomaps import IOMapsMixin
from ._internal import ClarityElement, WrappedXml, FieldsMixin
from ._internal.props import subnode_property_list_of_dicts, subnode_property, attribute_property, subnode_link, subnode_links, subnode_element_list
log = logging.getLogger(__name__)
# Step actions that can be assigned in the Next Step drop down
COMPLETE_ACTION = "complete"
REMOVE_FROM_WORKFLOW_ACTION = "remove"
COMPLETE_AND_REPEAT_ACTION = "completerepeat"
REPEAT_STEP_ACTION = "repeat"
REWORK_ACTION = "rework"
REVIEW_ACTION = "review"
STORE_ACTION = "store"
NEXT_STEP_ACTION = "nextstep"
LEAVE_IN_QC_PROTOCOL_ACTION = "leave"
PROGRAM_STATUS_ERROR = "ERROR"
PROGRAM_STATUS_RUNNING = "RUNNING"
PROGRAM_STATUS_QUEUED = "QUEUED"
[docs]class Step(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}step"
date_started = subnode_property("date-started", types.DATETIME)
date_completed = subnode_property("date-completed", types.DATETIME)
name = subnode_property("configuration", readonly=True)
[docs] def open_resultfile(self, name, mode, only_write_locally=False, limsid=None):
"""
:type name: str
:type mode: str
:type only_write_locally: bool
:type limsid: str
:rtype: s4.clarity.File
"""
if limsid is not None:
def filterfunc(o):
return o.limsid == limsid
filterdesc = "limsid = %s" % limsid
else:
def filterfunc(o):
return o.name == name
filterdesc = "name = %s" % name
matches = list(filter(filterfunc, self.details.shared_outputs))
if not matches:
raise ValueError("No output file matching filter: " + filterdesc)
if len(matches) > 1:
raise ValueError("Multiple output artifacts matching filter: " + filterdesc)
return matches[0].open_file(mode, only_write_locally, name)
[docs] @lazy_property
def details(self):
"""
:type: StepDetails
"""
return StepDetails(self, self.lims, self.uri + "/details")
[docs] @lazy_property
def actions(self):
"""
:type: StepActions
"""
return StepActions(self.lims, self.uri + "/actions")
[docs] @lazy_property
def configuration(self):
"""
:type: StepConfiguration
"""
node = self.xml_find("./configuration")
return self.lims.stepconfiguration_from_uri(node.get("uri"))
@property
def automatic_next_step(self):
"""
:type: Step
"""
return self.lims.steps.from_link_node(self.xml_find("automatic-next-step"))
[docs] @lazy_property
def pooling(self):
"""
:type: StepPools
"""
pool_link_node = self.xml_find("./pools")
if pool_link_node is None:
return None
return StepPools(self, self.lims, pool_link_node.get("uri"))
[docs] @lazy_property
def placements(self):
"""
:type: StepPlacements
"""
placements_node = self.xml_find("./placements")
if placements_node is None:
return None
return StepPlacements(self.lims, placements_node.get("uri"))
[docs] @lazy_property
def reagent_lots(self):
"""
:type: StepReagentLots
"""
return StepReagentLots(self, self.lims, self.uri + "/reagentlots")
[docs] @lazy_property
def reagents(self):
"""
:type: StepReagents
"""
return StepReagents(self, self.lims, self.uri + "/reagents")
[docs] @lazy_property
def program_status(self):
"""
:type: StepProgramStatus
"""
return StepProgramStatus(self.lims, self.uri + "/programstatus")
[docs] @lazy_property
def available_programs(self):
"""
:type: StepTrigger
"""
nodes = self.xml_findall("./available-programs/available-program")
if nodes is None:
return None
return [StepTrigger(self, node) for node in nodes]
@property
def process(self):
"""
:type: Process
"""
return self.lims.processes.from_limsid(self.limsid)
@property
def fields(self):
"""
:raises NotImplementedError: Steps don't have fields. Use step.details.
"""
raise NotImplementedError("Steps don't have fields. Use step.details.")
@property
def current_state(self):
"""
:type: str
"""
return self.xml_root.get("current-state")
[docs] def wait_for_epp(self):
# type: () -> int
"""
Polls Clarity, blocking until the currently running EPP is done.
:raises EppException: When EPP execution fails.
:return: Zero
:rtype: int
"""
try:
count = 0
while True:
self.program_status.refresh()
if count == 1:
log.info("Waiting for EPP.")
if self.program_status.status not in [PROGRAM_STATUS_RUNNING, PROGRAM_STATUS_QUEUED]:
log.info("EPP finished with status %s.", self.program_status.status)
if self.program_status.status == PROGRAM_STATUS_ERROR:
log.error(self.program_status.message)
raise EppFailureException(self.program_status.message)
self.refresh()
return 0
time.sleep(1)
count += 1
except ClarityException:
log.info("No EPP found.")
return 0
[docs] def advance(self):
# type: () -> None
"""
Advances the current step to the next screen.
"""
log.info("Advancing Step.")
self.xml_root = self.lims.request("post", self.uri + "/advance", self.xml_root)
self.wait_for_epp()
class EppException(Exception):
"""
Base class for all exceptions resulting from errors while running an EPP.
"""
pass
class EppFailureException(EppException):
"""
Raised when an EPP fails to execute correctly.
"""
pass
class EPPTimeoutException(EppException):
"""
Raised when the StepRunner times out waiting on an EPP to complete.
"""
def __init__(self):
super(EPPTimeoutException, self).__init__("Step Runner EPP Timeout - Step took too long to get to next state.")
[docs]class ArtifactAction(WrappedXml):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}actions"
artifact_uri = attribute_property("artifact-uri")
action = attribute_property("action")
step_uri = attribute_property("step-uri")
rework_step_uri = attribute_property("rework-step-uri")
def __init__(self, lims, step, xml_root):
super(ArtifactAction, self).__init__(lims, xml_root)
self.step = step
[docs] def leave_in_qc_protocol(self):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Leave in QC Protocol'
"""
self._set_artifact_next_action(LEAVE_IN_QC_PROTOCOL_ACTION)
[docs] def remove_from_workflow(self):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Remove From Workflow'
"""
self._set_artifact_next_action(REMOVE_FROM_WORKFLOW_ACTION)
[docs] def review(self):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Request Manager Review'
"""
self._set_artifact_next_action(REVIEW_ACTION)
[docs] def repeat(self):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Repeat Step'
"""
self._set_artifact_next_action(REPEAT_STEP_ACTION)
[docs] def mark_protocol_complete(self):
self._set_artifact_next_action(COMPLETE_ACTION)
[docs] def next_step(self, step_uri=None):
"""
Set the next step to continue to, or mark the protocol as complete if there is no next step.
If step_uri is not provided, artifact will:
- Continue to the first transition defined for this step, if any transitions are defined.
- Mark the protocol as complete, if there are no transitions (the protocol is done).
"""
transitions = self.step.configuration.transitions
if len(transitions) == 0:
self._set_artifact_next_action(COMPLETE_ACTION)
else:
step_uri = step_uri or transitions[0]["next-step-uri"]
self._set_step_uri_action(NEXT_STEP_ACTION, step_uri)
[docs] def complete_and_repeat(self, step_uri):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Complete and Repeat' with the specified next step uri.
"""
self._set_step_uri_action(COMPLETE_AND_REPEAT_ACTION, step_uri)
[docs] def rework(self, step_uri):
"""
Sets the Next Step property for the artifact specified by artifact_uri to 'Rework from an earlier step' with the specified next step uri.
"""
self._set_artifact_next_action(REWORK_ACTION)
self.rework_step_uri = step_uri
def _set_artifact_next_action(self, action):
self.action = action
# Make sure there are no old routing uris in the element
self.step_uri = None
self.rework_step_uri = None
def _set_step_uri_action(self, action, step_uri):
self._set_artifact_next_action(action)
self.step_uri = step_uri
[docs]class StepActions(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}actions"
next_actions = subnode_property_list_of_dicts('next-actions/next-action', as_attributes=[
'artifact-uri', 'action', 'step-uri', 'rework-step-uri'
])
step = subnode_link(Step, "step")
# The Researcher that was running the step and requested a manager review
escalation_author = subnode_link(Researcher, "./escalation/request/author", attributes=('uri',), readonly=True)
# The Researcher that has been flagged as the reviewer for this step
escalation_reviewer = subnode_link(Researcher, "./escalation/request/reviewer", attributes=('uri',), readonly=True)
# The date that the step was moved into the 'Under Review' state
escalation_date = subnode_property("./escalation/request/date", typename=types.DATETIME, readonly=True)
# All artifacts that are under manager review
escalated_artifacts = subnode_links(Artifact, "./escalation/escalated-artifacts/escalated-artifact")
[docs] @lazy_property
def artifact_actions(self):
"""
A dictionary of ArtifactActions for this step, keyed by artifact.
:rtype: dict[Artifact, ArtifactAction]
"""
actions = {}
for xml_entry in self.xml_findall("./next-actions/next-action"):
artifact = self.lims.artifacts.get(xml_entry.get("artifact-uri"))
actions[artifact] = ArtifactAction(self.lims, self.step, xml_entry)
return actions
def __str__(self):
return "<Actions for Step %s>" % self.step.limsid
[docs] def all_next_step(self, step_uri=None):
"""
Set all artifacts actions to either next step, or mark protocol as complete.
If step_uri is not provided, artifacts will:
- Continue to the first transition defined for this step, if any transitions are defined.
- Mark the protocol as complete, if there are no transitions.
"""
for action in self.artifact_actions.values():
action.next_step(step_uri)
[docs]class StepDetails(IOMapsMixin, FieldsMixin, ClarityElement):
"""
:ivar Step step:
:ivar list[IOMap] iomaps:
:ivar list[Artifact] inputs:
:ivar list[Artifact] outputs:
:ivar list[Artifact] shared_outputs:
:ivar str|None uri:
:ivar LIMS lims:
"""
UNIVERSAL_TAG = "{http://genologics.com/ri/step}details"
FIELDS_XPATH = "./fields"
ATTACH_TO_CATEGORY = "ProcessType"
IOMAPS_XPATH = "input-output-maps/input-output-map"
IOMAPS_OUTPUT_TYPE_ATTRIBUTE = "type"
name = subnode_property("configuration", readonly=True)
instrument_used = subnode_link(Instrument, "instrument", attributes=('uri',))
def __init__(self, step, *args, **kwargs):
self.step = step
super(StepDetails, self).__init__(*args, **kwargs)
def __str__(self):
if self.step:
return "<Details for Step %s>" % self.step.limsid
else:
return "<Unattached StepDetails>"
def _get_attach_to_key(self):
return self.name, self.ATTACH_TO_CATEGORY
def _get_iomaps_shared_result_file_type(self):
return "ResultFile"
[docs]class StepPools(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}pools"
def __init__(self, step, lims, uri):
self.step = step
super(StepPools, self).__init__(lims, uri)
def __str__(self):
return "<Pools for Step %s>" % self.step.limsid
@property
def available_inputs(self):
"""
:type: list[AvailableInput]
"""
elements = self.xml_findall("./available-inputs/input")
return [AvailableInput(self.lims, p) for p in elements]
@property
def pools(self):
"""
:type: list[Pool]
"""
poolelements = self.xml_findall("./pooled-inputs/pool")
return [Pool(self.lims, p) for p in poolelements]
[docs] def create_pool(self, name, samples):
pool_root = self.xml_root.find("./pooled-inputs")
pool_node = ETree.SubElement(pool_root, "pool", {"name": name})
for sample in samples:
ETree.SubElement(pool_node, "input", {"uri": sample.uri})
[docs] def clear_pools(self):
# type: () -> None
"""
Removes all existing pools that were created on this step.
"""
self.xml_root.remove(self.xml_root.find("./pooled-inputs"))
ETree.SubElement(self.xml_root, "pooled-inputs")
[docs]class Placement(WrappedXml):
container = subnode_link(Container, "location/container") # type: Container
location_value = subnode_property("location/value") # type: str
artifact = subnode_link(Artifact, ".", attributes=('uri',)) # type: Artifact
[docs]class StepPlacements(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}placements"
step = subnode_link(Step, "step") # type: Step
placements = subnode_element_list(Placement, "output-placements", "output-placement") # type: List[Placement]
def __str__(self):
# type: () -> string
return "<Placements for Step %s>" % self.step.limsid
@property
def selected_containers(self):
# type: () -> List[Container]
"""
:type: list[Container]
"""
selected_containers = self.xml_findall("./selected-containers/container")
return self.lims.containers.from_link_nodes(selected_containers)
[docs] def clear_selected_containers(self):
# type: () -> None
"""
Clears the list of selected containers associated with the step.
This can be used to remove containers that are automatically
created for the step when they are not used.
"""
self.xml_root.remove(self.xml_root.find("./selected-containers"))
ETree.SubElement(self.xml_root, "selected-containers")
[docs] def add_selected_container(self, new_container):
# type: (Container) -> None
"""
Adds a new container to the step placement's list of selected containers.
"""
selected_containers = self.xml_find("./selected-containers")
ETree.SubElement(selected_containers, "container", {"uri": new_container.uri})
[docs] def clear_placements(self):
# type: () -> None
"""
Clears all previous artifact placements recorded to this step.
This is often called before starting automated placement to ensure
that artifacts are not placed twice.
"""
self.xml_root.remove(self.xml_root.find("./output-placements"))
ETree.SubElement(self.xml_root, "output-placements")
[docs] def create_placement(self, artifact, container, well_string):
# type: (Artifact, Container, string) -> None
"""
Place the provided artifact, in the provided container at the location
described by the well_string.
:param artifact: The artifact to place.
:param container: The container that will hold the artifact.
:param well_string: The location on the plate to place the artifact
"""
placement_root = self.xml_root.find("./output-placements")
placement_node = self.xml_root.find("./output-placements/output-placement[@uri='" + artifact.uri + "']")
if not placement_node:
placement_node = ETree.SubElement(placement_root, "output-placement", {"uri": artifact.uri})
location_subnode = ETree.SubElement(placement_node, "location")
ETree.SubElement(location_subnode, "container", {"uri": container.uri})
ETree.SubElement(location_subnode, "value").text = well_string
[docs] def create_placement_with_no_location(self, artifact):
# type: (Artifact) -> None
"""
Samples that are part of a process, but have been removed
need to be included with out a location in some cases because
Clarity will hold the old spot, which may now be used by another sample.
"""
placement_root = self.xml_root.find("./output-placements")
ETree.SubElement(placement_root, "output-placement", {"uri": artifact.uri})
[docs] def commit(self):
# type: () -> None
"""
Push placement changes back to Clarity.
"""
self.post_and_parse()
[docs]class Pool(WrappedXml):
@property
def name(self):
# type: () -> str
"""
:type: str
"""
return self.xml_root.get("name")
[docs] @lazy_property
def output(self):
# type: () -> Artifact
"""
:type: Artifact
"""
node = self.xml_root
return self.lims.artifacts.get(node.get("output-uri"), name=self.name)
[docs]class StepReagentLots(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}lots"
def __init__(self, step, lims, uri):
self.step = step
super(StepReagentLots, self).__init__(lims, uri)
def __str__(self):
# type: () -> string
return "<Reagent lots for Step %s>" % self.step.limsid
[docs] def add_reagent_lots(self, elements):
# type: (List[ReagentLot]) -> None
"""
:param elements: Add reagent lots to the step
:type elements: list[ReagentLot]
"""
reagent_lots = self.xml_find("./reagent-lots")
current_lots = [lot.uri for lot in self.reagent_lots]
for element in elements:
if element.uri not in current_lots:
ETree.SubElement(reagent_lots, "reagent-lot", {"uri": element.uri})
[docs] def remove_reagent_lots(self, elements):
# type: (List[ReagentLot]) -> None
"""
:param elements: Remove reagent lots from the step
:type elements: list[ReagentLot]
"""
reagent_lots = self.xml_find("./reagent-lots")
for element in elements:
for lot_node in reagent_lots.findall("reagent-lot"):
if lot_node.get("uri") == element.uri:
reagent_lots.remove(lot_node)
[docs] def clear_reagent_lots(self):
# type: () -> None
"""
Clear all reagent lots from the step
"""
self.xml_root.remove(self.xml_root.find("./reagent-lots"))
ETree.SubElement(self.xml_root, "reagent-lots")
@property
def reagent_lots(self):
# type: () -> List[ReagentLot]
"""
:type: list(ReagentLot)
"""
reagent_elements = self.xml_findall("./reagent-lots/reagent-lot")
return [ReagentLot(self.lims, p.get("uri")) for p in reagent_elements]
[docs]class StepProgramStatus(ClarityElement):
"""
Manage the status of the currently executing step. By setting a
message to the step status, a message box will be displayed to
the user.
The AI node will set the status to RUNNING, but does not
allow the API to set this value.
NOTE: A user has to action Step transition, upon message box display.
i.e. There is no API request to get past the message box.
In practise, using the 'program-status' endpoint conflicts with using StepRunner to develop automated workflow tests
"""
UNIVERSAL_TAG = "{http://genologics.com/ri/step}program-status"
step = subnode_link(Step, "step") # type: Step
message = subnode_property("message") # type: string
status = subnode_property("status") # type: string
[docs] def report_warning(self, message):
# type: (string) -> None
self.message = message
self.status = "WARNING"
self.commit()
[docs] def report_ok(self, message):
# type: (string) -> None
self.message = message
self.status = "OK"
self.commit()
[docs] def report_error(self, message):
# type: (string) -> None
self.message = message
self.status = "ERROR"
self.commit()
def __str__(self):
# type: () -> string
return "<Program Status for Step %s>" % self.step.limsid
[docs]class StepTrigger(WrappedXml):
def __init__(self, step, uri):
self.step = step
super(StepTrigger, self).__init__(step.lims, uri)
@property
def name(self):
# type: () -> str
"""
:type: str
"""
return self.xml_root.get("name")
[docs] def fire(self):
# type: () -> None
log.info("Firing script %s", self.name)
self.lims.request("post", self.xml_root.get("uri"))
self.step.wait_for_epp()
[docs]class StepReagents(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/step}reagents"
def __init__(self, step, lims, uri):
self.step = step
super(StepReagents, self).__init__(lims, uri)
def __str__(self):
# type: () -> string
return "<Reagent Placements for Step %s>" % self.step.limsid
reagent_category = subnode_property("reagent-category") # type: string
@property
def output_reagents(self):
# type: () -> List[OutputReagent]
"""
:type: list(OutputReagent)
"""
nodes = self.xml_findall("./output-reagents/output")
if nodes is None:
return None
return [OutputReagent(self, node) for node in nodes]
[docs]class OutputReagent(WrappedXml):
def __init__(self, step, node):
self.step = step
super(OutputReagent, self).__init__(step.lims, node)
@property
def reagent_label(self):
# type: () -> string
"""
:type: str
"""
node = self.xml_find('./' + "reagent-label")
if node is None:
return None
return node.get("name")
@reagent_label.setter
def reagent_label(self, value):
# type: (string) -> None
node = self.xml_find('./' + "reagent-label")
if node is None:
node = ETree.SubElement(self.xml_root, "reagent-label")
node.set("name", value)