18 from typing 
import Optional, TypeVar, Generic, List, Dict, Any, Type, Union
 
   19 from typing_extensions 
import Self
 
   20 from datetime 
import datetime
 
   21 from pathlib 
import Path
 
   22 from abc 
import ABC, abstractmethod
 
   23 from contextlib 
import ExitStack
 
   29 from .dataset 
import Dataset
 
   30 from .state 
import DatasetState
 
   31 from ..tag 
import EntityTagType, Taggable
 
   32 from ..sample 
import NetworkSample
 
   33 from ..utils 
import isEntityNameValid
 
   34 from ..._folder_manager 
import folder_manager
 
   35 from ...codable 
import KeyDescriptor
 
   36 from ...networking 
import NetworkObject, \
 
   37     fileChunkUpload, networkManager, NetworkRequestError
 
   38 from ...threading 
import MultithreadedDataProcessor
 
   39 from ...cryptography 
import aes, getProjectKey
 
   40 from ...utils.file 
import isArchive, archive
 
   43 SampleType = TypeVar(
"SampleType", bound = 
"NetworkSample")
 
   44 NAME_VALIDATION_MESSAGE = 
">> [Coretex] Entity name is invalid. Requirements: alphanumeric characters (\"a-z\", and \"0-9\") and dash (\"-\") with length between 3 to 50" 
   45 MAX_DATASET_NAME_LENGTH = 50
 
   48 def _hashDependencies(dependencies: List[str]) -> str:
 
   50     hash.update(
"".join(sorted(dependencies)).encode())
 
   52     hashString = base64.b64encode(hash.digest()).decode(
"ascii")
 
   53     hashString = hashString.lower()
 
   54     hashString = hashString.replace(
"+", 
"0")
 
   55     hashString = hashString.replace(
"/", 
"0")
 
   56     hashString = hashString.replace(
"=", 
"0")
 
   61 def _chunkSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int) -> SampleType:
 
   64         "dataset_id": datasetId,
 
   65         "file_id": fileChunkUpload(samplePath)
 
   68     response = networkManager.formData(
"session/import", parameters)
 
   69     if response.hasFailed():
 
   70         raise NetworkRequestError(response, f
"Failed to create sample from \"{samplePath}\"")
 
   72     return sampleType.decode(response.getJson(dict))
 
   75 def _encryptedSampleImport(sampleType: Type[SampleType], sampleName: str, samplePath: Path, datasetId: int, key: bytes) -> SampleType:
 
   76     with ExitStack() 
as stack:
 
   77         if isArchive(samplePath):
 
   78             archivePath = samplePath
 
   80             archivePath = stack.enter_context(folder_manager.tempFile())
 
   81             archive(samplePath, archivePath)
 
   83         encryptedPath = stack.enter_context(folder_manager.tempFile())
 
   84         aes.encryptFile(key, archivePath, encryptedPath)
 
   86         return _chunkSampleImport(sampleType, sampleName, encryptedPath, datasetId)
 
   92     raise RuntimeError(
"Unreachable statement was reached.")
 
   95 class NetworkDataset(Generic[SampleType], Dataset[SampleType], NetworkObject, Taggable, ABC):
 
   98         Represents the base class for all Dataset classes which are 
   99         comunicating with Coretex.ai 
  104             creation date of dataset 
  106             id of created dataset id 
  108             availabilty of dataset for modifications 
  116     meta: Optional[Dict[str, Any]]
 
  118     def __init__(self, sampleType: Type[SampleType]) -> 
None:
 
  124             Retrieves path of dataset 
  128             Path -> path of dataset 
  131         return folder_manager.datasetsFolder / str(self.id)
 
  134     def entityTagType(self) -> EntityTagType:
 
  135         return EntityTagType.dataset
 
  140     def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]:
 
  141         descriptors = super()._keyDescriptors()
 
  144         descriptors[
"samples"] = 
KeyDescriptor(
"sessions", NetworkSample, list)
 
  151     def _endpoint(cls) -> str:
 
  155     def fetchById(cls, objectId: int, **kwargs: Any) -> Self:
 
  156         if "include_sessions" not in kwargs:
 
  157             kwargs[
"include_sessions"] = 1
 
  159         return super().fetchById(objectId, **kwargs)
 
  162     def fetchAll(cls, **kwargs: Any) -> List[Self]:
 
  163         if "include_sessions" not in kwargs:
 
  164             kwargs[
"include_sessions"] = 1
 
  166         return super().fetchAll(**kwargs)
 
  171             Fetches cached dataset if it exists 
  175             dependencies : List[str] 
  176                 Parameters on which the cached dataset depends 
  180             Self -> Fetched dataset object 
  184             ValueError -> If dataset doesn't exist 
  187         return super().fetchOne(
 
  188             name = _hashDependencies(dependencies),
 
  199         meta: Optional[Dict[str, Any]] = 
None 
  203             Creates a new dataset with the provided name and type 
  210                 project for which the dataset will be created 
  214             The created dataset object or None if creation failed 
  218             ValueError -> If name is invalid 
  219             NetworkRequestError -> If dataset creation failed 
  223             >>> from coretex import NetworkDataset 
  225             >>> dummyDataset = NetworkDataset.createDataset("dummyDataset", 123) 
  228         if not isEntityNameValid(name):
 
  229             raise ValueError(NAME_VALIDATION_MESSAGE)
 
  233             project_id = projectId,
 
  240             Generated dataset name based on the dependencies 
  245                 prefix to which the dependency hash will be appended 
  246             dependencies : List[str] 
  247                 parameters which affect the contents of the cache 
  251             str -> prefix with hash generated based on dependencies appended 
  254         if MAX_DATASET_NAME_LENGTH - len(prefix) < 8:
 
  255             raise ValueError(f
"Dataset prefix \"{prefix}\" is too long. Max allowed size is \"{MAX_DATASET_NAME_LENGTH - 8}\".")
 
  257         suffix = _hashDependencies(dependencies)
 
  258         name = f
"{prefix}-{suffix}" 
  260         if len(name) > MAX_DATASET_NAME_LENGTH:
 
  261             name = name[:MAX_DATASET_NAME_LENGTH]
 
  268             Creates a dataset used for caching results of tasks 
  269             Used to avoid repeating expensive and long calculations 
  274                 prefix of the cache dataset 
  275             dependencies : List[str] 
  276                 parameters which affect the contents of the cache 
  278                 project for which the dataset will be created 
  282             The created dataset object 
  286             ValueError -> If prefix of the name is invalid or if failed to create cache dataset 
  290             >>> from coretex import NetworkDataset 
  292             >>> dependencies = [str(projectId), str(index), str(parameter)] 
  293             >>> dummyDataset = NetworkDataset.createCacheDataset("dummyDataset", dependencies, 123) 
  296         if not isEntityNameValid(prefix):
 
  297             raise ValueError(NAME_VALIDATION_MESSAGE)
 
  301             raise ValueError(f
"Failed to create cache dataset with prefix \"{prefix}\"")
 
  307             Finalizes state of Coretex dataset 
  311             >>> from coretex import CustomDataset 
  313             >>> dummyDataset = CustomDataset.createDataset("dummyDataset", 123) 
  314             >>> dummyDataset.finalize() 
  317         return self.update(name = self.
namename, state = DatasetState.final)
 
  319     def _linkSamplePath(self, samplePath: Path) -> 
None:
 
  320         linkPath = self.
pathpathpath / samplePath.name
 
  321         if linkPath.exists():
 
  324         samplePath.link_to(linkPath)
 
  326     def download(self, decrypt: bool = 
True, ignoreCache: bool = 
False) -> 
None:
 
  328             Downloads dataset from Coretex 
  333                 if dataset is already downloaded and ignoreCache 
  334                 is True it will be downloaded again (not required) 
  338             >>> from coretex import NetworkDataset 
  340             >>> dummyDataset = NetworkDataset.fetchById(1023) 
  341             >>> dummyDataset.download() 
  344         self.
pathpathpath.mkdir(exist_ok = 
True)
 
  346         def sampleDownloader(sample: SampleType) -> 
None:
 
  347             sample.download(decrypt, ignoreCache)
 
  349             if sample.downloadPath.exists():
 
  352             if sample.zipPath.exists():
 
  355             logging.getLogger(
"coretexpylib").info(f
"\tDownloaded \"{sample.name}\"")
 
  357         processor = MultithreadedDataProcessor(
 
  360             message = f
"Downloading dataset \"{self.name}\"..." 
  366         if not isEntityNameValid(name):
 
  367             raise ValueError(NAME_VALIDATION_MESSAGE)
 
  369         success = self.update(name = name)
 
  372             return super().
rename(name)
 
  377     def _uploadSample(self, samplePath: Path, sampleName: str, **metadata: Any) -> SampleType:
 
  382     def add(self, samplePath: Union[Path, str], sampleName: Optional[str] = 
None, **metadata: Any) -> SampleType:
 
  384             Uploads the provided archive (.zip, .tar.gz) as Sample to 
  385             Coretex.ai as a part of this Dataset. 
  389             path : Union[Path, str] 
  390                 path to data which will be uploaded 
  394             SampleType -> created Sample 
  397         if isinstance(samplePath, str):
 
  398             samplePath = Path(samplePath)
 
  400         if sampleName 
is None:
 
  401             sampleName = samplePath.stem
 
  404             sample = _encryptedSampleImport(self.
_sampleType_sampleType, sampleName, samplePath, self.id, getProjectKey(self.projectId))
 
  406             sample = self.
_uploadSample_uploadSample(samplePath, sampleName, **metadata)
 
  409         self.samples.append(sample)
 
bool rename(self, str name)
Self fetchCachedDataset(cls, List[str] dependencies)
None download(self, bool decrypt=True, bool ignoreCache=False)
str generateCacheName(cls, str prefix, List[str] dependencies)
SampleType add(self, Union[Path, str] samplePath, Optional[str] sampleName=None, **Any metadata)
Self createCacheDataset(cls, str prefix, List[str] dependencies, int projectId)
Self createDataset(cls, str name, int projectId, Optional[Dict[str, Any]] meta=None)
SampleType _uploadSample(self, Path samplePath, str sampleName, **Any metadata)
None _linkSamplePath(self, Path samplePath)