User Guide¶
Here find some examples of how you might put the S4-Clarity library to use.
EPP Scripts¶
Example StepEPPs¶
Generate CSV¶
Generate a simple CSV file and attach it to a step.
This EPP extends s4.clarity.scripts.StepEPP
and is meant to be initiated from a Record Details button.
Record Details Button
bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/generate_csv.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --fileId {compoundOutputFileLuid1}
--fileName 'example.csv' --artifactUDFName Concentration"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import logging
import argparse
from csv import DictWriter
from s4.clarity.scripts import StepEPP
HEADER_INPUT_NAME = "Input Name"
HEADER_INPUT_ID = "Input Id"
HEADER_OUTPUT_NAME = "Output Name"
HEADER_OUTPUT_ID = "Output Id"
log = logging.getLogger(__name__)
class GenerateCSV(StepEPP):
@classmethod
def add_arguments(cls, argparser):
# type: (argparse) -> None
super(GenerateCSV, cls).add_arguments(argparser)
argparser.add_argument("--fileId",
help="{compoundOutputFileLuids} token",
required=True)
argparser.add_argument("--fileName",
type=str,
help="File name with extension",
default="epp_generated.csv")
argparser.add_argument("--artifactUDFName",
type=str,
help="Name of the UDF to include in file",
required=True)
def run(self):
csv_file = self.step.open_resultfile(self.options.fileName, 'w', limsid=self.options.fileId)
csv_writer = DictWriter(csv_file, fieldnames=[
HEADER_INPUT_NAME,
HEADER_INPUT_ID,
HEADER_OUTPUT_NAME,
HEADER_OUTPUT_ID,
self.options.artifactUDFName
])
csv_writer.writeheader()
for input_analyte, output_analytes in self.step.details.input_keyed_lookup.items():
for output_analyte in output_analytes:
udf_value = output_analyte.get(self.options.artifactUDFName, "Empty")
row = {
HEADER_INPUT_NAME: input_analyte.name,
HEADER_INPUT_ID: input_analyte.limsid,
HEADER_OUTPUT_NAME: output_analyte.name,
HEADER_OUTPUT_ID: output_analyte.limsid,
self.options.artifactUDFName: udf_value
}
log.info("Writing the following line: %s" % row)
csv_writer.writerow(row)
csv_file.commit()
if __name__ == "__main__":
GenerateCSV.main()
Example TriggeredStepEPPs¶
QC Set Next Step¶
Sets the next step action for each output analyte.
This EPP extends s4.clarity.scripts.TriggeredStepEPP
and is meant to be triggered on the transition into
the Next Steps screen, and again on the End of Step transition.
Record Details Exit transition:
bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/qc_set_next_step.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action RecordDetailsExit"
End of Step transition:
bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/qc_set_next_step.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action EndOfStep"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import logging
from s4.clarity.artifact import Artifact
from s4.clarity.scripts import TriggeredStepEPP
log = logging.getLogger(__name__)
class QCSetNextStep(TriggeredStepEPP):
def should_repeat_step(self, input_analyte):
# type: (Artifact) -> bool
# QC flag is set on output result file artifacts
output_measurements = self.step.details.input_keyed_lookup[input_analyte]
# If QC failed for any replicate of the input it should repeat
return any(output.qc_failed() for output in output_measurements)
def on_record_details_exit(self):
"""
Set the next step actions for the user to inspect.
"""
for analyte, action in self.step.actions.artifact_actions.items():
if self.should_repeat_step(analyte):
log.info("Setting Analyte '%s' (%s) to repeat step." % (analyte.name, analyte.limsid))
action.repeat()
else:
action.next_step()
self.step.actions.commit()
def on_end_of_step(self):
"""
Ensure analytes repeat the step but do not overwrite other user selections.
"""
# As this is a QC step it is the inputs that are moving to the next step.
for input_analyte, action in self.step.actions.artifact_actions.items():
if self.should_repeat_step(input_analyte):
log.info("Setting Analyte '%s' (%s) to repeat step." % (input_analyte.name, input_analyte.limsid))
action.repeat()
self.step.actions.commit()
if __name__ == "__main__":
QCSetNextStep.main()
Create Pools of Two¶
Groups the step’s input analytes into pools of two.
This EPP extends s4.clarity.scripts.TriggeredStepEPP
and is meant to be triggered on the transition into Pooling.
Pooling Enter transition:
bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/create_pools_of_two.py -u {username} -p {password}
-l {compoundOutputFileLuid0} --step-uri {stepURI} --action PoolingEnter"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
from s4.clarity.scripts import TriggeredStepEPP
from s4.clarity.scripts import UserMessageException
class CreatePoolsOfTwo(TriggeredStepEPP):
def validate_even_number_of_inputs(self):
num_input_analytes = len(self.step.pooling.available_inputs)
if num_input_analytes % 2:
raise UserMessageException("Step must be started with an even number of analytes.")
def validate_single_input_replicate_created(self):
for available_input in self.step.pooling.available_inputs:
if available_input.replicates > 1:
raise UserMessageException("No more than one replicate per analyte allowed in this step")
def on_pooling_enter(self):
self.validate_single_input_replicate_created()
self.validate_even_number_of_inputs()
inputs = [a.input for a in self.step.pooling.available_inputs]
for i in range(0, len(inputs) - 1, 2):
pool_inputs = [
inputs[i],
inputs[i + 1]
]
pool_name = "%s_%s" % (pool_inputs[0].limsid, pool_inputs[1].limsid)
self.step.pooling.create_pool(pool_name, pool_inputs)
self.step.pooling.commit()
if __name__ == "__main__":
CreatePoolsOfTwo.main()
Example DerivedSampleAutomations¶
Set UDF Value¶
Assigns a user provided value to the analyte UDF specified for all selected analytes.
This EPP extends s4.clarity.scripts.DerivedSampleAutomation
and is meant to be triggered from the projects dashboard.
Automation Configuration
bash -c "/opt/gls/clarity/customextensions/env/bin/python
/opt/gls/clarity/customextensions/examples/set_udf_value.py -u {username} -p {password}
-a '{baseURI}v2' -d {derivedSampleLuids} --udfName '{userinput:UDF_Name}' --udfValue '{userinput:UDF_Value}'"
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import argparse
from s4.clarity.scripts import DerivedSampleAutomation
class SetUDFValue(DerivedSampleAutomation):
@classmethod
def add_arguments(cls, argparser):
# type: (argparse) -> None
super(SetUDFValue, cls).add_arguments(argparser)
argparser.add_argument("--udfName",
type=str,
help="The name of the analyte UDF",
required=True)
argparser.add_argument("--udfValue",
type=str,
help="The new value for the UDF",
required=True)
def process_derived_samples(self):
for artifact in self.artifacts:
artifact[self.options.udfName] = self.options.udfValue
self.lims.artifacts.batch_update(self.artifacts)
return "Successfully set UDF '%s' to '%s' for %s derived samples" % \
(self.options.udfName, self.options.udfValue, len(self.artifacts))
if __name__ == "__main__":
SetUDFValue.main()
Shell Scripts¶
Accession Clarity Sample¶
Accessions a new sample into Clarity using the provided container name and project.
This script extends s4.clarity.scripts.ShellScript
and is meant to be executed from the command line.
Example Invocation
python ./accession_clarity_sample.py -u <user name> -p <password> -r https://<clarity_server>/api/v2
--sampleName <Sample Name> --projectName <Project Name> --containerName <Container Name>
# Copyright 2019 Semaphore Solutions, Inc.
# ---------------------------------------------------------------------------
import logging
import argparse
from s4.clarity.scripts import ShellScript
log = logging.getLogger(__name__)
class AccessionClaritySample(ShellScript):
@classmethod
def add_arguments(cls, argparser):
# type: (argparse) -> None
super(AccessionClaritySample, cls).add_arguments(argparser)
argparser.add_argument("--sampleName",
type=str,
help="The name of the sample to create",
required=True)
argparser.add_argument("--projectName",
type=str,
help="The name of an existing Clarity project",
required=True)
argparser.add_argument("--containerName",
type=str,
help="The name of the sample container",
required=True)
def run(self, *args):
projects = self.lims.projects.query(name=self.options.projectName)
if not projects:
raise Exception("Project '%s' does not exist in Clarity" % self.options.projectName)
project = projects[0]
tube_type = self.lims.container_types.get_by_name("Tube")
container = self.lims.containers.new(container_type=tube_type, name=self.options.containerName)
container = self.lims.containers.add(container)
sample = self.lims.samples.new(name=self.options.sampleName, project=project)
sample.set_location_coords(container, 1, 1)
self.lims.samples.add(sample)
log.info("Sample '%s' successfully accessioned in Clarity" % self.options.sampleName)
if __name__ == "__main__":
AccessionClaritySample.main()
Workflow Testing¶
Workflow Run¶
Runs two samples through a three step protocol.
Example Invocation
python ./workflow_run.py -u <user name> -p <password> -r https://<clarity_server>/api/v2
# Copyright 2019 Semaphore Solutions, Inc.
#
# Assumes a single protocol workflow that consists of three steps:
# - QC
# - Pooling
# - Standard
# ---------------------------------------------------------------------------
import logging
from s4.clarity.configuration import Workflow
from s4.clarity.container import Container
from s4.clarity.project import Project
from s4.clarity.sample import Sample
from s4.clarity.scripts import WorkflowTest
from s4.clarity.steputils.placement_utils import auto_place_artifacts
from s4.clarity.steputils.step_runner import StepRunner
log = logging.getLogger(__name__)
NAME_PROTOCOL = "Testing Protocol"
class QCStepRunner(StepRunner):
def __init__(self, lims):
super(QCStepRunner, self).__init__(lims, NAME_PROTOCOL, "QC Step")
def record_details(self):
# Set the QC flag on all output measurements
for output in self.step.details.outputs:
output.qc = True
self.lims.artifacts.batch_update(self.step.details.outputs)
def next_steps(self):
self.step.actions.all_next_step()
self.step.actions.commit()
class PoolingStepRunner(StepRunner):
def __init__(self, lims):
super(PoolingStepRunner, self).__init__(lims, NAME_PROTOCOL, "Pooling Step")
def pooling(self):
# Pool all inputs together
self.step.pooling.create_pool("The Pool", self.step.details.inputs)
self.step.pooling.commit()
def record_details(self):
# Set a step UDF named "Status"
self.step.details['Status'] = "Pooling Complete"
self.step.details.commit()
def next_steps(self):
self.step.actions.all_next_step()
self.step.actions.commit()
class StandardStepRunner(StepRunner):
def __init__(self, lims):
super(StandardStepRunner, self).__init__(lims, NAME_PROTOCOL, "Standard Step")
def placement(self):
auto_place_artifacts(self.step, self.step.details.outputs)
def record_details(self):
# Add all required reagents
self.add_default_reagents()
# Set the value of an analyte UDF on all of the outputs.
for output in self.step.details.outputs:
output["Inspected By"] = self.step.process.technician.last_name
self.lims.artifacts.batch_update(self.step.details.outputs)
def next_steps(self):
self.step.actions.all_next_step()
self.step.actions.commit()
class ExampleWorkflowTest(WorkflowTest):
PROJECT_OWNER_USER_NAME = "admin"
WORKFLOW_NAME = "Testing Workflow"
def get_or_create_project(self, project_name):
# type: (str) -> Project
projects = self.lims.projects.query(name=project_name)
if len(projects) > 0:
log.info("Using existing project %s" % project_name)
return projects[0]
project = self.lims.projects.new(name=project_name)
project.researcher = self.lims.researchers.query(username=self.PROJECT_OWNER_USER_NAME)[0]
# Submit the project back to Clarity
self.lims.projects.add(project)
log.info("Created project %s" % project_name)
return project
def get_workflow(self):
# type: () -> Workflow
workflow = self.lims.workflows.get_by_name(self.WORKFLOW_NAME)
if workflow is None:
raise Exception("Workflow '%s' does not exist in Clarity.")
return workflow
def create_tube(self):
# type: () -> Container
tube_type = self.lims.container_types.query(name="Tube")[0]
container = self.lims.containers.new(container_type=tube_type)
return self.lims.containers.add(container)
def create_sample(self, name):
# type: (str) -> Sample
project = self.get_or_create_project("Today's Project")
sample = self.lims.samples.new(name=name, project=project)
# Set the sample container
container = self.create_tube()
sample.set_location_coords(container, 1, 1)
return self.lims.samples.add(sample)
def run(self, *args):
log.info("Accessioning samples")
samples = [
self.create_sample('Jane'),
self.create_sample('Bob')
]
log.info("Routing sample to beginning of workflow '%s'", self.WORKFLOW_NAME)
workflow = self.get_workflow()
workflow.enqueue([s.artifact for s in samples])
log.info("Running sample through workflow '%s'", self.WORKFLOW_NAME)
input_uris = [s.artifact.uri for s in samples]
qc_step_runner = QCStepRunner(self.lims)
qc_step_runner.run(inputuris=input_uris)
pooling_step_runner = PoolingStepRunner(self.lims)
pooling_step_runner.run(previousstep=qc_step_runner.step)
StandardStepRunner(self.lims).run(previousstep=pooling_step_runner.step)
log.info("Sample successfully pushed through workflow '%s'", self.WORKFLOW_NAME)
if __name__ == "__main__":
ExampleWorkflowTest.main()