Запуск нескольких awaitables#

Тут рассматриваются функции, которые позволяют запускать несколько сопрограмм или задач:

  • asyncio.gather

  • asyncio.as_completed

Примечание

Кроме функций gather и as_completed, несколько awaitables может запускать и функция wait, но она рассматривается позже, в 18 разделе.

asyncio.gather#

Функция gather запускает на выполнение awaitable объекты, которые перечислены в последовательности aws:

asyncio.gather(*aws, return_exceptions=False)

Если какие-то из объектов являются сопрограммами, они автоматически оборачиваются в задачи и планируются на выполнение уже как объекты Task.

В данном примере функция connect_ssh якобы делает подключение к устройству по SSH и отправляет команду. Все реальные действия пока заменены на asyncio.sleep. В зависимости от числа, которое передается как аргумент, выполнение сопрограмм, которые возвращает функция connect_ssh, занимает разное время. Функция send_command_to_devices создает сопрограммы с помощью map и запускает их на выполнение с помощью asyncio.gather:

async def connect_ssh(ip, command):
    print(f'Подключаюсь к {ip}')
    await asyncio.sleep(ip)
    print(f'Отправляю команду {command} на устройство {ip}')
    await asyncio.sleep(1)
    return f"{command} {ip}"


async def send_command_to_devices(ip_list, command):
    coroutines = map(connect_ssh, ip_list, repeat(command))
    result = await asyncio.gather(*coroutines)
    return result

Если все объекты отработали корректно, asyncio.gather вернет список со значениями, которые вернули объекты. Порядок значений в списке соответствует порядку объектов:

In [2]: ip_list = [5, 2, 3, 7]

In [3]: result = asyncio.run(send_command_to_devices(ip_list, 'test'))
Подключаюсь к 5
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 7
Отправляю команду test на устройство 2
Отправляю команду test на устройство 3
Отправляю команду test на устройство 5
Отправляю команду test на устройство 7

In [4]: result
Out[4]: ['test 5', 'test 2', 'test 3', 'test 7']

Если return_exceptions равно False (по умолчанию), при возникновении исключения, оно появляется в том месте, где ожидается (await) результат asyncio.gather:

async def connect_ssh(ip, command):
    print(f'Подключаюсь к {ip}')
    await asyncio.sleep(ip)
    if ip == 3:
        raise OSError(f'Не могу подключиться к {ip}')
    print(f'Отправляю команду {command} на устройство {ip}')
    await asyncio.sleep(1)
    return f"{command} {ip}"


In [11]: result = asyncio.run(send_command_to_devices(ip_list, 'test'))
Подключаюсь к 5
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 7
Отправляю команду test на устройство 2
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-11-4c2a35eaf7cd> in <module>
----> 1 result = asyncio.run(send_command_to_devices(ip_list, 'test'))
...

<ipython-input-1-7f470cb98776> in send_command_to_devices(ip_list, command)
     13 async def send_command_to_devices(ip_list, command):
     14     coroutines = map(connect_ssh, ip_list, repeat(command))
---> 15     result = await asyncio.gather(*coroutines)
     16     return result

<ipython-input-10-5e26dce87ca7> in connect_ssh(ip, command)
      3     await asyncio.sleep(ip)
      4     if ip == 3:
----> 5         raise OSError(f'Не могу подключиться к {ip}')
      6     print(f'Отправляю команду {command} на устройство {ip}')
      7     await asyncio.sleep(1)

OSError: Не могу подключиться к 3

Если return_exceptions равно True, исключение попадает в список как результат:

async def connect_ssh(ip, command):
    print(f'Подключаюсь к {ip}')
    await asyncio.sleep(ip)
    if ip == 3:
        raise OSError(f'Не могу подключиться к {ip}')
    print(f'Отправляю команду {command} на устройство {ip}')
    await asyncio.sleep(1)
    return f"{command} {ip}"


async def send_command_to_devices(ip_list, command):
    coroutines = map(connect_ssh, ip_list, repeat(command))
    result = await asyncio.gather(*coroutines, return_exceptions=True)
    return result


In [14]: result = asyncio.run(send_command_to_devices(ip_list, 'test'))
Подключаюсь к 5
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 7
Отправляю команду test на устройство 2
Отправляю команду test на устройство 5
Отправляю команду test на устройство 7

In [15]: result
Out[15]: ['test 5', 'test 2', OSError('Не могу подключиться к 3'), 'test 7']

In [16]: result[2]
Out[16]: OSError('Не могу подключиться к 3')

In [17]: isinstance(result[2], Exception)
Out[17]: True

asyncio.as_completed#

Функция as_completed запускает на выполнение awaitable объекты, которые перечислены в последовательности aws:

asyncio.as_completed(aws, *, timeout=None)

Возвращает итератор с сопрограмами, в порядке получения результата от сопрограмм. Функция генерирует исключение asyncio.TimeoutError, если за timeout отработали не все сопрограмы.

Пример использования as_completed:

async def delay_print(task_name):
    delay = round(random.random() * 10, 2)
    print(f'>>> start {task_name} sleep {delay}')
    await asyncio.sleep(delay)
    print(f'<<< end   {task_name}')
    return task_name


async def main():
    coroutines = [delay_print(f"task {i}") for i in range(1, 6)]
    for cor in asyncio.as_completed(coroutines):
        cor_result = await cor
        print(f"DONE {cor_result}")

Результаты возвращаются в порядке отрабатывания сопрограм, а не в порядке их запуска:

In [27]: asyncio.run(main())
>>> start task 2 sleep 8.93
>>> start task 1 sleep 0.03
>>> start task 4 sleep 8.33
>>> start task 5 sleep 3.43
>>> start task 3 sleep 5.09
<<< end   task 1
DONE task 1
<<< end   task 5
DONE task 5
<<< end   task 3
DONE task 3
<<< end   task 4
DONE task 4
<<< end   task 2
DONE task 2

Это может быть полезно когда сразу после получения результата, надо запускать следующую операцию. Например, в примере ниже сразу после получения результата сопрограмы, идет запись результата в файл:

import asyncio
import time
from datetime import datetime
import random


async def connect_ssh(ip, command):
    print(f"Подключаюсь к {ip}")
    await asyncio.sleep(1)
    print(f"Отправляю команду {command}")
    await asyncio.sleep(random.random() * 10)
    print(f"Получен результат от {ip}")
    return ip, command


async def write_to_file(filename, data):
    print(f">>> Записываю результат в файл {filename}")
    await asyncio.sleep(1)
    print(f"<<< Результат записан в файл {filename}")


async def main():
    ip_list = ["10.1.1.1", "10.1.1.2", "10.1.1.3", "10.1.1.4"]
    coroutines = [connect_ssh(ip, "sh clock") for ip in ip_list]
    tasks = []
    for coro in asyncio.as_completed(coroutines):
        result = await coro
        tasks.append(asyncio.create_task(write_to_file(f"{result[0]}.txt", result)))
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())