diff --git a/clearml/utilities/pigar/extractor/__init__.py b/clearml/utilities/pigar/extractor/__init__.py deleted file mode 100644 index ff65f902..00000000 --- a/clearml/utilities/pigar/extractor/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function, division, absolute_import - - -from ..utils import PY32 - - -if PY32: - from .thread_extractor import ThreadExtractor as Extractor -else: - from .gevent_extractor import GeventExtractor as Extractor - -__all__ = ["Extractor"] diff --git a/clearml/utilities/pigar/extractor/extractor.py b/clearml/utilities/pigar/extractor/extractor.py deleted file mode 100644 index 508220dd..00000000 --- a/clearml/utilities/pigar/extractor/extractor.py +++ /dev/null @@ -1,36 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function, division, absolute_import - -import multiprocessing - - -class BaseExtractor(object): - - def __init__(self, names, max_workers=None): - self._names = names - self._max_workers = max_workers or (multiprocessing.cpu_count() * 4) - - def run(self, job): - try: - self.extract(job) - self.wait_complete() - except KeyboardInterrupt: - print('** Shutting down ...') - self.shutdown() - else: - print('^.^ Extracting all packages done!') - finally: - self.final() - - def extract(self, job): - raise NotImplementedError - - def wait_complete(self): - raise NotImplementedError - - def shutdown(self): - raise NotImplementedError - - def final(self): - pass diff --git a/clearml/utilities/pigar/extractor/gevent_extractor.py b/clearml/utilities/pigar/extractor/gevent_extractor.py deleted file mode 100644 index ba7363c5..00000000 --- a/clearml/utilities/pigar/extractor/gevent_extractor.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function, division, absolute_import - -import sys - -import greenlet -from gevent.pool import Pool - -from .extractor import BaseExtractor -from ..log import logger - - -class GeventExtractor(BaseExtractor): - - def __init__(self, names, max_workers=222): - super(self.__class__, self).__init__(names, max_workers) - self._pool = Pool(self._max_workers) - self._exited_greenlets = 0 - - def extract(self, job): - job = self._job_wrapper(job) - for name in self._names: - if self._pool.full(): - self._pool.wait_available() - self._pool.spawn(job, name) - - def _job_wrapper(self, job): - def _job(name): - result = None - try: - result = job(name) - except greenlet.GreenletExit: - self._exited_greenlets += 1 - except Exception: - e = sys.exc_info()[1] - logger.error('Extracting "{0}", got: {1}'.format(name, e)) - return result - return _job - - def wait_complete(self): - self._pool.join() - - def shutdown(self): - self._pool.kill(block=True) - - def final(self): - count = self._exited_greenlets - if count != 0: - print('** {0} running job exited.'.format(count)) diff --git a/clearml/utilities/pigar/extractor/thread_extractor.py b/clearml/utilities/pigar/extractor/thread_extractor.py deleted file mode 100644 index e0a4ce73..00000000 --- a/clearml/utilities/pigar/extractor/thread_extractor.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function, division, absolute_import - -import concurrent.futures - -from .extractor import BaseExtractor - -from ..log import logger - - -class ThreadExtractor(BaseExtractor): - """Extractor use thread pool execute tasks. - - Can be used to extract /simple/ or /pypi//json. - - FIXME: can not deliver SIG_INT to threads in Python 2. - """ - - def __init__(self, names, max_workers=None): - super(self.__class__, self).__init__(names, max_workers) - self._futures = dict() - - def extract(self, job): - """Extract url by package name.""" - with concurrent.futures.ThreadPoolExecutor( - max_workers=self._max_workers) as executor: - for name in self._names: - self._futures[executor.submit(job, name)] = name - - def wait_complete(self): - """Wait for futures complete done.""" - for future in concurrent.futures.as_completed(self._futures.keys()): - try: - error = future.exception() - except concurrent.futures.CancelledError: - break - name = self._futures[future] - if error is not None: - err_msg = 'Extracting "{0}", got: {1}'.format(name, error) - logger.error(err_msg) - - def shutdown(self): - for future in self._futures: - future.cancel()