User Guide

Here find some examples of how you might put the S4-Clarity library to use.

EPP Scripts

Example StepEPPs

Generate CSV

Generate a simple CSV file and attach it to a step.

This EPP extends s4.clarity.scripts.StepEPP and is meant to be initiated from a Record Details button.

Record Details Button

bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/generate_csv.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --fileId {compoundOutputFileLuid1}
--fileName 'example.csv' --artifactUDFName Concentration"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

import logging
import argparse
from csv import DictWriter

from s4.clarity.scripts import StepEPP

HEADER_INPUT_NAME = "Input Name"
HEADER_INPUT_ID = "Input Id"
HEADER_OUTPUT_NAME = "Output Name"
HEADER_OUTPUT_ID = "Output Id"

log = logging.getLogger(__name__)


class GenerateCSV(StepEPP):

    @classmethod
    def add_arguments(cls, argparser):
        # type: (argparse) -> None
        super(GenerateCSV, cls).add_arguments(argparser)
        argparser.add_argument("--fileId",
                               help="{compoundOutputFileLuids} token",
                               required=True)
        argparser.add_argument("--fileName",
                               type=str,
                               help="File name with extension",
                               default="epp_generated.csv")
        argparser.add_argument("--artifactUDFName",
                               type=str,
                               help="Name of the UDF to include in file",
                               required=True)

    def run(self):
        csv_file = self.step.open_resultfile(self.options.fileName, 'w', limsid=self.options.fileId)

        csv_writer = DictWriter(csv_file, fieldnames=[
            HEADER_INPUT_NAME,
            HEADER_INPUT_ID,
            HEADER_OUTPUT_NAME,
            HEADER_OUTPUT_ID,
            self.options.artifactUDFName
        ])

        csv_writer.writeheader()

        for input_analyte, output_analytes in self.step.details.input_keyed_lookup.items():
            for output_analyte in output_analytes:
                udf_value = output_analyte.get(self.options.artifactUDFName, "Empty")

                row = {
                    HEADER_INPUT_NAME: input_analyte.name,
                    HEADER_INPUT_ID: input_analyte.limsid,
                    HEADER_OUTPUT_NAME: output_analyte.name,
                    HEADER_OUTPUT_ID: output_analyte.limsid,
                    self.options.artifactUDFName: udf_value
                }

                log.info("Writing the following line: %s" % row)

                csv_writer.writerow(row)

        csv_file.commit()


if __name__ == "__main__":
    GenerateCSV.main()

Example TriggeredStepEPPs

QC Set Next Step

Sets the next step action for each output analyte.

This EPP extends s4.clarity.scripts.TriggeredStepEPP and is meant to be triggered on the transition into the Next Steps screen, and again on the End of Step transition.

Record Details Exit transition:

bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/qc_set_next_step.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action RecordDetailsExit"

End of Step transition:

bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/qc_set_next_step.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action EndOfStep"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

import logging

from s4.clarity.artifact import Artifact
from s4.clarity.scripts import TriggeredStepEPP

log = logging.getLogger(__name__)


class QCSetNextStep(TriggeredStepEPP):

    def should_repeat_step(self, input_analyte):
        # type: (Artifact) -> bool

        # QC flag is set on output result file artifacts
        output_measurements = self.step.details.input_keyed_lookup[input_analyte]

        # If QC failed for any replicate of the input it should repeat
        return any(output.qc_failed() for output in output_measurements)

    def on_record_details_exit(self):
        """
        Set the next step actions for the user to inspect.
        """
        for analyte, action in self.step.actions.artifact_actions.items():

            if self.should_repeat_step(analyte):
                log.info("Setting Analyte '%s' (%s) to repeat step." % (analyte.name, analyte.limsid))
                action.repeat()
            else:
                action.next_step()

        self.step.actions.commit()

    def on_end_of_step(self):
        """
         Ensure analytes repeat the step but do not overwrite other user selections.
        """

        # As this is a QC step it is the inputs that are moving to the next step.
        for input_analyte, action in self.step.actions.artifact_actions.items():

            if self.should_repeat_step(input_analyte):
                log.info("Setting Analyte '%s' (%s) to repeat step." % (input_analyte.name, input_analyte.limsid))
                action.repeat()

        self.step.actions.commit()


if __name__ == "__main__":
    QCSetNextStep.main()

Create Pools of Two

Groups the step’s input analytes into pools of two.

This EPP extends s4.clarity.scripts.TriggeredStepEPP and is meant to be triggered on the transition into Pooling.

Pooling Enter transition:

bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/create_pools_of_two.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action PoolingEnter"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

from s4.clarity.scripts import TriggeredStepEPP
from s4.clarity.scripts import UserMessageException


class CreatePoolsOfTwo(TriggeredStepEPP):

    def validate_even_number_of_inputs(self):
        num_input_analytes = len(self.step.pooling.available_inputs)

        if num_input_analytes % 2:
            raise UserMessageException("Step must be started with an even number of analytes.")

    def validate_single_input_replicate_created(self):
        for available_input in self.step.pooling.available_inputs:
            if available_input.replicates > 1:
                raise UserMessageException("No more than one replicate per analyte allowed in this step")

    def on_pooling_enter(self):
        self.validate_single_input_replicate_created()
        self.validate_even_number_of_inputs()

        inputs = [a.input for a in self.step.pooling.available_inputs]

        for i in range(0, len(inputs) - 1, 2):
            pool_inputs = [
                inputs[i],
                inputs[i + 1]
            ]

            pool_name = "%s_%s" % (pool_inputs[0].limsid, pool_inputs[1].limsid)

            self.step.pooling.create_pool(pool_name, pool_inputs)

        self.step.pooling.commit()


if __name__ == "__main__":
    CreatePoolsOfTwo.main()

Example DerivedSampleAutomations

Set UDF Value

Assigns a user provided value to the analyte UDF specified for all selected analytes.

This EPP extends s4.clarity.scripts.DerivedSampleAutomation and is meant to be triggered from the projects dashboard.

Automation Configuration

bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/set_udf_value.py -u {username} -p {password}
-a '{baseURI}v2' -d {derivedSampleLuids}  --udfName '{userinput:UDF_Name}' --udfValue '{userinput:UDF_Value}'"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

import argparse

from s4.clarity.scripts import DerivedSampleAutomation


class SetUDFValue(DerivedSampleAutomation):

    @classmethod
    def add_arguments(cls, argparser):
        # type: (argparse) -> None
        super(SetUDFValue, cls).add_arguments(argparser)
        argparser.add_argument("--udfName",
                               type=str,
                               help="The name of the analyte UDF",
                               required=True)
        argparser.add_argument("--udfValue",
                               type=str,
                               help="The new value for the UDF",
                               required=True)

    def process_derived_samples(self):
        for artifact in self.artifacts:
            artifact[self.options.udfName] = self.options.udfValue

        self.lims.artifacts.batch_update(self.artifacts)

        return "Successfully set UDF '%s' to '%s' for %s derived samples" % \
               (self.options.udfName, self.options.udfValue, len(self.artifacts))


if __name__ == "__main__":
    SetUDFValue.main()

Shell Scripts

Accession Clarity Sample

Accessions a new sample into Clarity using the provided container name and project.

This script extends s4.clarity.scripts.ShellScript and is meant to be executed from the command line.

Example Invocation

python ./accession_clarity_sample.py -u <user name> -p <password> -r https://<clarity_server>/api/v2
--sampleName <Sample Name> --projectName <Project Name> --containerName <Container Name>
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------

import logging
import argparse

from s4.clarity.scripts import ShellScript

log = logging.getLogger(__name__)


class AccessionClaritySample(ShellScript):

    @classmethod
    def add_arguments(cls, argparser):
        # type: (argparse) -> None
        super(AccessionClaritySample, cls).add_arguments(argparser)
        argparser.add_argument("--sampleName",
                               type=str,
                               help="The name of the sample to create",
                               required=True)
        argparser.add_argument("--projectName",
                               type=str,
                               help="The name of an existing Clarity project",
                               required=True)
        argparser.add_argument("--containerName",
                               type=str,
                               help="The name of the sample container",
                               required=True)

    def run(self, *args):
        projects = self.lims.projects.query(name=self.options.projectName)
        if not projects:
            raise Exception("Project '%s' does not exist in Clarity" % self.options.projectName)

        project = projects[0]

        tube_type = self.lims.container_types.get_by_name("Tube")
        container = self.lims.containers.new(container_type=tube_type, name=self.options.containerName)
        container = self.lims.containers.add(container)

        sample = self.lims.samples.new(name=self.options.sampleName, project=project)
        sample.set_location_coords(container, 1, 1)
        self.lims.samples.add(sample)

        log.info("Sample '%s' successfully accessioned in Clarity" % self.options.sampleName)


if __name__ == "__main__":
    AccessionClaritySample.main()

Workflow Testing

Workflow Run

Runs two samples through a three step protocol.

Example Invocation

python ./workflow_run.py -u <user name> -p <password> -r https://<clarity_server>/api/v2
# Copyright 2019 Semaphore Solutions, Inc.
#
# Assumes a single protocol workflow that consists of three steps:
# - QC
# - Pooling
# - Standard
# ---------------------------------------------------------------------------

import logging

from s4.clarity.configuration import Workflow
from s4.clarity.container import Container
from s4.clarity.project import Project
from s4.clarity.sample import Sample
from s4.clarity.scripts import WorkflowTest
from s4.clarity.steputils.placement_utils import auto_place_artifacts
from s4.clarity.steputils.step_runner import StepRunner

log = logging.getLogger(__name__)

NAME_PROTOCOL = "Testing Protocol"


class QCStepRunner(StepRunner):
    def __init__(self, lims):
        super(QCStepRunner, self).__init__(lims, NAME_PROTOCOL, "QC Step")

    def record_details(self):
        # Set the QC flag on all output measurements
        for output in self.step.details.outputs:
            output.qc = True

        self.lims.artifacts.batch_update(self.step.details.outputs)

    def next_steps(self):
        self.step.actions.all_next_step()
        self.step.actions.commit()


class PoolingStepRunner(StepRunner):
    def __init__(self, lims):
        super(PoolingStepRunner, self).__init__(lims, NAME_PROTOCOL, "Pooling Step")

    def pooling(self):
        # Pool all inputs together
        self.step.pooling.create_pool("The Pool", self.step.details.inputs)
        self.step.pooling.commit()

    def record_details(self):
        # Set a step UDF named "Status"
        self.step.details['Status'] = "Pooling Complete"
        self.step.details.commit()

    def next_steps(self):
        self.step.actions.all_next_step()
        self.step.actions.commit()


class StandardStepRunner(StepRunner):
    def __init__(self, lims):
        super(StandardStepRunner, self).__init__(lims, NAME_PROTOCOL, "Standard Step")

    def placement(self):
        auto_place_artifacts(self.step, self.step.details.outputs)

    def record_details(self):
        # Add all required reagents
        self.add_default_reagents()

        # Set the value of an analyte UDF on all of the outputs.
        for output in self.step.details.outputs:
            output["Inspected By"] = self.step.process.technician.last_name

        self.lims.artifacts.batch_update(self.step.details.outputs)

    def next_steps(self):
        self.step.actions.all_next_step()
        self.step.actions.commit()


class ExampleWorkflowTest(WorkflowTest):
    PROJECT_OWNER_USER_NAME = "admin"
    WORKFLOW_NAME = "Testing Workflow"

    def get_or_create_project(self, project_name):
        # type: (str) -> Project
        projects = self.lims.projects.query(name=project_name)
        if len(projects) > 0:
            log.info("Using existing project %s" % project_name)
            return projects[0]

        project = self.lims.projects.new(name=project_name)
        project.researcher = self.lims.researchers.query(username=self.PROJECT_OWNER_USER_NAME)[0]

        # Submit the project back to Clarity
        self.lims.projects.add(project)
        log.info("Created project %s" % project_name)
        return project

    def get_workflow(self):
        # type: () -> Workflow
        workflow = self.lims.workflows.get_by_name(self.WORKFLOW_NAME)
        if workflow is None:
            raise Exception("Workflow '%s' does not exist in Clarity.")

        return workflow

    def create_tube(self):
        # type: () -> Container
        tube_type = self.lims.container_types.query(name="Tube")[0]
        container = self.lims.containers.new(container_type=tube_type)

        return self.lims.containers.add(container)

    def create_sample(self, name):
        # type: (str) -> Sample
        project = self.get_or_create_project("Today's Project")
        sample = self.lims.samples.new(name=name, project=project)

        # Set the sample container
        container = self.create_tube()
        sample.set_location_coords(container, 1, 1)

        return self.lims.samples.add(sample)

    def run(self, *args):
        log.info("Accessioning samples")
        samples = [
            self.create_sample('Jane'),
            self.create_sample('Bob')
        ]

        log.info("Routing sample to beginning of workflow '%s'", self.WORKFLOW_NAME)
        workflow = self.get_workflow()
        workflow.enqueue([s.artifact for s in samples])

        log.info("Running sample through workflow '%s'", self.WORKFLOW_NAME)
        input_uris = [s.artifact.uri for s in samples]

        qc_step_runner = QCStepRunner(self.lims)
        qc_step_runner.run(inputuris=input_uris)

        pooling_step_runner = PoolingStepRunner(self.lims)
        pooling_step_runner.run(previousstep=qc_step_runner.step)

        StandardStepRunner(self.lims).run(previousstep=pooling_step_runner.step)

        log.info("Sample successfully pushed through workflow '%s'", self.WORKFLOW_NAME)


if __name__ == "__main__":
    ExampleWorkflowTest.main()