Coretex
artifacts.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 
19 from typing import List, Iterator
20 from pathlib import Path
21 from contextlib import contextmanager
22 
23 import logging
24 
25 from watchdog.events import FileSystemEventHandler, FileSystemEvent
26 from watchdog.observers import Observer
27 
28 from ...entities import TaskRun
29 
30 
31 IGNORED_FILES = ["_coretex.py"]
32 
33 
34 class FileEventHandler(FileSystemEventHandler):
35 
36  def __init__(self) -> None:
37  super().__init__()
38 
39  self.artifactPaths: List[Path] = []
40 
41  def on_created(self, event: FileSystemEvent) -> None:
42  if event.is_directory:
43  return
44 
45  filePath = Path(event.src_path)
46 
47  if filePath.parent.joinpath(".coretexignore").exists():
48  return
49 
50  if filePath.name in IGNORED_FILES:
51  return
52 
53  logging.getLogger("coretex").debug(f">> [Coretex] File created at path \"{filePath}\", adding to artifacts list")
54  self.artifactPaths.append(filePath)
55 
56 
57 @contextmanager
58 def track(taskRun: TaskRun) -> Iterator[FileEventHandler]:
59  # If local use current working dir, else use task path
60  root = Path.cwd() if taskRun.isLocal else taskRun.taskPath
61 
62  try:
63  observer = Observer()
64  observer.setName("ArtifactTracker")
65 
66  logging.getLogger("coretexpylib").debug(f">> [Coretex] Tracking files created inside \"{root}\"")
67 
68  eventHandler = FileEventHandler()
69  observer.schedule(eventHandler, root, recursive = True) # type: ignore[no-untyped-call]
70  try:
71  observer.start() # type: ignore[no-untyped-call]
72  except OSError as e:
73  if "inotify watch limit reached" in str(e):
74  logging.getLogger("coretexpylib").error(
75  f"{e}. Consider increasing the inotify limit by putting \"fs.inotify.max_user_watches=524288\" \
76  (or any other value you prefer) into your sysctl settings"
77  )
78  else:
79  logging.getLogger("coretexpylib").error(f"Failed to start watchdog observer with error: {e}")
80 
81  yield eventHandler
82  finally:
83  observer.stop() # type: ignore[no-untyped-call]
84  observer.join()
85 
86  for index, artifactPath in enumerate(eventHandler.artifactPaths):
87  logging.getLogger("coretexpylib").debug(f">> [Coretex] Uploading {index + 1}/{len(eventHandler.artifactPaths)} - \"{artifactPath}\"")
88 
89  try:
90  artifact = taskRun.createArtifact(artifactPath, str(artifactPath.relative_to(root)))
91  if artifact is not None:
92  logging.getLogger("coretexpylib").debug(f"\tSuccessfully uploaded artifact")
93  else:
94  logging.getLogger("coretexpylib").debug(f"\tFailed to upload artifact")
95  except Exception as e:
96  logging.getLogger("coretexpylib").error(f"\tError while creating artifact: {e}")
97  logging.getLogger("coretexpylib").debug(f"\tError while creating artifact: {e}", exc_info = e)