Coretex
transcriber.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 # from typing import Dict, Any, List, Tuple
19 # from pathlib import Path
20 
21 # import wave
22 # import logging
23 
24 # from deepspeech import Model
25 
26 # import numpy as np
27 
28 # from .token import Token
29 # from .utils import isTxtSample, getAudioFilePath
30 # from .transcription import Transcription
31 # from .text import loadTxtSample
32 # from ..coretex import CustomSample, CustomDataset
33 # from ..folder_management import FolderManager
34 
35 
36 # TranscriptionList = List[Tuple[CustomSample, Transcription]]
37 
38 
39 # def _instantiateModel(modelPath: Path, modelScorerPath: Path) -> Model:
40 # model = Model(str(modelPath))
41 # model.enableExternalScorer(str(modelScorerPath))
42 
43 # model.setScorerAlphaBeta(0.93, 1.18) # 0.93 represents lm_alpha, and 1.18 represents lm_beta
44 # model.setBeamWidth(100) # 100 represents beam_width
45 
46 # return model
47 
48 
49 # def _readAudioFile(path: Path) -> bytes:
50 # with wave.open(str(path), 'rb') as audioFile:
51 # frames = audioFile.getnframes()
52 # buffer = audioFile.readframes(frames)
53 
54 # return buffer
55 
56 
57 # def _hasCachedTranscription(dataset: CustomDataset, meta: Dict[str, Any]) -> bool:
58 # fetchedDatasets = CustomDataset.fetchAll([
59 # f"name={dataset.name}_cached",
60 # "include_sessions=1"
61 # ])
62 
63 # if fetchedDatasets != 1:
64 # print("1", len(fetchedDatasets))
65 # return False
66 
67 # cachedDataset = fetchedDatasets[0]
68 # if cachedDataset.meta is None:
69 # print("2", cachedDataset.meta)
70 # return False
71 
72 # for key, value in meta.items():
73 # if cachedDataset.meta.get(key) != value:
74 # print("3", cachedDataset.meta.get(key), "!=", value)
75 # return False
76 
77 # return True
78 
79 
80 # def _fetchCachedTranscription(dataset: CustomDataset, meta: Dict[str, Any]) -> TranscriptionList:
81 # fetchedDatasets = CustomDataset.fetchAll([f"name={dataset.name}_cached"])
82 # if fetchedDatasets != 1:
83 # raise ValueError(">> [Coretex] Fetched more than one cached dataset")
84 
85 # cachedDataset = fetchedDatasets[0]
86 # if cachedDataset.meta is None:
87 # raise ValueError(">> [Coretex] Dataset.meta field is None")
88 
89 # for key, value in meta.items():
90 # if cachedDataset.meta.get(key) != value:
91 # raise ValueError(">> [Coretex] Dataset.meta field does not match parameters of TaskRun")
92 
93 # cachedDataset.download()
94 
95 # result: TranscriptionList = []
96 
97 # for sample in dataset.samples:
98 # cachedSample = cachedDataset.getSample(f"{sample.name}_cached")
99 # if cachedSample is None:
100 # raise ValueError(f">> [Coretex] {sample.name} has invalid cache")
101 
102 # cachedSample.unzip()
103 # folderContent = list(cachedSample.load().folderContent)
104 
105 # if len(folderContent) != 1:
106 # raise ValueError(f">> [Coretex] {sample.name} has invalid cache")
107 
108 # result.append((sample, Transcription.load(folderContent[0])))
109 
110 # return result
111 
112 
113 # def _cacheTranscription(
114 # dataset: CustomDataset,
115 # transcriptions: TranscriptionList,
116 # meta: Dict[str, Any]
117 # ) -> bool:
118 
119 # logging.info(">> [Coretex] Caching dataset transcription")
120 
121 # cachedDataset = CustomDataset.createDataset(f"{dataset.name}_cached", dataset.projectId, meta = meta)
122 # if cachedDataset is None:
123 # return False
124 
125 # try:
126 # cachedSamplesPath = Path(FolderManager.instance().createTempFolder("cached_samples"))
127 # for sample, transcription in transcriptions:
128 # logging.info(f">> [Coretex] Caching sample: {sample.name}")
129 
130 # cachedSamplePath = transcription.save(cachedSamplesPath / f"{sample.name}.json")
131 
132 # cachedSample = CustomSample.createCustomSample(f"{sample.name}_cached", cachedDataset.id, str(cachedSamplePath))
133 # if cachedSample is None:
134 # raise ValueError
135 # except ValueError:
136 # cachedDataset.delete()
137 # return False
138 
139 # return True
140 
141 
142 # class AudioTranscriber:
143 
144 # """
145 # Transcribes audio dataset into text, and tokens
146 
147 # Properties:
148 # model: Model -> deepspeech model used for transcription
149 # parameters: Dict[str, Any] -> parameters which affect model output,
150 # these parameters are stored as metadata about cached dataset after transcription
151 # has been performed
152 # """
153 
154 # def __init__(self, modelPath: Path, modelScorerPath: Path, parameters: Dict[str, Any]):
155 # self.model = _instantiateModel(modelPath, modelScorerPath)
156 # self.parameters = parameters
157 
158 # def __transcribeSingle(self, sample: CustomSample, batchSize: int) -> Transcription:
159 # logging.info(f">> [Coretex] Transcribing: {sample.name}")
160 
161 # sample.unzip()
162 
163 # if isTxtSample(sample):
164 # return loadTxtSample(sample)
165 
166 # audioFilePath = getAudioFilePath(sample)
167 # if audioFilePath is None:
168 # raise ValueError(f">> [Coretex] {sample.name} does not contain a valid audio file")
169 
170 # stream = self.model.createStream()
171 
172 # buffer = _readAudioFile(audioFilePath)
173 # offset = 0
174 
175 # while offset < len(buffer):
176 # endOffset = offset + batchSize
177 # chunk = buffer[offset:endOffset]
178 
179 # data16 = np.frombuffer(chunk, dtype = np.int16)
180 # stream.feedAudioContent(data16)
181 
182 # text = stream.intermediateDecodeWithMetadata()
183 # normalText = stream.intermediateDecode()
184 
185 # offset = endOffset
186 
187 # return Transcription.create(normalText, Token.fromTokenMetadata(text.transcripts[0].tokens))
188 
189 # def transcribe(
190 # self,
191 # dataset: CustomDataset,
192 # batchSize: int,
193 # ignoreCache: bool = False
194 # ) -> TranscriptionList:
195 
196 # """
197 # Transcribes audio dataset into text and separates
198 # text into tokens.
199 # If text sample is contained inside the dataset
200 # it is also processed and tokenized
201 
202 # Parameters
203 # ----------
204 # dataset : CustomDataset
205 # dataset used for transcription
206 # batchSize : int
207 # size of the chunk extracted for transcription
208 # ignoreCache : bool
209 # if True cached dataset is ignored
210 
211 # Returns
212 # -------
213 # TranscriptionList -> list of tuples, each tuple contains sample and its transcription
214 # """
215 
216 # # TODO: Enable once backend is working
217 # # if not ignoreCache and _hasCachedTranscription(dataset, self.parameters):
218 # # logging.info(f">> [Coretex] Using cached transcription for: {dataset.name}")
219 # # return _fetchCachedTranscription(dataset, self.parameters)
220 
221 # result: TranscriptionList = []
222 
223 # for sample in dataset.samples:
224 # result.append((sample, self.__transcribeSingle(sample, batchSize)))
225 
226 # # TODO: Enable once backend is working
227 # # if _cacheTranscription(dataset, result, self.parameters):
228 # # logging.info(f">> [Coretex] Cached transcription for: {dataset.name}")
229 # # else:
230 # # logging.info(f">> [Coretex] Failed to cache transcription for: {dataset.name}")
231 
232 # return result