Source code for s4.clarity.queue

# Copyright 2016 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

from six.moves.urllib.parse import urlencode

from ._internal import ClarityElement, WrappedXml
from ._internal.props import subnode_property
from s4.clarity import types


[docs]class Queue(ClarityElement): UNIVERSAL_TAG = "{http://genologics.com/ri/queue}queue" @property def queued_artifacts(self): return self.query()
[docs] def query(self, prefetch=True, **params): queued_elements = [] for k in params: if "_" in k: new_k = k.replace("_", "-") params[new_k] = params[k] del params[k] query_uri = self.uri + "?" + urlencode(params, doseq=True) while query_uri is not None: next_page_node = self.lims.request("get", query_uri) queued_elements += next_page_node.findall("./artifacts/artifact") next_page_link = next_page_node.find("./next-page") if next_page_link is not None: query_uri = next_page_link.get("uri") else: query_uri = None queued_artifacts = [QueueArtifact(self.lims, p) for p in queued_elements] if prefetch: self.lims.artifacts.batch_fetch([queued_artifact.artifact for queued_artifact in queued_artifacts]) return queued_artifacts
class QueueArtifact(WrappedXml): @property def limsid(self): return self.xml_root.get("limsid") @property def uri(self): return self.xml_root.get("uri") @property def artifact(self): return self.lims.artifacts.get(uri=self.uri, limsid=self.limsid) queue_time = subnode_property("queue-time", types.DATETIME)