D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
saltstack
/
salt
/
lib
/
python3.10
/
site-packages
/
zmq
/
auth
/
Filename :
asyncio.py
back
Copy
"""ZAP Authenticator integrated with the asyncio IO loop. .. versionadded:: 15.2 """ # Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. import asyncio import warnings from typing import Any, Optional import zmq from zmq.asyncio import Poller from .base import Authenticator class AsyncioAuthenticator(Authenticator): """ZAP authentication for use in the asyncio IO loop""" __poller: Optional[Poller] __task: Any zap_socket: "zmq.asyncio.Socket" def __init__(self, context: Optional["zmq.Context"] = None, loop: Any = None): super().__init__(context) if loop is not None: warnings.warn(f"{self.__class__.__name__}(loop) is deprecated and ignored") self.__poller = None self.__task = None async def __handle_zap(self) -> None: while True: if self.__poller is None: break events = await self.__poller.poll() if self.zap_socket in dict(events): msg = await self.zap_socket.recv_multipart() self.handle_zap_message(msg) def start(self) -> None: """Start ZAP authentication""" super().start() self.__poller = Poller() self.__poller.register(self.zap_socket, zmq.POLLIN) self.__task = asyncio.ensure_future(self.__handle_zap()) def stop(self) -> None: """Stop ZAP authentication""" if self.__task: self.__task.cancel() if self.__poller: self.__poller.unregister(self.zap_socket) self.__poller = None super().stop() __all__ = ["AsyncioAuthenticator"]