18 from typing
import Optional, Any, List, Dict, Union, Tuple, TypeVar, Generic, Type
19 from typing_extensions
import Self, override
20 from zipfile
import ZipFile, ZIP_DEFLATED
21 from pathlib
import Path
29 from .utils
import createSnapshot
30 from .artifact
import Artifact
31 from .status
import TaskRunStatus
32 from .metrics
import Metric, MetricType
33 from .parameter
import validateParameters, parameter_factory
34 from .execution_type
import ExecutionType
35 from ..dataset
import Dataset, LocalDataset, NetworkDataset
36 from ..project
import ProjectType
37 from ..model
import Model
38 from ..._folder_manager
import folder_manager
39 from ...codable
import KeyDescriptor
40 from ...networking
import networkManager, NetworkObject, NetworkRequestError, FileData
43 DatasetType = TypeVar(
"DatasetType", bound = Dataset)
45 class TaskRun(NetworkObject, Generic[DatasetType]):
48 Represents TaskRun entity from Coretex.ai
57 description of TaskRun
60 status : TaskRunStatus
65 name of Coretex Project
66 projectType : ProjectType
67 appropriate project type
75 if True chached env will be used, otherwise new environment will be created
84 projectType: ProjectType
90 executionType: ExecutionType
94 def __init__(self) -> None:
95 super(TaskRun, self).__init__()
105 Dict[str, Any] -> Parameters for TaskRun
115 Path -> Path for TaskRun
118 return folder_manager.temp / str(self.id)
123 Value of the parameter with name "dataset" assigned to this TaskRun
127 Dataset object if there was a parameter with name "dataset" entered when the TaskRun was started
131 ValueError -> if there is not parameter with name "dataset"
134 dataset = self.
parametersparameters.get(
"dataset")
136 raise ValueError(f
">> [Coretex] TaskRun \"{self.id}\" does not have a parameter named \"dataset\"")
141 def isLocal(self) -> bool:
142 return self.
executionTypeexecutionType == ExecutionType.local
144 def setDatasetType(self, datasetType: Type[DatasetType]) ->
None:
145 for key, value
in self.
__parameters__parameters.items():
146 if isinstance(value, LocalDataset)
and issubclass(datasetType, LocalDataset):
147 self.
__parameters__parameters[key] = datasetType(value.path)
149 if isinstance(value, NetworkDataset)
and issubclass(datasetType, NetworkDataset):
150 self.
__parameters__parameters[key] = datasetType.fetchById(value.id)
152 def setModelType(self, modelType: Type[Model]) ->
None:
153 for key, value
in self.
__parameters__parameters.items():
154 if isinstance(value, Model):
155 self.
__parameters__parameters[key] = modelType.fetchById(value.id)
158 def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]:
159 descriptors = super()._keyDescriptors()
161 descriptors[
"status"] =
KeyDescriptor(
"status", TaskRunStatus)
164 descriptors[
"projectType"] =
KeyDescriptor(
"project_task", ProjectType)
167 descriptors[
"executionType"] =
KeyDescriptor(
"execution_type", ExecutionType)
168 descriptors[
"workflowRunId"] =
KeyDescriptor(
"pipeline_run_id")
171 descriptors[
"__parameters"] =
KeyDescriptor(isEncodable =
False)
177 def _endpoint(cls) -> str:
181 def entityUrl(self) -> str:
182 return f
"workflow-run?id={self.workflowRunId}&taskRunId={self.id}"
184 def onDecode(self) -> None:
187 if self.meta[
"parameters"]
is None:
188 self.meta[
"parameters"] = []
190 if not isinstance(self.meta[
"parameters"], list):
191 raise ValueError(
">> [Coretex] Invalid parameters")
193 parameters = [parameter_factory.create(value)
for value
in self.meta[
"parameters"]]
195 parameterValidationResults = validateParameters(parameters, verbose =
True)
196 if not all(parameterValidationResults.values()):
197 raise ValueError(
"Invalid parameters found")
199 self.
__parameters__parameters = {parameter.name: parameter.parseValue(self.projectType)
for parameter
in parameters}
201 def _isInterval(self, metricName: str) -> bool:
202 for metric
in self.
metricsmetrics:
203 if metric.name == metricName
and metric.xType == MetricType.interval.name:
210 status: Optional[TaskRunStatus] =
None,
211 message: Optional[str] =
None,
212 notifyServer: bool =
True
216 Updates TaskRun status, if message parameter is None
217 default message value will be used\n
218 Some TaskRun statuses do not have default message
222 status : Optional[TaskRunStatus]
223 Status to which the TaskRun will be updated to
224 message : Optional[str]
225 Descriptive message for TaskRun status, it is diplayed
226 when the status is hovered on the Coretex Web App
228 if True update request will be sent to Coretex.ai
232 >>> from coretex import ExecutingTaskRun, TaskRunStatus
234 >>> ExecutingTaskRun.current().updateStatus(
235 TaskRunStatus.completedWithSuccess
240 if status
is not None:
241 self.
statusstatus = status
244 if status
is not None and message
is None:
245 message = status.defaultMessage
247 parameters: Dict[str, Any] = {
251 if status
is not None:
252 parameters[
"status"] = status
254 if message
is not None:
255 parameters[
"message"] = message
258 endpoint = f
"{self._endpoint()}/job-status-update"
259 response = networkManager.post(endpoint, parameters)
261 if response.hasFailed():
262 logging.getLogger(
"coretexpylib").error(
">> [Coretex] Error while updating TaskRun status")
263 elif status
is not None:
264 logging.getLogger(
"coretexpylib").info(f
">> [Coretex] Updated Task Run status to \"{status.name}\"")
266 return not response.hasFailed()
272 Creates specified metrics for the TaskRun
276 values : List[Metric]]
277 List of Metric meta objects in this format
278 Metric("name", "x_label", "x_type", "y_label", "y_type", "x_range", "y_range")
282 List[Metric] -> List of Metric objects
286 NetworkRequestError -> if the request failed
290 >>> from coretex import ExecutingTaskRun, MetricType
292 >>> metrics = ExecutingTaskRun.current().createMetrics([
293 Metric.create("loss", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100]),
294 Metric.create("accuracy", "epoch", MetricType.int, "value", MetricType.float, None, [0, 100])
296 >>> if len(metrics) == 0:
297 print("Failed to create metrics")
300 parameters: Dict[str, Any] = {
301 "experiment_id": self.id,
302 "metrics": [metric.encode()
for metric
in metrics]
305 response = networkManager.post(f
"{self._endpoint()}/metrics-meta", parameters)
306 if response.hasFailed():
307 raise NetworkRequestError(response,
"Failed to create metrics")
309 self.
metricsmetrics.extend(metrics)
311 def submitMetrics(self, metricValues: Dict[str, Tuple[float, float]]) -> bool:
313 Appends metric values for the provided metrics
317 metricValues : Dict[str, Tuple[float, float]]
318 Values of metrics in this format {"name": x, y}
322 >>> from coretex import ExecutingTaskRun
324 >>> result = ExecutingTaskRun.current().submitMetrics({
325 "loss": (epoch, logs["loss"]),
326 "accuracy": (epoch, logs["accuracy"]),
333 "timestamp": value[0]
if self.
_isInterval_isInterval(key)
else time.time(),
337 }
for key, value
in metricValues.items()]
339 parameters: Dict[str, Any] = {
340 "experiment_id": self.id,
344 response = networkManager.post(f
"{self._endpoint()}/metrics", parameters)
345 return not response.hasFailed()
349 Submit an output of this task to act as a parameter in tasks
350 downstream in the Workflow
355 name of the parameter
357 value that will be sent (id of the object will be sent
358 in case Coretex objects like CustomDataset, Model etc. is submited)
362 NetworkRequestError -> if the request failed
366 >>> from coretex import TaskRun
368 >>> ExecutingTaskRun.current().submitOutputParameter(
369 parameterName = "outputDataset"
370 value = outputDataset
378 Submit multiple outputs of this task to act as parameters in tasks
379 downstream in the Workflow
383 outputs : Dict[str, Any]
384 dictionary with outputs, with key being the name of the parameter
385 (id of the object will be sent in case Coretex objects like
386 CustomDataset, Model etc. is submited)
390 NetworkRequestError -> if the request failed
394 >>> from coretex import TaskRun
396 >>> result = ExecutingTaskRun.current().submitOutputParameters({
397 "outputDataset": outputDataset,
402 outputParameters: List[Dict[str, Any]] = []
404 for key, value
in outputs.items():
405 if isinstance(value, NetworkObject):
408 outputParameters.append({key: value})
410 parameters: Dict[str, Any] = {
412 "parameters": outputParameters
415 response = networkManager.post(f
"{self._endpoint()}/output-parameter", parameters)
416 if response.hasFailed():
417 raise NetworkRequestError(response,
">> [Coretex] Failed to submit outputs")
421 Downloads task snapshot linked to the TaskRun
425 bool -> True if task downloaded successfully, False if task download has failed
429 "model_queue_id": self.id
432 zipFilePath = f
"{self.taskPath}.zip"
433 response = networkManager.download(f
"workspace/download", zipFilePath, params)
435 if response.hasFailed():
436 logging.getLogger(
"coretexpylib").info(
">> [Coretex] Task download has failed")
439 with ZipFile(zipFilePath)
as zipFile:
440 zipFile.extractall(self.
taskPathtaskPath)
443 os.unlink(zipFilePath)
445 return not response.hasFailed()
449 localFilePath: Union[Path, str],
451 mimeType: Optional[str] =
None
452 ) -> Optional[Artifact]:
455 Creates Artifact for the current TaskRun on Coretex.ai
459 localFilePath : Union[Path, str]
460 local path of Artifact file
462 path of Artifact file on Coretex
463 mimeType : Optional[str]
464 mimeType (not required) if not passed guesMimeType() function is used
468 Optional[Artifact] -> if response is True returns Artifact object, None otherwise
471 return Artifact.create(self.id, localFilePath, remoteFilePath, mimeType)
473 def createQiimeArtifact(self, rootArtifactFolderName: str, qiimeArtifactPath: Path) ->
None:
474 if not zipfile.is_zipfile(qiimeArtifactPath):
475 raise ValueError(
">> [Coretex] Not an archive")
477 localFilePath = str(qiimeArtifactPath)
478 remoteFilePath = f
"{rootArtifactFolderName}/{qiimeArtifactPath.name}"
480 mimeType: Optional[str] =
None
481 if qiimeArtifactPath.suffix
in [
".qza",
".qzv"]:
482 mimeType =
"application/zip"
484 artifact = self.
createArtifactcreateArtifact(localFilePath, remoteFilePath, mimeType)
486 logging.getLogger(
"coretexpylib").warning(f
">> [Coretex] Failed to upload {localFilePath} to {remoteFilePath}")
508 nodeId: Union[int, str],
510 description: Optional[str] =
None,
511 parameters: Optional[List[Dict[str, Any]]] =
None
515 Schedules an TaskRun for execution on the specified
521 id of task that is being used for starting TaskRun
522 nodeId : Union[int, str]
523 id of node that is being used for starting TaskRun
525 name of TaskRun (not required)
526 description : Optional[str]
527 TaskRun description (not required)
528 parameters : Optional[List[Dict[str, Any]]]
529 list of parameters (not required)
533 Self -> TaskRun object
537 NetworkRequestError -> if the request failed
541 >>> from coretex import TaskRun
542 >>> from coretex.networking import NetworkRequestError
547 "description": "Dataset id that is used for fetching dataset from coretex.",
549 "data_type": "dataset",
555 taskRun = TaskRun.run(
558 name = "Dummy Custom TaskRun
559 description = "Dummy description",
560 parameters = parameters
563 print(f"Created TaskRun with name: {taskRun.name}")
564 >>> except NetworkRequestError:
565 print("Failed to create TaskRun")
568 if isinstance(nodeId, int):
571 if parameters
is None:
574 response = networkManager.post(
"run", {
575 "sub_project_id": taskId,
576 "service_id": nodeId,
578 "description": description,
579 "execution_type": ExecutionType.remote.value,
580 "parameters": parameters
583 if response.hasFailed():
584 raise NetworkRequestError(response,
"Failed to create TaskRun")
586 responseJson = response.getJson(dict)
587 return cls.fetchById(responseJson[
"experiment_ids"][0])
595 description: Optional[str] =
None,
596 parameters: Optional[List[Dict[str, Any]]] =
None,
597 entryPoint: Optional[str] =
None
601 Creates TaskRun on Coretex.ai with the provided parameters,
602 which will be run on the same machine which created it immidiately
603 after running the entry point file of the task
608 id of project that is being used for starting TaskRun
610 true if snapshot of local files will be saved to Coretex
612 name of TaskRun (not required)
613 description : Optional[str]
614 TaskRun description (not required)
615 parameters : Optional[List[Dict[str, Any]]]
616 list of parameters (not required)
617 entryPoint : Optional[str]
618 relative path to the script inside of the project
622 Self -> TaskRun object
626 NetworkRequestError -> if the request failed
629 if parameters
is None:
633 "project_id": projectId,
635 "description": description,
636 "execution_type": ExecutionType.local.value,
637 "parameters": json.dumps(parameters)
640 if entryPoint
is not None:
641 params[
"entry_point"] = entryPoint
645 files = [FileData.createFromPath(
"file", createSnapshot())]
649 response = networkManager.formData(
"run", params, files)
650 if response.hasFailed():
651 raise NetworkRequestError(response,
"Failed to create TaskRun")
653 responseJson = response.getJson(dict)
654 return cls.fetchById(responseJson[
"experiment_ids"][0])
659 Combines the id and the name of the task run into a name for enitites like
664 str -> the generated name
667 name = f
"{self.id}-{self.name}"
bool submitMetrics(self, Dict[str, Tuple[float, float]] metricValues)
None submitOutputs(self, Dict[str, Any] outputs)
None submitOutput(self, str parameterName, Any value)
str generateEntityName(self)
Self run(cls, int taskId, Union[int, str] nodeId, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None)
bool _isInterval(self, str metricName)
bool updateStatus(self, Optional[TaskRunStatus] status=None, Optional[str] message=None, bool notifyServer=True)
DatasetType dataset(self)
Self runLocal(cls, int projectId, bool saveSnapshot, Optional[str] name, Optional[str] description=None, Optional[List[Dict[str, Any]]] parameters=None, Optional[str] entryPoint=None)
None createMetrics(self, List[Metric] metrics)
Optional[Artifact] createArtifact(self, Union[Path, str] localFilePath, str remoteFilePath, Optional[str] mimeType=None)
Dict[str, Any] parameters(self)