18 from typing
import Tuple, Optional, List
26 from ..base_callback
import TaskCallback
27 from ...networking
import networkManager
28 from ...entities
import TaskRun, secret_factory
31 class RemoteArgumentParser(Tap):
35 decryptedSecrets: Optional[str]
37 def configure(self) -> None:
38 self.add_argument(
"--refreshToken", type = str)
39 self.add_argument(
"--taskRunId", type = int)
40 self.add_argument(
"--decryptedSecrets", nargs =
"?", type = str, default =
None)
43 def processRemote(args: Optional[List[str]] =
None) -> Tuple[TaskRun, TaskCallback]:
44 remoteArgumentParser, unknown = RemoteArgumentParser().parse_known_args(args)
46 response = networkManager.authenticateWithRefreshToken(remoteArgumentParser.refreshToken)
47 if response.hasFailed():
48 raise RuntimeError(
">> [Coretex] Failed to authenticate")
50 taskRun: TaskRun = TaskRun.fetchById(remoteArgumentParser.taskRunId)
52 if remoteArgumentParser.decryptedSecrets
is not None:
54 decryptedSecrets = json.loads(remoteArgumentParser.decryptedSecrets)
55 if not isinstance(decryptedSecrets, dict):
58 for key, value
in decryptedSecrets.items():
59 if isinstance(value, dict):
60 taskRun.parameters[key] = secret_factory.create(value)
62 if isinstance(value, list):
63 taskRun.parameters[key] = [secret_factory.create(element)
for element
in value]
64 except BaseException
as ex:
65 logging.getLogger(
"coretexpylib").debug(f
">> [Coretex] Failed to load decrypted secrets - \"{ex}\"", exc_info = ex)
67 return taskRun, TaskCallback(taskRun)