Coretex
run_logger.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, List
19 
20 import logging
21 import traceback
22 import functools
23 
24 from .upload_worker import LoggerUploadWorker
25 from ...logging import Log, LogSeverity
26 from ...networking import networkManager, RequestFailedError
27 
28 
29 def exceptionToString(exception: BaseException) -> str:
30  tb = traceback.format_tb(exception.__traceback__)
31  tb.append(str(exception))
32  return "".join(tb)
33 
34 
35 def uploadTaskRunLogs(taskRunId: int, logs: List[Log]) -> bool:
36  try:
37  params = {
38  "model_queue_id": taskRunId,
39  "logs": [log.encode() for log in logs]
40  }
41 
42  response = networkManager.post("model-queue/add-console-log", params)
43  return not response.hasFailed()
44  except RequestFailedError as ex:
45  logging.getLogger("coretexpylib").error(f">> Failed to upload console logs to Coretex. Reason: {ex}")
46  logging.getLogger("coretexpylib").debug(f">> Failed to upload console logs to Coretex. Reason: {ex}", exc_info = ex)
47 
48  return False
49 
50 
51 class RunLogger:
52 
53  NAME = "coretex-run"
54 
55  def __init__(self) -> None:
56  self._logger = logging.getLogger(RunLogger.NAME)
57  self._uploadWorker: Optional[LoggerUploadWorker] = None
58 
59  def attach(self, taskRunId: int) -> None:
60  if self._uploadWorker is not None:
61  raise ValueError("TaskRun is already attached to logger")
62 
63  self._uploadWorker = LoggerUploadWorker(functools.partial(uploadTaskRunLogs, taskRunId))
64  self._uploadWorker.start()
65 
66  def reset(self) -> None:
67  if self._uploadWorker is not None:
68  self._uploadWorker.stop()
69  self._uploadWorker.join()
70 
71  self._uploadWorker = None
72 
73  def flushLogs(self) -> bool:
74  if self._uploadWorker is None:
75  return False
76 
77  return self._uploadWorker.uploadLogs()
78 
79  def logProcessOutput(self, line: str, severity: Optional[LogSeverity] = None) -> None:
80  if line.strip() == "":
81  return
82 
83  log, _ = Log.parse(line)
84 
85  if severity is not None:
86  log.severity = severity
87 
88  self._logger.log(log.severity.getLevel(), log.message)
89  if self._uploadWorker is not None:
90  self._uploadWorker.add(log)
91 
92  def log(self, severity: LogSeverity, message: str, exception: Optional[BaseException] = None) -> None:
93  log = Log(severity, message)
94  self._logger.log(severity.getLevel(), message)
95 
96  if self._uploadWorker is not None:
97  self._uploadWorker.add(log)
98 
99  if exception is not None:
100  self.log(severity, exceptionToString(exception))
101 
102  def debug(self, message: str, exception: Optional[BaseException] = None) -> None:
103  self.log(LogSeverity.debug, message, exception)
104 
105  def info(self, message: str, exception: Optional[BaseException] = None) -> None:
106  self.log(LogSeverity.info, message, exception)
107 
108  def warning(self, message: str, exception: Optional[BaseException] = None) -> None:
109  self.log(LogSeverity.warning, message, exception)
110 
111  def error(self, message: str, exception: Optional[BaseException] = None) -> None:
112  self.log(LogSeverity.error, message, exception)
113 
114  def critical(self, message: str, exception: Optional[BaseException] = None) -> None:
115  self.log(LogSeverity.fatal, message, exception)
116 
117  warn = warning
118  fatal = critical
119 
120 
121 runLogger = RunLogger()