Coretex
initialization.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Callable, List
19 
20 import logging
21 import sys
22 
23 from .remote import processRemote
24 from .current_task_run import setCurrentTaskRun
25 from .._folder_manager import folder_manager
26 from ..entities import TaskRun, TaskRunStatus
27 from ..logging import createFormatter, initializeLogger
28 from ..logging.severity import LogSeverity
29 from ..networking import RequestFailedError
30 
31 
32 def _initializeLogger(taskRun: TaskRun) -> None:
33  # enable/disable verbose mode for taskRuns
34  severity = LogSeverity.info
35  verbose = taskRun.parameters.get("verbose", False)
36 
37  if verbose:
38  severity = LogSeverity.debug
39 
40  streamHandler = logging.StreamHandler(sys.stdout)
41  streamHandler.setLevel(severity.getLevel())
42  streamHandler.setFormatter(createFormatter(
43  includeTime = False,
44  includeLevel = False,
45  includeColor = False,
46  jsonOutput = True
47  ))
48 
49  logPath = folder_manager.getRunLogsDir(taskRun.id) / "run.log"
50  initializeLogger(severity, logPath, streamHandler)
51 
52 
53 def _prepareForExecution(taskRun: TaskRun) -> None:
54  _initializeLogger(taskRun)
55 
56  taskRun.updateStatus(
57  status = TaskRunStatus.inProgress,
58  message = "Executing task."
59  )
60 
61 
62 def initializeRTask(mainFunction: Callable[[TaskRun], None], args: List[str]) -> None:
63  """
64  Initializes and starts the R task as Coretex TaskRun
65 
66  Parameters
67  ----------
68  mainFunction : Callable[[ExecutingTaskRun], None]
69  entry point function
70  args : Optional[List[str]]
71  list of command line arguments
72  """
73 
74  taskRun, callback = processRemote(args)
75 
76  try:
77  _prepareForExecution(taskRun)
78  setCurrentTaskRun(taskRun)
79 
80  callback.onStart()
81 
82  logging.getLogger("coretexpylib").info("TaskRun execution started")
83  mainFunction(taskRun)
84 
85  callback.onSuccess()
86  except RequestFailedError:
87  callback.onNetworkConnectionLost()
88  except KeyboardInterrupt:
89  callback.onKeyboardInterrupt()
90  except BaseException as ex:
91  callback.onException(ex)
92 
93  # sys.exit is ok here, finally block is guaranteed to execute
94  # due to how sys.exit is implemented (it internally raises SystemExit exception)
95  sys.exit(1)
96  finally:
97  callback.onCleanUp()