Source code for tool.tb_full_mapping

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

from __future__ import print_function
import sys

from os import path
from os import unlink

from past.builtins import basestring  # pylint: disable=redefined-builtin

from basic_modules.tool import Tool

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import FILE_IN, FILE_OUT, IN
    from pycompss.api.task import task
    # from pycompss.api.constraint import constraint
    from pycompss.api.api import compss_wait_on
except ImportError:
    logger.info("[Warning] Cannot import \"pycompss\" API packages.")
    logger.info("          Using mock decorators.")

    from utils.dummy_pycompss import FILE_IN, FILE_OUT, IN  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task  # pylint: disable=ungrouped-imports
    # from utils.dummy_pycompss import constraint # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import compss_wait_on  # pylint: disable=ungrouped-imports

from pytadbit.mapping.mapper import full_mapping  # pylint: disable=import-error
from pytadbit.utils.fastq_utils import quality_plot  # pylint: disable=import-error


# ------------------------------------------------------------------------------

[docs]class tbFullMappingTool(Tool): # pylint: disable=invalid-name """ Tool for mapping fastq paired end files to the GEM index files """ def __init__(self): """ Init function """ logger.info("TADbit full_mapping") Tool.__init__(self)
[docs] @task( gem_file=FILE_IN, fastq_file=FILE_IN, windows=IN, window1=FILE_OUT, window2=FILE_OUT, window3=FILE_OUT, window4=FILE_OUT) def tb_full_mapping_iter( # pylint: disable=too-many-locals,too-many-statements,unused-argument,no-self-use,too-many-arguments self, gem_file, fastq_file, windows, window1, window2, window3, window4, ncpus=1, workdir='/tmp/'): """ Function to map the FASTQ files to the GEM file over different window sizes ready for alignment Parameters ---------- gem_file : str Location of the genome GEM index file fastq_file_bgd : str Location of the FASTQ file windows : list List of lists with the window sizes to be computed window1 : str Location of the first window index file window2 : str Location of the second window index file window3 : str Location of the third window index file window4 : str Location of the fourth window index file Returns ------- window1 : str Location of the first window index file window2 : str Location of the second window index file window3 : str Location of the third window index file window4 : str Location of the fourth window index file """ logger.info("tb_full_mapping_iter") output_dir = workdir _ = full_mapping( gem_file, fastq_file, output_dir, windows=windows, frag_map=False, nthreads=ncpus, clean=True, temp_dir=workdir ) return True
[docs] @task( gem_file=FILE_IN, fastq_file=FILE_IN, enzyme_name=IN, windows=IN, full_file=FILE_OUT, frag_file=FILE_OUT) def tb_full_mapping_frag( # pylint: disable=too-many-locals,too-many-statements,unused-argument,no-self-use,too-many-arguments self, gem_file, fastq_file, enzyme_name, windows, full_file, frag_file, ncpus=1, workdir='/tmp/'): """ Function to map the FASTQ files to the GEM file based on fragments derived from the restriction enzyme that was used. Parameters ---------- gem_file : str Location of the genome GEM index file fastq_file_bgd : str Location of the FASTQ file enzyme_name : str Restriction enzyme name (MboI) windows : list List of lists with the window sizes to be computed window_file : str Location of the first window index file Returns ------- window_file : str Location of the window index file """ logger.info("tb_full_mapping_frag") # od_loc = fastq_file.split("/") # output_dir = "/".join(od_loc[0:-1]) output_dir = workdir logger.info("TB MAPPING - output_dir:", output_dir) logger.info("TB MAPPING - full_file dir:", full_file) logger.info("TB MAPPING - frag_file dir:", frag_file) gzipped = '' dsrc = '' if fastq_file.endswith('.fastq.gz') or fastq_file.endswith('.fq.gz'): gzipped = '.gz' if fastq_file.endswith('.fastq.dsrc'): dsrc = '.dsrc' file_name = path.basename(fastq_file) file_name = file_name.replace('.fastq'+dsrc+gzipped, '') file_name = file_name.replace('.fq'+dsrc+gzipped, '') fastq_file_tmp = workdir+'/'+file_name _ = full_mapping( gem_file, fastq_file, output_dir, r_enz=enzyme_name, windows=windows, frag_map=True, nthreads=ncpus, clean=True, temp_dir=workdir ) with open(full_file, "wb") as f_out: with open(fastq_file_tmp + dsrc + "_full_1-end.map", "rb") as f_in: f_out.write(f_in.read()) if path.isfile(fastq_file_tmp + dsrc + "_full_1-end.map"): unlink(fastq_file_tmp + dsrc + "_full_1-end.map") with open(frag_file, "wb") as f_out: with open(fastq_file_tmp + dsrc + "_frag_1-end.map", "rb") as f_in: f_out.write(f_in.read()) if path.isfile(fastq_file_tmp + dsrc + "_frag_1-end.map"): unlink(fastq_file_tmp + dsrc + "_frag_1-end.map") return True
[docs] def run(self, input_files, input_metadata, output_files): # pylint: disable=too-many-locals,too-many-branches,too-many-statements,no-self-use,unused-argument """ The main function to map the FASTQ files to the GEM file over different window sizes ready for alignment Parameters ---------- input_files : list gem_file : str Location of the genome GEM index file fastq_file_bgd : str Location of the FASTQ file metadata : dict windows : list List of lists with the window sizes to be computed enzyme_name : str Restriction enzyme used [OPTIONAL] Returns ------- output_files : list List of locations for the output files. output_metadata : list List of matching metadata dict objects """ gem_file = input_files[0] fastq_file = input_files[1] windows = input_metadata['windows'] if not windows: windows = None if 'ncpus' not in input_metadata: input_metadata['ncpus'] = 8 if 'iterative_mapping' in input_metadata: if isinstance(input_metadata['iterative_mapping'], basestring): frag_base = not input_metadata['iterative_mapping'].lower() in ("yes", "true", "t", "1") # pylint: disable=line-too-long else: frag_base = not input_metadata['iterative_mapping'] else: frag_base = (windows is None) if 'workdir' in input_metadata: root_path = input_metadata['workdir'] else: root_path = path.dirname(path.abspath(fastq_file)) gzipped = '' if fastq_file.lower().endswith('.fastq.gz') or fastq_file.lower().endswith('.fq.gz'): gzipped = '.gz' file_name = path.basename(fastq_file) file_name = (file_name.replace('.fastq'+gzipped, '')).replace('.FASTQ'+gzipped, '') file_name = (file_name.replace('.fq'+gzipped, '')).replace('.FQ'+gzipped, '') quality_plot_file = '' log_path = '' # name = root_name[-1] if 'quality_plot' in input_metadata: quality_plot_file = 'QC-plot_'+file_name + '.png' log_path = root_path+'/'+'mapping_log_'+file_name + '.txt' dangling_ends, ligated = quality_plot( fastq_file, r_enz=input_metadata['enzyme_name'], nreads=100000, paired=False, savefig=path.join( root_path, quality_plot_file)) orig_stdout = sys.stdout f_handler = open(log_path, "w") sys.stdout = f_handler logger.info('Hi-C QC plot') for renz in dangling_ends: logger.info(' - Dangling-ends (sensu-stricto): ', dangling_ends[renz]) for renz in ligated: logger.info(' - Ligation sites: ', ligated[renz]) sys.stdout = orig_stdout f_handler.close() file_name = root_path+'/'+file_name output_metadata = {} # input and output share most metadata if frag_base: full_file = file_name + "_full.map" frag_file = file_name + "_frag.map" results = self.tb_full_mapping_frag( gem_file, fastq_file, input_metadata['enzyme_name'], None, full_file, frag_file, ncpus=input_metadata['ncpus'], workdir=root_path ) results = compss_wait_on(results) output_metadata['func'] = 'frag' return_files = [full_file, frag_file] if 'quality_plot' in input_metadata: return_files += [log_path, root_path+'/'+quality_plot_file] return (return_files, output_metadata) window2 = window3 = window4 = None window1 = file_name + "_full_" + str(windows[0][0]) + "-" + str(windows[0][1]) + ".map" if len(windows) > 1: window2 = file_name + "_full_" + str(windows[1][0]) + "-" + str(windows[1][1]) + ".map" if len(windows) > 2: window3 = file_name + "_full_" + str(windows[2][0]) + "-" + str(windows[2][1]) + ".map" if len(windows) > 3: window4 = file_name + "_full_" + str(windows[3][0]) + "-" + str(windows[3][1]) + ".map" results = self.tb_full_mapping_iter( gem_file, fastq_file, windows, window1, window2, window3, window4, ncpus=input_metadata['ncpus'], workdir=root_path ) results = compss_wait_on(results) output_metadata['func'] = 'iter' return_files = [window1, window2, window3, window4] if 'quality_plot' in input_metadata: return_files += [log_path, root_path+'/'+quality_plot_file] return (return_files, output_metadata)
# ------------------------------------------------------------------------------