18 from typing
import Optional, TypeVar, Generic, List, Dict, Any, Type, Union
19 from typing_extensions
import Self
20 from datetime
import datetime
21 from pathlib
import Path
22 from abc
import ABC, abstractmethod
23 from contextlib
import ExitStack
29 from .dataset
import Dataset
30 from .state
import DatasetState
31 from ..tag
import EntityTagType, Taggable
32 from ..sample
import NetworkSample
33 from ..utils
import isEntityNameValid
34 from ..._folder_manager
import folder_manager
35 from ...codable
import KeyDescriptor
36 from ...networking
import NetworkObject, \
37 fileChunkUpload, networkManager, NetworkRequestError
38 from ...threading
import MultithreadedDataProcessor
39 from ...cryptography
import aes, getProjectKey
40 from ...utils.file
import isArchive, archive
43 SampleType = TypeVar(
"SampleType", bound =
"NetworkSample")
44 NAME_VALIDATION_MESSAGE =
">> [Coretex] Entity name is invalid. Requirements: alphanumeric characters (\"a-z\", and \"0-9\") and dash (\"-\") with length between 3 to 50"
45 MAX_DATASET_NAME_LENGTH = 50
48 def _hashDependencies(dependencies: List[str]) -> str:
50 hash.update(
"".join(sorted(dependencies)).encode())
52 hashString = base64.b64encode(hash.digest()).decode(
"ascii")
53 hashString = hashString.lower()
54 hashString = hashString.replace(
"+",
"0")
55 hashString = hashString.replace(
"/",
"0")
56 hashString = hashString.replace(
"=",
"0")
61 def _chunkSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int) -> SampleType:
64 "dataset_id": datasetId,
65 "file_id": fileChunkUpload(samplePath)
68 response = networkManager.formData(
"session/import", parameters)
69 if response.hasFailed():
70 raise NetworkRequestError(response, f
"Failed to create sample from \"{samplePath}\"")
72 return sampleType.decode(response.getJson(dict))
75 def _encryptedSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int, key: bytes) -> SampleType:
76 with ExitStack()
as stack:
77 if isArchive(samplePath):
78 archivePath = samplePath
80 archivePath = stack.enter_context(folder_manager.tempFile())
81 archive(samplePath, archivePath)
83 encryptedPath = stack.enter_context(folder_manager.tempFile())
84 aes.encryptFile(key, archivePath, encryptedPath)
86 return _chunkSampleImport(sampleType, sampleName, encryptedPath, datasetId)
92 raise RuntimeError(
"Unreachable statement was reached.")
95 class NetworkDataset(Generic[SampleType], Dataset[SampleType], NetworkObject, Taggable, ABC):
98 Represents the base class for all Dataset classes which are
99 comunicating with Coretex.ai
104 creation date of dataset
106 id of created dataset id
108 availabilty of dataset for modifications
116 meta: Optional[Dict[str, Any]]
118 def __init__(self, sampleType: Type[SampleType]) ->
None:
124 Retrieves path of dataset
128 Path -> path of dataset
131 return folder_manager.datasetsFolder / str(self.id)
134 def entityTagType(self) -> EntityTagType:
135 return EntityTagType.dataset
140 def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]:
141 descriptors = super()._keyDescriptors()
144 descriptors[
"samples"] =
KeyDescriptor(
"sessions", NetworkSample, list)
151 def _endpoint(cls) -> str:
155 def fetchById(cls, objectId: int, **kwargs: Any) -> Self:
156 if "include_sessions" not in kwargs:
157 kwargs[
"include_sessions"] = 1
159 return super().fetchById(objectId, **kwargs)
162 def fetchAll(cls, **kwargs: Any) -> List[Self]:
163 if "include_sessions" not in kwargs:
164 kwargs[
"include_sessions"] = 1
166 return super().fetchAll(**kwargs)
171 Fetches cached dataset if it exists
175 dependencies : List[str]
176 Parameters on which the cached dataset depends
180 Self -> Fetched dataset object
184 ValueError -> If dataset doesn't exist
187 return super().fetchOne(
188 name = _hashDependencies(dependencies),
199 meta: Optional[Dict[str, Any]] =
None
203 Creates a new dataset with the provided name and type
210 project for which the dataset will be created
214 The created dataset object or None if creation failed
218 ValueError -> If name is invalid
219 NetworkRequestError -> If dataset creation failed
223 >>> from coretex import NetworkDataset
225 >>> dummyDataset = NetworkDataset.createDataset("dummyDataset", 123)
228 if not isEntityNameValid(name):
229 raise ValueError(NAME_VALIDATION_MESSAGE)
233 project_id = projectId,
240 Generated dataset name based on the dependencies
245 prefix to which the dependency hash will be appended
246 dependencies : List[str]
247 parameters which affect the contents of the cache
251 str -> prefix with hash generated based on dependencies appended
254 if MAX_DATASET_NAME_LENGTH - len(prefix) < 8:
255 raise ValueError(f
"Dataset prefix \"{prefix}\" is too long. Max allowed size is \"{MAX_DATASET_NAME_LENGTH - 8}\".")
257 suffix = _hashDependencies(dependencies)
258 name = f
"{prefix}-{suffix}"
260 if len(name) > MAX_DATASET_NAME_LENGTH:
261 name = name[:MAX_DATASET_NAME_LENGTH]
268 Creates a dataset used for caching results of tasks
269 Used to avoid repeating expensive and long calculations
274 prefix of the cache dataset
275 dependencies : List[str]
276 parameters which affect the contents of the cache
278 project for which the dataset will be created
282 The created dataset object
286 ValueError -> If prefix of the name is invalid or if failed to create cache dataset
290 >>> from coretex import NetworkDataset
292 >>> dependencies = [str(projectId), str(index), str(parameter)]
293 >>> dummyDataset = NetworkDataset.createCacheDataset("dummyDataset", dependencies, 123)
296 if not isEntityNameValid(prefix):
297 raise ValueError(NAME_VALIDATION_MESSAGE)
301 raise ValueError(f
"Failed to create cache dataset with prefix \"{prefix}\"")
307 Finalizes state of Coretex dataset
311 >>> from coretex import CustomDataset
313 >>> dummyDataset = CustomDataset.createDataset("dummyDataset", 123)
314 >>> dummyDataset.finalize()
317 return self.update(name = self.
namename, state = DatasetState.final)
319 def _linkSamplePath(self, samplePath: Path) ->
None:
320 linkPath = self.
pathpathpath / samplePath.name
321 if linkPath.exists():
324 samplePath.link_to(linkPath)
326 def download(self, decrypt: bool =
True, ignoreCache: bool =
False) ->
None:
328 Downloads dataset from Coretex
333 if dataset is already downloaded and ignoreCache
334 is True it will be downloaded again (not required)
338 >>> from coretex import NetworkDataset
340 >>> dummyDataset = NetworkDataset.fetchById(1023)
341 >>> dummyDataset.download()
344 self.
pathpathpath.mkdir(exist_ok =
True)
346 def sampleDownloader(sample: SampleType) ->
None:
347 sample.download(decrypt, ignoreCache)
349 if sample.downloadPath.exists():
352 if sample.zipPath.exists():
355 logging.getLogger(
"coretexpylib").info(f
"\tDownloaded \"{sample.name}\"")
357 processor = MultithreadedDataProcessor(
360 message = f
"Downloading dataset \"{self.name}\"..."
366 if not isEntityNameValid(name):
367 raise ValueError(NAME_VALIDATION_MESSAGE)
369 success = self.update(name = name)
372 return super().
rename(name)
377 def _uploadSample(self, samplePath: Path, sampleName: str, **metadata: Any) -> SampleType:
382 def add(self, samplePath: Union[Path, str], sampleName: Optional[str] =
None, **metadata: Any) -> SampleType:
384 Uploads the provided archive (.zip, .tar.gz) as Sample to
385 Coretex.ai as a part of this Dataset.
389 path : Union[Path, str]
390 path to data which will be uploaded
394 SampleType -> created Sample
397 if isinstance(samplePath, str):
398 samplePath = Path(samplePath)
400 if sampleName
is None:
401 sampleName = samplePath.stem
404 sample = _encryptedSampleImport(self.
_sampleType_sampleType, sampleName, samplePath, self.id, getProjectKey(self.projectId))
406 sample = self.
_uploadSample_uploadSample(samplePath, sampleName, **metadata)
409 self.samples.append(sample)
bool rename(self, str name)
Self fetchCachedDataset(cls, List[str] dependencies)
None download(self, bool decrypt=True, bool ignoreCache=False)
str generateCacheName(cls, str prefix, List[str] dependencies)
SampleType add(self, Union[Path, str] samplePath, Optional[str] sampleName=None, **Any metadata)
Self createCacheDataset(cls, str prefix, List[str] dependencies, int projectId)
Self createDataset(cls, str name, int projectId, Optional[Dict[str, Any]] meta=None)
SampleType _uploadSample(self, Path samplePath, str sampleName, **Any metadata)
None _linkSamplePath(self, Path samplePath)