Source code for tool.fastqreader

#!/usr/bin/env python

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

import os
import errno
import re
import gzip

from utils import logger


[docs]class fastqreader(object): # pylint: disable=too-many-instance-attributes,invalid-name """ Module for reading single end and paired end FASTQ files """ def __init__(self): """ Initialise the module """ self.fastq1 = '' self.fastq2 = '' self.f1_file = None self.f2_file = None self.f1_gz = False self.f2_gz = False self.f1_eof = False self.f2_eof = False self.f1_output_file_loc = None self.f2_output_file_loc = None self.f1_output_file = None self.f2_output_file = None self.output_tag = '' self.output_file_count = 0 self.paired = False
[docs] def openFastQ(self, file1, file2=None): # pylint: disable=invalid-name """ Create file handles for reading the FastQ files Parameters ---------- file1 : str Location of the first FASTQ file file2 : str Location of a paired end FASTQ file. """ self.fastq1 = file1 f1_name = os.path.split(self.fastq1)[1] if f1_name.split(".")[-1] == "gz": self.f1_gz = True if self.f1_gz: self.f1_file = gzip.open(self.fastq1, "rb") else: self.f1_file = open(self.fastq1, "r") self.f1_eof = False if file2 is not None: self.fastq2 = file2 f2_name = os.path.split(self.fastq2)[1] if f2_name.split(".")[-1] == "gz": self.f2_gz = True if self.f2_gz: self.f2_file = gzip.open(self.fastq2, "rb") else: self.f2_file = open(self.fastq2, "r") self.f2_eof = False self.paired = True
[docs] def closeFastQ(self): # pylint: disable=invalid-name """ Close file handles for the FastQ files. """ self.f1_file.close() if self.paired is True: self.f2_file.close()
[docs] def eof(self, side=1): """ Indicate if the end of the file has been reached Parameters ---------- side : int 1 or 2 """ if side == 1: return self.f1_eof if side == 2: return self.f2_eof logger.error("side has value {}. Permitted values are 1 or 2".format(side)) return "ERROR"
[docs] def next(self, side=1): """ Get the next read element for the specific FastQ file pair Parameters ---------- side : int 1 or 2 to get the element from the relevant end (DEFAULT: 1) Returns ------- dict id : str Sequence ID seq : str Called sequence add : str Plus sign score : str Base call score """ read_id = '' read_seq = '' read_addition = '' read_score = '' if side == 1: try: start_posn = self.f1_file.tell() read_id = self.f1_file.readline() read_seq = self.f1_file.readline() read_addition = self.f1_file.readline() read_score = self.f1_file.readline() end_posn = self.f1_file.tell() if read_id == "": raise EOFError except EOFError: self.f1_eof = True return False elif side == 2: try: start_posn = self.f2_file.tell() read_id = self.f2_file.readline() read_seq = self.f2_file.readline() read_addition = self.f2_file.readline() read_score = self.f2_file.readline() end_posn = self.f2_file.tell() if read_id == "": raise EOFError except EOFError: self.f2_eof = True return False else: return 'ERROR' return { 'id': read_id.rstrip(), 'seq': read_seq, 'add': read_addition, 'score': read_score, 'start_posn': start_posn, 'end_posn': end_posn }
[docs] def createOutputFiles(self, tag=''): # pylint: disable=invalid-name """ Create and open the file handles for the output files Parameters ---------- tag : str Tag to identify the output files (DEFAULT: '') """ if tag not in ('', self.output_tag): self.output_tag = tag fq1 = os.path.split(self.fastq1) if self.f1_gz: fq1_suffix = "." + fq1[1].split(".")[-2] + ".gz" new_suffix = ".{}_{}.{}".format( str(self.output_tag), str(self.output_file_count), fq1[1].split(".")[-2]) else: fq1_suffix = "." + fq1[1].split(".")[-1] new_suffix = ".{}_{}.{}".format( str(self.output_tag), str(self.output_file_count), fq1[1].split(".")[-1]) fq1_tmp_file = re.sub(fq1_suffix + '$', new_suffix, fq1[1]) fq1 = os.path.split(os.path.join(fq1[0], "tmp", fq1_tmp_file)) if os.path.isdir(fq1[0]) is False: try: os.mkdir(fq1[0]) except (OSError, IOError) as oserror: if oserror.errno != errno.EEXIST: raise OSError self.f1_output_file = open(os.path.join(fq1[0], fq1[1]), "w") self.f1_output_file_loc = os.path.join(fq1[0], fq1[1]) if self.paired is True: fq2 = os.path.split(self.fastq2) if self.f1_gz: fq2_suffix = "." + fq2[1].split(".")[-2] new_suffix = ".{}_{}{}".format( str(self.output_tag), str(self.output_file_count), fq2[1].split(".")[-2]) else: fq2_suffix = "." + fq2[1].split(".")[-1] new_suffix = ".{}_{}{}".format( str(self.output_tag), str(self.output_file_count), fq2[1].split(".")[-1]) fq2_tmp_file = re.sub(fq2_suffix + '$', new_suffix, fq2[1]) fq2 = os.path.split(os.path.join(fq2[0], "tmp", fq2_tmp_file)) self.f2_output_file = open(os.path.join(fq2[0], fq2[1]), "w") self.f2_output_file_loc = os.path.join(fq2[0], fq2[1]) return (self.f1_output_file_loc, self.f2_output_file_loc) return self.f1_output_file_loc
[docs] def writeOutput(self, read, side=1): # pylint: disable=invalid-name """ Writer to print the extracted lines Parameters ---------- read : dict Read is the dictionary object returned from self.next() side : int The side that the read has coe from (DEFAULT: 1) Returns ------- bool False if a value other than 1 or 2 is entered for the side. """ line = read["id"] + "\n" + read["seq"] + read["add"] + read["score"] if side == 1: self.f1_output_file.write(line) elif side == 2: self.f2_output_file.write(line) else: return False return True
[docs] def closeOutputFiles(self): # pylint: disable=invalid-name """ Close the output file handles """ self.f1_output_file.close() if self.paired is True: self.f2_output_file.close()
[docs] def incrementOutputFiles(self): # pylint: disable=invalid-name """ Increment the counter and create new files for splitting the original FastQ paired end files. """ self.closeOutputFiles() self.output_file_count += 1 self.createOutputFiles(self.output_tag) if self.paired is True: return (self.f1_output_file_loc, self.f2_output_file_loc) return self.f1_output_file_loc