Source code for tool.bs_seeker_aligner

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

from __future__ import print_function

import os
import shlex
import subprocess
import sys
import tarfile
import shutil

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import FILE_IN, FILE_OUT, IN
    from pycompss.api.task import task
    from pycompss.api.api import barrier, compss_wait_on, compss_open, compss_delete_file
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

    from utils.dummy_pycompss import FILE_IN, FILE_OUT, IN  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import barrier, compss_wait_on, compss_open, compss_delete_file  # pylint: disable=ungrouped-imports

from basic_modules.tool import Tool
from basic_modules.metadata import Metadata

from tool.fastq_splitter import fastq_splitter
from tool.bam_utils import bamUtils
from tool.bam_utils import bamUtilsTask


# ------------------------------------------------------------------------------

[docs]class bssAlignerTool(Tool): # pylint: disable=invalid-name """ Script from BS-Seeker2 for building the index for alignment. In this case it uses Bowtie2. """ def __init__(self, configuration=None): """ Initialise the tool with its configuration. Parameters ---------- configuration : dict a dictionary containing parameters that define how the operation should be carried out, which are specific to each Tool. """ logger.info("BS-Seeker Aligner") Tool.__init__(self) if configuration is None: configuration = {} self.configuration.update(configuration)
[docs] @task( returns=bool, isModifier=False, input_fastq_1=FILE_IN, input_fastq_2=FILE_IN, aligner=IN, aligner_path=IN, bss_path=IN, aln_params=IN, genome_fasta=FILE_IN, genome_idx=FILE_IN, bam_out=FILE_OUT ) def bs_seeker_aligner( self, input_fastq_1, input_fastq_2, aligner, aligner_path, bss_path, aln_params, genome_fasta, genome_idx, bam_out): """ Alignment of the paired ends to the reference genome Generates bam files for the alignments This is performed by running the external program rather than reimplementing the code from the main function to make it easier when it comes to updating the changes in BS-Seeker2 Parameters ---------- input_fastq1 : str Location of paired end FASTQ file 1 input_fastq2 : str Location of paired end FASTQ file 2 aligner : str Aligner to use aligner_path : str Location of the aligner genome_fasta : str Location of the genome FASTA file genome_idx : str Location of the tar.gz genome index file bam_out : str Location of the aligned bam file Returns ------- bam_out : file Location of the BAM file generated during the alignment. """ index_dir = genome_idx.split("/") script = bss_path + "/bs_seeker2-align.py" params = [ "--input_1", input_fastq_1, "--input_2", input_fastq_2, "--aligner", aligner, "--path", aligner_path, "--genome", genome_fasta, "-d", "/".join(index_dir[:-1]), "--bt2-p", "4", "-o", bam_out + "_tmp.bam", "-f", "bam" ] + aln_params results = self.run_aligner(genome_idx, bam_out, script, params) return results
[docs] @task( returns=bool, isModifier=False, input_fastq=FILE_IN, aligner=IN, aligner_path=IN, bss_path=IN, aln_params=IN, genome_fasta=FILE_IN, genome_idx=FILE_IN, bam_out=FILE_OUT) def bs_seeker_aligner_single( self, input_fastq, aligner, aligner_path, bss_path, aln_params, genome_fasta, genome_idx, bam_out): """ Alignment of the paired ends to the reference genome Generates bam files for the alignments This is performed by running the external program rather than reimplementing the code from the main function to make it easier when it comes to updating the changes in BS-Seeker2 Parameters ---------- input_fastq1 : str Location of paired end FASTQ file 1 input_fastq2 : str Location of paired end FASTQ file 2 aligner : str Aligner to use aligner_path : str Location of the aligner genome_fasta : str Location of the genome FASTA file genome_idx : str Location of the tar.gz genome index file bam_out : str Location of the aligned bam file Returns ------- bam_out : file Location of the BAM file generated during the alignment. """ index_dir = genome_idx.split("/") script = bss_path + "/bs_seeker2-align.py" params = [ "-i", input_fastq, "--aligner", aligner, "--path", aligner_path, "--genome", genome_fasta, "-d", "/".join(index_dir[:-1]), "--bt2-p", "4", "-o", bam_out + "_tmp.bam", "-f", "bam" ] + aln_params results = self.run_aligner(genome_idx, bam_out, script, params) return results
[docs] @staticmethod def get_aln_params(params, paired=False): # pylint: disable=too-many-branches """ Function to handle to extraction of commandline parameters and formatting them for use in the aligner for Bowtie2 Parameters ---------- params : dict paired : bool Indicate if the parameters are paired-end specific. [DEFAULT=False] Returns ------- list """ command_params = [] bss_aligner_command_parameters = { # Reduced Representation Bisufite Sequencing "bss_aligner_rrbs_param": ["--rrbs", False], "bss_aligner_rrbs_cutoff_site_param": ["-c", True], "bss_aligner_rrbs_lower_param": ["-L", True], "bss_aligner_rrbs_upper_param": ["-U", True], # General Options "bss_aligner_tag_param": ["-t", True], "bss_aligner_start_base_param": ["-s", True], "bss_aligner_end_base_param": ["-e", True], # "bss_aligner_adapter_param": ["-a", True], # "bss_aligner_adapter_mismatch_param": ["--am", True], "bss_aligner_no_mismatches_param": ["-m", True], "bss_aligner_split_line_param": ["-l", True], } for param in params: if param in bss_aligner_command_parameters: if bss_aligner_command_parameters[param][1]: command_params = command_params + [ bss_aligner_command_parameters[param][0], params[param]] else: if bss_aligner_command_parameters[param][0]: command_params.append(bss_aligner_command_parameters[param][0]) bowtie2_command_parameters = { # Input Options - 11 "bowtie2_interleaved_param": ["--bt2--interleaved", False], "bowtie2_tab5_param": ["--bt2--tab5", False], "bowtie2_tab6_param": ["--bt2--tab6", False], # "bowtie2_qseq_param": ["--bt2--qseq", False], # "bowtie2_read_only_param": ["--bt2-r", True], "bowtie2_skip_1st_n_reads_param": ["--bt2-s", True], "bowtie2_aln_1st_n_reads_param": ["--bt2-u", True], "bowtie2_trim5_param": ["--bt2-5", True], "bowtie2_trim3_param": ["--bt2-3", True], "bowtie2_phred33_param": ["--bt2--phred33", False], "bowtie2_phre64_param": ["--bt2--phred64", False], # Alignment Options - 12 "bowtie2_num_mismatch_param": ["--bt2-N", True], "bowtie2_seed_len_param": ["--bt2-L", True], # "bowtie2_seed_func_param": ["--bt2-i", True], # "bowtie2_ambg_char_func_param": ["--bt2--n-cell", True], "bowtie2_dpads_param": ["--bt2--dpad", True], "bowtie2_gbar_param": ["--bt2--gbar", True], "bowtie2_ignore_quals_param": ["--bt2--ignore-quals", False], "bowtie2_nofw_param": ["--bt2--nofw", False], "bowtie2_norc_param": ["--bt2--norc", False], "bowtie2_no_1mm_upfront_param": ["--bt2--no-1mm-upfront", False], "bowtie2_end_to_end_param": ["--bt2--end-to-end", False], "bowtie2_local_param": ["--bt2--local", False], # Effort Options - 2 "bowtie2_seed_extension_attempts_param": ["--bt2-D", True], "bowtie2_reseed_param": ["--bt2-R", True], # SAM Options - 9 "bowtie2_no_unal_param": ["--bt2--no-unal", False], "bowtie2_no_hd_param": ["--bt2--no-hd", False], "bowtie2_no_sq_param": ["--bt2--no-dq", False], "bowtie2_rg_id_param": ["--bt2--rg-id", True], "bowtie2_rg_param": ["--bt2--rg", True], "bowtie2_omit_sec_seq_param": ["--bt2--omit-sec-seq", False], "bowtie2_soft_clipped_unmapped_tlen_param": [ "--bt2--soft-clipped-unmapped-tlen", False], "bowtie2_sam_no_qname_trunc_param": ["--bt2--sam-no-qname-trunc", False], "bowtie2_xeq_param": ["--bt2--xeq", False], } if paired: # Paired-end Options - 10 bowtie2_command_parameters["bowtie2_min_frag_len_param"] = ["--bt2-I", True] bowtie2_command_parameters["bowtie2_max_frag_len_param"] = ["--bt2-X", True] bowtie2_command_parameters["bowtie2_frrfff_param"] = ["", True] bowtie2_command_parameters["bowtie2_no_mixed_param"] = ["--bt2--no-mixed", False] bowtie2_command_parameters["bowtie2_no_discordant_param"] = [ "--bt2--no-discordant", False] bowtie2_command_parameters["bowtie2_dovetail_param"] = ["--bt2--dovetail", False] bowtie2_command_parameters["bowtie2_no_contain_param"] = ["--bt2--no-contain", False] bowtie2_command_parameters["bowtie2_no_overlap_param"] = ["--bt2--no-overlap", False] for param in params: if param in bowtie2_command_parameters: if bowtie2_command_parameters[param][1] and params[param] != "": command_params = command_params + [ bowtie2_command_parameters[param][0], params[param]] else: if bowtie2_command_parameters[param][0] and params[param] is not False: command_params.append(bowtie2_command_parameters[param][0]) # Scoring Options - 8 if "bowtie2_ma_param" in params and params["bowtie2_ma_param"] != "": command_params = command_params + [ "--ma", str(params["bowtie2_ma_param"])] if ("bowtie2_mp_mx_param" in params and "bowtie2_mp_mn_param" in params and params["bowtie2_mp_mx_param"] != "" and params["bowtie2_mp_mn_param"] != ""): command_params = command_params + [ "--mp", str(params["bowtie2_mp_mx_param"]) + "," + str(params["bowtie2_mp_mn_param"])] if "bowtie2_np_param" in params and params["bowtie2_np_param"] != "": command_params = command_params + [ "--np", str(params["bowtie2_np_param"])] if ("bowtie2_rdg_o_param" in params and "bowtie2_rdg_e_param" in params and params["bowtie2_rdg_o_param"] != "" and params["bowtie2_rdg_e_param"] != ""): command_params = command_params + [ "--rdg", str(params["bowtie2_rdg_o_param"]) + "," + str(params["bowtie2_rdg_e_param"])] if ("bowtie2_rfg_o_param" in params and "bowtie2_rfg_e_param" in params and params["bowtie2_rfg_o_param"] != "" and params["bowtie2_rfg_e_param"] != ""): command_params = command_params + [ "--rfg", str(params["bowtie2_rfg_o_param"]) + "," + str(params["bowtie2_rfg_e_param"])] return command_params
[docs] def run_aligner(self, genome_idx, bam_out, script, params): # pylint: disable=no-self-use """ Run the aligner Parameters ---------- genome_idx : str Location of the genome index archive bam_out : str Location of the output bam file script : str Location of the BS Seeker2 aligner script params : list Parameter list for the aligner Returns ------- bool True if the function completed successfully """ g_dir = genome_idx.split("/") g_dir = "/".join(g_dir[:-1]) params += ["-d", g_dir] untar_idx = True if "no-untar" in self.configuration and self.configuration["no-untar"] is True: untar_idx = False if untar_idx is True: try: tar = tarfile.open(genome_idx) tar.extractall(path=g_dir) tar.close() except (OSError, IOError): logger.fatal("WGBS - BS SEEKER2: Missing index archive") return False command_line = ( "python " + script + " " + " ".join(params) ).format() logger.info("command for aligner : ", command_line) args = shlex.split(command_line) process = subprocess.Popen(args) process.wait() # Using the bamUtils directly as already insite of a @task bam_handler = bamUtils() bam_handler.bam_sort(bam_out + "_tmp.bam") try: with open(bam_out + "_tmp.bam", "rb") as f_in: with open(bam_out, "wb") as f_out: f_out.write(f_in.read()) os.remove(bam_out + "_tmp.bam") except (OSError, IOError): logger.fatal("WGBS - BS SEEKER2: Failed sorting") return False return True
[docs] def run(self, input_files, input_metadata, output_files): # pylint: disable=too-many-locals,too-many-branches,too-many-statements """ Tool for indexing the genome assembly using BS-Seeker2. In this case it is using Bowtie2 Parameters ---------- input_files : list FASTQ file output_files : list Results files. metadata : list Returns ------- array : list Location of the filtered FASTQ file """ tasks_done = 0 task_count = 6 try: if "bss_path" in self.configuration: bss_path = self.configuration["bss_path"] else: raise KeyError if "aligner_path" in self.configuration: aligner_path = self.configuration["aligner_path"] else: raise KeyError if "aligner" in self.configuration: aligner = self.configuration["aligner"] else: raise KeyError except KeyError: logger.fatal("WGBS - BS SEEKER2: Unassigned configuration variables") genome_fasta = input_files["genome"] genome_idx = input_files["index"] sources = [input_files["genome"]] fqs = fastq_splitter() fastq1 = input_files["fastq1"] sources.append(input_files["fastq1"]) logger.progress("FASTQ Splitter", task_id=tasks_done, total=task_count) fastq_file_gz = os.path.join( self.configuration["execution"], os.path.split(fastq1)[1] + ".tar.gz") if "fastq2" in input_files: fastq2 = input_files["fastq2"] sources.append(input_files["fastq2"]) fastq_file_list = fqs.paired_splitter( fastq1, fastq2, fastq_file_gz ) aln_params = self.get_aln_params(self.configuration, True) else: fastq_file_list = fqs.single_splitter( fastq1, fastq_file_gz ) aln_params = self.get_aln_params(self.configuration) # Required to prevent iterating over the future objects fastq_file_list = compss_wait_on(fastq_file_list) if not fastq_file_list: logger.fatal("FASTQ SPLITTER: run failed") return {}, {} if hasattr(sys, '_run_from_cmdl') is True: pass else: with compss_open(fastq_file_gz, "rb") as f_in: with open(fastq_file_gz, "wb") as f_out: f_out.write(f_in.read()) gz_data_path = fastq_file_gz.split("/") gz_data_path = "/".join(gz_data_path[:-1]) try: tar = tarfile.open(fastq_file_gz) tar.extractall(path=gz_data_path) tar.close() os.remove(fastq_file_gz) compss_delete_file(fastq_file_gz) except tarfile.TarError: logger.fatal("Split FASTQ files: Malformed tar file") return {}, {} tasks_done += 1 logger.progress("FASTQ Splitter", task_id=tasks_done, total=task_count) # input and output share most metadata output_metadata = {} output_bam_file = output_files["bam"] output_bai_file = output_files["bai"] logger.progress("ALIGNER - jobs = " + str(len(fastq_file_list)), task_id=tasks_done, total=task_count) output_bam_list = [] for fastq_file_pair in fastq_file_list: logger.info("TMP DIR: " + gz_data_path + "/tmp/") if "fastq2" in input_files: tmp_fq1 = gz_data_path + "/tmp/" + fastq_file_pair[0] tmp_fq2 = gz_data_path + "/tmp/" + fastq_file_pair[1] logger.info("TMP_FQ1: " + fastq_file_pair[0]) logger.info("TMP_FQ2: " + fastq_file_pair[1]) output_bam_file_tmp = tmp_fq1 + ".bam" output_bam_list.append(output_bam_file_tmp) self.bs_seeker_aligner( tmp_fq1, tmp_fq2, aligner, aligner_path, bss_path, aln_params, genome_fasta, genome_idx, output_bam_file_tmp ) else: tmp_fq = gz_data_path + "/tmp/" + fastq_file_pair[0] logger.info("TMP_FQ: " + fastq_file_pair[0]) output_bam_file_tmp = tmp_fq + ".bam" output_bam_list.append(output_bam_file_tmp) self.bs_seeker_aligner_single( tmp_fq, aligner, aligner_path, bss_path, aln_params, genome_fasta, genome_idx, output_bam_file_tmp ) barrier() # Remove all tmp fastq files now that the reads have been aligned # if untar_idx: # for idx_file in index_files: # compss_delete_file(index_files[idx_file]) for fastq_file_pair in fastq_file_list: os.remove(gz_data_path + "/tmp/" + fastq_file_pair[0]) compss_delete_file(gz_data_path + "/tmp/" + fastq_file_pair[0]) if "fastq2" in input_files: os.remove(gz_data_path + "/tmp/" + fastq_file_pair[1]) compss_delete_file(gz_data_path + "/tmp/" + fastq_file_pair[1]) tasks_done += 1 logger.progress("ALIGNER", task_id=tasks_done, total=task_count) bam_handle = bamUtilsTask() logger.progress("Merging bam files", task_id=tasks_done, total=task_count) bam_handle.bam_merge(output_bam_list) tasks_done += 1 logger.progress("Merging bam files", task_id=tasks_done, total=task_count) # Remove all bam files that are not the final file for i in output_bam_list[1:len(output_bam_list)]: try: compss_delete_file(i) os.remove(i) except (OSError, IOError) as msg: logger.warn( "Unable to remove file I/O error({0}): {1}".format( msg.errno, msg.strerror ) ) logger.progress("Sorting merged bam file", task_id=tasks_done, total=task_count) bam_handle.bam_sort(output_bam_list[0]) tasks_done += 1 logger.progress("Sorting merged bam file", task_id=tasks_done, total=task_count) logger.progress("Copying bam file into the output file", task_id=tasks_done, total=task_count) bam_handle.bam_copy(output_bam_list[0], output_bam_file) tasks_done += 1 logger.progress("Copying bam file into the output file", task_id=tasks_done, total=task_count) compss_delete_file(output_bam_list[0]) logger.progress("Creating output bam index file", task_id=tasks_done, total=task_count) bam_handle.bam_index(output_bam_file, output_bai_file) tasks_done += 1 logger.progress("Creating output bam index file", task_id=tasks_done, total=task_count) barrier() shutil.rmtree(gz_data_path + "/tmp") output_metadata = { "bam": Metadata( data_type="data_wgbs", file_type="BAM", file_path=output_bam_file, sources=sources, taxon_id=input_metadata["genome"].taxon_id, meta_data={ "assembly": input_metadata["genome"].meta_data["assembly"], "tool": "bs_seeker_aligner", "parameters": aln_params, "associated_files": [output_bai_file] } ), "bai": Metadata( data_type="data_wgbs", file_type="BAI", file_path=output_bai_file, sources=[input_metadata["genome"].file_path], taxon_id=input_metadata["genome"].taxon_id, meta_data={ "assembly": input_metadata["genome"].meta_data["assembly"], "tool": "bs_seeker_aligner", "associated_master": output_bam_file } ) } return (output_files, output_metadata)
# ------------------------------------------------------------------------------