Licence CC BY-NC-NDThierry Parmentelat
les librairies disponibles¶
import asyncio
à ce stade vous avez les bases pour pouvoir utiliser les parties vraiment utiles de la librairie asyncio
, ainsi d’ailleurs que tout l’écosystème qui s’est construit autour
asyncio
¶
Le contenu de la asyncio
est assez hétérogène en fait, car on y trouve
la boucle d’événements dont a parlé dans la séquence précédente
un objet du type ‘Queue’ pour gérer les accès concurrents
mais aussi - heureusement - des outils qui adressent spécifiquement des interactions avec le système d’exploitation, notamment en ce qui concerne
la gestion des sous-processus
le réseau, notamment seulement les couches basses (TCP/IP)
autres¶
Pour tous usages de plus haut niveau - par exemple si vous voulez faire du HTTP, ou du SSH, ou tout autre - il vous faudra installer des librairies supplémentaires comme par exemple
oifiles
pour accéder aux fichiers de l’ordinateur d’une façon compatible avec le paradigme qu’on étudieaiohttp
qu’on a utilisée pour nos premiers exemplesasyncssh
pour contrôler plein de machines en même temps avec une seule connexion sshasyncpg
pour dialoguer de façon asynchrone avec une base de données postgresql,…
Queue
¶
Pour montrer un petit exemple d’utilisation de la classe Queue
, on va implémenter un mécanisme de ‘throttle’ qui permet de limiter le nombre de trucs qui tournent en même temps
Comme toujours je prends un exemple bidon; chaque tâche appelle notre utilitaire sequence
from asynchelpers import start_timer, show_timer, sequence
async def job(name):
await sequence(name, delay=2)
Imaginons maintenant que j’ai plein de jobs de ce genre
# lancer n jobs indentiques en parallèle
async def hurd(nbjobs):
await asyncio.gather(*(job(f"job #{i+1:03d}") for i in range(nbjobs)))
Qaudn je les lance tous ensemble, ça donne ceci
start_timer()
await hurd(8)
---------- zero
0s + 000ms >>> job #001
0s + 000ms >>> job #002
0s + 001ms >>> job #003
0s + 001ms >>> job #004
0s + 002ms >>> job #005
0s + 002ms >>> job #006
0s + 002ms >>> job #007
0s + 003ms >>> job #008
2s + 001ms <<< job #001
2s + 001ms <<< job #002
2s + 002ms <<< job #003
2s + 003ms <<< job #004
2s + 003ms <<< job #005
2s + 003ms <<< job #006
2s + 004ms <<< job #007
2s + 004ms <<< job #008
OK; maintenant disons que je veux limiter le nombre de jobs actifs à un instant t
Pour ne pas devoir faire une arithmétique compliquée, je vais juste utiliser une queue
# l'objet queue s'ssure qu'il n'y a pas plus de n jetons pris à un instant t
async def job2(name, queue):
# j'occupe une place dans la queue
await queue.put(1)
await sequence(name, delay=2)
# je la libère
await queue.get()
# maintenant il me suffit de créer la queue avec la taille qui va bien
async def hurd2(n, throttle):
queue = asyncio.Queue(throttle)
await asyncio.gather(*(job2(f"job #{i+1:03d}", queue) for i in range(n)))
# et maintenant je n'ai que 'throttle' jobs qui tournent en même temps
start_timer()
await hurd2(12, 8)
---------- zero
0s + 000ms >>> job #001
0s + 000ms >>> job #002
0s + 000ms >>> job #003
0s + 000ms >>> job #004
0s + 000ms >>> job #005
0s + 000ms >>> job #006
0s + 000ms >>> job #007
0s + 000ms >>> job #008
2s + 003ms <<< job #001
2s + 003ms <<< job #002
2s + 004ms <<< job #003
2s + 004ms <<< job #004
2s + 004ms <<< job #005
2s + 004ms <<< job #006
2s + 004ms <<< job #007
2s + 004ms <<< job #008
2s + 004ms >>> job #009
2s + 004ms >>> job #010
2s + 004ms >>> job #011
2s + 005ms >>> job #012
4s + 007ms <<< job #009
4s + 007ms <<< job #010
4s + 007ms <<< job #011
4s + 009ms <<< job #012
réseau¶
je tire cet exemple de la doc Python ici https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-client-using-streams
ça devrait résonner par rapport au dernier cours de Basile Marchand…
serveur TCP¶
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"server Received {message!r} from {addr!r}")
# simulate a small delay
await asyncio.sleep(1)
print(f"server Send: {message!r}")
writer.write(data)
await writer.drain()
print("server Close the connection")
writer.close()
async def server_mainloop(port):
server = await asyncio.start_server(
handle_echo, '127.0.0.1', port)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
server_task = asyncio.ensure_future(server_mainloop(8080))
# comme on l'a vu, si ça se passe mal on n'a pas de retour
if server_task.done():
print(server_task.exception())
Serving on ('127.0.0.1', 8080)
# pour arrêter le serveur
# server_task.cancel()
client¶
maintenant que le serveur tourne je peux lancer des clients
import asyncio
async def tcp_echo_client(port, message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', port)
print(f'client Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'client Received: {data.decode()!r}')
print('Close the connection')
writer.close()
# un seul client
asyncio.ensure_future(tcp_echo_client(8080, "Hey"))
<Task pending name='Task-27' coro=<tcp_echo_client() running at /tmp/ipykernel_2301/2125229441.py:3>>
async def hurd(nb_clients):
await asyncio.gather(*(tcp_echo_client(8080, f"client#{i:03d}") for i in range(nb_clients)))
client Send: 'Hey'
server Received 'Hey' from ('127.0.0.1', 58268)
client_task = asyncio.ensure_future(hurd(30))
et plus…¶
Pour ceux qui voudraient en savoir plus, je vous invite à consulter la semaine 8 du MOOC Python sur fun-mooc.fr, et notamment
la séquence 8 où on montre un exemple de gestion de sous-processus
pour les geeks la séquence 5 où j’explique la mécanique interne de la boucle d’événements