Source code for s4.clarity.iomaps
# Copyright 2016 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
from collections import defaultdict
from ._internal import ClarityElement
class IOMapsMixin(ClarityElement):
"""
Parse the StepDetails or Process object,
https://www.genologics.com/files/permanent/API/latest/rest.version.steps.limsid.details.html#GET
https://www.genologics.com/files/permanent/API/latest/rest.version.processes.html#GET
to prepare a list of inputs and outputs for each step/process.
:ivar list[IOMap] iomaps:
:ivar list[Artifact] inputs:
:ivar list[Artifact] outputs:
:ivar list[Artifact] shared_outputs:
:ivar dict[Artifact, list[Artifact]] input_keyed_lookup:
:ivar dict[Artifact, list[Artifact]] output_keyed_lookup:
"""
IOMAPS_XPATH = None
IOMAPS_OUTPUT_TYPE_ATTRIBUTE = None
def __init__(self, *args, **kwargs):
super(IOMapsMixin, self).__init__(*args, **kwargs)
self.xml_root # force population of xml_root, which will initialize lists
def _init_lists(self):
self.input_keyed_lookup = {}
self.output_keyed_lookup = defaultdict(list)
self.iomaps = []
shared_output_set = set()
io_map_nodes = self.xml_findall(self.IOMAPS_XPATH)
shared_result_file_type = self._get_iomaps_shared_result_file_type()
for io_map_node in io_map_nodes:
input_artifact, output_artifact, artifact_type, generation_type = self._get_node_artifacts(io_map_node)
# If we have not seen this input yet, store it to the input lookup dict.
# This step builds up our input artifact list and, if there are no per-artifact outputs
# this is the only place that inputs are recorded.
if input_artifact not in self.input_keyed_lookup:
self.input_keyed_lookup[input_artifact] = []
if output_artifact is not None:
# Remove all shared result files
if generation_type == "PerAllInputs" and artifact_type == shared_result_file_type:
shared_output_set.add(output_artifact)
else:
# Save the output to its input lookup
self.input_keyed_lookup[input_artifact].append(output_artifact)
# Save the input to the output's lookup
self.output_keyed_lookup[output_artifact].append(input_artifact)
# If any of the input lists have more than one item we are in a pooling step.
# There are no steps that will have multiple inputs AND multiple outputs.
is_pooling = any(len(inputs) > 1 for inputs in self.output_keyed_lookup.values())
if is_pooling:
# We are pooling so map multiple inputs to a single output
self.iomaps = [IOMap(input_artifacts, [output_artifact]) for output_artifact, input_artifacts in
self.output_keyed_lookup.items()]
else:
# Regular mapping, allow for one to one or replicate generation
self.iomaps = [IOMap([input_artifact], output_artifacts) for input_artifact, output_artifacts in
self.input_keyed_lookup.items()]
# Prepare our artifact lists
self.inputs = list(self.input_keyed_lookup)
self.outputs = list(self.output_keyed_lookup)
self.shared_outputs = list(shared_output_set)
def _get_iomaps_shared_result_file_type(self):
"""
Get the name of the shared result file type that is used in iomap output link nodes
:rtype: str
:return: the name
"""
raise Exception("Classes using the IOMapsMixin must override the _get_iomaps_shared_result_file_type method.")
def _get_node_artifacts(self, io_map_node):
"""
Returns the input and output artifacts for a io map node as well as the artifact type and generation_type
of the output
:return :type (Artifact, Artifact, String, String)
"""
# There will always be an input artifact
input_node = io_map_node.find('input')
input_artifact = self.lims.artifacts.from_link_node(input_node)
output_node = io_map_node.find('output')
if output_node is None:
return input_artifact, None, None, None
output_artifact = self.lims.artifacts.from_link_node(output_node)
artifact_type = output_node.get(self.IOMAPS_OUTPUT_TYPE_ATTRIBUTE)
generation_type = output_node.get("output-generation-type")
return input_artifact, output_artifact, artifact_type, generation_type
def iomaps_input_keyed(self):
"""
:return: a mapping of input -> outputs.
:rtype: dict[Artifact, list[Artifact]]
"""
return self.input_keyed_lookup
def iomaps_output_keyed(self):
"""
:return: a mapping of output -> inputs.
:rtype: dict[Artifact, list[Artifact]]
"""
return self.output_keyed_lookup
@property
def xml_root(self):
return super(IOMapsMixin, self).xml_root
@xml_root.setter
def xml_root(self, root_node):
super(IOMapsMixin,type(self)).xml_root.__set__(self, root_node)
if root_node is not None:
# initialize lists from new xml
self._init_lists()
[docs]class IOMap(object):
"""
:ivar inputs: list[Artifact]
:ivar outputs: list[Artifact]
"""
def __init__(self, inputs, outputs):
self.inputs = inputs
self.outputs = outputs
@property
def output(self):
"""
:type: Artifact
:raise Exception: If there are multiple output artifacts
"""
if len(self.outputs) > 1:
raise Exception("Too many outputs (%d) to get single output" % len(self.outputs))
return self.outputs[0]
@property
def input(self):
"""
:type: Artifact
:raise Exception: If there are multiple input artifacts
"""
if len(self.inputs) > 1:
raise Exception("Too many inputs (%d) to get single input" % len(self.inputs))
return self.inputs[0]