Coretex
secret.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Any, Dict, Optional, Tuple
19 from typing_extensions import Self
20 from abc import ABC, abstractmethod
21 from base64 import b64decode
22 
23 import copy
24 
25 from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
26 
27 from .type import SecretType
28 from ...networking import NetworkObject, networkManager, NetworkRequestError, RequestType
29 from ...cryptography import rsa
30 
31 
32 class Secret(NetworkObject, ABC):
33 
34  """
35  Represents base Secret entity from Coretex.ai
36  """
37 
38  def __init__(self, type_: SecretType) -> None:
39  super().__init__()
40 
41  self.type_type_ = type_
42 
43  @abstractmethod
44  def _encryptedFields(self) -> Tuple[str, ...]:
45  """
46  Returns
47  -------
48  Tuple[str, ...] -> A list of fields which are to be
49  decrypted when "Secret.decrypted" is called
50  """
51 
52  pass
53 
54  def decrypted(self, key: Optional[RSAPrivateKey] = None) -> Self:
55  """
56  Returns
57  -------
58  Self -> Decrypted Coretex Secret
59  """
60 
61  if key is None:
62  key = rsa.getPrivateKey()
63 
64  decrypted = copy.deepcopy(self)
65 
66  for field in self._encryptedFields():
67  if not field in decrypted.__dict__:
68  raise AttributeError(f"\"{type(decrypted)}\".\"{field}\" not found")
69 
70  value = decrypted.__dict__[field]
71  if not isinstance(value, str):
72  raise TypeError(f"Expected \"str\" received \"{type(value)}\"")
73 
74  decrypted.__dict__[field] = rsa.decrypt(key, b64decode(value)).decode("utf-8")
75 
76  return decrypted
77 
78  @classmethod
79  def _endpoint(cls) -> str:
80  return "secret"
81 
82  def refresh(self, jsonObject: Optional[Dict[str, Any]] = None) -> bool:
83  """
84  Secret does not support this method
85  """
86 
87  return NotImplemented
88 
89  def update(self, **kwargs: Any) -> bool:
90  """
91  Secret does not support this method
92  """
93 
94  return NotImplemented
95 
96  @classmethod
97  def fetchById(cls, objectId: int, **kwargs: Any) -> Self:
98  """
99  Secret does not support this method
100  """
101 
102  return NotImplemented
103 
104  @classmethod
105  def fetchByName(cls, name: str) -> Self:
106  """
107  Fetches a single Secret with the matching name
108 
109  Parameters
110  ----------
111  name : str
112  name of the Secret which is fetched
113 
114  Returns
115  -------
116  Self -> fetched Secret
117 
118  Raises
119  ------
120  NetworkRequestError -> If the request for fetching failed
121  """
122 
123  response = networkManager.get(f"{cls._endpoint()}/data", {
124  "name": name
125  })
126 
127  if response.hasFailed():
128  raise NetworkRequestError(response, f"Failed to fetch Secret \"{name}\"")
129 
130  return cls.decode(response.getJson(dict))
131 
132  @classmethod
133  def fetchNodeSecret(cls, name: str, accessToken: str) -> Self:
134  """
135  Fetches a single Node Secret with the matching name
136 
137  Parameters
138  ----------
139  name : str
140  name of the Node Secret which is fetched
141  accessToken : str
142  Node access token
143 
144  Returns
145  -------
146  Self -> fetched Node Secret
147 
148  Raises
149  ------
150  NetworkRequestError -> If the request for fetching failed
151  """
152 
153  headers = networkManager._headers()
154  headers["node-access-token"] = accessToken
155 
156  response = networkManager.request("secret/node", RequestType.get, headers, {
157  "name": name
158  })
159 
160  if response.hasFailed():
161  raise NetworkRequestError(response, f"Failed to fetch Node Secret \"{name}\"")
162 
163  return cls.decode(response.getJson(dict))
Self fetchById(cls, int objectId, **Any kwargs)
Definition: secret.py:97
Self fetchByName(cls, str name)
Definition: secret.py:105
bool refresh(self, Optional[Dict[str, Any]] jsonObject=None)
Definition: secret.py:82
Self fetchNodeSecret(cls, str name, str accessToken)
Definition: secret.py:133
Self decrypted(self, Optional[RSAPrivateKey] key=None)
Definition: secret.py:54
bool update(self, **Any kwargs)
Definition: secret.py:89