Coretex
network_dataset.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, TypeVar, Generic, List, Dict, Any, Type, Union
19 from typing_extensions import Self
20 from datetime import datetime
21 from pathlib import Path
22 from abc import ABC, abstractmethod
23 from contextlib import ExitStack
24 
25 import hashlib
26 import base64
27 import logging
28 
29 from .dataset import Dataset
30 from .state import DatasetState
31 from ..tag import EntityTagType, Taggable
32 from ..sample import NetworkSample
33 from ..utils import isEntityNameValid
34 from ..._folder_manager import folder_manager
35 from ...codable import KeyDescriptor
36 from ...networking import NetworkObject, \
37  fileChunkUpload, networkManager, NetworkRequestError
38 from ...threading import MultithreadedDataProcessor
39 from ...cryptography import aes, getProjectKey
40 from ...utils.file import isArchive, archive
41 
42 
43 SampleType = TypeVar("SampleType", bound = "NetworkSample")
44 NAME_VALIDATION_MESSAGE = ">> [Coretex] Entity name is invalid. Requirements: alphanumeric characters (\"a-z\", and \"0-9\") and dash (\"-\") with length between 3 to 50"
45 MAX_DATASET_NAME_LENGTH = 50
46 
47 
48 def _hashDependencies(dependencies: List[str]) -> str:
49  hash = hashlib.md5()
50  hash.update("".join(sorted(dependencies)).encode())
51 
52  hashString = base64.b64encode(hash.digest()).decode("ascii")
53  hashString = hashString.lower()
54  hashString = hashString.replace("+", "0")
55  hashString = hashString.replace("/", "0")
56  hashString = hashString.replace("=", "0")
57 
58  return hashString
59 
60 
61 def _chunkSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int) -> SampleType:
62  parameters = {
63  "name": sampleName,
64  "dataset_id": datasetId,
65  "file_id": fileChunkUpload(samplePath)
66  }
67 
68  response = networkManager.formData("session/import", parameters)
69  if response.hasFailed():
70  raise NetworkRequestError(response, f"Failed to create sample from \"{samplePath}\"")
71 
72  return sampleType.decode(response.getJson(dict))
73 
74 
75 def _encryptedSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int, key: bytes) -> SampleType:
76  with ExitStack() as stack:
77  if isArchive(samplePath):
78  archivePath = samplePath
79  else:
80  archivePath = stack.enter_context(folder_manager.tempFile())
81  archive(samplePath, archivePath)
82 
83  encryptedPath = stack.enter_context(folder_manager.tempFile())
84  aes.encryptFile(key, archivePath, encryptedPath)
85 
86  return _chunkSampleImport(sampleType, sampleName, encryptedPath, datasetId)
87 
88  # ExitStack is marked as something that can "swallow" exceptions raised
89  # in "with" block. In this case ExitStack does not enter any context
90  # which can swallow the exceptions so this is not reachable, but mypy
91  # is complaning about this and we silence it by raising an exception here
92  raise RuntimeError("Unreachable statement was reached.")
93 
94 
95 class NetworkDataset(Generic[SampleType], Dataset[SampleType], NetworkObject, Taggable, ABC):
96 
97  """
98  Represents the base class for all Dataset classes which are
99  comunicating with Coretex.ai
100 
101  Properties
102  ----------
103  createdOn : datetime
104  creation date of dataset
105  createdById : str
106  id of created dataset id
107  isLocked : bool
108  availabilty of dataset for modifications
109  """
110 
111  projectId: int
112  createdOn: datetime
113  createdById: str
114  isLocked: bool
115  isEncrypted: bool
116  meta: Optional[Dict[str, Any]]
117 
118  def __init__(self, sampleType: Type[SampleType]) -> None:
119  self._sampleType_sampleType = sampleType
120 
121  @property
122  def path(self) -> Path:
123  """
124  Retrieves path of dataset
125 
126  Returns
127  -------
128  Path -> path of dataset
129  """
130 
131  return folder_manager.datasetsFolder / str(self.id)
132 
133  @property
134  def entityTagType(self) -> EntityTagType:
135  return EntityTagType.dataset
136 
137  # Codable overrides
138 
139  @classmethod
140  def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]:
141  descriptors = super()._keyDescriptors()
142 
143  descriptors["projectId"] = KeyDescriptor("project_id")
144  descriptors["samples"] = KeyDescriptor("sessions", NetworkSample, list)
145 
146  return descriptors
147 
148  # NetworkObject overrides
149 
150  @classmethod
151  def _endpoint(cls) -> str:
152  return "dataset"
153 
154  @classmethod
155  def fetchById(cls, objectId: int, **kwargs: Any) -> Self:
156  if "include_sessions" not in kwargs:
157  kwargs["include_sessions"] = 1
158 
159  return super().fetchById(objectId, **kwargs)
160 
161  @classmethod
162  def fetchAll(cls, **kwargs: Any) -> List[Self]:
163  if "include_sessions" not in kwargs:
164  kwargs["include_sessions"] = 1
165 
166  return super().fetchAll(**kwargs)
167 
168  @classmethod
169  def fetchCachedDataset(cls, dependencies: List[str]) -> Self:
170  """
171  Fetches cached dataset if it exists
172 
173  Parameters
174  ----------
175  dependencies : List[str]
176  Parameters on which the cached dataset depends
177 
178  Returns
179  -------
180  Self -> Fetched dataset object
181 
182  Raises
183  ------
184  ValueError -> If dataset doesn't exist
185  """
186 
187  return super().fetchOne(
188  name = _hashDependencies(dependencies),
189  include_sessions = 1
190  )
191 
192  # Dataset methods
193 
194  @classmethod
196  cls,
197  name: str,
198  projectId: int,
199  meta: Optional[Dict[str, Any]] = None
200  ) -> Self:
201 
202  """
203  Creates a new dataset with the provided name and type
204 
205  Parameters
206  ----------
207  name : str
208  dataset name
209  projectId : int
210  project for which the dataset will be created
211 
212  Returns
213  -------
214  The created dataset object or None if creation failed
215 
216  Raises
217  ------
218  ValueError -> If name is invalid
219  NetworkRequestError -> If dataset creation failed
220 
221  Example
222  -------
223  >>> from coretex import NetworkDataset
224  \b
225  >>> dummyDataset = NetworkDataset.createDataset("dummyDataset", 123)
226  """
227 
228  if not isEntityNameValid(name):
229  raise ValueError(NAME_VALIDATION_MESSAGE)
230 
231  return cls.create(
232  name = name,
233  project_id = projectId,
234  meta = meta
235  )
236 
237  @classmethod
238  def generateCacheName(cls, prefix: str, dependencies: List[str]) -> str:
239  """
240  Generated dataset name based on the dependencies
241 
242  Parameters
243  ----------
244  prefix : str
245  prefix to which the dependency hash will be appended
246  dependencies : List[str]
247  parameters which affect the contents of the cache
248 
249  Returns
250  -------
251  str -> prefix with hash generated based on dependencies appended
252  """
253 
254  if MAX_DATASET_NAME_LENGTH - len(prefix) < 8:
255  raise ValueError(f"Dataset prefix \"{prefix}\" is too long. Max allowed size is \"{MAX_DATASET_NAME_LENGTH - 8}\".")
256 
257  suffix = _hashDependencies(dependencies)
258  name = f"{prefix}-{suffix}"
259 
260  if len(name) > MAX_DATASET_NAME_LENGTH:
261  name = name[:MAX_DATASET_NAME_LENGTH]
262 
263  return name
264 
265  @classmethod
266  def createCacheDataset(cls, prefix: str, dependencies: List[str], projectId: int) -> Self:
267  """
268  Creates a dataset used for caching results of tasks
269  Used to avoid repeating expensive and long calculations
270 
271  Parameters
272  ----------
273  prefix : str
274  prefix of the cache dataset
275  dependencies : List[str]
276  parameters which affect the contents of the cache
277  projectId : int
278  project for which the dataset will be created
279 
280  Returns
281  -------
282  The created dataset object
283 
284  Raises
285  ------
286  ValueError -> If prefix of the name is invalid or if failed to create cache dataset
287 
288  Example
289  -------
290  >>> from coretex import NetworkDataset
291  \b
292  >>> dependencies = [str(projectId), str(index), str(parameter)]
293  >>> dummyDataset = NetworkDataset.createCacheDataset("dummyDataset", dependencies, 123)
294  """
295 
296  if not isEntityNameValid(prefix):
297  raise ValueError(NAME_VALIDATION_MESSAGE)
298 
299  dataset = cls.createDatasetcreateDataset(cls.generateCacheNamegenerateCacheName(prefix, dependencies), projectId)
300  if dataset is None:
301  raise ValueError(f"Failed to create cache dataset with prefix \"{prefix}\"")
302 
303  return dataset
304 
305  def finalize(self) -> bool:
306  """
307  Finalizes state of Coretex dataset
308 
309  Example
310  -------
311  >>> from coretex import CustomDataset
312  \b
313  >>> dummyDataset = CustomDataset.createDataset("dummyDataset", 123)
314  >>> dummyDataset.finalize()
315  """
316 
317  return self.update(name = self.namename, state = DatasetState.final)
318 
319  def _linkSamplePath(self, samplePath: Path) -> None:
320  linkPath = self.pathpathpath / samplePath.name
321  if linkPath.exists():
322  linkPath.unlink()
323 
324  samplePath.link_to(linkPath)
325 
326  def download(self, decrypt: bool = True, ignoreCache: bool = False) -> None:
327  """
328  Downloads dataset from Coretex
329 
330  Parameters
331  ----------
332  ignoreCache : bool
333  if dataset is already downloaded and ignoreCache
334  is True it will be downloaded again (not required)
335 
336  Example
337  -------
338  >>> from coretex import NetworkDataset
339  \b
340  >>> dummyDataset = NetworkDataset.fetchById(1023)
341  >>> dummyDataset.download()
342  """
343 
344  self.pathpathpath.mkdir(exist_ok = True)
345 
346  def sampleDownloader(sample: SampleType) -> None:
347  sample.download(decrypt, ignoreCache)
348 
349  if sample.downloadPath.exists():
350  self._linkSamplePath_linkSamplePath(sample.downloadPath)
351 
352  if sample.zipPath.exists():
353  self._linkSamplePath_linkSamplePath(sample.zipPath)
354 
355  logging.getLogger("coretexpylib").info(f"\tDownloaded \"{sample.name}\"")
356 
357  processor = MultithreadedDataProcessor(
358  self.samples,
359  sampleDownloader,
360  message = f"Downloading dataset \"{self.name}\"..."
361  )
362 
363  processor.process()
364 
365  def rename(self, name: str) -> bool:
366  if not isEntityNameValid(name):
367  raise ValueError(NAME_VALIDATION_MESSAGE)
368 
369  success = self.update(name = name)
370 
371  if success:
372  return super().rename(name)
373 
374  return success
375 
376  @abstractmethod
377  def _uploadSample(self, samplePath: Path, sampleName: str, **metadata: Any) -> SampleType:
378  # Override in data specific classes (ImageDataset, SequenceDataset, etc...)
379  # to implement a specific way of uploading samples
380  pass
381 
382  def add(self, samplePath: Union[Path, str], sampleName: Optional[str] = None, **metadata: Any) -> SampleType:
383  """
384  Uploads the provided archive (.zip, .tar.gz) as Sample to
385  Coretex.ai as a part of this Dataset.
386 
387  Parametrs
388  ---------
389  path : Union[Path, str]
390  path to data which will be uploaded
391 
392  Returns
393  -------
394  SampleType -> created Sample
395  """
396 
397  if isinstance(samplePath, str):
398  samplePath = Path(samplePath)
399 
400  if sampleName is None:
401  sampleName = samplePath.stem
402 
403  if self.isEncrypted:
404  sample = _encryptedSampleImport(self._sampleType_sampleType, sampleName, samplePath, self.id, getProjectKey(self.projectId))
405  else:
406  sample = self._uploadSample_uploadSample(samplePath, sampleName, **metadata)
407 
408  # Append the newly created sample to the list of samples
409  self.samples.append(sample)
410 
411  return sample
Self fetchCachedDataset(cls, List[str] dependencies)
None download(self, bool decrypt=True, bool ignoreCache=False)
str generateCacheName(cls, str prefix, List[str] dependencies)
SampleType add(self, Union[Path, str] samplePath, Optional[str] sampleName=None, **Any metadata)
Self createCacheDataset(cls, str prefix, List[str] dependencies, int projectId)
Self createDataset(cls, str name, int projectId, Optional[Dict[str, Any]] meta=None)
SampleType _uploadSample(self, Path samplePath, str sampleName, **Any metadata)