Licence CC BY-NC-NDThierry Parmentelat_images/inria-25-alpha.png

les librairies disponibles

import asyncio

à ce stade vous avez les bases pour pouvoir utiliser les parties vraiment utiles de la librairie asyncio, ainsi d’ailleurs que tout l’écosystème qui s’est construit autour

asyncio

Le contenu de la asyncio est assez hétérogène en fait, car on y trouve

  • la boucle d’événements dont a parlé dans la séquence précédente

  • un objet du type ‘Queue’ pour gérer les accès concurrents

  • mais aussi - heureusement - des outils qui adressent spécifiquement des interactions avec le système d’exploitation, notamment en ce qui concerne

    • la gestion des sous-processus

    • le réseau, notamment seulement les couches basses (TCP/IP)

autres

Pour tous usages de plus haut niveau - par exemple si vous voulez faire du HTTP, ou du SSH, ou tout autre - il vous faudra installer des librairies supplémentaires comme par exemple

  • oifiles pour accéder aux fichiers de l’ordinateur d’une façon compatible avec le paradigme qu’on étudie

  • aiohttp qu’on a utilisée pour nos premiers exemples

  • asyncssh pour contrôler plein de machines en même temps avec une seule connexion ssh

  • asyncpg pour dialoguer de façon asynchrone avec une base de données postgresql,

Queue

Pour montrer un petit exemple d’utilisation de la classe Queue, on va implémenter un mécanisme de ‘throttle’ qui permet de limiter le nombre de trucs qui tournent en même temps

Comme toujours je prends un exemple bidon; chaque tâche appelle notre utilitaire sequence

from asynchelpers import start_timer, show_timer, sequence
async def job(name):
    await sequence(name, delay=2)

Imaginons maintenant que j’ai plein de jobs de ce genre

# lancer n jobs indentiques en parallèle
async def hurd(nbjobs):
    await asyncio.gather(*(job(f"job #{i+1:03d}") for i in range(nbjobs)))

Qaudn je les lance tous ensemble, ça donne ceci

start_timer()
await hurd(8)
---------- zero
0s + 000ms >>> job #001
0s + 000ms >>> job #002
0s + 001ms >>> job #003
0s + 001ms >>> job #004
0s + 002ms >>> job #005
0s + 002ms >>> job #006
0s + 002ms >>> job #007
0s + 003ms >>> job #008
2s + 001ms <<< job #001
2s + 001ms <<< job #002
2s + 002ms <<< job #003
2s + 003ms <<< job #004
2s + 003ms <<< job #005
2s + 003ms <<< job #006
2s + 004ms <<< job #007
2s + 004ms <<< job #008

OK; maintenant disons que je veux limiter le nombre de jobs actifs à un instant t

Pour ne pas devoir faire une arithmétique compliquée, je vais juste utiliser une queue

# l'objet queue s'ssure qu'il n'y a pas plus de n jetons pris à un instant t

async def job2(name, queue):
    # j'occupe une place dans la queue
    await queue.put(1)
    await sequence(name, delay=2)
    # je la libère
    await queue.get()    
# maintenant il me suffit de créer la queue avec la taille qui va bien

async def hurd2(n, throttle):
    queue = asyncio.Queue(throttle)
    await asyncio.gather(*(job2(f"job #{i+1:03d}", queue) for i in range(n)))
# et maintenant je n'ai que 'throttle' jobs qui tournent en même temps
start_timer()
await hurd2(12, 8)
---------- zero
0s + 000ms >>> job #001
0s + 000ms >>> job #002
0s + 000ms >>> job #003
0s + 000ms >>> job #004
0s + 000ms >>> job #005
0s + 000ms >>> job #006
0s + 000ms >>> job #007
0s + 000ms >>> job #008
2s + 003ms <<< job #001
2s + 003ms <<< job #002
2s + 004ms <<< job #003
2s + 004ms <<< job #004
2s + 004ms <<< job #005
2s + 004ms <<< job #006
2s + 004ms <<< job #007
2s + 004ms <<< job #008
2s + 004ms >>> job #009
2s + 004ms >>> job #010
2s + 004ms >>> job #011
2s + 005ms >>> job #012
4s + 007ms <<< job #009
4s + 007ms <<< job #010
4s + 007ms <<< job #011
4s + 009ms <<< job #012

réseau

je tire cet exemple de la doc Python ici https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-client-using-streams

ça devrait résonner par rapport au dernier cours de Basile Marchand…

serveur TCP

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"server Received {message!r} from {addr!r}")
    
    # simulate a small delay
    await asyncio.sleep(1)

    print(f"server Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("server Close the connection")
    writer.close()
async def server_mainloop(port):
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', port)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()
server_task = asyncio.ensure_future(server_mainloop(8080))
# comme on l'a vu, si ça se passe mal on n'a pas de retour
if server_task.done():
    print(server_task.exception())
Serving on ('127.0.0.1', 8080)
# pour arrêter le serveur
# server_task.cancel()

client

maintenant que le serveur tourne je peux lancer des clients

import asyncio

async def tcp_echo_client(port, message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', port)

    print(f'client Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'client Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
# un seul client 
asyncio.ensure_future(tcp_echo_client(8080, "Hey"))
<Task pending name='Task-27' coro=<tcp_echo_client() running at /tmp/ipykernel_2301/2125229441.py:3>>
async def hurd(nb_clients):
    await asyncio.gather(*(tcp_echo_client(8080, f"client#{i:03d}") for i in range(nb_clients)))
client Send: 'Hey'
server Received 'Hey' from ('127.0.0.1', 58268)
client_task = asyncio.ensure_future(hurd(30))

et plus…

Pour ceux qui voudraient en savoir plus, je vous invite à consulter la semaine 8 du MOOC Python sur fun-mooc.fr, et notamment

  • la séquence 8 où on montre un exemple de gestion de sous-processus

  • pour les geeks la séquence 5 où j’explique la mécanique interne de la boucle d’événements