Асинхронная итерация#

При работе с асинхронным кодом, могут использоваться как обычные итераторы и итерируемые объекты, так и асинхронные. Асинхронные итераторы и итерируемые объекты в целом работают так же, как и синхронные. Для асинхронных итераторов и итерируемых объектов созданы отдельные методы __aiter__ и __anext__, плюс для перебора асинхронных итерируемых объектов есть цикл async for.

Примечание

async for можно использовать только внутри сопрограммы.

Асинхронный итерируемый объект (asynchronous iterable) - это объект, который можно использовать в async for. В асинхронном итерируемом объекте должен быть метод __aiter__, который возвращает асинхронный итератор.

Асинхронный итератор (asynchronous iterator) - это объект, у которого есть методы __anext__ и __aiter__. Метод __anext__ должен возвращать awaitable объект. При завершении итерации генерируется исключение StopAsyncIteration.

Асинхронные итераторы можно создавать с помощью классов или асинхронных генераторов. Тут рассматривается вариант через классы, а в следующем разделе рассматриваются асинхронные генераторы.

Пример асинхронного итератора, который на каждой итерации пытается подключиться к одному устройству с помощью scrapli:

import asyncio
from datetime import datetime
from scrapli import AsyncScrapli
from scrapli.exceptions import ScrapliException
from async_timeout import timeout
import yaml


class CheckConnection:
    def __init__(self, device_list):
        self.device_list = device_list
        self._current_device = 0

    async def _scan_device(self, device):
        ip = device["host"]
        try:
            async with timeout(5): # для asynctelnet
                async with AsyncScrapli(**device) as conn:
                    prompt = await conn.get_prompt()
                return True, prompt
        except (ScrapliException, asyncio.exceptions.TimeoutError) as error:
            return False, f"{error} {ip}"

    async def __anext__(self):
        if self._current_device >= len(self.device_list):
            raise StopAsyncIteration
        device_params = self.device_list[self._current_device]
        scan_results = await self._scan_device(device_params)
        self._current_device += 1
        return scan_results

    def __aiter__(self):
        return self

Смысл этого итератора в том чтобы проверить получится ли подключиться к оборудованию по SSH с помощью scrapli. Если подключиться получилось, __anext__ возвращает True и приглашение устройства, если нет - False и исключение.

Примечание

Обратите внимание на то, что метод __anext__ сопрограмма, а __aiter__ нет.

Для перебора асинхронного итератора надо использовать async for и соответственно перебор надо делать в сопрограмме:

async def ssh_scan(devices):
    check = CheckConnection(devices)
    async for status, msg in check:
        if status:
            print(f"{datetime.now()} SSH. Подключение успешно: {msg}")
        else:
            print(f"{datetime.now()} SSH. Не удалось подключиться: {msg}")


if __name__ == "__main__":
    with open("devices_asyncssh.yaml") as f:
        devices = yaml.safe_load(f)
    asyncio.run(ssh_scan(devices))

Содержимое файла devices_asyncssh.yaml (первые два устройства доступны и с правильными параметрами, третье доступно, но указан неправильный пароль, четвертое недоступно):

- host: 192.168.100.1
  auth_username: cisco
  auth_password: cisco
  auth_secondary: cisco
  auth_strict_key: false
  timeout_socket: 5
  timeout_transport: 10
  platform: cisco_iosxe
  transport: asyncssh
- host: 192.168.100.2
  auth_username: cisco
  auth_password: cisco
  auth_secondary: cisco
  auth_strict_key: false
  timeout_socket: 5
  timeout_transport: 10
  platform: cisco_iosxe
  transport: asyncssh
- host: 192.168.100.3
  auth_username: cisco
  auth_password: ciscoe
  auth_secondary: cisco
  auth_strict_key: false
  timeout_socket: 5
  timeout_transport: 10
  platform: cisco_iosxe
  transport: asyncssh
- host: 192.168.100.11
  auth_username: cisco
  auth_password: cisco
  auth_secondary: cisco
  auth_strict_key: false
  timeout_socket: 5
  timeout_transport: 10
  platform: cisco_iosxe
  transport: asyncssh

Вызов скрипта:

$ python ex05_async_iterator_ssh.py
2021-04-19 11:25:40.775305 SSH. Подключение успешно: R1#
2021-04-19 11:25:41.334456 SSH. Подключение успешно: R2#
2021-04-19 11:25:43.638459 SSH. Не удалось подключиться: all authentication methods failed 192.168.100.3
2021-04-19 11:25:48.647160 SSH. Не удалось подключиться: timed out opening connection to device 192.168.100.11

Итератор CheckConnection проверяет устройства последовательно, но вместе с этим итератором, параллельно можно запускать что-то еще, например, паралелльно проверять подключение telnet (файл ex06_async_iterator_telnet_ssh.py):

async def scan(devices, protocol):
    check = CheckConnection(devices)
    async for status, msg in check:
        if status:
            print(f"{datetime.now()} {protocol}. Подключение успешно: {msg}")
        else:
            print(f"{datetime.now()} {protocol}. Не удалось подключиться: {msg}")


async def main():
    with open("devices_asyncssh.yaml") as f:
        devices_ssh = yaml.safe_load(f)
    with open("devices_asynctelnet.yaml") as f:
        devices_telnet = yaml.safe_load(f)
    await asyncio.gather(scan(devices_ssh, "SSH"), scan(devices_telnet, "Telnet"))


if __name__ == "__main__":
    asyncio.run(main())

Вызов скрипта:

$ python ex06_async_iterator_telnet_ssh.py
2021-04-19 11:30:14.820195 SSH. Подключение успешно: R1#
2021-04-19 11:30:14.983307 Telnet. Подключение успешно: R1#
2021-04-19 11:30:15.296011 SSH. Подключение успешно: R2#
2021-04-19 11:30:15.449338 Telnet. Подключение успешно: R2#
2021-04-19 11:30:17.599259 SSH. Не удалось подключиться: all authentication methods failed 192.168.100.3
2021-04-19 11:30:19.775107 Telnet. Не удалось подключиться: username/login prompt seen more than once, assuming auth failed 192.168.100.3
2021-04-19 11:30:22.603411 SSH. Не удалось подключиться:  192.168.100.11
2021-04-19 11:30:24.777987 Telnet. Не удалось подключиться:  192.168.100.11