D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python38
/
lib
/
python3.8
/
site-packages
/
async_timeout
/
Filename :
__init__.py
back
Copy
import asyncio __version__ = '1.3.0' class timeout: """timeout context manager. Useful in cases when you want to apply timeout logic around block of code or in cases when asyncio.wait_for is not suitable. For example: >>> with timeout(0.001): ... async with aiohttp.get('https://github.com') as r: ... await r.text() timeout - value in seconds or None to disable timeout logic loop - asyncio compatible event loop """ def __init__(self, timeout, *, loop=None): if timeout is not None and timeout == 0: timeout = None self._timeout = timeout if loop is None: loop = asyncio.get_event_loop() self._loop = loop self._task = None self._cancelled = False self._cancel_handler = None def __enter__(self): return self._do_enter() def __exit__(self, exc_type, exc_val, exc_tb): self._do_exit(exc_type) @asyncio.coroutine def __aenter__(self): return self._do_enter() @asyncio.coroutine def __aexit__(self, exc_type, exc_val, exc_tb): self._do_exit(exc_type) @property def expired(self): return self._cancelled def _do_enter(self): if self._timeout is not None: self._task = current_task(self._loop) if self._task is None: raise RuntimeError('Timeout context manager should be used ' 'inside a task') self._cancel_handler = self._loop.call_later( self._timeout, self._cancel_task) return self def _do_exit(self, exc_type): if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None raise asyncio.TimeoutError if self._timeout is not None and self._cancel_handler is not None: self._cancel_handler.cancel() self._cancel_handler = None self._task = None def _cancel_task(self): self._task.cancel() self._cancelled = True def current_task(loop): task = asyncio.Task.current_task(loop=loop) if task is None: if hasattr(loop, 'current_task'): task = loop.current_task() return task