Source code for tool.biobambam_filter

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

from __future__ import print_function

import os
import subprocess
import sys

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import FILE_IN, FILE_OUT
    from pycompss.api.task import task
    from pycompss.api.constraint import constraint
    # from pycompss.api.api import compss_wait_on
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

    from utils.dummy_pycompss import FILE_IN, FILE_OUT  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task, constraint  # pylint: disable=ungrouped-imports
    # from utils.dummy_pycompss import compss_wait_on # pylint: disable=ungrouped-imports

from basic_modules.metadata import Metadata
from basic_modules.tool import Tool
from tool.bam_utils import bamUtils
from tool.bam_utils import bamUtilsTask
from tool.common import common

# ------------------------------------------------------------------------------


[docs]class biobambam(Tool): # pylint: disable=invalid-name """ Tool to sort and filter bam files """ def __init__(self, configuration=None): """ Initialise the tool with its configuration. Parameters ---------- configuration : dict a dictionary containing parameters that define how the operation should be carried out, which are specific to each Tool. """ logger.info("BioBamBam2 Filter") Tool.__init__(self) if configuration is None: configuration = {} self.configuration.update(configuration)
[docs] @constraint(ComputingUnits="4") @task(returns=bool, bam_file_in=FILE_IN, bam_file_out=FILE_OUT, isModifier=False) def biobambam_filter_alignments(self, bam_file_in, bam_file_out): # pylint: disable=no-self-use """ Sorts and filters the bam file. It is important that all duplicate alignments have been removed. This can be run as an intermediate step, but should always be run as a check to ensure that the files are sorted and duplicates have been removed. Parameters ---------- bam_file_in : str Location of the input bam file bam_file_out : str Location of the output bam file tmp_dir : str Tmp location for intermediate files during the sorting Returns ------- bam_file_out : str Location of the output bam file """ logger.info("BIOBAMBAM: bam_file_in: " + bam_file_in) logger.info("BIOBAMBAM: bam_file_out: " + bam_file_out) command_line = 'bamsormadup --threads=4 --tmpfile=' + os.path.split(bam_file_in)[0] bam_tmp_marked_out = bam_file_in + '.marked.tmp.bam' bam_tmp_filtered_out = bam_file_in + '.filtered.tmp.bam' logger.info("BIOBAMBAM: command_line: " + command_line) logger.progress("BIOBAMBAM", task_id=0, total=2) try: with open(bam_file_in, "r") as f_in: with open(bam_tmp_marked_out, "w") as f_out: process = subprocess.Popen(command_line, shell=True, stdin=f_in, stdout=f_out) process.wait() except (IOError, OSError) as msg: logger.fatal("I/O error({0}) - bamsormadup: {1}\n{2}".format( msg.errno, msg.strerror, command_line)) return False logger.progress("BIOBAMBAM", task_id=1, total=2) logger.progress("SAMTOOLS REMOVE DUPLICATES", task_id=1, total=2) bam_handle = bamUtils() bam_handle.bam_filter(bam_tmp_marked_out, bam_tmp_filtered_out, "duplicate") logger.progress("SAMTOOLS REMOVE DUPLICATES", task_id=2, total=2) common_handle = common() return_val = common_handle.to_output_file(bam_tmp_filtered_out, bam_file_out, False) if return_val is False: "I/O error: No file - {}".format(bam_tmp_filtered_out) return return_val
[docs] def run(self, input_files, input_metadata, output_files): """ The main function to run BioBAMBAMfilter to remove duplicates and spurious reads from the FASTQ files before analysis. Parameters ---------- input_files : dict List of input bam file locations where 0 is the bam data file metadata : dict Matching meta data for the input files output_files : dict List of output file locations Returns ------- output_files : dict Filtered bam fie. output_metadata : dict List of matching metadata dict objects """ logger.info("BIOBAMBAM FILTER: Ready to run") self.biobambam_filter_alignments(input_files['input'], output_files['output']) bam_handle = bamUtilsTask() bam_handle.bam_index(output_files["output"], output_files["bai"]) logger.info("BIOBAMBAM FILTER: completed") output_metadata = { "bam": Metadata( data_type=input_metadata["input"].data_type, file_type="BAM", file_path=output_files["output"], sources=[input_metadata["input"].file_path], taxon_id=input_metadata["input"].taxon_id, meta_data={ "assembly": input_metadata["input"].meta_data["assembly"], "tool": "biobambam_filter", "associated_files": [output_files["bai"]] } ), "bai": Metadata( data_type=input_metadata['input'].data_type, file_type="BAI", file_path=output_files["bai"], sources=[input_metadata["input"].file_path], taxon_id=input_metadata["input"].taxon_id, meta_data={ "assembly": input_metadata["input"].meta_data["assembly"], "tool": "bs_seeker_aligner", "associated_master": output_files["output"] } ) } return ( {"bam": output_files['output'], "bai": output_files["bai"]}, output_metadata )
# ------------------------------------------------------------------------------