Coretex
utils.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import List
19 from pathlib import Path
20 from zipfile import ZipFile
21 
22 import logging
23 import shutil
24 import gzip
25 
26 from ..._folder_manager import folder_manager
27 from ...entities import TaskRun, CustomSample, CustomDataset
28 from ...networking import NetworkRequestError
29 
30 
31 def createSample(name: str, dataset: CustomDataset, path: Path, taskRun: TaskRun, stepName: str, retryCount: int = 0) -> CustomSample:
32  try:
33  sample = dataset.add(path, name)
34  except NetworkRequestError:
35  if retryCount < 3:
36  logging.info(f">> [Coretex] Retry count: {retryCount}")
37  return createSample(name, dataset, path, taskRun, stepName, retryCount + 1)
38 
39  raise
40 
41  taskRun.createQiimeArtifact(f"{stepName}/{name}", path)
42 
43  return sample
44 
45 
46 def compressGzip(source: Path, destination: Path, deleteSource: bool = False) -> None:
47  logging.info(f"{source} -> {destination}")
48 
49  with gzip.open(destination, "w") as destinationFile:
50  destinationFile.write(source.read_bytes())
51 
52  if deleteSource:
53  source.unlink()
54 
55 
56 def sampleNumber(sample: CustomSample) -> int:
57  return int(sample.name.split("-")[0])
58 
59 
60 def isFastqMPSample(sample: CustomSample) -> bool:
61  sample.unzip()
62 
63  sequenceFileNames = ["forward.fastq", "forward.fastq.gz", "sequences.fastq", "sequences.fastq.gz"]
64  barcodesFileNames = ["barcodes.fastq", "barcodes.fastq.gz"]
65 
66  folderContent = list(sample.load().folderContent)
67 
68  sequenceFilePresent = any([path.name in sequenceFileNames for path in folderContent])
69  barcodesFilePresent = any([path.name in barcodesFileNames for path in folderContent])
70 
71  return sequenceFilePresent and barcodesFilePresent
72 
73 
74 def isFastqDPSample(sample: CustomSample) -> bool:
75  sample.unzip()
76 
77  return any([path.suffix == ".fastq" for path in sample.load().folderContent])
78 
79 
80 def isImportedSample(sample: CustomSample) -> bool:
81  sample.unzip()
82 
83  sampleContent = [path.name for path in sample.load().folderContent]
84  return "multiplexed-sequences.qza" in sampleContent
85 
86 
87 def isDemultiplexedSample(sample: CustomSample) -> bool:
88  sample.unzip()
89 
90  sampleContent = [path.name for path in sample.load().folderContent]
91  return "demux.qza" in sampleContent
92 
93 
94 def isDenoisedSample(sample: CustomSample) -> bool:
95  sample.unzip()
96 
97  sampleContent = [path.name for path in sample.load().folderContent]
98  return (
99  "table.qza" in sampleContent and
100  "rep-seqs.qza" in sampleContent and
101  "stats.qza" in sampleContent
102  )
103 
104 
105 def isPhylogeneticTreeSample(sample: CustomSample) -> bool:
106  sample.unzip()
107 
108  sampleContent = [path.name for path in sample.load().folderContent]
109  return (
110  "rooted-tree.qza" in sampleContent and
111  "unrooted-tree.qza" in sampleContent and
112  "aligned-rep-seqs.qza" in sampleContent and
113  "masked-aligned-rep-seqs.qza" in sampleContent
114  )
115 
116 
117 def getFastqMPSamples(dataset: CustomDataset) -> List[CustomSample]:
118  # Multiplexed fastq data
119  return dataset.getSamples(isFastqMPSample)
120 
121 
122 def getFastqDPSamples(dataset: CustomDataset) -> List[CustomSample]:
123  # Demultiplexed fastq data
124  return dataset.getSamples(isFastqDPSample)
125 
126 
127 def getImportedSamples(dataset: CustomDataset) -> List[CustomSample]:
128  return dataset.getSamples(isImportedSample)
129 
130 
131 def getDemuxSamples(dataset: CustomDataset) -> List[CustomSample]:
132  return dataset.getSamples(isDemultiplexedSample)
133 
134 
135 def getDenoisedSamples(dataset: CustomDataset) -> List[CustomSample]:
136  return dataset.getSamples(isDenoisedSample)
137 
138 
139 def getPhylogeneticTreeSamples(dataset: CustomDataset) -> List[CustomSample]:
140  return dataset.getSamples(isPhylogeneticTreeSample)
141 
142 
143 def getMetadata(sample: CustomSample) -> Path:
144  metadataPathList = list(sample.path.glob("*.tsv"))
145  if len(metadataPathList) != 1:
146  raise RuntimeError(f">> [Coretex] Metadata sample must contain one .tsv file. Found {len(metadataPathList)}")
147 
148  return metadataPathList[0]
149 
150 
151 def isPairedEnd(sample: CustomSample) -> bool:
152  # In order to determine whether we are dealing with paired-end
153  # sequences, this function unzips the qiime artifact and
154  # reads the metadata, looking for the second (type) row, which will have
155  # "PairedEnd" somewhere if it's paired-end
156 
157  sampleTemp = folder_manager.createTempFolder("qzaSample")
158  qzaPath = list(sample.path.iterdir())[0]
159 
160  with ZipFile(qzaPath, "r") as qzaFile:
161  qzaFile.extractall(sampleTemp)
162 
163  metadataPath = list(sampleTemp.rglob("*metadata.yaml"))[0]
164 
165  with metadataPath.open("r") as metadata:
166  pairedEnd = "PairedEnd" in metadata.readlines()[1]
167 
168  shutil.rmtree(sampleTemp)
169 
170  return pairedEnd