Coretex
__init__.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import List, Tuple
19 from pathlib import Path
20 
21 import subprocess
22 import os
23 
24 from ...utils import command, logProcessOutput, CommandException
25 from ...entities import CustomDataset
26 from ...logging import LogSeverity
27 
28 
29 def indexCommand(bwaPath: Path, sequencePath: Path, prefix: Path) -> None:
30  """
31  This function acts as a wrapper for the index command of BWA
32  (Burrows-Wheeler Aligner). It is necessary to perform indexing
33  on the reference sequence before alignment.
34 
35  Parameters
36  ----------
37  bwaPath : Path
38  Path pointing to the bwa binary
39  sequencePath : Path
40  Path pointing to the reference sequence
41  prefix : Path
42  Path serving as the prefix for indexing. This command will
43  generate 5 new files that are needed for alignment. These files will
44  have the path "{prefix}.{suffix}", each with a different suffix / extension
45 
46  Example
47  -------
48  >>> from pathlib import Path
49  >>> bwaPath = Path("tools/bwa")
50  >>> sequencePath = Path("sequences/hg18.fa")
51  >>> prefix = Path("output/hg18")
52  >>> indexCommand(bwaPath, sequencePath, prefix)
53 
54  Link to BWA: https://bio-bwa.sourceforge.net
55  """
56 
57  command([
58  str(bwaPath.absolute()), "index",
59  "-p", str(prefix.absolute()),
60  str(sequencePath.absolute())
61  ])
62 
63 
64 def alignCommand(
65  bwaPath: Path,
66  prefix: Path,
67  sequencePath: Path,
68  outputPath: Path,
69  multithreading: bool = True
70 ) -> None:
71 
72  """
73  This function acts as a wrapper for the mem command of BWA
74  (Burrows-Wheeler Aligner). It perfoms alignment of a given sequence read
75  to the reference sequence and outputs a SAM file.
76 
77  Parameters
78  ----------
79  bwaPath : Path
80  Path pointing to the bwa binary
81  prefix : Path
82  Path that serves as the prefix for the 5 files generated during indexing
83  sequencePath : Path
84  Path pointing to the sequence read file on which alignment should be performed
85  outputPath : Path
86  Path of the output file
87 
88  Example
89  -------
90  >>> from pathlib import Path
91  >>> bwaPath = Path("tools/bwa")
92  >>> prefix = Path("reference/hg18")
93  >>> sequencePath = Path("input/R34D.fastq")
94  >>> outputPath = Path("output/R34D.sam")
95  >>> alignCommand(bwaPath, prefix, sequencePath, outputPath)
96 
97  Link to BWA: https://bio-bwa.sourceforge.net
98  """
99 
100  args = [
101  str(bwaPath.absolute()), "mem",
102  "-o", str(outputPath.absolute())
103  ]
104 
105  if multithreading:
106  threads = os.cpu_count()
107  if threads is not None:
108  args.extend(["-t", str(threads)])
109 
110  args.extend([
111  str(prefix.absolute()),
112  str(sequencePath.absolute())
113  ])
114 
115  command(args, True)
116 
117 
118 def sam2bamCommand(
119  samtoolsPath: Path,
120  samPath: Path,
121  outputPath: Path,
122  multithreading: bool = True
123 ) -> None:
124 
125  """
126  This function uses the CLI tool "samtools" to convert SAM files into their binary
127  version, BAM.
128 
129  Parameters
130  ----------
131  samtoolsPath : Path
132  Path pointing to the samtools binary
133  samPath : Path
134  Path pointing to the input .sam file
135  outputPath : Path
136  Path pointing to the output, .bam, file
137 
138  Example
139  -------
140  >>> from pathlib import Path
141  >>> samtoolsPath = Path("tools/samtools")
142  >>> samPath = Path("input/R34D.sam")
143  >>> outputPath = Path("output/R34D.bam")
144  >>> sam2bamCommand(samtoolsPath, samPath, outputPath)
145 
146  Link to samtools: http://htslib.org/
147  """
148 
149  args = [
150  str(samtoolsPath.absolute()), "view",
151  "-b", "-S"
152  ]
153 
154  if multithreading:
155  threads = os.cpu_count()
156  if threads is not None:
157  args.extend(["--threads", str(threads - 1)])
158 
159  args.extend([
160  "-o", str(outputPath.absolute()),
161  str(samPath.absolute())
162  ])
163 
164  command(args)
165 
166 
167 def extractData(samtoolsPath: Path, file: Path) -> Tuple[List[int], List[int], List[int]]:
168  """
169  Takes an aligned sequence file (SAM/BAM) and returns three lists holding
170  MAPQ scores, leftmost position and sequence length for each read in the
171  file. This is done using the samtools view command.
172 
173  Parameters
174  ----------
175  samtoolsPath : Path
176  Path pointing to the samtools binary
177  samPath : Path
178  Path pointing to the input .sam or .bam file
179 
180  Example
181  -------
182  >>> from pathlib import Path
183  >>> samtoolsPath = Path("tools/samtools")
184  >>> file = Path("R34D.bam")
185  >>> scores, positions, lengths = extractData(samtoolsPath, file)
186 
187  Link to samtools: http://htslib.org/
188  """
189 
190  scores: List[int] = []
191  positions: List[int] = []
192  sequenceLengths: List[int] = []
193 
194  args = [
195  str(samtoolsPath.absolute()),
196  "view",
197  "-F", "256", "-F", "2048", "-F", "512", # Will ignore all duplicate reads
198  str(file.absolute())
199  ]
200 
201  process = subprocess.Popen(
202  args,
203  shell = False,
204  cwd = Path(__file__).parent,
205  stdout = subprocess.PIPE,
206  stderr = subprocess.PIPE
207  )
208 
209  while process.poll() is None:
210  if process.stdout is None:
211  continue
212 
213  stdout = process.stdout.readlines()
214 
215  for line in stdout:
216  if len(line) > 0:
217  fields = line.decode("UTF-8").strip().split("\t")
218  scores.append(int(fields[4]))
219  positions.append(int(fields[3]))
220  sequenceLengths.append(len(fields[9]))
221 
222  if process.stderr is not None:
223  for stderr in process.stderr:
224  if len(stderr) > 0:
225  if process.returncode == 0:
226  logProcessOutput(stderr, LogSeverity.warning)
227  else:
228  logProcessOutput(stderr, LogSeverity.fatal)
229 
230  if process.returncode != 0:
231  raise CommandException(f">> [Coretex] Falied to execute command. Returncode: {process.returncode}")
232 
233  return scores, positions, sequenceLengths
234 
235 
236 def chmodX(file: Path) -> None:
237  command(["chmod", "+x", str(file.absolute())])
238 
239 
240 def loadFa(dataset: CustomDataset) -> List[Path]:
241  inputFiles: List[Path] = []
242 
243  for sample in dataset.samples:
244  sample.unzip()
245 
246  for file in Path(sample.path).iterdir():
247  if file.suffix.startswith(".fa"):
248  inputFiles.append(file)
249 
250  if len(inputFiles) == 0:
251  raise ValueError(">> [Coretex] No sequence reads found")
252 
253  return inputFiles