Comment créer des pipelines de traitement de données en Python

Vous voulez traiter des données de manière itérative dans le style d’un pipeline de traitement de données (similaire aux tubes Unix). Par exemple, vous avez une énorme quantité de données qui doit être traitée, mais elle ne peut pas être stockée entièrement en mémoire.

Les fonctions Générateur sont un bon moyen d’implémenter des pipelines de traitement de données. Pour illustrer ce concept, supposons que vous ayez un énorme répertoire de fichiers journaux que vous voulez traiter:

foto/
       web-log007.gz
       web-log008.gz
       web-log009.gz
       ...
       web-log852.gz
    Var/
       web-log2236558.bz2
       ...
       web-log362445898

Supposons que chaque fichier contienne des lignes de données comme celle-ci:

194.115.6.19   - - [10/Jul/2019:00:18:50 -0500] "GET /client.txt   ..." 6548 71
210.219.209.67 - - [10/Jul/2019:00:18:51 -0500] "GET /numpy/       ..." 6548 11875
210.219.209.67 - - [10/Jul/2019:00:18:51 -0500] "GET /config.json  ..." 404  10369
61.135.216.105 - - [10/Jul/2019:00:20:04 -0500] "GET /providers.db ..." 304  -
...

Pour traiter ces fichiers, vous pouvez définir un ensemble de petites fonctions Générateur qui exécutent des tâches autonomes spécifiques. Par exemple:

import os
import fnmatch
import gzip
import bz2
import re

def genTrouver(filepat, top):
    '''
    Trouver tous les noms de fichiers d'une arborescence de répertoires qui correspondent à un motif de caractères génériques d'interpréteur de commandes
    '''
    for chemin, repliste, fichierliste in os.walk(top):
        for nom in fnmatch.filter(fichierliste, fichierchemin):
            yield os.path.join(chemin,nom)

def genOuvrir(filenames):
    '''
    Ouvrir une séquence de noms de fichiers un à la fois en produisant un objet fichier.
    Le fichier est fermé dès que l'on passe à l'itération suivante.
    '''
    for fichier in fichiers:
        if fichier.endswith('.gz'):
            f = gzip.open(fichier, 'rt')
        elif fichier.endswith('.bz2'):
            f = bz2.open(fichier, 'rt')
        else:
            f = open(fichier, 'rt')
        yield f
        f.close()

def genConcatener(iterateurs):
    '''
    Enchaîner une séquence d'itérateurs en une seule séquence.
    '''
    for it in iterateurs:
        yield from it

def genRecherche(motif, lignes):
    '''
    Rechercher un motif d'expression régulière dans une séquence de lignes
    '''
    moti = re.compile(motif)
    for ligne in lignes:
        if moti.search(ligne):
            yield ligne

Vous pouvez maintenant facilement empiler ces fonctions ensemble pour créer un pipeline de traitement. Par exemple, pour trouver toutes les lignes de logs qui contiennent le mot python, il suffit de faire ceci:

lognoms = genTrouver('web-log*', 'www')
fichiers = gen_opener(lognaoms)
lignes = genConcatener(fichiers)
pylignes = genRecherche('(?i)python', lignes)
for ligne in pylines:
    print(ligne)

Si vous voulez étendre le pipeline, vous pouvez même alimenter les données en expressions de générateur. Par exemple, cette version trouve le nombre d’octets transférés et additionne le total:

lognoms = gen_find('web-log*', 'www')
fichiers = gen_opener(lognoms)
lignes = genConcatener(fichiers)
pylignes = genRecherche('(?i)python', lignes)
bytecolonne = (ligne.rsplit(None,1)[1] for ligne in pylignes)
bytes = (int(x) for x in bytecolonne if x != '-')
print('Total', sum(bytes))

Le traitement des données en pipeline fonctionne bien pour une grande variété d’autres problèmes, y compris l’analyse, la lecture à partir de sources de données en temps réel, les sondages périodiques, et ainsi de suite.

Pour comprendre le code, il est important de comprendre que l’instruction yield agit comme une sorte de producteur de données, tandis que la boucle for agit comme un lecteur de données.

Lorsque les générateurs sont empilés ensemble, chaque yield fournit un seul élément de données à l’étape suivante du pipeline qui le consomme par itération.

Dans le dernier exemple, la fonction sum() est en train de piloter l’ensemble du programme, tirant un élément à la fois hors du pipeline des générateurs.

Une caractéristique intéressante de cette approche est que chaque fonction générateur tend à être petite et autonome. En tant que telles, elles sont faciles à écrire et à maintenir.

Dans de nombreux cas, elles sont tellement générales qu’elles peuvent être réutilisées dans d’autres contextes. Le code qui en résulte et qui colle les composants entre eux a également tendance à se lire facilement.

L’efficacité mémoire de cette approche ne peut pas non plus être surestimée. Le code affiché fonctionnerait toujours même s’il était utilisé sur un répertoire massif de fichiers. En fait, en raison de la nature itérative du traitement, très peu de mémoire serait utilisée.

Il y a un peu de subtilité extrême dans la fonction genConcatener(). Le but de cette fonction est de concaténer les séquences d’entrée en une longue séquence de lignes.

La fonction itertools.chain() exécute une fonction similaire, mais exige que tous les iterables chaînés soient spécifiés comme arguments.

Dans le cas de ce code particulièr, cela impliquerait une instruction telle que lignes = itertools.chain(*fichiers), ce qui entraînerait la consommation totale du générateur gebOuvrir().

Puisque ce générateur produit une séquence de fichiers ouverts qui sont immédiatement fermés à l’étape d’itération suivante, chain() ne peut pas être utilisé. La solution présentée évite ce problème.

La fonction genConcatener() permet également d’utiliser yield from pour déléguer à un sous-générateur. L’instruction rendement à partir de celui-ci fait simplement genConcatener() émettre toutes les valeurs produites par le générateur qu’il.

Ceci est décrit plus en détail dans l’article “Comment aplatir une séquence imbriquée“.

Enfin, et ce n’est pas le moins important, il convient de noter qu’une approche en pipeline ne fonctionne pas toujours pour tous les problèmes de traitement des données. Parfois, vous avez juste besoin de travailler avec toutes les données en même temps.

Cependant, même dans ce cas, l’utilisation de pipelines de générateur peut être un moyen de décomposer logiquement un problème en une sorte de flux de travail.

LAISSER UN COMMENTAIRE

Please enter your comment!
Please enter your name here