18 from typing
import Any, Callable, Optional, List
19 from concurrent.futures
import ThreadPoolExecutor, Future
31 Used for splitting data processing on multiple threads
32 Only useful for I/O bound operations, do not use for
33 heavy data processing operations
36 list which will be split-processed on multiple threads
37 singleElementProcessor: Callable[[Any], None]
38 function which will be called for a single element from the provided list
39 threadCount: Optional[int]
40 number of threads which will be used for processing (maximum is 8), if None
41 it will use min(cpuCores, 8)
43 message which is displayed when the processing starts
48 >>> from pathlib import Path
49 >>> from coretex import CustomItem
50 >>> from coretex.networking import networkManager
51 >>> from coretex.threading import MultithreadedDataProcessor
53 >>> networkManager.authenticate("dummyUser@coretex.ai", *******) # login on coretex
54 >>> path = "" #path to dir which contains samples for upload to dataset
57 >>> def createItem(path: str) -> CustomItem:
58 item = CustomItem.createCustomItem(Path(path).stem, 1023, path) # first make new dataset
62 >>> for file in os.listdir(path):
63 if file.endswith(".zip"):
64 file = os.path.join(path, file)
65 listOfFiles.append(file)
67 >>> processor = MultithreadedDataProcessor(
70 threadCount = 8, # set threadCount on desired value default is 8
73 >>> processor.process()
79 singleElementProcessor: Callable[[Any],
None],
80 workerCount: Optional[int] =
None,
81 message: Optional[str] =
None
84 if workerCount
is None:
85 cpuCount = os.cpu_count()
90 workerCount = min(MAX_WORKER_COUNT, cpuCount)
92 if workerCount > MAX_WORKER_COUNT:
93 logging.warning(f
">> [Coretex] \"workerCount\" value: {workerCount} is higher than maximum allowed: {MAX_WORKER_COUNT}. Using {MAX_WORKER_COUNT} workers")
95 workerCount = min(MAX_WORKER_COUNT, workerCount)
104 Starts the multithreaded processing of the data
108 Any unhandled exception which happened during the processing
112 logging.getLogger(
"coretexpylib").info(f
">> [Coretex] {self.__message}")
113 logging.getLogger(
"coretexpylib").info(f
"\tUsing {self.__workerCount} workers")
115 futures: List[Future] = []
117 with ThreadPoolExecutor(max_workers = self.
__workerCount__workerCount)
as pool:
118 for element
in self.
__data__data:
120 futures.append(future)
122 for future
in futures:
123 exception = future.exception()
124 if exception
is not None: