18 from typing 
import Any, Dict, Optional, Tuple
 
   19 from typing_extensions 
import Self
 
   20 from abc 
import ABC, abstractmethod
 
   21 from base64 
import b64decode
 
   25 from cryptography.hazmat.primitives.asymmetric.rsa 
import RSAPrivateKey
 
   27 from .type 
import SecretType
 
   28 from ...networking 
import NetworkObject, networkManager, NetworkRequestError, RequestType
 
   29 from ...cryptography 
import rsa
 
   35         Represents base Secret entity from Coretex.ai 
   38     def __init__(self, type_: SecretType) -> 
None:
 
   41         self.
type_type_ = type_
 
   44     def _encryptedFields(self) -> Tuple[str, ...]:
 
   48             Tuple[str, ...] -> A list of fields which are to be 
   49                 decrypted when "Secret.decrypted" is called 
   54     def decrypted(self, key: Optional[RSAPrivateKey] = 
None) -> Self:
 
   58             Self -> Decrypted Coretex Secret 
   62             key = rsa.getPrivateKey()
 
   64         decrypted = copy.deepcopy(self)
 
   66         for field 
in self._encryptedFields():
 
   67             if not field 
in decrypted.__dict__:
 
   68                 raise AttributeError(f
"\"{type(decrypted)}\".\"{field}\" not found")
 
   70             value = decrypted.__dict__[field]
 
   71             if not isinstance(value, str):
 
   72                 raise TypeError(f
"Expected \"str\" received \"{type(value)}\"")
 
   74             decrypted.__dict__[field] = rsa.decrypt(key, b64decode(value)).decode(
"utf-8")
 
   79     def _endpoint(cls) -> str:
 
   82     def refresh(self, jsonObject: Optional[Dict[str, Any]] = 
None) -> bool:
 
   84             Secret does not support this method 
   89     def update(self, **kwargs: Any) -> bool:
 
   91             Secret does not support this method 
   97     def fetchById(cls, objectId: int, **kwargs: Any) -> Self:
 
   99             Secret does not support this method 
  102         return NotImplemented
 
  107             Fetches a single Secret with the matching name 
  112                 name of the Secret which is fetched 
  116             Self -> fetched Secret 
  120             NetworkRequestError -> If the request for fetching failed 
  123         response = networkManager.get(f
"{cls._endpoint()}/data", {
 
  127         if response.hasFailed():
 
  128             raise NetworkRequestError(response, f
"Failed to fetch Secret \"{name}\"")
 
  130         return cls.decode(response.getJson(dict))
 
  135             Fetches a single Node Secret with the matching name 
  140                 name of the Node Secret which is fetched 
  146             Self -> fetched Node Secret 
  150             NetworkRequestError -> If the request for fetching failed 
  153         headers = networkManager._headers()
 
  154         headers[
"node-access-token"] = accessToken
 
  156         response = networkManager.request(
"secret/node", RequestType.get, headers, {
 
  160         if response.hasFailed():
 
  161             raise NetworkRequestError(response, f
"Failed to fetch Node Secret \"{name}\"")
 
  163         return cls.decode(response.getJson(dict))
 
Self fetchById(cls, int objectId, **Any kwargs)
Self fetchByName(cls, str name)
bool refresh(self, Optional[Dict[str, Any]] jsonObject=None)
Self fetchNodeSecret(cls, str name, str accessToken)
Self decrypted(self, Optional[RSAPrivateKey] key=None)
bool update(self, **Any kwargs)