Source code for tool.kallisto_quant

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

from __future__ import print_function

import shlex
import subprocess
import itertools
import sys
import os

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import IN, FILE_IN, FILE_OUT
    from pycompss.api.task import task
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

    from utils.dummy_pycompss import IN, FILE_IN, FILE_OUT  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task  # pylint: disable=ungrouped-imports

from basic_modules.tool import Tool
from basic_modules.metadata import Metadata

# ------------------------------------------------------------------------------


[docs]class kallistoQuantificationTool(Tool): # pylint: disable=invalid-name """ Tool for quantifying RNA-seq alignments to calculate expression levels of genes within a genome. """ def __init__(self, configuration=None): """ Initialise the tool with its configuration. Parameters ---------- configuration : dict a dictionary containing parameters that define how the operation should be carried out, which are specific to each Tool. """ logger.info("Kallisto Quantification") Tool.__init__(self) if configuration is None: configuration = {} self.configuration.update(configuration)
[docs] @task( cdna_idx_file=FILE_IN, fastq_file_loc=FILE_IN, abundance_h5_file=FILE_OUT, abundance_tsv_file=FILE_OUT, run_info_file=FILE_OUT) def kallisto_quant_single( # pylint: disable=too-many-arguments, too-many-locals self, cdna_idx_file, fastq_file_loc, abundance_h5_file, abundance_tsv_file, run_info_file): """ Kallisto quantifier for single end RNA-seq data Parameters ---------- idx_loc : str Location of the output index file fastq_file_loc : str Location of the FASTQ sequence file Returns ------- wig_file_loc : loc Location of the wig file containing the levels of expression """ fq_stats = self.seq_read_stats(fastq_file_loc) output_dir = os.path.split(fastq_file_loc) std = fq_stats["std"] if std == 0.0: std = 1/fq_stats['mean'] command_line = "kallisto quant -i " + cdna_idx_file + " " command_line += " -o " + output_dir[0] + "/" command_line += " --single -l " + str(fq_stats['mean']) + " " command_line += "-s " + str(std) + " " + fastq_file_loc logger.info("KALLISTO_QUANT COMMAND", command_line) args = shlex.split(command_line) process = subprocess.Popen(args) process.wait() output_files = [ { "in": os.path.join(output_dir[0], "abundance.h5"), "out": abundance_h5_file }, { "in": os.path.join(output_dir[0], "abundance.tsv"), "out": abundance_tsv_file }, { "in": os.path.join(output_dir[0], "run_info.json"), "out": run_info_file } ] for i in output_files: if i["in"] != i["out"]: if os.path.isfile(i["in"]) is True and os.path.getsize(i["in"]) > 0: with open(i["out"], "wb") as f_out: with open(i["in"], "rb") as f_in: f_out.write(f_in.read()) else: with open(i["out"], "w") as f_out: f_out.write("") return True
[docs] @task( fastq_file_loc_01=FILE_IN, fastq_file_loc_02=FILE_IN, cdna_idx_file=FILE_IN, abundance_h5_file=FILE_OUT, abundance_tsv_file=FILE_OUT, run_info_file=FILE_OUT) def kallisto_quant_paired( # pylint: disable=no-self-use,too-many-arguments self, cdna_idx_file, fastq_file_loc_01, fastq_file_loc_02, abundance_h5_file, abundance_tsv_file, run_info_file): """ Kallisto quantifier for paired end RNA-seq data Parameters ---------- idx_loc : str Location of the output index file fastq_file_loc_01 : str Location of the FASTQ sequence file fastq_file_loc_02 : str Location of the paired FASTQ sequence file Returns ------- wig_file_loc : loc Location of the wig file containing the levels of expression """ output_dir = os.path.split(fastq_file_loc_01) command_line = 'kallisto quant -i ' + cdna_idx_file + ' ' command_line += '-o ' + output_dir[0] + "/ " command_line += fastq_file_loc_01 + ' ' + fastq_file_loc_02 args = shlex.split(command_line) process = subprocess.Popen(args) process.wait() output_files = [ { "in": os.path.join(output_dir[0], "abundance.h5"), "out": abundance_h5_file }, { "in": os.path.join(output_dir[0], "abundance.tsv"), "out": abundance_tsv_file }, { "in": os.path.join(output_dir[0], "run_info.json"), "out": run_info_file } ] for i in output_files: print(i) if os.path.isfile(i["in"]) is True and os.path.getsize(i["in"]) > 0: with open(i["out"], "wb") as f_out: with open(i["in"], "rb") as f_in: f_out.write(f_in.read()) else: with open(i["out"], "w") as f_out: f_out.write("") return True
[docs] @staticmethod def load_gff_ensembl(gff_file): """ Function to extract all of the genes and their locations from a GFF file generated by ensembl """ gff_data = {} with open(gff_file, "r") as gff_handle: for gff_line in gff_handle: if gff_line[0] == "#": continue gff_entry = gff_line.split("\t") gff_meta = [i.split("=") for i in gff_entry[8].split(";")] gff_meta_dict = {i[0]: i[1] for i in gff_meta} if "Parent" in gff_meta_dict and "gene" in gff_meta_dict["Parent"]: gene_id = gff_meta_dict["Parent"].split(":")[-1] gff_data[gff_meta_dict["transcript_id"]] = { "chromosome": gff_entry[0], "start": gff_entry[3], "end": gff_entry[4], "strand": gff_entry[6], "gene_id": gene_id } return gff_data
[docs] @staticmethod def load_gff_ucsc(gff_file): """ Function to extract all of the genes and their locations from a GFF file generated by ensembl """ gff_data = {} with open(gff_file, "r") as gff_handle: for gff_line in gff_handle: if gff_line[0] == "#": continue gff_entry = gff_line.strip().split("\t") gff_meta = [i.split("=") for i in gff_entry[8].split(";")] gff_meta_dict = {i[0]: i[1] for i in gff_meta} if "name" in gff_meta_dict: ids = gff_meta_dict["name"].split(" (") gene_id = ids[-1][0:-1].split(" ")[0] gff_data[ids[0]] = { "chromosome": gff_entry[0], "start": gff_entry[3], "end": gff_entry[4], "strand": gff_entry[6], "gene_id": gene_id } return gff_data
[docs] @task( returns=bool, gff_data=IN, abundance_tsv_file=FILE_IN, abundance_bed_file=FILE_OUT) def kallisto_tsv2bed( # pylint: disable=no-self-use self, gff_data, abundance_tsv_file, abundance_bed_file): """ So that the TSV file can be viewed within the genome browser it is handy to convert the file to a BigBed file """ with open(abundance_tsv_file, "r") as tsv_handle: with open(abundance_bed_file, "w") as bed_handle: bed_handle.write("track name=kallisto_quant\n") for tsv_line in tsv_handle: tsv_entry = tsv_line.strip().split("\t") if tsv_entry[0] == "target_id": continue if tsv_entry[0] in gff_data: gene_entry = gff_data[tsv_entry[0]] bed_handle.write( "{}\t{}\t{}\t{}\t{}\t{}\n".format( gene_entry["chromosome"], gene_entry["start"]-1, gene_entry["end"]-1, tsv_entry[0], tsv_entry[4], gene_entry["strand"], ) ) else: logger.warn("{} is not a valid transcript ID".format(tsv_entry[0])) return True
[docs] @task( returns=bool, gff_data=IN, abundance_tsv_file=FILE_IN, abundance_gff_file=FILE_OUT) def kallisto_tsv2gff( # pylint: disable=no-self-use self, gff_data, abundance_tsv_file, abundance_gff_file): """ So that the TSV file can be viewed within the genome browser it is handy to convert the file to a BigBed file """ with open(abundance_tsv_file, "r") as tsv_handle: with open(abundance_gff_file, "w") as gff_handle: gff_handle.write("##gff-version 3\n") for tsv_line in tsv_handle: tsv_entry = tsv_line.strip().split("\t") if tsv_entry[0] == "target_id": continue if tsv_entry[0] in gff_data: gene_entry = gff_data[tsv_entry[0]] gff_handle.write( "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\tID={}\n".format( gene_entry["chromosome"], "Kallisto", "expression", gene_entry["start"], gene_entry["end"], tsv_entry[4], gene_entry["strand"], ".", tsv_entry[0] ) ) else: logger.warn("{} is not a valid transcript ID".format(tsv_entry[0])) return True
[docs] @staticmethod def seq_read_stats(file_in): """ Calculate the mean and standard deviation of the reads in a fastq file Parameters ---------- file_in : str Location of a FASTQ file Returns ------- dict mean : Mean length of sequenced strands std : Standard deviation of lengths of sequenced strands """ from numpy import std from numpy import mean total_len = [] with open(file_in, 'r') as file_handle: forthlines = itertools.islice(file_handle, 1, None, 4) for line in forthlines: line = line.rstrip() total_len.append(len(line)) length_sd = std(total_len) length_mean = mean(total_len) return {'mean': length_mean, 'std': length_sd}
[docs] def run(self, input_files, input_metadata, output_files): """ Tool for calculating the level of expression Parameters ---------- input_files : list Kallisto index file for the FASTQ file for the experiemtnal alignments input_metadata : list Returns ------- array : list First element is a list of the index files. Second element is a list of the matching metadata """ # input and output share most metadata output_metadata = {} if "ensembl" in input_metadata["gff"].meta_data: gff_data = self.load_gff_ensembl(input_files["gff"]) else: gff_data = self.load_gff_ucsc(input_files["gff"]) if "fastq2" not in input_files: self.kallisto_quant_single( input_files["index"], input_files["fastq1"], output_files["abundance_h5_file"], output_files["abundance_tsv_file"], output_files["run_info_file"] ) # results = compss_wait_on(results) elif "fastq2" in input_files: # handle error self.kallisto_quant_paired( input_files["index"], input_files["fastq1"], input_files["fastq2"], output_files["abundance_h5_file"], output_files["abundance_tsv_file"], output_files["run_info_file"] ) # results = compss_wait_on(results) else: return ({}, {}) # self.kallisto_tsv2bed( # gff_data, # output_files["abundance_tsv_file"], # output_files["abundance_bed_file"] # ) self.kallisto_tsv2gff( gff_data, output_files["abundance_tsv_file"], output_files["abundance_gff_file"] ) generic_meta = input_metadata["cdna"].meta_data generic_meta["tool"] = "kallisto_quant" sources = [input_metadata["cdna"].file_path, input_metadata["fastq1"].file_path] if "fastq2" in input_files: sources.append(input_metadata["fastq1"].file_path) output_metadata = { "abundance_h5_file": Metadata( data_type="data_rna_seq", file_type="HDF5", file_path=output_files["abundance_h5_file"], sources=sources, taxon_id=input_metadata["cdna"].taxon_id, meta_data=generic_meta ), "abundance_tsv_file": Metadata( data_type="data_rna_seq", file_type="TSV", file_path=output_files["abundance_tsv_file"], sources=sources, taxon_id=input_metadata["cdna"].taxon_id, meta_data=generic_meta ), # "abundance_bed_file": Metadata( # data_type="data_rna_seq", # file_type="BED", # file_path=output_files["abundance_bed_file"], # sources=sources, # taxon_id=input_metadata["cdna"].taxon_id, # meta_data=generic_meta # ), "abundance_gff_file": Metadata( data_type="data_rna_seq", file_type="GFF3", file_path=output_files["abundance_gff_file"], sources=sources, taxon_id=input_metadata["cdna"].taxon_id, meta_data=generic_meta ), "run_info_file": Metadata( data_type="data_rna_seq", file_type="JSON", file_path=output_files["run_info_file"], sources=sources, taxon_id=input_metadata["cdna"].taxon_id, meta_data=generic_meta ) } return (output_files, output_metadata)
# ------------------------------------------------------------------------------