"""
.. See the NOTICE file distributed with this work for additional information
regarding copyright ownership.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import os
import sys
import subprocess
import pysam
from utils import logger
try:
if hasattr(sys, '_run_from_cmdl') is True:
raise ImportError
from pycompss.api.parameter import FILE_IN, FILE_OUT, FILE_INOUT, IN
from pycompss.api.task import task
from pycompss.api.api import barrier, compss_delete_file
except ImportError:
logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
logger.warn(" Using mock decorators.")
from utils.dummy_pycompss import FILE_IN, FILE_OUT, FILE_INOUT, IN # pylint: disable=ungrouped-imports
from utils.dummy_pycompss import task # pylint: disable=ungrouped-imports
from utils.dummy_pycompss import barrier, compss_delete_file # pylint: disable=ungrouped-imports
# ------------------------------------------------------------------------------
[docs]class bamUtils(object): # pylint: disable=invalid-name
"""
Tool for handling bam files
"""
def __init__(self):
"""
Initialise the tool with its configuration.
"""
logger.info("BAM Utils")
[docs] @staticmethod
def sam_to_bam(sam_file, bam_file):
"""
Function for converting sam files to bam files
"""
# Convert the new sam file into a sorted bam file
cmd_sort = ' '.join([
'samtools sort',
'-O bam',
'-o', bam_file,
sam_file
])
try:
logger.info("CREATE BAM COMMAND: " + cmd_sort)
process = subprocess.Popen(cmd_sort, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info("I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror, cmd_sort))
return False
return True
[docs] @staticmethod
def bam_to_bed(bam_file, bed_file):
"""
Function for converting bam files to bed files
"""
# Convert the new bam file into a bed file
cmd_bamtobed = ' '.join([
'bedtools bamtobed',
'-i', bam_file,
'>', bed_file
])
try:
logger.info("CREATE BAMTOBED COMMAND: " + cmd_bamtobed)
process = subprocess.Popen(cmd_bamtobed, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info("I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror, cmd_bamtobed))
return False
return True
[docs] @staticmethod
def bam_count_reads(bam_file, aligned=False):
"""
Wrapper to count the number of (aligned) reads in a bam file
"""
if aligned:
return pysam.view("-c", "-F", "260", bam_file).strip() # pylint: disable=no-member
return pysam.view("-c", bam_file).strip() # pylint: disable=no-member
[docs] @staticmethod
def bam_paired_reads(bam_file):
"""
Wrapper to test if a bam file contains paired end reads
"""
paired_count = pysam.view("-c", "-f", "1", bam_file).strip() # pylint: disable=no-member
if int(paired_count) > 0:
return True
return False
[docs] @staticmethod
def bam_sort(bam_file):
"""
Wrapper for the pysam SAMtools sort function
Parameters
----------
bam_file : str
Location of the bam file to sort
"""
try:
pysam.sort( # pylint: disable=no-member
"-o", bam_file, "-T", bam_file + "_sort", bam_file)
except (OSError, IOError):
return False
return True
[docs] @staticmethod
def bam_filter(bam_file, bam_file_out, filter_name):
"""
Wrapper for filtering out reads from a bam file
Parameters
----------
bam_file : str
bam_file_out : str
filter : str
One of:
duplicate - Read is PCR or optical duplicate (1024)
supplementary - Reads that are chimeric, fusion or non linearly aligned (2048)
unmapped - Read is unmapped or not the primary alignment (260)
"""
filter_list = {
"duplicate": "1024",
"supplementary": "2048",
"unmapped": "260"
}
# Using samtools directly as pysam.view ignored the '-o' parameter
cmd_view = ' '.join([
"samtools view",
"-b",
"-F", filter_list[filter_name],
"-o", bam_file_out,
bam_file
])
try:
process = subprocess.Popen(cmd_view, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info(
"BAM FILTER - I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror,
"samtools view -b -F {} -o {} {}".format(
filter_list[filter_name], bam_file_out, bam_file)
)
)
return False
return True
[docs] @staticmethod
def bam_merge(*args):
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
"""
logger.info("Merging:")
if isinstance(args[0], list):
final_bam = args[0][0]
tmp_bam = final_bam + "_merge.bam"
pysam.merge("-f", tmp_bam, *args[0]) # pylint: disable=no-member
else:
final_bam = args[0]
tmp_bam = final_bam + "_merge.bam"
pysam.merge("-f", tmp_bam, *args) # pylint: disable=no-member
try:
with open(tmp_bam, "rb") as f_in:
with open(final_bam, "wb") as f_out:
f_out.write(f_in.read())
except (OSError, IOError):
return False
os.remove(tmp_bam)
return True
[docs] @staticmethod
def bam_copy(bam_in, bam_out):
"""
Wrapper function to copy from one bam file to another
Parameters
----------
bam_in : str
Location of the input bam file
bam_out : str
Location of the output bam file
"""
logger.info("Copying: " + bam_in + " - " + bam_out)
try:
with open(bam_in, "rb") as f_in:
with open(bam_out, "wb") as f_out:
f_out.write(f_in.read())
except (OSError, IOError):
return False
return True
[docs] @staticmethod
def bam_index(bam_file, bam_idx_file):
"""
Wrapper for the pysam SAMtools index function
Parameters
----------
bam_file : str
Location of the bam file that is to be indexed
bam_idx_file : str
Location of the bam index file (.bai)
"""
cmd_view = ' '.join([
'samtools index',
'-b',
bam_file,
bam_file + "_tmp.bai"
])
try:
logger.info("INDEX BAM COMMAND: " + cmd_view)
process = subprocess.Popen(cmd_view, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info("I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror, cmd_view))
return False
try:
with open(bam_file + "_tmp.bai", "rb") as f_in:
with open(bam_idx_file, "wb") as f_out:
f_out.write(f_in.read())
except (OSError, IOError):
return False
return True
[docs] @staticmethod
def bam_list_chromosomes(bam_file):
"""
Wrapper to list the chromosome names that are present within the bam file
Parameters
----------
bam_file : str
Location of the bam file
Returns
-------
list
List of the names of the chromosomes that are present in the bam file
"""
bam_file_handle = pysam.AlignmentFile(bam_file, "rb") # pylint: disable=no-member
if "SQ" not in bam_file_handle.header:
return []
chromosome_list = [chromosome["SN"] for chromosome in bam_file_handle.header["SQ"]]
bam_file_handle.close()
return chromosome_list
[docs] @staticmethod
def bam_split(bam_file_in, bai_file, chromosome, bam_file_out): # pylint: disable=unused-argument
"""
Wrapper to extract a single chromosomes worth of reading into a new bam
file
Parameters
----------
bam_file_in : str
Location of the input bam file
bai_file : str
Location of the bam index file. This needs to be in the same directory
as the bam_file_in
chromosome : str
Name of the chromosome whose alignments are to be extracted
bam_file_out : str
Location of the output bam file
"""
# Extract the subsection from the bam file using samtools
cmd_view_1 = ' '.join([
'samtools view',
'-h',
'-o', bam_file_in + ".sam",
bam_file_in,
chromosome
])
# Convert the new sam file into a sorted bam file
cmd_view_2 = ' '.join([
'samtools sort',
'-O bam',
'-o', bam_file_out,
bam_file_in + ".sam"
])
try:
logger.info("EXTRACT SAM COMMAND: " + cmd_view_1)
process = subprocess.Popen(cmd_view_1, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info("I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror, cmd_view_1))
return False
try:
logger.info("CREATE BAM COMMAND: " + cmd_view_2)
process = subprocess.Popen(cmd_view_2, shell=True)
process.wait()
except (OSError, IOError) as msg:
logger.info("I/O error({0}): {1}\n{2}".format(
msg.errno, msg.strerror, cmd_view_2))
return False
os.remove(bam_file_in + ".sam")
return True
[docs] @staticmethod
def bam_stats(bam_file):
"""
Wrapper for the pysam SAMtools flagstat function
Parameters
----------
bam_file : str
Location of the bam file
Returns
-------
list : dict
qc_passed : int
qc_failed : int
description : str
"""
results = pysam.flagstat(bam_file) # pylint: disable=no-member
separate_results = results.strip().split("\n")
return [
{
"qc_passed": int(element[0]),
"qc_failed": int(element[2]),
"description": " ".join(element[3:])
} for element in [row.split(" ") for row in separate_results]
]
[docs]class bamUtilsTask(object): # pylint: disable=invalid-name
"""
Wrappers so that the function above can be used as part of a @task within
COMPSs avoiding the files being copied around the infrastructure too many
times
"""
def __init__(self):
"""
Init function
"""
logger.info("BAM @task Utils")
[docs] @task(returns=list, bam_file=FILE_IN)
def bam_list_chromosomes(self, bam_file): # pylint: disable=no-self-use
"""
Wrapper to get the list of chromosomes in a given bam file
Parameters
----------
bam_file : str
Location of the bam file
Returns
-------
chromosome_list : list
List of the chromosomes in the bam file
"""
bam_handle = bamUtils()
return bam_handle.bam_list_chromosomes(bam_file)
[docs] @task(returns=bool, bam_file=FILE_INOUT)
def bam_sort(self, bam_file): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools sort function
Parameters
----------
bam_file : str
Location of the bam file to sort
"""
bam_handle = bamUtils()
return bam_handle.bam_sort(bam_file)
[docs] @task(returns=bool, bam_file=FILE_IN, bam_file_out=FILE_IN, filter_name=IN)
def bam_filter(self, bam_file, bam_file_out, filter_name): # pylint: disable=no-self-use
"""
Wrapper for filtering out reads from a bam file
Parameters
----------
bam_file : str
bam_file_out : str
filter : str
One of:
duplicate - Read is PCR or optical duplicate (1024)
unmapped - Read is unmapped or not the primary alignment (260)
"""
bam_handle = bamUtils()
return bam_handle.bam_filter(bam_file, bam_file_out, filter_name)
[docs] def bam_merge(self, in_bam_job_files): # pylint: disable=too-many-branches
"""
Wrapper task taking any number of bam files and merging them into a
single bam file.
Parameters
----------
bam_job_files : list
List of the locations of the separate bam files that are to be merged
The first file in the list will be taken as the output file name
"""
merge_round = -1
if len(in_bam_job_files) == 1:
return in_bam_job_files[0]
bam_job_files = [i for i in in_bam_job_files]
cleanup_files = []
# The extra block required is for testing once the merge step for batches
# of 10 has completed, for the generation of the new name and therefore
# submitting the batches that there are actually more jobs to submit.
while True: # pylint: disable=too-many-nested-blocks
merge_round += 1
if len(bam_job_files) > 1:
tmp_alignments = []
if bam_job_files:
while len(bam_job_files) >= 10:
current_list_len = len(bam_job_files)
for i in range(0, current_list_len-9, 10): # pylint: disable=unused-variable
bam_out = bam_job_files[0] + "_merge_" + str(merge_round) + ".bam"
tmp_alignments.append(bam_out)
cleanup_files.append(bam_out)
self.bam_merge_10(
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_job_files.pop(0), bam_out
)
if bam_job_files:
bam_out = bam_job_files[0] + "_merge_" + str(merge_round) + ".bam"
if len(bam_job_files) >= 5:
tmp_alignments.append(bam_out)
cleanup_files.append(bam_out)
self.bam_merge_5(
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_job_files.pop(0), bam_job_files.pop(0), bam_out
)
if bam_job_files:
bam_out = bam_job_files[0] + "_merge_" + str(merge_round) + ".bam"
if len(bam_job_files) == 4:
tmp_alignments.append(bam_out)
cleanup_files.append(bam_out)
self.bam_merge_4(
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_job_files.pop(0), bam_out
)
elif len(bam_job_files) == 3:
tmp_alignments.append(bam_out)
cleanup_files.append(bam_out)
self.bam_merge_3(
bam_job_files.pop(0), bam_job_files.pop(0), bam_job_files.pop(0),
bam_out
)
elif len(bam_job_files) == 2:
tmp_alignments.append(bam_out)
cleanup_files.append(bam_out)
self.bam_merge_2(
bam_job_files.pop(0), bam_job_files.pop(0), bam_out
)
elif len(bam_job_files) == 1:
tmp_alignments.append(bam_job_files[0])
barrier()
bam_job_files = []
bam_job_files = [new_bam for new_bam in tmp_alignments]
else:
break
return_value = self.bam_copy(bam_job_files[0], in_bam_job_files[0])
for tmp_bam_file in cleanup_files:
compss_delete_file(tmp_bam_file)
return return_value
[docs] @task(
returns=bool,
bam_file_1=FILE_IN, bam_file_2=FILE_IN, bam_file_out=FILE_OUT)
def bam_merge_2(self, bam_file_1, bam_file_2, bam_file_out): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
"""
bam_handle = bamUtils()
bam_handle.bam_merge(bam_file_1, bam_file_2)
return bam_handle.bam_copy(bam_file_1, bam_file_out)
[docs] @task(
returns=bool,
bam_file_1=FILE_IN, bam_file_2=FILE_IN, bam_file_3=FILE_IN,
bam_file_out=FILE_OUT)
def bam_merge_3(self, bam_file_1, bam_file_2, bam_file_3, bam_file_out): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_3 : str
Location of the bam file that is to get merged into bam_file_1
"""
bam_handle = bamUtils()
bam_handle.bam_merge([
bam_file_1, bam_file_2, bam_file_3
])
return bam_handle.bam_copy(bam_file_1, bam_file_out)
[docs] @task(
returns=bool,
bam_file_1=FILE_IN, bam_file_2=FILE_IN, bam_file_3=FILE_IN,
bam_file_4=FILE_IN, bam_file_out=FILE_OUT) # pylint: disable=no-self-use,too-many-arguments
def bam_merge_4(self, bam_file_1, bam_file_2, bam_file_3, bam_file_4, bam_file_out):
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_3 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_4 : str
Location of the bam file that is to get merged into bam_file_1
"""
bam_handle = bamUtils()
bam_handle.bam_merge([
bam_file_1, bam_file_2, bam_file_3, bam_file_4
])
return bam_handle.bam_copy(bam_file_1, bam_file_out)
[docs] @task(
returns=bool,
bam_file_1=FILE_IN, bam_file_2=FILE_IN, bam_file_3=FILE_IN,
bam_file_4=FILE_IN, bam_file_5=FILE_IN, bam_file_out=FILE_OUT
) # pylint: disable=no-self-use,too-many-arguments
def bam_merge_5(self, bam_file_1, bam_file_2, bam_file_3, bam_file_4, bam_file_5, bam_file_out):
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_3 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_4 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_5 : str
Location of the bam file that is to get merged into bam_file_1
"""
bam_handle = bamUtils()
bam_handle.bam_merge([
bam_file_1, bam_file_2, bam_file_3, bam_file_4, bam_file_5
])
return bam_handle.bam_copy(bam_file_1, bam_file_out)
[docs] @task(
returns=bool,
bam_file_1=FILE_IN, bam_file_2=FILE_IN, bam_file_3=FILE_IN,
bam_file_4=FILE_IN, bam_file_5=FILE_IN, bam_file_6=FILE_IN,
bam_file_7=FILE_IN, bam_file_8=FILE_IN, bam_file_9=FILE_IN,
bam_file_10=FILE_IN, bam_file_out=FILE_OUT
) # pylint: disable=no-self-use,too-many-arguments
def bam_merge_10(self,
bam_file_1, bam_file_2, bam_file_3, bam_file_4, bam_file_5,
bam_file_6, bam_file_7, bam_file_8, bam_file_9, bam_file_10,
bam_file_out):
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file_1 : str
Location of the bam file to merge into
bam_file_2 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_3 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_4 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_5 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_6 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_7 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_8 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_9 : str
Location of the bam file that is to get merged into bam_file_1
bam_file_10 : str
Location of the bam file that is to get merged into bam_file_1
"""
bam_handle = bamUtils()
bam_handle.bam_merge([
bam_file_1, bam_file_2, bam_file_3, bam_file_4, bam_file_5,
bam_file_6, bam_file_7, bam_file_8, bam_file_9, bam_file_10
])
return bam_handle.bam_copy(bam_file_1, bam_file_out)
[docs] @task(returns=bool, bam_in=FILE_IN, bam_out=FILE_OUT)
def bam_copy(self, bam_in, bam_out): # pylint: disable=no-self-use
"""
Wrapper function to copy from one bam file to another
Parameters
----------
bam_in : str
Location of the input bam file
bam_out : str
Location of the output bam file
"""
bam_handle = bamUtils()
return bam_handle.bam_copy(bam_in, bam_out)
[docs] @task(returns=bool, bam_file=FILE_IN, bam_idx_file=FILE_OUT)
def bam_index(self, bam_file, bam_idx_file): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools merge function
Parameters
----------
bam_file : str
Location of the bam file that is to be indexed
bam_idx_file : str
Location of the bam index file (.bai)
"""
bam_handle = bamUtils()
return bam_handle.bam_index(bam_file, bam_idx_file)
[docs] @task(returns=list, bam_file=FILE_IN)
def bam_stats(self, bam_file): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools flagstat function
Parameters
----------
bam_file : str
Location of the bam file that is to be indexed
bam_idx_file : str
Location of the bam index file (.bai)
"""
bam_handle = bamUtils()
return bam_handle.bam_stats(bam_file)
[docs] @task(returns=bool, bam_file=FILE_IN)
def bam_paired_reads(self, bam_file): # pylint: disable=no-self-use
"""
Wrapper for the pysam SAMtools view function to identify if a bam file
contains paired end reads
Parameters
----------
bam_file : str
Location of the bam file that is to be indexed
Returns
-------
bool
True if the bam file contains paired end reads
"""
bam_handle = bamUtils()
return bam_handle.bam_stats(bam_file)