Coretex
coretex.threading.threaded_data_processor.MultithreadedDataProcessor Class Reference

Public Member Functions

None process (self)
 

Detailed Description

    Used for splitting data processing on multiple threads
    Only useful for I/O bound operations, do not use for
    heavy data processing operations

    data: List[Any] -> list which will be split-processed on multiple threads
    singleElementProcessor: Callable[[Any], None] -> function which will
    be called for a single element from the provided list
    threadCount: int -> number of threads which will be used for processing
    title: Optional[str] -> title used for displaying the progress bar

    Example
    -------
    >>> import os
    >>> from pathlib import Path
    >>> from coretex import CustomItem
    >>> from coretex.networking import networkManager
    >>> from coretex.threading import MultithreadedDataProcessor
    \b
    >>> networkManager.authenticate("dummyUser@coretex.ai", *******) # login on coretex
    >>> path = "" #path to dir which contains samples for upload to dataset
    >>> listOfFiles = []
    \b
    >>> def createItem(path: str) -> CustomItem:
            item = CustomItem.createCustomItem(Path(path).stem, 1023, path)  # first make new dataset
            if item is None:
                pass
    \b
    >>> for file in os.listdir(path):
            if file.endswith(".zip"):
                file = os.path.join(path, file)
                listOfFiles.append(file)
    \b
    >>> processor = MultithreadedDataProcessor(
            listOfFiles,
            createItem,
            threadCount = 8, # set threadCount on desired value default is 8
            title = "createItem"
        )
    >>> processor.process()

Definition at line 24 of file threaded_data_processor.py.

Member Function Documentation

◆ process()

None coretex.threading.threaded_data_processor.MultithreadedDataProcessor.process (   self)
    Starts the multithreaded processing of the data

    Raises
    ------
    Any unhandled exception which happened during the processing

Definition at line 74 of file threaded_data_processor.py.

74  def process(self) -> None:
75  """
76  Starts the multithreaded processing of the data
77 
78  Raises
79  ------
80  Any unhandled exception which happened during the processing
81  """
82 
83  futures: List[Future] = []
84 
85  with ThreadPoolExecutor(max_workers = self.__threadCount) as pool:
86  for element in self.__data:
87  future = pool.submit(self.__singleElementProcessor, element)
88  future.add_done_callback(lambda _: self.__progressBar.update())
89  futures.append(future)
90 
91  self.__progressBar.finish()
92 
93  for future in futures:
94  exception = future.exception()
95  if exception is not None:
96  raise exception

The documentation for this class was generated from the following file: