Coretex
task_run.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, Any, List, Dict, Union, Tuple, TypeVar, Generic, Type
19 from typing_extensions import Self, override
20 from zipfile import ZipFile, ZIP_DEFLATED
21 from pathlib import Path
22 
23 import os
24 import time
25 import logging
26 import zipfile
27 import json
28 
29 from .utils import createSnapshot
30 from .artifact import Artifact
31 from .status import TaskRunStatus
32 from .metrics import Metric, MetricType
33 from .parameter import validateParameters, parameter_factory
34 from .execution_type import ExecutionType
35 from ..dataset import Dataset, LocalDataset, NetworkDataset
36 from ..project import ProjectType
37 from ..model import Model
38 from ..._folder_manager import folder_manager
39 from ...codable import KeyDescriptor
40 from ...networking import networkManager, NetworkObject, NetworkRequestError, FileData
41 
42 
43 DatasetType = TypeVar("DatasetType", bound = Dataset)
44 
45 class TaskRun(NetworkObject, Generic[DatasetType]):
46 
47  """
48  Represents TaskRun entity from Coretex.ai
49 
50  Properties
51  ----------
52  datasetId : int
53  id of dataset
54  name : str
55  name of TaskRun
56  description : str
57  description of TaskRun
58  meta : Dict[str, Any]
59  meta data of TaskRun
60  status : TaskRunStatus
61  status of TaskRun
62  projectId : int
63  id of Coretex Project
64  projectName : str
65  name of Coretex Project
66  projectType : ProjectType
67  appropriate project type
68  taskId : int
69  id of task
70  taskName : str
71  name of task
72  createdById : str
73  id of created TaskRun
74  useCachedEnv : bool
75  if True chached env will be used, otherwise new environment will be created
76  """
77 
78  name: str
79  description: str
80  meta: Dict[str, Any]
81  status: TaskRunStatus
82  projectId: int
83  projectName: str
84  projectType: ProjectType
85  taskId: int
86  taskName: str
87  entryPoint: str
88  createdById: str
89  useCachedEnv: bool
90  executionType: ExecutionType
91  metrics: List[Metric]
92  workflowRunId: int
93 
94  def __init__(self) -> None:
95  super(TaskRun, self).__init__()
96 
97  self.metricsmetrics = []
98  self.__parameters__parameters: Dict[str, Any] = {}
99 
100  @property
101  def parameters(self) -> Dict[str, Any]:
102  """
103  Returns
104  -------
105  Dict[str, Any] -> Parameters for TaskRun
106  """
107 
108  return self.__parameters__parameters
109 
110  @property
111  def taskPath(self) -> Path:
112  """
113  Returns
114  -------
115  Path -> Path for TaskRun
116  """
117 
118  return folder_manager.temp / str(self.id)
119 
120  @property
121  def dataset(self) -> DatasetType:
122  """
123  Value of the parameter with name "dataset" assigned to this TaskRun
124 
125  Returns
126  -------
127  Dataset object if there was a parameter with name "dataset" entered when the TaskRun was started
128 
129  Raises
130  ------
131  ValueError -> if there is not parameter with name "dataset"
132  """
133 
134  dataset = self.parametersparameters.get("dataset")
135  if dataset is None:
136  raise ValueError(f">> [Coretex] TaskRun \"{self.id}\" does not have a parameter named \"dataset\"")
137 
138  return dataset # type: ignore
139 
140  @property
141  def isLocal(self) -> bool:
142  return self.executionTypeexecutionType == ExecutionType.local
143 
144  def setDatasetType(self, datasetType: Type[DatasetType]) -> None:
145  for key, value in self.__parameters__parameters.items():
146  if isinstance(value, LocalDataset) and issubclass(datasetType, LocalDataset):
147  self.__parameters__parameters[key] = datasetType(value.path) # type: ignore
148 
149  if isinstance(value, NetworkDataset) and issubclass(datasetType, NetworkDataset):
150  self.__parameters__parameters[key] = datasetType.fetchById(value.id)
151 
152  def setModelType(self, modelType: Type[Model]) -> None:
153  for key, value in self.__parameters__parameters.items():
154  if isinstance(value, Model):
155  self.__parameters__parameters[key] = modelType.fetchById(value.id)
156 
157  @classmethod
158  def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]:
159  descriptors = super()._keyDescriptors()
160 
161  descriptors["status"] = KeyDescriptor("status", TaskRunStatus)
162  descriptors["projectId"] = KeyDescriptor("project_id")
163  descriptors["projectName"] = KeyDescriptor("project_name")
164  descriptors["projectType"] = KeyDescriptor("project_task", ProjectType)
165  descriptors["taskId"] = KeyDescriptor("sub_project_id")
166  descriptors["taskName"] = KeyDescriptor("sub_project_name")
167  descriptors["executionType"] = KeyDescriptor("execution_type", ExecutionType)
168  descriptors["workflowRunId"] = KeyDescriptor("pipeline_run_id")
169 
170  # private properties of the object should not be encoded
171  descriptors["__parameters"] = KeyDescriptor(isEncodable = False)
172 
173  return descriptors
174 
175  @classmethod
176  @override
177  def _endpoint(cls) -> str:
178  return "model-queue"
179 
180  @override
181  def entityUrl(self) -> str:
182  return f"workflow-run?id={self.workflowRunId}&taskRunId={self.id}"
183 
184  def onDecode(self) -> None:
185  super().onDecode()
186 
187  if self.meta["parameters"] is None:
188  self.meta["parameters"] = []
189 
190  if not isinstance(self.meta["parameters"], list):
191  raise ValueError(">> [Coretex] Invalid parameters")
192 
193  parameters = [parameter_factory.create(value) for value in self.meta["parameters"]]
194 
195  parameterValidationResults = validateParameters(parameters, verbose = True)
196  if not all(parameterValidationResults.values()):
197  raise ValueError("Invalid parameters found")
198 
199  self.__parameters__parameters = {parameter.name: parameter.parseValue(self.projectType) for parameter in parameters}
200 
201  def _isInterval(self, metricName: str) -> bool:
202  for metric in self.metricsmetrics:
203  if metric.name == metricName and metric.xType == MetricType.interval.name:
204  return True
205 
206  return False
207 
209  self,
210  status: Optional[TaskRunStatus] = None,
211  message: Optional[str] = None,
212  notifyServer: bool = True
213  ) -> bool:
214 
215  """
216  Updates TaskRun status, if message parameter is None
217  default message value will be used\n
218  Some TaskRun statuses do not have default message
219 
220  Parameters
221  ----------
222  status : Optional[TaskRunStatus]
223  Status to which the TaskRun will be updated to
224  message : Optional[str]
225  Descriptive message for TaskRun status, it is diplayed
226  when the status is hovered on the Coretex Web App
227  notifyServer : bool
228  if True update request will be sent to Coretex.ai
229 
230  Example
231  -------
232  >>> from coretex import ExecutingTaskRun, TaskRunStatus
233  \b
234  >>> ExecutingTaskRun.current().updateStatus(
235  TaskRunStatus.completedWithSuccess
236  )
237  True
238  """
239 
240  if status is not None:
241  self.statusstatus = status
242 
243  if notifyServer:
244  if status is not None and message is None:
245  message = status.defaultMessage
246 
247  parameters: Dict[str, Any] = {
248  "id": self.id
249  }
250 
251  if status is not None:
252  parameters["status"] = status
253 
254  if message is not None:
255  parameters["message"] = message
256 
257  # TODO: Should API rename this too?
258  endpoint = f"{self._endpoint()}/job-status-update"
259  response = networkManager.post(endpoint, parameters)
260 
261  if response.hasFailed():
262  logging.getLogger("coretexpylib").error(">> [Coretex] Error while updating TaskRun status")
263  elif status is not None:
264  logging.getLogger("coretexpylib").info(f">> [Coretex] Updated Task Run status to \"{status.name}\"")
265 
266  return not response.hasFailed()
267 
268  return True
269 
270  def createMetrics(self, metrics: List[Metric]) -> None:
271  """
272  Creates specified metrics for the TaskRun
273 
274  Parameters
275  ----------
276  values : List[Metric]]
277  List of Metric meta objects in this format
278  Metric("name", "x_label", "x_type", "y_label", "y_type", "x_range", "y_range")
279 
280  Returns
281  -------
282  List[Metric] -> List of Metric objects
283 
284  Raises
285  ------
286  NetworkRequestError -> if the request failed
287 
288  Example
289  -------
290  >>> from coretex import ExecutingTaskRun, MetricType
291  \b
292  >>> metrics = ExecutingTaskRun.current().createMetrics([
293  Metric.create("loss", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100]),
294  Metric.create("accuracy", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100])
295  ])
296  >>> if len(metrics) == 0:
297  print("Failed to create metrics")
298  """
299 
300  parameters: Dict[str, Any] = {
301  "experiment_id": self.id,
302  "metrics": [metric.encode() for metric in metrics]
303  }
304 
305  response = networkManager.post(f"{self._endpoint()}/metrics-meta", parameters)
306  if response.hasFailed():
307  raise NetworkRequestError(response, "Failed to create metrics")
308 
309  self.metricsmetrics.extend(metrics)
310 
311  def submitMetrics(self, metricValues: Dict[str, Tuple[float, float]]) -> bool:
312  """
313  Appends metric values for the provided metrics
314 
315  Parameters
316  ----------
317  metricValues : Dict[str, Tuple[float, float]]
318  Values of metrics in this format {"name": x, y}
319 
320  Example
321  -------
322  >>> from coretex import ExecutingTaskRun
323  \b
324  >>> result = ExecutingTaskRun.current().submitMetrics({
325  "loss": (epoch, logs["loss"]),
326  "accuracy": (epoch, logs["accuracy"]),
327  })
328  >>> print(result)
329  True
330  """
331 
332  metrics = [{
333  "timestamp": value[0] if self._isInterval_isInterval(key) else time.time(),
334  "metric": key,
335  "x": value[0],
336  "y": value[1]
337  } for key, value in metricValues.items()]
338 
339  parameters: Dict[str, Any] = {
340  "experiment_id": self.id,
341  "metrics": metrics
342  }
343 
344  response = networkManager.post(f"{self._endpoint()}/metrics", parameters)
345  return not response.hasFailed()
346 
347  def submitOutput(self, parameterName: str, value: Any) -> None:
348  """
349  Submit an output of this task to act as a parameter in tasks
350  downstream in the Workflow
351 
352  Parameters
353  ----------
354  parameterName : str
355  name of the parameter
356  value : Any
357  value that will be sent (id of the object will be sent
358  in case Coretex objects like CustomDataset, Model etc. is submited)
359 
360  Raises
361  ------
362  NetworkRequestError -> if the request failed
363 
364  Example
365  -------
366  >>> from coretex import TaskRun
367  \b
368  >>> ExecutingTaskRun.current().submitOutputParameter(
369  parameterName = "outputDataset"
370  value = outputDataset
371  )
372  """
373 
374  self.submitOutputssubmitOutputs({parameterName: value})
375 
376  def submitOutputs(self, outputs: Dict[str, Any]) -> None:
377  """
378  Submit multiple outputs of this task to act as parameters in tasks
379  downstream in the Workflow
380 
381  Parameters
382  ----------
383  outputs : Dict[str, Any]
384  dictionary with outputs, with key being the name of the parameter
385  (id of the object will be sent in case Coretex objects like
386  CustomDataset, Model etc. is submited)
387 
388  Raises
389  ------
390  NetworkRequestError -> if the request failed
391 
392  Example
393  -------
394  >>> from coretex import TaskRun
395  \b
396  >>> result = ExecutingTaskRun.current().submitOutputParameters({
397  "outputDataset": outputDataset,
398  "numbers": 123
399  })
400  """
401 
402  outputParameters: List[Dict[str, Any]] = []
403 
404  for key, value in outputs.items():
405  if isinstance(value, NetworkObject):
406  value = value.id
407 
408  outputParameters.append({key: value})
409 
410  parameters: Dict[str, Any] = {
411  "id": self.id,
412  "parameters": outputParameters
413  }
414 
415  response = networkManager.post(f"{self._endpoint()}/output-parameter", parameters)
416  if response.hasFailed():
417  raise NetworkRequestError(response, ">> [Coretex] Failed to submit outputs")
418 
419  def downloadTask(self) -> bool:
420  """
421  Downloads task snapshot linked to the TaskRun
422 
423  Returns
424  -------
425  bool -> True if task downloaded successfully, False if task download has failed
426  """
427 
428  params = {
429  "model_queue_id": self.id
430  }
431 
432  zipFilePath = f"{self.taskPath}.zip"
433  response = networkManager.download(f"workspace/download", zipFilePath, params)
434 
435  if response.hasFailed():
436  logging.getLogger("coretexpylib").info(">> [Coretex] Task download has failed")
437  return False
438 
439  with ZipFile(zipFilePath) as zipFile:
440  zipFile.extractall(self.taskPathtaskPath)
441 
442  # remove zip file after extract
443  os.unlink(zipFilePath)
444 
445  return not response.hasFailed()
446 
448  self,
449  localFilePath: Union[Path, str],
450  remoteFilePath: str,
451  mimeType: Optional[str] = None
452  ) -> Optional[Artifact]:
453 
454  """
455  Creates Artifact for the current TaskRun on Coretex.ai
456 
457  Parameters
458  ----------
459  localFilePath : Union[Path, str]
460  local path of Artifact file
461  remoteFilePath : str
462  path of Artifact file on Coretex
463  mimeType : Optional[str]
464  mimeType (not required) if not passed guesMimeType() function is used
465 
466  Returns
467  -------
468  Optional[Artifact] -> if response is True returns Artifact object, None otherwise
469  """
470 
471  return Artifact.create(self.id, localFilePath, remoteFilePath, mimeType)
472 
473  def createQiimeArtifact(self, rootArtifactFolderName: str, qiimeArtifactPath: Path) -> None:
474  if not zipfile.is_zipfile(qiimeArtifactPath):
475  raise ValueError(">> [Coretex] Not an archive")
476 
477  localFilePath = str(qiimeArtifactPath)
478  remoteFilePath = f"{rootArtifactFolderName}/{qiimeArtifactPath.name}"
479 
480  mimeType: Optional[str] = None
481  if qiimeArtifactPath.suffix in [".qza", ".qzv"]:
482  mimeType = "application/zip"
483 
484  artifact = self.createArtifactcreateArtifact(localFilePath, remoteFilePath, mimeType)
485  if artifact is None:
486  logging.getLogger("coretexpylib").warning(f">> [Coretex] Failed to upload {localFilePath} to {remoteFilePath}")
487 
488  # TODO: Enable when uploading file by file is not slow anymore
489  # tempDir = Path(FolderManager.instance().createTempFolder(rootArtifactFolderName))
490  # fileUtils.recursiveUnzip(qiimeArtifactPath, tempDir, remove = False)
491 
492  # for path in fileUtils.walk(tempDir):
493  # relative = path.relative_to(tempDir)
494 
495  # localFilePath = str(path)
496  # remoteFilePath = f"{rootArtifactFolderName}/{str(relative)}"
497 
498  # logging.getLogger("coretexpylib").debug(f">> [Coretex] Uploading {localFilePath} to {remoteFilePath}")
499 
500  # artifact = self.createArtifact(localFilePath, remoteFilePath)
501  # if artifact is None:
502  # logging.getLogger("coretexpylib").warning(f">> [Coretex] Failed to upload {localFilePath} to {remoteFilePath}")
503 
504  @classmethod
505  def run(
506  cls,
507  taskId: int,
508  nodeId: Union[int, str],
509  name: Optional[str],
510  description: Optional[str] = None,
511  parameters: Optional[List[Dict[str, Any]]] = None
512  ) -> Self:
513 
514  """
515  Schedules an TaskRun for execution on the specified
516  Node on Coretex.ai
517 
518  Parameters
519  ----------
520  taskId : int
521  id of task that is being used for starting TaskRun
522  nodeId : Union[int, str]
523  id of node that is being used for starting TaskRun
524  name : Optional[str]
525  name of TaskRun (not required)
526  description : Optional[str]
527  TaskRun description (not required)
528  parameters : Optional[List[Dict[str, Any]]]
529  list of parameters (not required)
530 
531  Returns
532  -------
533  Self -> TaskRun object
534 
535  Raises
536  ------
537  NetworkRequestError -> if the request failed
538 
539  Example
540  -------
541  >>> from coretex import TaskRun
542  >>> from coretex.networking import NetworkRequestError
543  \b
544  >>> parameters = [
545  {
546  "name": "dataset",
547  "description": "Dataset id that is used for fetching dataset from coretex.",
548  "value": null,
549  "data_type": "dataset",
550  "required": true
551  }
552  ]
553  \b
554  >>> try:
555  taskRun = TaskRun.run(
556  taskId = 1023,
557  nodeId = 23,
558  name = "Dummy Custom TaskRun
559  description = "Dummy description",
560  parameters = parameters
561  )
562 
563  print(f"Created TaskRun with name: {taskRun.name}")
564  >>> except NetworkRequestError:
565  print("Failed to create TaskRun")
566  """
567 
568  if isinstance(nodeId, int):
569  nodeId = str(nodeId)
570 
571  if parameters is None:
572  parameters = []
573 
574  response = networkManager.post("run", {
575  "sub_project_id": taskId,
576  "service_id": nodeId,
577  "name": name,
578  "description": description,
579  "execution_type": ExecutionType.remote.value,
580  "parameters": parameters
581  })
582 
583  if response.hasFailed():
584  raise NetworkRequestError(response, "Failed to create TaskRun")
585 
586  responseJson = response.getJson(dict)
587  return cls.fetchById(responseJson["experiment_ids"][0])
588 
589  @classmethod
590  def runLocal(
591  cls,
592  projectId: int,
593  saveSnapshot: bool,
594  name: Optional[str],
595  description: Optional[str] = None,
596  parameters: Optional[List[Dict[str, Any]]] = None,
597  entryPoint: Optional[str] = None
598  ) -> Self:
599 
600  """
601  Creates TaskRun on Coretex.ai with the provided parameters,
602  which will be run on the same machine which created it immidiately
603  after running the entry point file of the task
604 
605  Parameters
606  ----------
607  projectId : int
608  id of project that is being used for starting TaskRun
609  saveSnapshot : bool
610  true if snapshot of local files will be saved to Coretex
611  name : Optional[str]
612  name of TaskRun (not required)
613  description : Optional[str]
614  TaskRun description (not required)
615  parameters : Optional[List[Dict[str, Any]]]
616  list of parameters (not required)
617  entryPoint : Optional[str]
618  relative path to the script inside of the project
619 
620  Returns
621  -------
622  Self -> TaskRun object
623 
624  Raises
625  ------
626  NetworkRequestError -> if the request failed
627  """
628 
629  if parameters is None:
630  parameters = []
631 
632  params = {
633  "project_id": projectId,
634  "name": name,
635  "description": description,
636  "execution_type": ExecutionType.local.value,
637  "parameters": json.dumps(parameters)
638  }
639 
640  if entryPoint is not None:
641  params["entry_point"] = entryPoint
642 
643  # Create snapshot
644  if saveSnapshot:
645  files = [FileData.createFromPath("file", createSnapshot())]
646  else:
647  files = None
648 
649  response = networkManager.formData("run", params, files)
650  if response.hasFailed():
651  raise NetworkRequestError(response, "Failed to create TaskRun")
652 
653  responseJson = response.getJson(dict)
654  return cls.fetchById(responseJson["experiment_ids"][0])
655 
656 
657  def generateEntityName(self) -> str:
658  """
659  Combines the id and the name of the task run into a name for enitites like
660  datasets or models.
661 
662  Returns
663  -------
664  str -> the generated name
665  """
666 
667  name = f"{self.id}-{self.name}"
668  return name[:50]
bool submitMetrics(self, Dict[str, Tuple[float, float]] metricValues)
Definition: task_run.py:311
None submitOutputs(self, Dict[str, Any] outputs)
Definition: task_run.py:376
None submitOutput(self, str parameterName, Any value)
Definition: task_run.py:347
Self run(cls, int taskId, Union[int, str] nodeId, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None)
Definition: task_run.py:512
bool _isInterval(self, str metricName)
Definition: task_run.py:201
bool updateStatus(self, Optional[TaskRunStatus] status=None, Optional[str] message=None, bool notifyServer=True)
Definition: task_run.py:213
Self runLocal(cls, int projectId, bool saveSnapshot, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None, Optional[str] entryPoint=None)
Definition: task_run.py:598
None createMetrics(self, List[Metric] metrics)
Definition: task_run.py:270
Optional[Artifact] createArtifact(self, Union[Path, str] localFilePath, str remoteFilePath, Optional[str] mimeType=None)
Definition: task_run.py:452