Source code for s4.clarity.routing

# Copyright 2017 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import logging
from collections import defaultdict
from s4.clarity import ETree

log = logging.getLogger(__name__)

ACTION_ASSIGN = "assign"
ACTION_UNASSIGN = "unassign"


[docs]class Router(object): """ Class allowing routing of multiple artifacts to a given workflow or stage """ def __init__(self, lims): self.lims = lims self.routing_dict = defaultdict(lambda:defaultdict(set))
[docs] def clear(self): """ Clears the routing node and the routing dict. """ self.routing_dict = defaultdict(lambda:defaultdict(set))
[docs] def remove(self, artifact_or_artifacts): """ Remove given artifact or artifacts from the routing dict. No error is raised if the artifact is not found. """ for artifact in self._normalize_as_list(artifact_or_artifacts): for routes in self.routing_dict.values(): for artifact_set in routes.values(): artifact_set.discard(artifact)
[docs] def assign(self, workflow_or_stage_uri, artifact_or_artifacts): """ Stages an artifact or multiple artifacts to be assigned to a workflow_or_stage_uri. :type workflow_or_stage_uri: str :param workflow_or_stage_uri: The uri of either a workflow or workflow-stage. If a workflow uri is provided, the artifacts will be queued to the first stage. Otherwise, they will be queued to the specific workflow-stage. :type artifact_or_artifacts: s4.clarity.Artifact | list[s4.clarity.Artifact] """ self._update_routing_dict(ACTION_ASSIGN, workflow_or_stage_uri, artifact_or_artifacts)
[docs] def unassign(self, workflow_or_stage_uri, artifact_or_artifacts): """ Stages an artifact or multiple artifacts to be unassigned from a workflow_or_stage_uri. :type workflow_or_stage_uri: str :param workflow_or_stage_uri: The uri of either a workflow or workflow-stage. If a workflow uri is provided, the artifacts will be removed from any stages of that workflow. Otherwise, they will be removed from the specified workflow stage. :type artifact_or_artifacts: s4.clarity.Artifact | list[s4.clarity.Artifact] """ self._update_routing_dict(ACTION_UNASSIGN, workflow_or_stage_uri, artifact_or_artifacts)
def _update_routing_dict(self, action, uri, artifact_or_artifacts): """ Stages an artifact or multiple artifacts to be assigned to /unassigned from a workflow_or_stage_uri. :type action: str :param action: either "assign" or "unassign" :type uri: str :param uri: The uri of either a workflow or workflow-stage. """ self.routing_dict[action][uri].update( self._normalize_as_list(artifact_or_artifacts) )
[docs] def commit(self): """ Generates the routing XML for workflow/stage assignment/unassignment and posts it. """ routing_node = self._create_routing_node() self.lims.request("post", self.lims.root_uri + "/route/artifacts", routing_node)
def _create_routing_node(self): """ Generates the XML for workflow/stage assignment/unassignment """ routing_node = ETree.Element("{http://genologics.com/ri/routing}routing") for action, routes in self.routing_dict.items(): for workflow_or_stage_uri, artifact_set in routes.items(): if artifact_set: assign_node = self._add_action_subnode(routing_node, action, workflow_or_stage_uri) # create an artifact assign node for each samples for artifact in artifact_set: ETree.SubElement(assign_node, "artifact", {"uri": artifact.uri}) return routing_node def _add_action_subnode(self, routing_node, action, workflow_or_stage_uri): """ Generates a ElementTree.SubElement according to the action (assign / unassign) and the workflow/stage uri """ if '/stages/' in workflow_or_stage_uri: assign_node = ETree.SubElement(routing_node, action, {'stage-uri': workflow_or_stage_uri}) else: assign_node = ETree.SubElement(routing_node, action, {'workflow-uri': workflow_or_stage_uri}) return assign_node def _normalize_as_list(self, artifact_or_artifacts): """ Returns a list of artifacts from one or many artifacts """ if not isinstance(artifact_or_artifacts, (list, set)): artifacts = [artifact_or_artifacts] else: artifacts = artifact_or_artifacts return artifacts