mirror of
https://github.com/clearml/clearml
synced 2025-04-24 16:26:35 +00:00
Remove redundant code
This commit is contained in:
parent
0013c5851e
commit
139c4ffe86
@ -1,14 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import print_function, division, absolute_import
|
||||
|
||||
|
||||
from ..utils import PY32
|
||||
|
||||
|
||||
if PY32:
|
||||
from .thread_extractor import ThreadExtractor as Extractor
|
||||
else:
|
||||
from .gevent_extractor import GeventExtractor as Extractor
|
||||
|
||||
__all__ = ["Extractor"]
|
@ -1,36 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import print_function, division, absolute_import
|
||||
|
||||
import multiprocessing
|
||||
|
||||
|
||||
class BaseExtractor(object):
|
||||
|
||||
def __init__(self, names, max_workers=None):
|
||||
self._names = names
|
||||
self._max_workers = max_workers or (multiprocessing.cpu_count() * 4)
|
||||
|
||||
def run(self, job):
|
||||
try:
|
||||
self.extract(job)
|
||||
self.wait_complete()
|
||||
except KeyboardInterrupt:
|
||||
print('** Shutting down ...')
|
||||
self.shutdown()
|
||||
else:
|
||||
print('^.^ Extracting all packages done!')
|
||||
finally:
|
||||
self.final()
|
||||
|
||||
def extract(self, job):
|
||||
raise NotImplementedError
|
||||
|
||||
def wait_complete(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def shutdown(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def final(self):
|
||||
pass
|
@ -1,50 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import print_function, division, absolute_import
|
||||
|
||||
import sys
|
||||
|
||||
import greenlet
|
||||
from gevent.pool import Pool
|
||||
|
||||
from .extractor import BaseExtractor
|
||||
from ..log import logger
|
||||
|
||||
|
||||
class GeventExtractor(BaseExtractor):
|
||||
|
||||
def __init__(self, names, max_workers=222):
|
||||
super(self.__class__, self).__init__(names, max_workers)
|
||||
self._pool = Pool(self._max_workers)
|
||||
self._exited_greenlets = 0
|
||||
|
||||
def extract(self, job):
|
||||
job = self._job_wrapper(job)
|
||||
for name in self._names:
|
||||
if self._pool.full():
|
||||
self._pool.wait_available()
|
||||
self._pool.spawn(job, name)
|
||||
|
||||
def _job_wrapper(self, job):
|
||||
def _job(name):
|
||||
result = None
|
||||
try:
|
||||
result = job(name)
|
||||
except greenlet.GreenletExit:
|
||||
self._exited_greenlets += 1
|
||||
except Exception:
|
||||
e = sys.exc_info()[1]
|
||||
logger.error('Extracting "{0}", got: {1}'.format(name, e))
|
||||
return result
|
||||
return _job
|
||||
|
||||
def wait_complete(self):
|
||||
self._pool.join()
|
||||
|
||||
def shutdown(self):
|
||||
self._pool.kill(block=True)
|
||||
|
||||
def final(self):
|
||||
count = self._exited_greenlets
|
||||
if count != 0:
|
||||
print('** {0} running job exited.'.format(count))
|
@ -1,45 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import print_function, division, absolute_import
|
||||
|
||||
import concurrent.futures
|
||||
|
||||
from .extractor import BaseExtractor
|
||||
|
||||
from ..log import logger
|
||||
|
||||
|
||||
class ThreadExtractor(BaseExtractor):
|
||||
"""Extractor use thread pool execute tasks.
|
||||
|
||||
Can be used to extract /simple/<pkg_name> or /pypi/<pkg_name>/json.
|
||||
|
||||
FIXME: can not deliver SIG_INT to threads in Python 2.
|
||||
"""
|
||||
|
||||
def __init__(self, names, max_workers=None):
|
||||
super(self.__class__, self).__init__(names, max_workers)
|
||||
self._futures = dict()
|
||||
|
||||
def extract(self, job):
|
||||
"""Extract url by package name."""
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=self._max_workers) as executor:
|
||||
for name in self._names:
|
||||
self._futures[executor.submit(job, name)] = name
|
||||
|
||||
def wait_complete(self):
|
||||
"""Wait for futures complete done."""
|
||||
for future in concurrent.futures.as_completed(self._futures.keys()):
|
||||
try:
|
||||
error = future.exception()
|
||||
except concurrent.futures.CancelledError:
|
||||
break
|
||||
name = self._futures[future]
|
||||
if error is not None:
|
||||
err_msg = 'Extracting "{0}", got: {1}'.format(name, error)
|
||||
logger.error(err_msg)
|
||||
|
||||
def shutdown(self):
|
||||
for future in self._futures:
|
||||
future.cancel()
|
Loading…
Reference in New Issue
Block a user