Coretex
threaded_data_processor.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Any, Callable, Optional, List
19 from concurrent.futures import ThreadPoolExecutor, Future
20 
21 import logging
22 import os
23 
24 
25 MAX_WORKER_COUNT = 8
26 
27 
29 
30  """
31  Used for splitting data processing on multiple threads
32  Only useful for I/O bound operations, do not use for
33  heavy data processing operations
34 
35  data: List[Any]
36  list which will be split-processed on multiple threads
37  singleElementProcessor: Callable[[Any], None]
38  function which will be called for a single element from the provided list
39  threadCount: Optional[int]
40  number of threads which will be used for processing (maximum is 8), if None
41  it will use min(cpuCores, 8)
42  title: Optional[str]
43  message which is displayed when the processing starts
44 
45  Example
46  -------
47  >>> import os
48  >>> from pathlib import Path
49  >>> from coretex import CustomItem
50  >>> from coretex.networking import networkManager
51  >>> from coretex.threading import MultithreadedDataProcessor
52  \b
53  >>> networkManager.authenticate("dummyUser@coretex.ai", *******) # login on coretex
54  >>> path = "" #path to dir which contains samples for upload to dataset
55  >>> listOfFiles = []
56  \b
57  >>> def createItem(path: str) -> CustomItem:
58  item = CustomItem.createCustomItem(Path(path).stem, 1023, path) # first make new dataset
59  if item is None:
60  pass
61  \b
62  >>> for file in os.listdir(path):
63  if file.endswith(".zip"):
64  file = os.path.join(path, file)
65  listOfFiles.append(file)
66  \b
67  >>> processor = MultithreadedDataProcessor(
68  listOfFiles,
69  createItem,
70  threadCount = 8, # set threadCount on desired value default is 8
71  title = "createItem"
72  )
73  >>> processor.process()
74  """
75 
76  def __init__(
77  self,
78  data: List[Any],
79  singleElementProcessor: Callable[[Any], None],
80  workerCount: Optional[int] = None,
81  message: Optional[str] = None
82  ) -> None:
83 
84  if workerCount is None:
85  cpuCount = os.cpu_count()
86 
87  if cpuCount is None:
88  cpuCount = 1
89 
90  workerCount = min(MAX_WORKER_COUNT, cpuCount)
91  else:
92  if workerCount > MAX_WORKER_COUNT:
93  logging.warning(f">> [Coretex] \"workerCount\" value: {workerCount} is higher than maximum allowed: {MAX_WORKER_COUNT}. Using {MAX_WORKER_COUNT} workers")
94 
95  workerCount = min(MAX_WORKER_COUNT, workerCount)
96 
97  self.__data__data = data
98  self.__singleElementProcessor__singleElementProcessor = singleElementProcessor
99  self.__workerCount__workerCount = workerCount
100  self.__message__message = message
101 
102  def process(self) -> None:
103  """
104  Starts the multithreaded processing of the data
105 
106  Raises
107  ------
108  Any unhandled exception which happened during the processing
109  """
110 
111  if self.__message__message is not None:
112  logging.getLogger("coretexpylib").info(f">> [Coretex] {self.__message}")
113  logging.getLogger("coretexpylib").info(f"\tUsing {self.__workerCount} workers")
114 
115  futures: List[Future] = []
116 
117  with ThreadPoolExecutor(max_workers = self.__workerCount__workerCount) as pool:
118  for element in self.__data__data:
119  future = pool.submit(self.__singleElementProcessor__singleElementProcessor, element)
120  futures.append(future)
121 
122  for future in futures:
123  exception = future.exception()
124  if exception is not None:
125  raise exception