Outils pour utilisateurs

Outils du site


thread_tableaublanc

Traitement multi-thread avec communication par tableau blanc

Problématique

Les applications multi-threads sont passionnantes, mais il y a quelques précautions à prendre lorsqu'elles doivent accéder à des ressources communes.

Or, certaines applications nécessitent que plusieurs threads aient besoin de se partager couramment des informations pour assurer le fonctionnement prévu.

On appellera ici “tableau blanc” l'emplacement unique où ces informations partagées sont créées, lues, modifiées et effacées.

Le terme de “tableau blanc” est assez explicite ici:

  • il s'agit bien d'informations partagées entre tous les threads
  • lorsqu'un thread écrit un message, il ne sait pas d'avance quel autre thread va l'exploiter
  • tous les threads ont accès à tous les messages et ont a priori tous les droits dessus (création, lecture, modification, effacement)

Cela n'empêche pas qu'un message puisse être destiné à un thread en particulier, mais il faudra ajouter le code nécessaire pour arriver à ce résultat.

J'ai cherché dans les modules existants de Python un moyen de programmer ce tableau blanc, mais je n'ai pas trouvé. Par exemple, dans le module queue, on ne peut pas lire une information sans la retirer de la pile, ce qui ne convient pas ici.

Exemple de traitement avec tableau blanc

On prendra ici l'exemple d'un pool de calculateurs qui traitent automatiquement des messages contenant des expressions mathématiques à calculer.

Le principe

Le principe est assez simple:

1- Les messages ont une structure donnée par la classe Message. Tous les messages ajoutés sur le tableau blanc seront en fait des instances de cette classe.

2- le tableau blanc est inspiré d'une pile FIFO, c'est à dire d'une file d'attente. Une particularité par rapport à une simple pile FIFO, c'est qu'on peut intervenir à l'intérieur de la pile (pour chercher, lire, modifier et effacer).

3- l'un des threads, appelé ici le “fournisseur” (classe Fournisseur), va générer des expressions mathématiques à calculer et les placer dans le tableau blanc sous forme de messages en attente.

4- l'un quelconque des 10 threads de calcul, appelés ici les “traiteurs” (classe Traiteur), ayant terminé son calcul précédent, va chercher sur le tableau blanc un message en attente, va le déclarer “encours” avec enregistrement de son nom de thread, va en calculer son expression, va ajouter son résultat au message et le déclarer celui-ci “fini”.

5- le programme principal, en plus d'initialiser et de démarrer l'ensemble, va trouver les messages “finis”, les extraire, les effacer du tableau blanc, afficher leurs résultats et afficher aussi des statistiques sur le fonctionnement de la machine.

Et c'est tout!

Essayez de faire fonctionner cela (copier-coller) dans une console ou, mieux, dans un outil de développement comme idle. N'utilisez pas la version pythonw sour windows parce qu'il faut une console pour voir les affichages.

Le code

Voici le code complet. Il est auto-documenté.

#!/usr/bin/python
# -*- coding: utf-8 -*-
 
from __future__ import division
 
import threading
import random
import time
import sys
from math import *
 
###############################################################################
class Message(object):
    """objet message destiné aux echanges entre thread par l'intermediaire du tableau blanc"""
    def __init__(self):
        self.statut = 'A'  # 'A'=en Attente de traitement, 'E'=En-cours de traitement, 'F'=Fini
        self.emetteur = None  # si necessaire: coordonnee de l'emetteur du message
        self.destinataire = None # dans le cas ou un destinataire particulier serait requis
        self.traiteur = None  # nom du thread qui est en train de traiter le message 
        self.requete = None # la requete à traiter
        self.reponse = None # la reponse du traiteur
        self.duree = None # duree du traitement
 
###############################################################################
class TBlanc(object):
    """objet de type file d'attente, base sur une pile fifo: on depile le plus ancien empile"""
 
    def __init__(self,maxpile=0):
        self.pile=[]
        self.maxpile = maxpile
 
    def empile(self,element):
        if (self.maxpile!=0) and (len(self.pile)==self.maxpile):
            raise ValueError ("erreur: tentative d'empiler une pile pleine")
        self.pile.insert(0,element)
 
    def depile(self,idx=-1):
        if len(self.pile)==0:
            raise ValueError ("erreur: tentative de depiler une pile vide")
        if idx<-len(self.pile) or idx>=len(self.pile):
            raise ValueError ("erreur: element de pile n'existe pas")
        return self.pile.pop(idx)
 
    def element(self,idx=-1):
        if idx<-len(self.pile) or idx>=len(self.pile):
            raise ValueError ("erreur: element de pile n'existe pas")
        return self.pile[idx]
 
    def copiepile(self,imin=0,imax=None):
        if imax==None:
            imax=len(self.pile)
        if imin<0 or imax>len(self.pile) or imin>=imax:
            raise ValueError ("erreur: mauvais indice pour extraction par copiepile")
        return list(self.pile[imin:imax])
 
    def cherche(self,stat,nomthread=None):
        for idx in xrange(-1,-len(self.pile)-1,-1):
            if self.pile[idx].statut==stat:
                if nomthread==None or (nomthread!=None and self.pile[idx].traiteur==nomthread):
                    return idx
        return None
 
    def estvide(self):
        return len(self.pile)==0
 
    def estpleine(self):
        return self.maxpile>0 and len(self.pile)==self.maxpile
 
    def taille(self):
        return len(self.pile)
 
    def nbattentes(self):
        k = 0
        for idx in xrange(0,len(self.pile)):
            if self.pile[idx].statut=='A':
                k += 1
        return k
 
    def nbencours(self):
        k = 0
        for idx in xrange(0,len(self.pile)):
            if self.pile[idx].statut=='E':
                k += 1
        return k
 
    def nbfinis(self):
        k = 0
        for idx in xrange(0,len(self.pile)):
            if self.pile[idx].statut=='F':
                k += 1
        return k
 
###############################################################################
class Traiteur(threading.Thread):
 
    def __init__(self):
        threading.Thread.__init__(self)
        self.nbtraitements = 0
 
    def run(self):
        global vtblanc, tblanc
        while True:
            # attente d'une expression à calculer disponible dans le tableau blanc
            while True:
                vtblanc.acquire()
                self.idx = tblanc.cherche('A')
                if self.idx!=None:
                    tblanc.pile[self.idx].statut = 'E'
                    tblanc.pile[self.idx].traiteur = self.getName()
                    self.expr = tblanc.pile[self.idx].requete
                    vtblanc.release()
                    break
                vtblanc.release()
                time.sleep(0.1)
 
            # calcul de l'expression
            self.tps = time.clock()
            try:
                self.result = "%s" % eval(self.expr)
            except:
                # Recup d'un eventuel message d'erreur
                self.result = "%s" % sys.exc_info()[1]
            self.tps = time.clock()-self.tps
            if self.tps < 0.001:
                self.duree = "< 0.001 s"
            else:
                self.duree = "%.3f s" % self.tps
 
            # donner la reponse dans le message traite du tableau blanc
            vtblanc.acquire()
            self.idx = tblanc.cherche('E',self.getName())
            tblanc.pile[self.idx].reponse = self.result
            tblanc.pile[self.idx].duree = self.duree
            tblanc.pile[self.idx].statut='F'
            vtblanc.release()
            self.nbtraitements += 1
 
            # et bouclage pour un nouveau calcul
 
###############################################################################
class Fournisseur(threading.Thread):
 
    def __init__(self):
        threading.Thread.__init__(self)
 
    def run(self):
        global vrequetes,requetes
        while True:
            # creation d'un nouveau message (avec le statut 'A' = en Attente)
            self.msg = Message()
 
            # creation d'une nouvelle expression a calculer
            self.msg.requete = str(random.random()) + "*sqrt(2)+" + str(random.random()) + "*sin(" + str(random.random()) + ")"
 
            # empilage du nouveau message dans le tableau blanc pour traitement
            vtblanc.acquire()
            tblanc.empile(self.msg)
            t = tblanc.taille()
            vtblanc.release()
 
            # attente  jusqu'a t<20 si le tableau blanc est trop plein (t>=100)
            if t>=100:
                while True:
                    vtblanc.acquire()
                    t=tblanc.taille()
                    vtblanc.release()
                    if t<20:
                        break
                    # on se pose la requete avec une tempo pour laisser travailler les traiteurs
                    time.sleep(0.1)
 
###############################################################################
 
# creation du tableau blanc et du verrou qui lui est affecte
tblanc = TBlanc()
vtblanc = threading.Lock()
 
# lancement du fournisseur qui va inscrire des expressions a calculer sur le tableau blanc
fournisseur=Fournisseur()
fournisseur.setDaemon(True)
fournisseur.start()
 
# creation d'une liste de nbtraiteur traiteurs
traiteurs = []
nbtraiteurs = 10
for i in xrange(0,nbtraiteurs):
    tr = Traiteur()
    traiteurs.append(tr)
    traiteurs[-1].setName("C%d" % i)
    traiteurs[-1].setDaemon(True)
    traiteurs[-1].start()
 
# exploitation des resultats
 
nbmax = 20 # on n'affiche que nbmax resultats en meme temps
while True:
    # affichage des resultats quand il y en a
    nb = 0
    while True:
        vtblanc.acquire()
        idx = tblanc.cherche('F')
        if idx==None:
            vtblanc.release()
            break
        else:
            msg = tblanc.depile(idx)
            print msg.traiteur + ": " + msg.reponse + " (" + msg.duree + ")"
            nb+=1
            if nb>nbmax:
                vtblanc.release()
                break
        vtblanc.release()
 
    # Etat du tableau blanc
    vtblanc.acquire()
    a = tblanc.nbattentes()
    e = tblanc.nbencours()
    f = tblanc.nbfinis()
    vtblanc.release()
    print
    print "attentes= ", a, "en-cours= ", e, "finis= ", f
 
    # activite des threads
    nbactifs = 0
    for i in xrange(0,nbtraiteurs):
        if traiteurs[i].isAlive():
            nbactifs += 1
    print "nombre de traiteurs actifs= ", nbactifs
    print "traitements realises:"
    for i in xrange(0,nbtraiteurs):
        print traiteurs[i].getName()+":"+str(traiteurs[i].nbtraitements),
    print
    print

Quelques commentaires sur le code

Pour le fournisseur des messages, il a fallu le “freiner” un peu, parce que dans les 1ères exécutions, je me suis retrouvé en quelques secondes avec 200000 calculs en attente dans le tableau blanc… Donc, quand la pile atteint 100 messages, il attend qu'elle redescende à 20 pour redémarrer.

Dans ce programme, les threads de calcul ne s'arrêtent jamais, ce qui est inhabituel. Quand ils ont terminé un calcul, il cherchent un nouveau message disponible en attente pour le prendre en compte. Dans une première version, j'avais fait en sorte qu'ils se suicident quand l'attente dépassait 5 secondes, mais cela posait le problème d'ajouter de nouveaux threads de calcul en cas de nécessité (taille de la file d'attente). Une autre solution consiste à créer le thread à chaque nouveau calcul, mais il faut alors qu'un thread spécialisé (ou le programme principal) joue le rôle de planificateur (scheduler) pour scruter le tableau blanc et créer le thread. A reprendre ultérieurement.

Une subtilité qu'on ne voit pas tout de suite: quand un thread a pris en compte un message, il le déclare “encours” afin que les autres threads ne s'en occupent pas. Mais il faut aussi que le thread enregistre son nom dans le message. Pourquoi? Pour pouvoir retrouver le message dans la pile! En effet, le verrou est relaché avant le calcul, et quand le calcul est terminé, le message a changé d'index dans la pile à cause des autres threads. Pour la même raison, si le message change d'index, il aurait pu lui être affecté un autre résultat par un autre thread. Par contre, la recherche avec le statut “encours” + le nom du thread permet à chaque thread de retrouver son message à coup sûr.

Vous voyez comment on utilise le verrou (vtblanc) afin d'éviter que 2 threads puissent intervenir en même temps sur le tableau blanc. Un peu comme sur un vrai tableau blanc avec un seul marqueur: seul celui qui l'a en main peut écrire!

Dans le message, je n'ai mis que des chaînes de caractères, mais en fait on peut y acrocher n'importe quoi, y compris des images, de la musique ou de la vidéo. C'est l'application qui dira ce qui est utile d'y mettre. On pourrait aussi avoir par exemple un tableau (une liste de liste) pour représenter un damier d'échec ou une grille de reversi.

Vous voyez que certaines utilisations de fonction du tableau blanc est susceptible de générer des exceptions. Je n'ai pas mis en place le code nécessaire pour cela afin de rester simple, mais pour une application sérieuse, il le faudrait. Je n'ai pas choisi non plus de créer le tableau blanc avec une limite de taille de pile, mais le code est en place dans la classe TBlanc. Par contre, en cas de pile limitée, il faudrait gérer les déclenchements d'exceptions pour tentative de dépassement.

Développements possibles de cette technique

L'exemple pris ci-dessus est un peu basé sur la logique du “producteur-consommateur”, mais le principe peut servir d'autres logiques, comme par exemple un jeu avec plusieurs joueurs, chaque joueur pouvant créer de nouveaux messages.

Rien n'empêche qu'un message nécessite plusieurs phases de traitement par plusieurs threads. Il faudra alors pouvoir repérer dans le statut du message quel est son état d'avancement.

J'ai utilisé plusieurs threads sur le même ordinateur, c'est à dire qu'ils se partagent tous les ressources de la machine. Mais dans un contexte de réseau, rien n'empêche que certains traitements soient confiés à d'autres ordinateurs du réseau, et même qu'on les envoie sur les ordinateurs du réseau les moins chargés parmi ceux qui sont capables de réaliser le traitement. Voilà une bonne façon d'utiliser l'énorme potentiel machine des entreprises: quand on a 1000 PC utilisés chacun en moyenne à 10%, ça fait un très gros ordinateur disponible!

Puisque le tableau blanc est une file d'attente, on peut imaginer de simuler, par exemple, le fonctionnement d'un atelier de fabrication d'outillage, le tableau blanc en guise de file d'attente devant chaque machine, et un tableau blanc supplémentaire pour les messages généraux (alertes, pannes, maladies, accidents, …). Rien n'empêche d'ailleurs de fixer des critères de priorité qui ne sont pas uniquement l'ancienneté du message. On pourrait même générer des pannes aléatoires, conformément à l'historique de pannes de chaque machine, pour voir comment l'ensemble évolue. On peut aussi identifier les goulots, changer les périodes d'utilisation des machines, etc… J'ai fait des choses comme ça il y a 20 ans…

Je ne suis pas spécialement joueur, mais je vois assez bien l'application aux jeux et je suis assez tenté par un jeu (pas trop compliqué) qui permettrait à plusieurs threads de jouer entre eux, le tableau blanc représentant la table de jeu.

Au niveau de la simulation de systèmes vivants, je suis assez fasciné par les expériences concernant des organismes ayant un fonctionnement basique et instinctif comme les fourmis. Je verrai bien chaque fourmi représentée par un thread, et le tableau blanc faisant office en même temps d'environnement et d'échanges de messages: comment trouver les critères individuels de comportement instinctifs qui font que l'ensemble ressemble à une fourmilière?


J'espère que vous vous amuserez autant que moi! :-D

thread_tableaublanc.txt · Dernière modification: 2008/10/27 07:47 par tyrtamos

Outils de la page