Source code for s4.clarity.configuration.stage
# Copyright 2016 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
from s4.clarity._internal.props import attribute_property
from s4.clarity import ClarityElement, lazy_property
from s4.clarity.routing import Router
class Stage(ClarityElement):
UNIVERSAL_TAG = "{http://genologics.com/ri/workflow}stage"
index = attribute_property("index")
[docs] @lazy_property
def workflow(self):
"""
:type: Workflow
"""
return self.lims.workflows.from_link_node(self.xml_find("./workflow"))
[docs] @lazy_property
def protocol(self):
"""
:type: Protocol
"""
return self.lims.protocols.from_link_node(self.xml_find("./protocol"))
[docs] @lazy_property
def step(self):
"""
:type: StepConfiguration
"""
step_node = self.xml_find("step")
if step_node is not None:
return self.lims.stepconfiguration_from_uri(step_node.get("uri"))
else:
return None
[docs] def enqueue(self, artifact_or_artifacts):
"""
Add one or more artifacts to the stage's queue
:param artifact_or_artifacts: The artifact(s) to enqueue
:type artifact_or_artifacts: s4.clarity.Artifact | list[s4.clarity.Artifact]
"""
r = Router(self.lims)
r.assign(self.uri, artifact_or_artifacts)
r.commit()
[docs] def remove(self, artifact_or_artifacts):
"""
Remove one or more sample artifacts from the stage
:param artifact_or_artifacts: The artifact(s) to enqueue
:type artifact_or_artifacts: s4.clarity.Artifact | list[s4.clarity.Artifact]
"""
r = Router(self.lims)
r.unassign(self.uri, artifact_or_artifacts)
r.commit()