Source code for s4.clarity.utils.artifact_ancestry
# Copyright 2017 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
from collections import defaultdict
[docs]def get_parent_artifacts(lims, artifacts):
"""
Helper method to get the parent artifacts keyed to the supplied artifacts
:param LIMS lims:
:param list[Artifact] artifacts: The artifacts to get parent artifacts for
:rtype: dict[Artifact, list[Artifact]]
"""
artifact_to_parent_artifacts = defaultdict(list)
artifacts_to_batch_fetch = []
for artifact in artifacts:
if artifact.parent_step:
# Ugly list comprehension that covers pooled inputs and replicates
artifact_to_parent_artifacts[artifact] = [input_artifact for iomap in artifact.parent_step.details.iomaps
for input_artifact in iomap.inputs
if any(output.limsid == artifact.limsid for output in iomap.outputs)]
artifacts_to_batch_fetch += artifact_to_parent_artifacts[artifact]
else:
# Without a parent_step, we've reached the end of the artifact history
artifact_to_parent_artifacts[artifact] = []
if artifact_to_parent_artifacts:
lims.artifacts.batch_fetch(set(artifacts_to_batch_fetch))
return artifact_to_parent_artifacts
[docs]def get_udfs_from_artifacts_or_ancestors(lims, artifacts_to_get_udf_from, required_udfs=None, optional_udfs=None):
"""
Walks the genealogy for each artifact in the artifacts_to_get_udf_from list and gets the value for udf_name from the
supplied artifact, or its first available ancestor that has a value for the UDF.
NOTE: The method will stop the search upon reaching any pooling step.
:param LIMS lims:
:param list[Artifact] artifacts_to_get_udf_from: the list of artifacts whose ancestors should be inspected for the udf. Passed
down recursively until all artifacts have been satisfied.
:param list[str] required_udfs: The list of UDFs that *must* be found. Exception will be raised otherwise.
:param list[str] optional_udfs: The list of UDFs that *can* be found, but do not need to be.
:rtype: dict[s4.clarity.Artifact, dict[str, str]]
:raises UserMessageException: if values can not be retrieved for all required_udfs for all of the provided artifacts
"""
if not required_udfs and not optional_udfs:
raise Exception("The get_udfs_from_artifacts_or_ancestors method must be called with at least one "
"of the required_udfs or optional_udfs parameters.")
required_udfs = required_udfs or []
optional_udfs = optional_udfs or []
# Assemble the dictionaries for the internal methods
ancestor_artifact_to_original_artifact = {}
original_artifact_to_udfs = {}
for artifact in artifacts_to_get_udf_from:
ancestor_artifact_to_original_artifact[artifact] = [artifact]
original_artifact_to_udfs[artifact] = {}
for name in (required_udfs + optional_udfs):
original_artifact_to_udfs[artifact][name] = artifact.get(name, None)
artifacts_to_udfs = _get_udfs_from_ancestors_internal(
lims, ancestor_artifact_to_original_artifact, original_artifact_to_udfs)
if required_udfs:
_validate_required_ancestor_udfs(artifacts_to_udfs, required_udfs)
return artifacts_to_udfs
def _validate_required_ancestor_udfs(artifacts_to_udfs, required_udfs):
"""
Validates that all items in the artifacts_to_udfs dict have values for the required_udfs
:type artifacts_to_udfs: dict[s4.clarity.Artifact, dict[str, str]]
:type required_udfs: list[str]
:raises UserMessageException: if any artifact is missing any of the required_udfs
"""
artifacts_missing_udfs = set()
missing_udfs = set()
for artifact, udf_name_to_value in artifacts_to_udfs.items():
for required_udf in required_udfs:
if udf_name_to_value.get(required_udf) in ["", None]:
artifacts_missing_udfs.add(artifact.name)
missing_udfs.add(required_udf)
if artifacts_missing_udfs:
raise Exception("Could not get required values for udf(s) '%s' from ancestors of artifact(s) '%s'." %
("', '".join(missing_udfs), "', '".join(artifacts_missing_udfs)))
def _get_udfs_from_ancestors_internal(lims, current_artifacts_to_original_artifacts, original_artifacts_to_udfs):
"""
Recursive method that gets parent artifacts, and searches them for any udfs that have not yet been filled in
:type lims: s4.clarity.LIMS
:type current_artifacts_to_original_artifacts: dict[s4.clarity.Artifact: list[s4.clarity.Artifact]]
:param current_artifacts_to_original_artifacts: dict of the currently inspected artifact to the original artifact.
:type original_artifacts_to_udfs: dict[s4.clarity.Artifact, dict[str, str]]
:param original_artifacts_to_udfs: dict of the original artifacts to their ancestors' UDF values, which will
get filled in over the recursive calls of this method.
:rtype: dict[s4.clarity.Artifact, dict[str, Any]]
"""
current_artifacts = list(current_artifacts_to_original_artifacts)
current_artifacts_to_parent_artifacts = get_parent_artifacts(lims, list(current_artifacts_to_original_artifacts))
# Initialize the 'next to search' dict
next_search_artifacts_to_original_artifacts = defaultdict(list)
for current_artifact in current_artifacts:
if not current_artifacts_to_parent_artifacts[current_artifact]:
# The end of the genealogy has been reached for this artifact
continue
if current_artifact.parent_step.pooling is not None:
# Stop looking when we reach a step with pooled inputs, as ancestor artifacts would likely contain multiple
# values for the UDFs in question
continue
# Can now get a single parent artifact with confidence, as validated it
current_artifact_parent = current_artifacts_to_parent_artifacts[current_artifact][0]
for original_artifact in current_artifacts_to_original_artifacts[current_artifact]:
continue_searching = False
for udf_name, udf_value in original_artifacts_to_udfs[original_artifact].items():
# Don't overwrite values that have already been found
if udf_value is not None:
continue
found_value = current_artifact_parent.get(udf_name, None)
if found_value is None:
continue_searching = True
continue
original_artifacts_to_udfs[original_artifact][udf_name] = found_value
if continue_searching:
next_search_artifacts_to_original_artifacts[current_artifact_parent].append(original_artifact)
if next_search_artifacts_to_original_artifacts:
return _get_udfs_from_ancestors_internal(lims, next_search_artifacts_to_original_artifacts, original_artifacts_to_udfs)
return original_artifacts_to_udfs