Coretex
base_callback.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import TextIO
19 
20 import sys
21 import logging
22 import faulthandler
23 import signal
24 
25 from .current_task_run import setCurrentTaskRun
26 from .._folder_manager import folder_manager
27 from ..entities import TaskRun
28 
29 
30 class TaskCallback:
31 
32  def __init__(self, taskRun: TaskRun, outputStream: TextIO = sys.stdout) -> None:
33  self._taskRun = taskRun
34 
35  # Store sys streams so they can be restored later
36  self.__stdoutBackup = sys.stdout
37  self.__stderrBackup = sys.stderr
38 
39  # Set sys output to new stream
40  sys.stdout = outputStream
41  sys.stderr = outputStream
42 
43  def _restoreStreams(self) -> None:
44  sys.stdout = self.__stdoutBackup
45  sys.stderr = self.__stderrBackup
46 
47  def onStart(self) -> None:
48  # Call "kill -30 task_run_process_id" to dump current stack trace of the TaskRun into the file
49  # 30 == signal.SIGUSR1
50  # Only works on *nix systems
51 
52  faultHandlerPath = folder_manager.getRunLogsDir(self._taskRun.id) / "stacktrace.log"
53  faulthandler.register(
54  signal.SIGUSR1,
55  file = faultHandlerPath.open("w"),
56  all_threads = True
57  )
58 
59  def onSuccess(self) -> None:
60  logging.getLogger("coretexpylib").info(">> [Coretex] TaskRun finished successfully")
61 
62  self._restoreStreams()
63 
64  def onException(self, exception: BaseException) -> None:
65  logging.getLogger("coretexpylib").critical(">> [Coretex] TaskRun failed to finish due to an error")
66  logging.getLogger("coretexpylib").debug(exception, exc_info = True)
67  logging.getLogger("coretexpylib").critical(str(exception))
68 
69  self._restoreStreams()
70 
71  def onKeyboardInterrupt(self) -> None:
72  pass
73 
74  def onNetworkConnectionLost(self) -> None:
75  sys.exit(1)
76 
77  def onCleanUp(self) -> None:
78  # Flushes the internal buffers of logging module handlers
79  # and other logging cleanup
80  # IMPORTANT: do not use logging after calling this
81  logging.shutdown()
82 
83  try:
84  from py3nvml import py3nvml
85  py3nvml.nvmlShutdown()
86  except:
87  pass
88 
89  setCurrentTaskRun(None)