Coretex
coretex.entities.task_run.task_run.TaskRun Class Reference

Inherits NetworkObject, Generic, and DatasetType.

Public Member Functions

Dict[str, Any] parameters (self)
 
Path taskPath (self)
 
DatasetType dataset (self)
 
bool updateStatus (self, Optional[TaskRunStatus] status=None, Optional[str] message=None, bool notifyServer=True)
 
None createMetrics (self, List[Metric] metrics)
 
bool submitMetrics (self, Dict[str, Tuple[float, float]] metricValues)
 
None submitOutput (self, str parameterName, Any value)
 
None submitOutputs (self, Dict[str, Any] outputs)
 
bool downloadTask (self)
 
Optional[ArtifactcreateArtifact (self, Union[Path, str] localFilePath, str remoteFilePath, Optional[str] mimeType=None)
 
Self run (cls, int taskId, Union[int, str] nodeId, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None)
 
Self runLocal (cls, int projectId, bool saveSnapshot, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None, Optional[str] entryPoint=None)
 
str generateEntityName (self)
 

Detailed Description

    Represents TaskRun entity from Coretex.ai

    Properties
    ----------
    datasetId : int
        id of dataset
    name : str
        name of TaskRun
    description : str
        description of TaskRun
    meta : Dict[str, Any]
        meta data of TaskRun
    status : TaskRunStatus
        status of TaskRun
    projectId : int
        id of Coretex Project
    projectName : str
        name of Coretex Project
    projectType : ProjectType
        appropriate project type
    taskId : int
        id of task
    taskName : str
        name of task
    createdById : str
        id of created TaskRun
    useCachedEnv : bool
        if True chached env will be used, otherwise new environment will be created

Definition at line 45 of file task_run.py.

Member Function Documentation

◆ createArtifact()

Optional[Artifact] coretex.entities.task_run.task_run.TaskRun.createArtifact (   self,
Union[Path, str]  localFilePath,
str  remoteFilePath,
Optional[str]   mimeType = None 
)
    Creates Artifact for the current TaskRun on Coretex.ai

    Parameters
    ----------
    localFilePath : Union[Path, str]
        local path of Artifact file
    remoteFilePath : str
        path of Artifact file on Coretex
    mimeType : Optional[str]
        mimeType (not required) if not passed guesMimeType() function is used

    Returns
    -------
    Optional[Artifact] -> if response is True returns Artifact object, None otherwise

Definition at line 447 of file task_run.py.

452  ) -> Optional[Artifact]:
453 
454  """
455  Creates Artifact for the current TaskRun on Coretex.ai
456 
457  Parameters
458  ----------
459  localFilePath : Union[Path, str]
460  local path of Artifact file
461  remoteFilePath : str
462  path of Artifact file on Coretex
463  mimeType : Optional[str]
464  mimeType (not required) if not passed guesMimeType() function is used
465 
466  Returns
467  -------
468  Optional[Artifact] -> if response is True returns Artifact object, None otherwise
469  """
470 
471  return Artifact.create(self.id, localFilePath, remoteFilePath, mimeType)
472 

◆ createMetrics()

None coretex.entities.task_run.task_run.TaskRun.createMetrics (   self,
List[Metric]  metrics 
)
    Creates specified metrics for the TaskRun

    Parameters
    ----------
    values : List[Metric]]
        List of Metric meta objects in this format
        Metric("name", "x_label", "x_type", "y_label", "y_type", "x_range", "y_range")

    Returns
    -------
    List[Metric] -> List of Metric objects

    Raises
    ------
    NetworkRequestError -> if the request failed

    Example
    -------
    >>> from coretex import ExecutingTaskRun, MetricType
    \b
    >>> metrics = ExecutingTaskRun.current().createMetrics([
            Metric.create("loss", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100]),
            Metric.create("accuracy", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100])
        ])
    >>> if len(metrics) == 0:
            print("Failed to create metrics")

Definition at line 270 of file task_run.py.

270  def createMetrics(self, metrics: List[Metric]) -> None:
271  """
272  Creates specified metrics for the TaskRun
273 
274  Parameters
275  ----------
276  values : List[Metric]]
277  List of Metric meta objects in this format
278  Metric("name", "x_label", "x_type", "y_label", "y_type", "x_range", "y_range")
279 
280  Returns
281  -------
282  List[Metric] -> List of Metric objects
283 
284  Raises
285  ------
286  NetworkRequestError -> if the request failed
287 
288  Example
289  -------
290  >>> from coretex import ExecutingTaskRun, MetricType
291  \b
292  >>> metrics = ExecutingTaskRun.current().createMetrics([
293  Metric.create("loss", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100]),
294  Metric.create("accuracy", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100])
295  ])
296  >>> if len(metrics) == 0:
297  print("Failed to create metrics")
298  """
299 
300  parameters: Dict[str, Any] = {
301  "experiment_id": self.id,
302  "metrics": [metric.encode() for metric in metrics]
303  }
304 
305  response = networkManager.post(f"{self._endpoint()}/metrics-meta", parameters)
306  if response.hasFailed():
307  raise NetworkRequestError(response, "Failed to create metrics")
308 
309  self.metrics.extend(metrics)
310 

◆ dataset()

DatasetType coretex.entities.task_run.task_run.TaskRun.dataset (   self)
    Value of the parameter with name "dataset" assigned to this TaskRun

    Returns
    -------
    Dataset object if there was a parameter with name "dataset" entered when the TaskRun was started

    Raises
    ------
    ValueError -> if there is not parameter with name "dataset"

Definition at line 121 of file task_run.py.

121  def dataset(self) -> DatasetType:
122  """
123  Value of the parameter with name "dataset" assigned to this TaskRun
124 
125  Returns
126  -------
127  Dataset object if there was a parameter with name "dataset" entered when the TaskRun was started
128 
129  Raises
130  ------
131  ValueError -> if there is not parameter with name "dataset"
132  """
133 
134  dataset = self.parameters.get("dataset")
135  if dataset is None:
136  raise ValueError(f">> [Coretex] TaskRun \"{self.id}\" does not have a parameter named \"dataset\"")
137 
138  return dataset # type: ignore
139 

◆ downloadTask()

bool coretex.entities.task_run.task_run.TaskRun.downloadTask (   self)
    Downloads task snapshot linked to the TaskRun

    Returns
    -------
    bool -> True if task downloaded successfully, False if task download has failed

Definition at line 419 of file task_run.py.

419  def downloadTask(self) -> bool:
420  """
421  Downloads task snapshot linked to the TaskRun
422 
423  Returns
424  -------
425  bool -> True if task downloaded successfully, False if task download has failed
426  """
427 
428  params = {
429  "model_queue_id": self.id
430  }
431 
432  zipFilePath = f"{self.taskPath}.zip"
433  response = networkManager.download(f"workspace/download", zipFilePath, params)
434 
435  if response.hasFailed():
436  logging.getLogger("coretexpylib").info(">> [Coretex] Task download has failed")
437  return False
438 
439  with ZipFile(zipFilePath) as zipFile:
440  zipFile.extractall(self.taskPath)
441 
442  # remove zip file after extract
443  os.unlink(zipFilePath)
444 
445  return not response.hasFailed()
446 

◆ generateEntityName()

str coretex.entities.task_run.task_run.TaskRun.generateEntityName (   self)
    Combines the id and the name of the task run into a name for enitites like
    datasets or models.

    Returns
    -------
    str -> the generated name

Definition at line 657 of file task_run.py.

657  def generateEntityName(self) -> str:
658  """
659  Combines the id and the name of the task run into a name for enitites like
660  datasets or models.
661 
662  Returns
663  -------
664  str -> the generated name
665  """
666 
667  name = f"{self.id}-{self.name}"
668  return name[:50]

◆ parameters()

Dict[str, Any] coretex.entities.task_run.task_run.TaskRun.parameters (   self)
    Returns
    -------
    Dict[str, Any] -> Parameters for TaskRun

Definition at line 101 of file task_run.py.

101  def parameters(self) -> Dict[str, Any]:
102  """
103  Returns
104  -------
105  Dict[str, Any] -> Parameters for TaskRun
106  """
107 
108  return self.__parameters
109 

◆ run()

Self coretex.entities.task_run.task_run.TaskRun.run (   cls,
int  taskId,
Union[int, str]  nodeId,
Optional[str]  name,
Optional[str]   description = None,
Optional[List[Dict[str, Any]]]   parameters = None 
)
    Schedules an TaskRun for execution on the specified
    Node on Coretex.ai

    Parameters
    ----------
    taskId : int
        id of task that is being used for starting TaskRun
    nodeId : Union[int, str]
        id of node that is being used for starting TaskRun
    name : Optional[str]
        name of TaskRun (not required)
    description : Optional[str]
        TaskRun description (not required)
    parameters : Optional[List[Dict[str, Any]]]
        list of parameters (not required)

    Returns
    -------
    Self -> TaskRun object

    Raises
    ------
    NetworkRequestError -> if the request failed

    Example
    -------
    >>> from coretex import TaskRun
    >>> from coretex.networking import NetworkRequestError
    \b
    >>> parameters = [
            {
                "name": "dataset",
                "description": "Dataset id that is used for fetching dataset from coretex.",
                "value": null,
                "data_type": "dataset",
                "required": true
            }
        ]
    \b
    >>> try:
            taskRun = TaskRun.run(
                taskId = 1023,
                nodeId = 23,
                name = "Dummy Custom TaskRun
                description = "Dummy description",
                parameters = parameters
            )

            print(f"Created TaskRun with name: {taskRun.name}")
    >>> except NetworkRequestError:
            print("Failed to create TaskRun")

Definition at line 505 of file task_run.py.

512  ) -> Self:
513 
514  """
515  Schedules an TaskRun for execution on the specified
516  Node on Coretex.ai
517 
518  Parameters
519  ----------
520  taskId : int
521  id of task that is being used for starting TaskRun
522  nodeId : Union[int, str]
523  id of node that is being used for starting TaskRun
524  name : Optional[str]
525  name of TaskRun (not required)
526  description : Optional[str]
527  TaskRun description (not required)
528  parameters : Optional[List[Dict[str, Any]]]
529  list of parameters (not required)
530 
531  Returns
532  -------
533  Self -> TaskRun object
534 
535  Raises
536  ------
537  NetworkRequestError -> if the request failed
538 
539  Example
540  -------
541  >>> from coretex import TaskRun
542  >>> from coretex.networking import NetworkRequestError
543  \b
544  >>> parameters = [
545  {
546  "name": "dataset",
547  "description": "Dataset id that is used for fetching dataset from coretex.",
548  "value": null,
549  "data_type": "dataset",
550  "required": true
551  }
552  ]
553  \b
554  >>> try:
555  taskRun = TaskRun.run(
556  taskId = 1023,
557  nodeId = 23,
558  name = "Dummy Custom TaskRun
559  description = "Dummy description",
560  parameters = parameters
561  )
562 
563  print(f"Created TaskRun with name: {taskRun.name}")
564  >>> except NetworkRequestError:
565  print("Failed to create TaskRun")
566  """
567 
568  if isinstance(nodeId, int):
569  nodeId = str(nodeId)
570 
571  if parameters is None:
572  parameters = []
573 
574  response = networkManager.post("run", {
575  "sub_project_id": taskId,
576  "service_id": nodeId,
577  "name": name,
578  "description": description,
579  "execution_type": ExecutionType.remote.value,
580  "parameters": parameters
581  })
582 
583  if response.hasFailed():
584  raise NetworkRequestError(response, "Failed to create TaskRun")
585 
586  responseJson = response.getJson(dict)
587  return cls.fetchById(responseJson["experiment_ids"][0])
588 

◆ runLocal()

Self coretex.entities.task_run.task_run.TaskRun.runLocal (   cls,
int  projectId,
bool  saveSnapshot,
Optional[str]  name,
Optional[str]   description = None,
Optional[List[Dict[str, Any]]]   parameters = None,
Optional[str]   entryPoint = None 
)
    Creates TaskRun on Coretex.ai with the provided parameters,
    which will be run on the same machine which created it immidiately
    after running the entry point file of the task

    Parameters
    ----------
    projectId : int
        id of project that is being used for starting TaskRun
    saveSnapshot : bool
        true if snapshot of local files will be saved to Coretex
    name : Optional[str]
        name of TaskRun (not required)
    description : Optional[str]
        TaskRun description (not required)
    parameters : Optional[List[Dict[str, Any]]]
        list of parameters (not required)
    entryPoint : Optional[str]
        relative path to the script inside of the project

    Returns
    -------
    Self -> TaskRun object

    Raises
    ------
    NetworkRequestError -> if the request failed

Definition at line 590 of file task_run.py.

598  ) -> Self:
599 
600  """
601  Creates TaskRun on Coretex.ai with the provided parameters,
602  which will be run on the same machine which created it immidiately
603  after running the entry point file of the task
604 
605  Parameters
606  ----------
607  projectId : int
608  id of project that is being used for starting TaskRun
609  saveSnapshot : bool
610  true if snapshot of local files will be saved to Coretex
611  name : Optional[str]
612  name of TaskRun (not required)
613  description : Optional[str]
614  TaskRun description (not required)
615  parameters : Optional[List[Dict[str, Any]]]
616  list of parameters (not required)
617  entryPoint : Optional[str]
618  relative path to the script inside of the project
619 
620  Returns
621  -------
622  Self -> TaskRun object
623 
624  Raises
625  ------
626  NetworkRequestError -> if the request failed
627  """
628 
629  if parameters is None:
630  parameters = []
631 
632  params = {
633  "project_id": projectId,
634  "name": name,
635  "description": description,
636  "execution_type": ExecutionType.local.value,
637  "parameters": json.dumps(parameters)
638  }
639 
640  if entryPoint is not None:
641  params["entry_point"] = entryPoint
642 
643  # Create snapshot
644  if saveSnapshot:
645  files = [FileData.createFromPath("file", createSnapshot())]
646  else:
647  files = None
648 
649  response = networkManager.formData("run", params, files)
650  if response.hasFailed():
651  raise NetworkRequestError(response, "Failed to create TaskRun")
652 
653  responseJson = response.getJson(dict)
654  return cls.fetchById(responseJson["experiment_ids"][0])
655 
656 

◆ submitMetrics()

bool coretex.entities.task_run.task_run.TaskRun.submitMetrics (   self,
Dict[str, Tuple[float, float]]  metricValues 
)
    Appends metric values for the provided metrics

    Parameters
    ----------
    metricValues : Dict[str, Tuple[float, float]]
        Values of metrics in this format {"name": x, y}

    Example
    -------
    >>> from coretex import ExecutingTaskRun
    \b
    >>> result = ExecutingTaskRun.current().submitMetrics({
            "loss": (epoch, logs["loss"]),
            "accuracy": (epoch, logs["accuracy"]),
        })
    >>> print(result)
    True

Definition at line 311 of file task_run.py.

311  def submitMetrics(self, metricValues: Dict[str, Tuple[float, float]]) -> bool:
312  """
313  Appends metric values for the provided metrics
314 
315  Parameters
316  ----------
317  metricValues : Dict[str, Tuple[float, float]]
318  Values of metrics in this format {"name": x, y}
319 
320  Example
321  -------
322  >>> from coretex import ExecutingTaskRun
323  \b
324  >>> result = ExecutingTaskRun.current().submitMetrics({
325  "loss": (epoch, logs["loss"]),
326  "accuracy": (epoch, logs["accuracy"]),
327  })
328  >>> print(result)
329  True
330  """
331 
332  metrics = [{
333  "timestamp": value[0] if self._isInterval(key) else time.time(),
334  "metric": key,
335  "x": value[0],
336  "y": value[1]
337  } for key, value in metricValues.items()]
338 
339  parameters: Dict[str, Any] = {
340  "experiment_id": self.id,
341  "metrics": metrics
342  }
343 
344  response = networkManager.post(f"{self._endpoint()}/metrics", parameters)
345  return not response.hasFailed()
346 

◆ submitOutput()

None coretex.entities.task_run.task_run.TaskRun.submitOutput (   self,
str  parameterName,
Any  value 
)
    Submit an output of this task to act as a parameter in tasks
    downstream in the Workflow

    Parameters
    ----------
    parameterName : str
        name of the parameter
    value : Any
        value that will be sent (id of the object will be sent
        in case Coretex objects like CustomDataset, Model etc. is submited)

    Raises
    ------
    NetworkRequestError -> if the request failed

    Example
    -------
    >>> from coretex import TaskRun
    \b
    >>> ExecutingTaskRun.current().submitOutputParameter(
            parameterName = "outputDataset"
            value = outputDataset
        )

Definition at line 347 of file task_run.py.

347  def submitOutput(self, parameterName: str, value: Any) -> None:
348  """
349  Submit an output of this task to act as a parameter in tasks
350  downstream in the Workflow
351 
352  Parameters
353  ----------
354  parameterName : str
355  name of the parameter
356  value : Any
357  value that will be sent (id of the object will be sent
358  in case Coretex objects like CustomDataset, Model etc. is submited)
359 
360  Raises
361  ------
362  NetworkRequestError -> if the request failed
363 
364  Example
365  -------
366  >>> from coretex import TaskRun
367  \b
368  >>> ExecutingTaskRun.current().submitOutputParameter(
369  parameterName = "outputDataset"
370  value = outputDataset
371  )
372  """
373 
374  self.submitOutputs({parameterName: value})
375 

◆ submitOutputs()

None coretex.entities.task_run.task_run.TaskRun.submitOutputs (   self,
Dict[str, Any]  outputs 
)
    Submit multiple outputs of this task to act as parameters in tasks
    downstream in the Workflow

    Parameters
    ----------
    outputs : Dict[str, Any]
        dictionary with outputs, with key being the name of the parameter
        (id of the object will be sent in case Coretex objects like
        CustomDataset, Model etc. is submited)

    Raises
    ------
    NetworkRequestError -> if the request failed

    Example
    -------
    >>> from coretex import TaskRun
    \b
    >>> result = ExecutingTaskRun.current().submitOutputParameters({
            "outputDataset": outputDataset,
            "numbers": 123
        })

Definition at line 376 of file task_run.py.

376  def submitOutputs(self, outputs: Dict[str, Any]) -> None:
377  """
378  Submit multiple outputs of this task to act as parameters in tasks
379  downstream in the Workflow
380 
381  Parameters
382  ----------
383  outputs : Dict[str, Any]
384  dictionary with outputs, with key being the name of the parameter
385  (id of the object will be sent in case Coretex objects like
386  CustomDataset, Model etc. is submited)
387 
388  Raises
389  ------
390  NetworkRequestError -> if the request failed
391 
392  Example
393  -------
394  >>> from coretex import TaskRun
395  \b
396  >>> result = ExecutingTaskRun.current().submitOutputParameters({
397  "outputDataset": outputDataset,
398  "numbers": 123
399  })
400  """
401 
402  outputParameters: List[Dict[str, Any]] = []
403 
404  for key, value in outputs.items():
405  if isinstance(value, NetworkObject):
406  value = value.id
407 
408  outputParameters.append({key: value})
409 
410  parameters: Dict[str, Any] = {
411  "id": self.id,
412  "parameters": outputParameters
413  }
414 
415  response = networkManager.post(f"{self._endpoint()}/output-parameter", parameters)
416  if response.hasFailed():
417  raise NetworkRequestError(response, ">> [Coretex] Failed to submit outputs")
418 

◆ taskPath()

Path coretex.entities.task_run.task_run.TaskRun.taskPath (   self)
    Returns
    -------
    Path -> Path for TaskRun

Definition at line 111 of file task_run.py.

111  def taskPath(self) -> Path:
112  """
113  Returns
114  -------
115  Path -> Path for TaskRun
116  """
117 
118  return folder_manager.temp / str(self.id)
119 

◆ updateStatus()

bool coretex.entities.task_run.task_run.TaskRun.updateStatus (   self,
Optional[TaskRunStatus]   status = None,
Optional[str]   message = None,
bool   notifyServer = True 
)
    Updates TaskRun status, if message parameter is None
    default message value will be used\n
    Some TaskRun statuses do not have default message

    Parameters
    ----------
    status : Optional[TaskRunStatus]
        Status to which the TaskRun will be updated to
    message : Optional[str]
        Descriptive message for TaskRun status, it is diplayed
        when the status is hovered on the Coretex Web App
    notifyServer : bool
        if True update request will be sent to Coretex.ai

    Example
    -------
    >>> from coretex import ExecutingTaskRun, TaskRunStatus
    \b
    >>> ExecutingTaskRun.current().updateStatus(
            TaskRunStatus.completedWithSuccess
        )
    True

Definition at line 208 of file task_run.py.

213  ) -> bool:
214 
215  """
216  Updates TaskRun status, if message parameter is None
217  default message value will be used\n
218  Some TaskRun statuses do not have default message
219 
220  Parameters
221  ----------
222  status : Optional[TaskRunStatus]
223  Status to which the TaskRun will be updated to
224  message : Optional[str]
225  Descriptive message for TaskRun status, it is diplayed
226  when the status is hovered on the Coretex Web App
227  notifyServer : bool
228  if True update request will be sent to Coretex.ai
229 
230  Example
231  -------
232  >>> from coretex import ExecutingTaskRun, TaskRunStatus
233  \b
234  >>> ExecutingTaskRun.current().updateStatus(
235  TaskRunStatus.completedWithSuccess
236  )
237  True
238  """
239 
240  if status is not None:
241  self.status = status
242 
243  if notifyServer:
244  if status is not None and message is None:
245  message = status.defaultMessage
246 
247  parameters: Dict[str, Any] = {
248  "id": self.id
249  }
250 
251  if status is not None:
252  parameters["status"] = status
253 
254  if message is not None:
255  parameters["message"] = message
256 
257  # TODO: Should API rename this too?
258  endpoint = f"{self._endpoint()}/job-status-update"
259  response = networkManager.post(endpoint, parameters)
260 
261  if response.hasFailed():
262  logging.getLogger("coretexpylib").error(">> [Coretex] Error while updating TaskRun status")
263  elif status is not None:
264  logging.getLogger("coretexpylib").info(f">> [Coretex] Updated Task Run status to \"{status.name}\"")
265 
266  return not response.hasFailed()
267 
268  return True
269 

The documentation for this class was generated from the following file: