Source code for looptime._internal.loops

from __future__ import annotations

import asyncio
import contextlib
import selectors
import time
import warnings
import weakref
from typing import TYPE_CHECKING, Any, Callable, Iterator, MutableSet, TypeVar, cast, overload

_T = TypeVar('_T')

if TYPE_CHECKING:
    AnyFuture = asyncio.Future[Any]
    AnyTask = asyncio.Task[Any]
else:
    AnyFuture = asyncio.Future
    AnyTask = asyncio.Task


[docs] class TimeWarning(UserWarning): """Issued when the loop time moves backwards, violating its monotonicity.""" pass
[docs] class LoopTimeoutError(asyncio.TimeoutError): """A special kind of timeout when the loop's time reaches its end.""" pass
[docs] class IdleTimeoutError(asyncio.TimeoutError): """A special kind of timeout when the loop idles with no external I/O.""" pass
[docs] class LoopTimeEventLoop(asyncio.BaseEventLoop): """ An event loop with time compaction. Either a class or a mixin. """ # BaseEventLoop does not have "_selector" declared but uses it in _run_once(). _selector: selectors.BaseSelector def __init__( self, *args: Any, start: float | None | Callable[[], float | None] = None, end: float | None | Callable[[], float | None] = None, resolution: float = 1e-6, # to cut off the floating-point errors idle_step: float | None = None, idle_timeout: float | None = None, noop_cycles: int = 42, **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) self.setup_looptime( start=start, end=end, resolution=resolution, idle_step=idle_step, idle_timeout=idle_timeout, noop_cycles=noop_cycles, )
[docs] def setup_looptime( self, *, start: float | None | Callable[[], float | None] = None, end: float | None | Callable[[], float | None] = None, resolution: float = 1e-6, # to cut off the floating-point errors idle_step: float | None = None, idle_timeout: float | None = None, noop_cycles: int = 42, _enabled: bool | None = None, # None means do nothing ) -> None: """ Set all the fake-time fields and patch the i/o selector. This is the same as ``__init__(...)``, except that it can be used when the mixin/class is injected into the existing event loop object. In that case, the object is already initialised except for these fields. """ new_time: float | None = start() if callable(start) else start end_time: float | None = end() if callable(end) else end old_time: float | None try: # NB: using the existing (old) reciprocal! old_time = self.__int2time(self.__now) except AttributeError: # initial setup: either reciprocals or __now are absent old_time = None new_time = float(new_time) if new_time is not None else None # If it is the 2nd or later setup, double-check on time monotonicity. # In some configurations, this waring might raise an error and fail the test. # In that case, the time must not be changed for the next test. if old_time is not None and new_time is not None and new_time < old_time: warnings.warn( f"The time of the event loop moves backwards from {old_time} to {new_time}," " thus breaking the monotonicity of time. This is dangerous!" " Perhaps, caused by reusing a higher-scope event loop in tests." " Revise the scopes of fixtures & event loops." " Remove the start=… kwarg and rely on arbitrary time values." " Migrate from `loop.time()` to the `looptime` numeric fixture.", TimeWarning, ) self.__resolution_reciprocal: int = round(1/resolution) self.__now: int = self.__time2int(new_time or old_time) or 0 self.__end: int | None = self.__time2int(end_time) self.__idle_timeout: int | None = self.__time2int(idle_timeout) self.__idle_step: int | None = self.__time2int(idle_step) self.__idle_end: int | None = None self.__noop_limit: int | None = None self.__noop_cycle: int | None = None self.__noop_cycles: int = noop_cycles self.__sync_futures: MutableSet[AnyFuture] = weakref.WeakSet() self.__sync_clock: Callable[[], float] = time.perf_counter self.__sync_ts: float | None = None # system/true-time clock timestamp try: self.__enabled # type: ignore except AttributeError: self.__enabled = _enabled if _enabled is not None else True # old behaviour else: self.__enabled = _enabled if _enabled is not None else self.__enabled # TODO: why do we patch the selector as an object while the event loop as a class? # this should be the same patching method for both. try: self.__original_select # type: ignore except AttributeError: self.__original_select = self._selector.select self._selector.select = self.__replaced_select # type: ignore
@property def looptime_on(self) -> bool: """ Whether the time compaction is enabled at the moment. """ return bool(self.__enabled)
[docs] @contextlib.contextmanager def looptime_enabled(self) -> Iterator[None]: """ A context manager to temporarily enable the time compaction. """ if self.__enabled: raise RuntimeError('Looptime mode is already enabled. Entered twice? Avoid this!') old_enabled = self.__enabled self.__enabled = True try: yield finally: self.__enabled = old_enabled
[docs] def time(self) -> float: if self.__enabled: return self.__int2time(self.__now) else: return super().time()
[docs] def run_in_executor(self, executor: Any, func: Any, *args: Any) -> AnyFuture: # type: ignore future = super().run_in_executor(executor, func, *args) if isinstance(future, asyncio.Future): self.__sync_futures.add(future) return future
def __replaced_select(self, timeout: float | None) -> list[tuple[Any, Any]]: overtime = False # First of all, do the i/o polling. Some i/o has happened? Fine! Process it asap! # The time-play starts only when there is nothing to process from the outside (i.e. no i/o). ready: list[tuple[Any, Any]] = self.__original_select(timeout=0) if ready: pass # If nothing to do right now, and the time is not compacted, truly sleep as requested. # Move the fake time by the exact real time spent in this wait (±discrepancies). elif not self.__enabled: t0 = time.monotonic() ready = self.__original_select(timeout=timeout) t1 = time.monotonic() # If timeout=None, it never exists until ready. This timeout check is for typing only. self.__now += self.__time2int(t1 - t0 if ready or timeout is None else timeout) # Regardless of the timeout, if there are executors sync futures, we move the time in steps. # The timeout (if present) can limit the size of the step, but not the logic of stepping. # Generally, external things (threads) take some time (e.g. for thread spawning). # We cannot reliably hook into the abstract executors or their sync futures, # so we have to spend the true-time waiting for them. # We hope that they finish fast enough —in a few steps— and schedule their new handles. elif any(not fut.done() for fut in self.__sync_futures): self.__noop_limit = self.__noop_cycle = None # Unbalanced split: if the events arrive closer to the end, still move the loop time. # But skip the loop time increment if the events arrive early (first 80% of the range). timeout, step, overtime = self.__sync(timeout, self.__idle_step) readyA = self.__original_select(timeout=timeout * 0.8 if timeout is not None else None) readyB = self.__original_select(timeout=timeout * 0.2 if timeout is not None else None) ready = readyA + readyB self.__now += step if not readyA else 0 # There is nothing to run or to wait inside the loop, only the external actors (I/O) # can put life into the loop: either via the regular I/O (e.g. sockets, servers, files), # or via a dummy self-writing socket of the event loop (used by the "thread-safe" calls). # Instead of the eternal sleep, limit it to little true-time steps until the end-of-time. # Set to `None` to actually sleep to infinity. Set to `0` to only poll and fail instantly. elif timeout is None: if self.__idle_end is None and self.__idle_timeout is not None: self.__idle_end = self.__now + self.__idle_timeout # Unbalanced split: if the events arrive closer to the end, still move the loop time. # But skip the loop time increment if the events arrive early (first 80% of the range). timeout, step, overtime = self.__sync(timeout, self.__idle_step, self.__idle_end) readyA = self.__original_select(timeout=timeout * 0.8 if timeout is not None else None) readyB = self.__original_select(timeout=timeout * 0.2 if timeout is not None else None) ready = readyA + readyB self.__now += step if not readyA else 0 # There are handles ready to be executed right now. The time never advances here. # We are explicitly asked to quick-check (poll) the status of I/O sockets just in case. # Note: the countdown should execute for N cycles strictly in the loop's no-op mode, # so any i/o polling resets the cycle counter. elif timeout == 0: if self.__noop_limit is not None: self.__noop_cycle = 0 # Now, we have a request to actually move the time to the next scheduled timer/handle. # Instead, we initiate a no-op cycle to let the coros/tasks/context-managers/generators run. # Without this no-op cycle, consecutive async/await operations sometimes have no chance # to execute because the earlier operations set a timeout or schedule the timer-handles # so that the fake-time moves before it gets to the next operations. For example: # async with async_timeout.timeout(2): # schedules to T0+2s, shifts the time, raises. # await asyncio.sleep(1) # gets no chance to set its handle to T0+1s. elif self.__noop_limit is None or self.__noop_cycle is None: self.__noop_limit = self.__noop_cycles self.__noop_cycle = 0 # Regardless of the requested timeout, let the no-op cycle go without side effects. # We never sleep or advance the loop time while in this cycle-throttling mode. elif self.__noop_cycle < self.__noop_limit: self.__noop_cycle += 1 # Finally, the no-op cycles are over. We move the fake clock to the next scheduled handle! # Since there is nothing that can happen from the outside, move the time all at once. # Moving it in smaller steps can be a waste of compute power (though, not harmful). else: _, step, overtime = self.__sync(timeout) self.__noop_limit = self.__noop_cycle = None self.__now += step # If any i/o has happened, we've got work to do: all idling countdowns should restart anew. if ready: self.__idle_end = None self.__noop_limit = None self.__noop_cycle = None task: AnyTask future: AnyFuture # If the loop has reached its end-of-time, destroy the loop's universe: stop all its tasks. # If possible, inject the descriptive exceptions; if not, just cancel them (on every cycle). if overtime and self.__end is not None and self.__now >= self.__end: for task in asyncio.all_tasks(): future = cast(AnyFuture, getattr(task, '_fut_waiter', None)) if future is not None: future.set_exception(LoopTimeoutError( "The event loop has reached its end-of-time. " "All running tasks are considered timed out.")) else: task.cancel() # If the end-of-time is not reached yet, but the loop is idling for too long? Fail too. if overtime and self.__idle_end is not None and self.__now >= self.__idle_end: for task in asyncio.all_tasks(): future = cast(AnyFuture, getattr(task, '_fut_waiter', None)) if future is not None: future.set_exception(IdleTimeoutError( "The event loop was idle for too long — giving up on waiting for new i/o. " "All running tasks are considered timed out.")) else: task.cancel() return ready def __sync( self, timeout: float | None, step: int | None = None, end: int | None = None, ) -> tuple[float | None, int, bool]: """ Synchronise the loop-time and real-time steps as much as possible. In some cases, the loop time moves in sync with the real-world time: specifically, when there is nothing inside the loop that can fast-forward the loop time and only external occasions can move it. (The "external occasions" are either i/o or synchronous executors.) The loop time moves in sharp steps. However, there is also code overhead that consumes time between the steps, making the loop time misaligned with the real time. To work around that, the we keep track of the timestamps when the last sync happened and adjusts the real-clock timeout and loop-time step. For example, in a sequence of 4x loop-time steps of 0.01 seconds, the code overhead between steps can be 0.0013, 0.0011, 0.0015 seconds; in that case, the sleeps will be 0.01, 0.0087, 0.0089, 0.0085 seconds. This implementation is sufficiently precise but not very precise — it measures the time from one sync to another, but not for the whole sequence of steps. """ overtime = False # Move the loop time precisely to the nearest planned event, not beyond it. real_step: int | None = step if timeout is not None: if real_step is not None: real_step = min(real_step, self.__time2int(timeout)) else: real_step = self.__time2int(timeout) if end is not None: if real_step is not None: overtime = real_step >= end - self.__now real_step = min(real_step, end - self.__now) else: overtime = True real_step = end - self.__now if self.__end is not None: if real_step is not None: overtime = real_step >= self.__end - self.__now real_step = min(real_step, self.__end - self.__now) else: overtime = True real_step = self.__end - self.__now # Normally, the real-clock step is the same as the loop-time step. real_timeout = self.__int2time(real_step) # Optionally, adjust the real-clock sleep by the code overhead since the last time sync. # The code overhead timedelta can be negative if the previous cycle was interrupted by i/o. prev_ts = self.__sync_ts curr_ts = self.__sync_ts = self.__sync_clock() if real_timeout is not None and prev_ts is not None and curr_ts > prev_ts: code_overhead = curr_ts - prev_ts real_timeout = max(0.0, real_timeout - code_overhead) # Pre-allocate the time for the calculated timeout (assuming the timeout is fully used). if real_timeout is not None: self.__sync_ts += real_timeout # Make the steps easier for int math: use 0 instead of None for "no step". return real_timeout, (real_step or 0), overtime @overload def __time2int(self, time: float) -> int: ... @overload def __time2int(self, time: None) -> None: ... def __time2int(self, time: float | None) -> int | None: """ Convert the supposed time in seconds to its INTernal INTeger form. All time math is done in integers to avoid floating point errors (there can also be some performance gain, but this is not the goal). Otherwise, the time assertions fail easily because of this:: 0.2-0.05 == 0.15000000000000002 0.2-0.19 == 0.010000000000000009 0.2+0.21 == 0.41000000000000003 """ return None if time is None else round(time * self.__resolution_reciprocal) @overload def __int2time(self, time: int) -> float: ... @overload def __int2time(self, time: None) -> None: ... def __int2time(self, time: int | None) -> float | None: """ Convert the INTernal INTeger form of time back to the time in seconds. The int/int division is better than the int*float multiplication:: 100_000 * 0.000_001 == 0.09999999999999999 100_000 / round(1/0.000_001) == 0.1 """ return None if time is None else time / self.__resolution_reciprocal