HOWTO - Tools

This document provides a tutorial for the creation of a tool that can be used within a pipeline within the MuG VRE. All functions should be wrapped up as a tool, this then allows for the tools to be easily reused by other pipelines and also deployed onto the compute cluster.

The Tool is the core element when it comes to running a program of function within the COMPSs environment. It defines the procedures that need to happen to prepare the data along with the function that is parallelised to run over the chunks of data provided. A function can be either a piece of code that is written in python or an external package that is run with given chunks of data or defined parameters. The results are then returned to the calling function for merging.

All functions contain at least a run(self) function which is called by the pipeline. The run function takes the input files (list), defined output files (list) and relevant metadata (dict). Returned by the run function is a list containing a list of the output files as the first object and a list of metadata dict objects as the second element.

Repository Structure

All tools should be placed within the tools directory within the package.

Basic Tool

This is a test tool that takes an input file, then counts the number of characters in that file and then prints the result to a second file. The matching code can be found in the GitHub repository mg-process-test. The file is called testTool.py.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
.. License and copyright agreement statement
"""
from __future__ import print_function

from utils import logger

try:
    from pycompss.api.parameter import FILE_IN, FILE_OUT
    from pycompss.api.task import task
    from pycompss.api.api import compss_wait_on
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

    from utils.dummy_pycompss import FILE_IN, FILE_OUT  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import compss_wait_on  # pylint: disable=ungrouped-imports

from basic_modules.tool import Tool
from basic_modules.metadata import Metadata

# ------------------------------------------------------------------------------

class testTool(Tool):
    """
    Tool for writing to a file
    """

    def __init__(self, configuration=None):
        """
        Init function
        """
        print("Test writer")
        Tool.__init__(self)

        if configuration is None:
            configuration = {}

        self.configuration.update(configuration)

    @task(returns=bool, file_in_loc=FILE_IN, file_out_loc=FILE_OUT, isModifier=False)
    def test_writer(self, file_loc):
        """
        Count the number of characters in a file and return a file with the count

        Parameters
        ----------
        file_in_loc : str
            Location of the input file
        file_out_loc : str
            Location of an output file

        Returns
        -------
        bool
            Writes to the file, which is returned by pyCOMPSs to the defined location
        """
        try:
            with open(file_loc, "w") as file_handle:
                file_handle.write("This is the test writer")
        except IOError as error:
            logger.fatal("I/O error({0}): {1}".format(error.errno, error.strerror))
            return False

        return True

    def run(self, input_files, input_metadata, output_files):
        """
        The main function to run the test_writer tool

        Parameters
        ----------
        input_files : dict
            List of input files - In this case there are no input files required
        input_metadata: dict
            Matching metadata for each of the files, plus any additional data
        output_files : dict
            List of the output files that are to be generated

        Returns
        -------
        output_files : dict
            List of files with a single entry.
        output_metadata : dict
            List of matching metadata for the returned files
        """

        results = self.test_writer(
            input_files["input_file_location"],
            output_files["output_file_location"]
        )

        results = compss_wait_on(results)

        if results is False:
            logger.fatal("Test Writer: run failed")
            return {}, {}

        output_metadata = {
            "test": Metadata(
                data_type="<data_type>",
                file_type="txt",
                file_path=output_files["test"],
                sources=[input_metadata["input_file_location"].file_path],
                taxon_id=input_metadata["input_file_location"].taxon_id,
                meta_data={
                    "tool": "testTool"
                }
            )
        }

        return (output_files, output_metadata)

This is this simplest case of a Tool that will run a function within the COMPSS environment. The run function takes the input files, if the output files are defined it can use those as the output locations and any relevant metadata. The locations of the output files can also be defined within the run function as sometimes functions can generate a large number of files that are not always easy to define up front if the Tool is being run as part of the VRE or as part of a larger pipeline.

The run function then calls the test_writer function. This uses the python decorator syntax to highlight that it is a function that can be run in parallel to pyCOMPSs library. The task decorator is used to define the list of files and parameters that need to be passed to the function. It also requires a list of the files a that are to be returned. As such the most common types will be FILE_IN, FILE_OUT, FILE_INOUT.

The __init__ function is important as it loads the configuration parameters into the class from the VRE. In this case there are no parameters used, but these can be parameters required for the tool that has been wrapped by the code.

Decorators can also be used to define the resources that are required by function. They can be used to define a set of machines that the task should be run on, required CPU capacity or the amount of RAM that is required by the task. Defining these parameters helps the COMPSS infrastructure correctly allocate jobs so that they are able to run as soon as the resources allow and prevent the job failing by being run on a machine that does not have the correct resources.

Further details about COMPSS and pyCOMPSs can be found at the BSC website along with specific tutorials about how to write functions that can utilise the full power of COMPSS.

pyCOMPSs within the Tool

When importing the pyCOMPSs modules it is important to provide access to the dummy_pycompss decorators as well. This will allow scripts to be run on computers where COMPSs has not been installed.

Practical Example

Now that we know the basics it is possible to apply this to writing a tool that can run and perform a real operation within the cluster.

Here is a tool that uses BWA to index a genome sequence file that has been saved in FASTA format.

The run function takes the input FASTA file, from this it generates a list of the locations of the output files. The input file and output files are passed to the bwa_indexer function. The files do not need to be listed in the return call so True is fine. COMPSS handles the passing back of the files to the run function. The run function then returns the output files to the pipeline or the VRE.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from __future__ import print_function

import os
import shlex
import shutil
import subprocess
import sys
import tarfile

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import FILE_IN, FILE_OUT
    from pycompss.api.task import task
    from pycompss.api.api import compss_wait_on
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

 from utils.dummy_pycompss import FILE_IN, FILE_OUT # pylint: disable=ungrouped-imports
 from utils.dummy_pycompss import task # pylint: disable=ungrouped-imports
 from utils.dummy_pycompss import compss_wait_on # pylint: disable=ungrouped-imports

from basic_modules.tool import Tool
from basic_modules.metadata import Metadata

# ------------------------------------------------------------------------------

class bwaIndexerTool(Tool):
    """
    Tool for running indexers over a genome FASTA file
    """

    def __init__(self, configuration=None):
        """
        Init function
        """
        print("BWA Indexer")
        Tool.__init__(self)

        if configuration is None:
            configuration = {}

        self.configuration.update(configuration)

    def bwa_index_genome(self, genome_file):
        """
        Create an index of the genome FASTA file with BWA. These are saved
        alongside the assembly file. If the index has already been generated
        then the locations of the files are returned

        Parameters
        ----------
        genome_file : str
            Location of the assembly file in the file system

        Returns
        -------
        amb_file : str
            Location of the amb file
        ann_file : str
            Location of the ann file
        bwt_file : str
            Location of the bwt file
        pac_file : str
            Location of the pac file
        sa_file : str
            Location of the sa file

        """
        command_line = 'bwa index ' + genome_file

        amb_name = genome_file + '.amb'
        ann_name = genome_file + '.ann'
        bwt_name = genome_file + '.bwt'
        pac_name = genome_file + '.pac'
        sa_name = genome_file + '.sa'

        if os.path.isfile(bwt_name) is False:
            args = shlex.split(command_line)
            process = subprocess.Popen(args)
            process.wait()

        return (amb_name, ann_name, bwt_name, pac_name, sa_name)

    @task(file_loc=FILE_IN, idx_out=FILE_OUT)
    def bwa_indexer(self, file_loc, idx_out): # pylint: disable=unused-argument
        """
        BWA Indexer

        Parameters
        ----------
        file_loc : str
            Location of the genome assebly FASTA file
        idx_out : str
            Location of the output index file

        Returns
        -------
        bool
        """
        amb_loc, ann_loc, bwt_loc, pac_loc, sa_loc = self.bwa_index_genome(file_loc)

        # tar.gz the index
        print("BS - idx_out", idx_out, idx_out.replace('.tar.gz', ''))
        idx_out_pregz = idx_out.replace('.tar.gz', '.tar')

        index_dir = idx_out.replace('.tar.gz', '')
        os.mkdir(index_dir)

        idx_split = index_dir.split("/")

        shutil.move(amb_loc, index_dir)
        shutil.move(ann_loc, index_dir)
        shutil.move(bwt_loc, index_dir)
        shutil.move(pac_loc, index_dir)
        shutil.move(sa_loc, index_dir)

        index_folder = idx_split[-1]

        tar = tarfile.open(idx_out_pregz, "w")
        tar.add(index_dir, arcname=index_folder)
        tar.close()

        command_line = 'pigz ' + idx_out_pregz
        args = shlex.split(command_line)
        process = subprocess.Popen(args)
        process.wait()

        return True

    def run(self, input_files, metadata, output_files):
        """
        Function to run the BWA over a genome assembly FASTA file to generate
        the matching index for use with the aligner

        Parameters
        ----------
        input_files : dict
            List containing the location of the genome assembly FASTA file
        meta_data : dict
        output_files : dict
            List of outpout files generated

        Returns
        -------
        output_files : dict
            index : str
                Location of the index file defined in the input parameters
        output_metadata : dict
            index : Metadata
                Metadata relating to the index file
        """
        results = self.bwa_indexer(
            input_files["genome"],
            output_files["index"]
        )
        results = compss_wait_on(results)

        if results is False:
            logger.fatal("BWA Indexer: run failed")
            return {}, {}

        output_metadata = {
            "index": Metadata(
                data_type="sequence_mapping_index_bwa",
                file_type="TAR",
                file_path=output_files["index"],
                sources=[metadata["genome"].file_path],
                taxon_id=metadata["genome"].taxon_id,
                meta_data={
                    "assembly": metadata["genome"].meta_data["assembly"],
                    "tool": "bwa_indexer"
                }
            )
        }

        return (output_files, output_metadata)

# ------------------------------------------------------------------------------

Troubleshooting Common Issues

Program is installed but fails to run

There are several points that need to be checked in this instance:

  1. Is the program available on your $PATH? - If not either add it, or place a symlink in a directory that is.

  2. Check that the command that you are running matches the command run by subprocess - Use the logger.info() to print the command and check that it works.

  3. Subprocess runs commands in a sandbox - The normal way to run subprocess() is to use subprocess.Popen(args) and pass it a list of arguments that represent the command to be run (as shown in the practical example above). Sometimes this fails as extra environment parameters may be required by the program, in this case it is possible to run the whole command as a single string and tell the subprocess to use a shell:

    1
    2
    3
    command_line = "python --version"
    process = subprocess.Popen(command_line, shell=True)
    process.wait()