05 Apr 2020, 18:41

Utiliser JSONB sous PostgreSQL avec Sqlalchemy

Je travaille en ce moment sur l’opensource de Saucs (qui va d’ailleurs être renommé, j’en parlerai plus tard) et notamment sur la partie performance des requêtes SQL.

Si vous ne connaissez pas Saucs il s’agit d’un outil permettant de s’abonner à des vendeurs et leurs produits puis d’être notifié en cas de nouvelle CVE (ou de mise à jour d’une ancienne).

Sans JSONB

Voici le schema actuel :

$ \dt
              List of relations
 Schema |      Name       | Type  |  Owner
--------+-----------------+-------+----------
 …
 public | products        | table | ncrocfer
 public | products_cves   | table | ncrocfer
 public | users           | table | ncrocfer
 public | users_products  | table | ncrocfer
 public | users_vendors   | table | ncrocfer
 public | vendors         | table | ncrocfer
 public | vendors_cves    | table | ncrocfer

Une table par vendors et products, puis des tables d’associations avec les CVES. De même nous avons des tables d’association pour faire le lien entre les utilisateurs et leurs abonnements.

Les soucis de performance se faisaient ressentir sur certaines requêtes, notamment sur la page d’accueil de l’utilisateur :

Tout d’abord la requête permettant de retrouver les CVE en fonction des souscriptions de l’utilisateur était assez gourmande :

Cve.query.filter(
    or_(
        Cve.vendors.any(Vendor.users.any(User.id == current_user.id)),
        Cve.products.any(Product.users.any(User.id == current_user.id)),
    )
)

La preuve via l’extension Flask-DebugToolbar :

Autre problème comme vous pouvez le constater : la page affiche un extrait des vendeurs et produits affectés par chaque CVE, ce qui implique beaucoup de petites requêtes. Elles ne sont certes pas très gourmandes, mais mis bout à bout le chargement de la page devenait très long.

Avec JSONB

C’est durant mon refacto que j’ai vraiment découvert l’utilisation de JSONB sous PostgreSQL. L’idée était de ne pas stocker ces informations dans des tables d’association mais directement dans la table CVE.

Plusieurs avantages à cela :

  • la taille de la database est réduite (à titre d’exemple la taille products_cves contenait environ 350k items)
  • la mise à jour des CVE est simplifiée (il fallait jusqu’alors vérifier si l’association existait avant de l’insérer)
  • et les performances sont évidemment améliorées

La déclaration sous SQLAlchemy d’une colonne de type JSONB se fait simplement de la manière suivante :

from sqlalchemy.dialects.postgresql import JSONB

class Cve(BaseModel):
    __tablename__ = "cves"
    ...
    vendors = db.Column(JSONB)

    __table_args__ = (db.Index('cves_vendors_gin_idx', vendors, postgresql_using="gin"),)

L’index de type GIN est très important car il nous fera gagner là aussi un max de perf. Seul inconvénient : l’insertion est apparemment un tout petit peu plus lente. Je dis apparemment car c’est ce que j’ai lu dans la doc mais je ne l’ai pas du tout ressenti durant mes tests.

Pour info mes recherches et mes tests sont partis de ce thread StackOverflow.

Après modification du script d’import les données ressemblent désormais à cela :

$ SELECT cve_id, vendors FROM cves;
...
CVE-2020-3850  | ["apple", "apple-OA-mac_os_x"]
CVE-2020-10865 | ["avast", "avast-OA-antivirus", "microsoft", "microsoft-OA-windows"]
...

La chaîne -OA- est utilisée comme séparateur afin d’inclure les produits dans cette liste (un produit est lié à un vendeur, sans cela des conflits peuvent apparaître car plusieurs vendeurs peuvent avoir les mêmes noms de produits).

La requête permettant d’afficher les CVE propres à l’utilisateur devient alors :

from sqlalchemy.dialects.postgresql import array

Cve.query.filter(Cve.vendors.has_any(array(current_user.vendors)))

Les performances, notamment grâce à l’index, deviennent vraiment apréciables. De plus nous évitons la multitude de requêtes liées aux vendeurs car l’information est déjà portée par la table CVE :

Conclusion

Comme vous pouvez le voir sur les screenshots je suis passé de plusieurs secondes à quelques centaines de millisecondes. Qui plus est la mise à jour des CVE est vraiment simplifiée.

Nous avons utilisé la méthode has_any() qui permet de renvoyer chaque row répondant à au moins 1 critère. Dans le cas où nous n’aurions qu’un seul item à rechercher (“donnes moi toutes les CVE lié au vendeur debian”) nous pouvons utiliser la méthode contains :

Cve.query.filter(Cve.vendors.contains(["debian"]))

Pour finir il est tout de même important de noter que ce refacto amène un changement majeur : Saucs ne fonctionnera plus que sur PostgreSQL. J’ai décidé d’accepter cette contrainte afin d’améliorer les performances, notre prod tournait déjà sur PG et SQLite n’était utilisé que pour les tests. Il faudra désormais une seconde instance pour les lancer, mais ce n’est pas très grave.

Et si demain quelqu’un trouve une meilleure solution je suis clairement preneur, le projet va devenir opensource et il sera donc bientôt possible de contribuer directement au code :)

09 Feb 2020, 13:04

Celery : utiliser le filesystem comme broker et result backend

Lorsqu’on utilise Celery en production il est d’usage d’utiliser un broker robuste comme RabbitMQ. Dans le cas où nos tâches utilisent un canvas tel que chord il faudra également penser à configurer la connection vers un result backend afin de stocker le résultat des tâches. On pourra par exemple utiliser Redis.

Mais avant la mise en production il y a l’étape du développement des tâches. Et même si l’installation d’un Redis ou d’un RabbitMQ est assez simple, j’ai pu remarqué par le passé que ces dépendances fortes pouvaient parfois être un frein à l’adoption de Celery.

Hors il faut savoir que Celery est basé sur la librairie Kombu, utilisée pour transmettre des messages d’un producer à un consumer. Et la bonne nouvelle est que Kombu supporte le filesystem comme mode de transport !

Ce qui signifie qu’en configurant Celery on peut facilement s’abstraire de ces dépendances :

from celery import Celery

app = Celery('tasks')
app.conf.update({
    # Configuration du broker
    "broker_url": "filesystem://",
    "broker_transport_options": {
        "data_folder_in": "/tmp/out",
        "data_folder_out": "/tmp/out",
    },

    # Configuration du result backend
    "result_backend": "file:///tmp/result",
})

Comme vous le voyez c’est très simple :

  • on indique via le broker_url que nous souhaitons utiliser le système de fichiers,
  • puis on transmet à Kombu le dossier dans lequel les messages seront stockés.

Evidemment les clés data_folder_in et data_folder_out doivent être les mêmes, sinon vos producers écriront leurs messages dans un dossier et vos consumers tenteront de les lire ailleurs.

Et comme Celery supporte par défaut le filesystem comme backend, sa configuration peut se faire via la clé result_backend.

Il est important que les dossiers existent avant de lancer Celery, voici donc un petit snippet très simple qui les créera pour vous sous un dossier .celery du répertoire courant :

# tasks.py
import os
from celery import Celery

OUT_DIR = os.path.join(os.getcwd(), ".celery/out")
RESULT_DIR = os.path.join(os.getcwd(), ".celery/result")

# Create folders if they don't exist
for dir in [OUT_DIR, RESULT_DIR]:
    if not os.path.exists(dir):
        os.makedirs(dir)


app = Celery('tasks')
app.conf.update({
    "broker_url": "filesystem://",
    "broker_transport_options": {
        "data_folder_in": OUT_DIR,
        "data_folder_out": OUT_DIR,
    },
    "result_backend": f"file://{RESULT_DIR}",
})

@app.task
def tsum(numbers):
    return sum(numbers)

On peut tester en lançant un simple chord :

In [1]: from tasks import tsum

In [2]: from celery import chord

In [3]: chord(
   ...:     [tsum.si([1, 2]), tsum.si([3, 4])],
   ...:     tsum.s()
   ...: ).delay()
Out[3]: <AsyncResult: 11422611-c389-4020-9358-ca485b14d65d>

Puis en visualisant les résultats du worker :

$ celery@LAPTOP v4.4.0 (cliffs)

Darwin-17.7.0-x86_64-i386-64bit 2020-02-09 14:11:36

[config]
.> app:         tasks:0x10605a8d0
.> transport:   filesystem://localhost//
.> results:     file:///Users/ncrocfer/Dev/.celery/result
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.tsum

[2020-02-09 14:11:36,645: INFO/MainProcess] Connected to filesystem://localhost//
[2020-02-09 14:11:36,677: INFO/MainProcess] celery@LAPTOP ready.
[2020-02-09 14:11:36,682: INFO/MainProcess] Received task: celery.chord_unlock[71b7898d-b3b4-4689-9f4b-ebbe2e96cb1d]  ETA:[2020-02-09 13:10:35.235858+00:00]
[2020-02-09 14:11:36,687: INFO/MainProcess] Received task: tasks.tsum[39a3728b-ee9a-44f8-b661-97e20e74c72e]
[2020-02-09 14:11:36,694: INFO/MainProcess] Received task: tasks.tsum[e79e3917-1c80-4366-adcc-23b601f38836]
[2020-02-09 14:11:36,700: INFO/ForkPoolWorker-1] Task tasks.tsum[39a3728b-ee9a-44f8-b661-97e20e74c72e] succeeded in 0.005183988017961383s: 3
[2020-02-09 14:11:36,705: INFO/ForkPoolWorker-2] Task tasks.tsum[e79e3917-1c80-4366-adcc-23b601f38836] succeeded in 0.005367834935896099s: 7
[2020-02-09 14:11:37,737: INFO/ForkPoolWorker-3] Task celery.chord_unlock[71b7898d-b3b4-4689-9f4b-ebbe2e96cb1d] succeeded in 0.05950633704196662s: None
[2020-02-09 14:11:38,711: INFO/MainProcess] Received task: tasks.tsum[11422611-c389-4020-9358-ca485b14d65d]
[2020-02-09 14:11:38,719: INFO/ForkPoolWorker-4] Task tasks.tsum[11422611-c389-4020-9358-ca485b14d65d] succeeded in 0.0034185299882665277s: 10

Evidemment cette configuration n’est à utiliser qu’en mode développement, je vous conseille fortement de passer sous un autre broker lorsque vous passerez vos tâches en prod :)

12 Jan 2020, 13:25

Celery : l'importance du pool d'exécution

En tant que dev nous sommes régulièrement amenés à devoir exécuter des tâches en arrière-plan. La plupart du temps il s’agira de répondre à une action de l’utilisateur (call d’API, click sur un bouton…), parfois nous aurons besoin de lancer une tâche de manière périodique.

Dans les 2 cas l’objectif est de ne pas bloquer le processus principal. En effet je vous laisse imaginer les pertes de performance si, au moment d’un POST sur votre API, votre application se chargeait d’exécuter elle-même une tâche lourde et très longue. L’application rendrait la main à l’utilisateur plusieurs secondes, voire plusieurs minutes, après le call.

Pour répondre à ce besoin les dev Python ont pris l’habitude d’utiliser la librairie Celery. Cet article n’est pas une introduction à cet outil, pour cela je vous renvoie vers le quickstart officiel, vous comprendrez rapidement à quoi il sert et surtout comment l’utiliser.

Nous allons parler ici d’une feature qui est parfois délaissée lorsqu’on utilise Celery : les pools d’exécution. Derrière cette notion se cache toute la magie de Celery : comment et par quoi mes tâches vont-elles s’exécuter. Et vous allez voir que configurer correctement cette option peut amener des gains de performances incroyables !

La théorie

Lorsque nous lançons un worker Celery, nous lançons en fait un processus chargé de coordonner le lancement des tâches empilées dans le broker. En réalité leur exécution à proprement parler ne sera pas effectuée par ce processus mais par d’autres processus enfants ou par des threads qu’il aura lui-même spawné.

Et c’est justement la façon dont est configuré le pool d’éxécution qui détermine comment sont lancés ces processus ou threads capables d’exécuter les tâches.

Celery fournit par défaut 4 pools d’exécution : prefork, solo, gevent et eventlet. La configuration s’effectue via l’option -P.

Mode prefork

Par défault le pool sélectionné par Celery est le prefork. Nous pouvons facilement le voir lorsque nous lançons un worker :

$ celery -A tasks worker
celery@LAPTOP v4.4.0 (cliffs)

Darwin-17.7.0-x86_64-i386-64bit 2020-01-26 14:31:06

[config]
.> app:         tasks:0x10566ca20
.> transport:   redis://localhost:6379/0
.> results:     disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

Ici la ligne concurrency: 4 (prefork) nous indique que nous utilisons le pool prefork avec une concurrency de 4. Ce mode d’exécution se base sur le package multiprocessing : Celery va donc spawner autant de processus que le nombre de coeurs de votre machine.

Nous pouvons vérifier que les processus ont bien été spawnés par Celery (le process principal et les 4 forks):

$ ps -A | grep -i celery
91822 ttys000    0:01.59 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker
91828 ttys000    0:00.02 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker
91829 ttys000    0:00.02 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker
91830 ttys000    0:00.02 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker
91831 ttys000    0:00.02 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker

Ce mode est particulièrement adapté aux applications nécessitant du calcul par le CPU (ces applications sont dîtes CPU bound). Plus vous aurez de coeurs puissants et plus votre application pourra traiter de tâches en parallèle.

A noter que la concurrency est paramétrable via l’option -c :

$ celery -A tasks worker -c 2
...
.> concurrency: 2 (prefork)
...

Mode solo

Ce mode est un peu particulier car le processus principal se chargera lui-même d’exécuter les tâches qu’il consommera :

$ celery -A tasks worker -P solo
...
.> concurrency: 4 (solo)
...

La concurrency reste à 4 mais ne vous y fiez pas, le mode solo ne s’en encombre pas et n’exécutera qu’une seule tâche à la fois.

Là encore nous pouvons le vérifier via les processus en cours d’exécution sur notre machine :

$ ps -A | grep -i celery
92356 ttys000    0:01.13 /Users/ncrocfer/Dev/celery_pool/venv/bin/python3 /Users/ncrocfer/Dev/celery_pool/venv/bin/celery -A tasks worker -P solo

Hormis pour lancer des tests, ce mode pourra être utile dans le cas d’un usage en environnement containers. En effet les comptes sont simples dans ce cas : N containers égal N tasks en parallèle.

Mode gevent & eventlet

Là on arrive dans la partie intéressante :) A l’inverse du mode prefork utile pour gérer des tâches CPU bound, ces deux modes vont nous permettre de gérer facilement les tâches dîtes IO bound.

De telles tâches n’effectuent pas de calculs lourds, et ne nécessitent donc pas un grand nombre de CPU puissants. En revanche elles peuvent être amenées à effectuer beaucoup de requêtes vers d’autres systèmes : une API externe, une base de données, un site web, etc. Ici notre CPU n’a pas grand chose à faire puisque nos tâches vont principalement attendre le retour de leurs requêtes.

Je vous invite à vous renseigner sur la librairie Asyncio qui expliquera tout ça mieux que moi.

Dans l’exemple ci-dessous notre worker pourra gérer 100 tâches en parallèle via le mode gevent :

$ celery -A tasks worker -P gevent -c 100
...
.> concurrency: 100 (gevent)
...

La pratique

Trève de théorie, illustrons maintenant l’utilisation des pool d’exécution via un exemple.

Nous souhaitons lancer plusieurs tâches en parallèle effectuant une requête vers une API externe. Cette API renvoie un nombre au hasard, et une fonction de callback est appelée afin d’effectuer la somme de ces nombres. Certes cet exemple est basique mais il va nous permettre de mettre en évidence la puissance des pools d’exécution dans Celery.

L’API tourne en locale, j’ai utilisé Sanic avec un timeout de 2 secondes simulant la latence d’une API externe :

$ cat app.py
import random
import asyncio

from sanic import Sanic
from sanic.response import json


app = Sanic()


@app.get("/")
async def get_time(request):
    await asyncio.sleep(2)
    return json({"number": random.randint(1, 10)})


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

Nos tâches sont elles-aussi très simples :

$ cat tasks.py
import requests
from celery import Celery, chord


app = Celery('tasks', broker='redis://localhost:6379/0', backend="redis://localhost:6379/1")


@app.task
def get_number():
    resp = requests.get("http://localhost:8000")
    return resp.json()["number"]

@app.task
def sum_numbers(numbers):
    return sum(numbers)


sign = chord(
    [get_number.si() for _ in range(5)],
    sum_numbers.s()
)

J’utilise un chord afin de lancer plusieurs requêtes en parallèle. La fonction sum_numbers est ensuite lancée afin de calculer la somme totale des nombres.

Le lancement peut s’effectuer en une ligne :

$ python -c "from tasks import sign; sign.delay()"

Testons avec le pool d’exécution par défaut (prefork) :

$ celery -A tasks worker --loglevel=INFO
...
[2020-01-26 19:30:48,852: INFO/MainProcess] Received task: tasks.get_number[5f2f8167-c667-4e83-b591-453e57b3a958]
[2020-01-26 19:30:48,855: INFO/MainProcess] Received task: tasks.get_number[a333586b-c396-4b91-917f-3fbdfa6d30e8]
[2020-01-26 19:30:48,860: INFO/MainProcess] Received task: tasks.get_number[88fa7cc0-2622-4053-8824-2f2d47c25a04]
[2020-01-26 19:30:48,876: INFO/MainProcess] Received task: tasks.get_number[fa35004e-51b8-4f48-ad84-e02640ed9621]
[2020-01-26 19:30:48,880: INFO/MainProcess] Received task: tasks.get_number[5ecb73c8-9ba5-4ebb-aaae-78e14908405a]
[2020-01-26 19:30:50,915: INFO/ForkPoolWorker-2] Task tasks.get_number[5f2f8167-c667-4e83-b591-453e57b3a958] succeeded in 2.057117607037071s: 1
[2020-01-26 19:30:50,915: INFO/ForkPoolWorker-3] Task tasks.get_number[a333586b-c396-4b91-917f-3fbdfa6d30e8] succeeded in 2.057576177001465s: 2
[2020-01-26 19:30:50,920: INFO/ForkPoolWorker-1] Task tasks.get_number[88fa7cc0-2622-4053-8824-2f2d47c25a04] succeeded in 2.0558868430089206s: 2
[2020-01-26 19:30:50,933: INFO/ForkPoolWorker-4] Task tasks.get_number[fa35004e-51b8-4f48-ad84-e02640ed9621] succeeded in 2.055466585967224s: 1
[2020-01-26 19:30:52,972: INFO/ForkPoolWorker-2] Task tasks.get_number[5ecb73c8-9ba5-4ebb-aaae-78e14908405a] succeeded in 2.0525691980146803s: 10
[2020-01-26 19:30:52,973: INFO/MainProcess] Received task: tasks.sum_numbers[605e4af4-229c-42a3-9be0-ce5daff79a0f]
[2020-01-26 19:30:52,975: INFO/ForkPoolWorker-2] Task tasks.sum_numbers[605e4af4-229c-42a3-9be0-ce5daff79a0f] succeeded in 0.0006177640170790255s: 16

Les tâches sont toutes prises en compte en une seule fois par notre worker (rien à voir avec la notion de pool d’exécution, il s’agit ici du prefetch, en gros le nombre de tâches qu’un worker peut se réserver).

En revanche seules 4 tâches sont effectuées en même temps, il faudra attendre 2 secondes supplémentaires pour que la 5ème le soit également. Le workflow complet aura donc pris 4 secondes.

Testons maintenant avec le pool d’exécution gevent :

$ celery -A tasks worker --loglevel=INFO -P gevent -c 100
...
[2020-01-26 19:35:12,151: INFO/MainProcess] Received task: tasks.get_number[04192b64-b71e-4e1c-8eb8-f86487c6b013]
[2020-01-26 19:35:12,163: INFO/MainProcess] Received task: tasks.get_number[12d227b5-f0b1-4fd8-8b44-25e8d8fbc028]
[2020-01-26 19:35:12,173: INFO/MainProcess] Received task: tasks.get_number[db7838f5-3543-414c-9816-7df62b13b65d]
[2020-01-26 19:35:12,182: INFO/MainProcess] Received task: tasks.get_number[52f163d4-7d75-493a-a3ac-65122691b078]
[2020-01-26 19:35:12,193: INFO/MainProcess] Received task: tasks.get_number[14537419-8710-4b32-bb13-6c884ff22dca]
[2020-01-26 19:35:14,202: INFO/MainProcess] Task tasks.get_number[04192b64-b71e-4e1c-8eb8-f86487c6b013] succeeded in 2.048632029967848s: 6
[2020-01-26 19:35:14,207: INFO/MainProcess] Task tasks.get_number[12d227b5-f0b1-4fd8-8b44-25e8d8fbc028] succeeded in 2.042411718983203s: 2
[2020-01-26 19:35:14,208: INFO/MainProcess] Task tasks.get_number[14537419-8710-4b32-bb13-6c884ff22dca] succeeded in 2.0140742670046166s: 10
[2020-01-26 19:35:14,209: INFO/MainProcess] Task tasks.get_number[db7838f5-3543-414c-9816-7df62b13b65d] succeeded in 2.03493124502711s: 6
[2020-01-26 19:35:14,225: INFO/MainProcess] Task tasks.get_number[52f163d4-7d75-493a-a3ac-65122691b078] succeeded in 2.0403319080360234s: 9
[2020-01-26 19:35:14,225: INFO/MainProcess] Received task: tasks.sum_numbers[043e87a0-4963-468a-9f17-7ffe93127fcd]
[2020-01-26 19:35:14,227: INFO/MainProcess] Task tasks.sum_numbers[043e87a0-4963-468a-9f17-7ffe93127fcd] succeeded in 0.0006761000258848071s: 33

Ici les 5 tâches sont bien effectuées en même temps, le workflow aura donc mis 2 secondes. Et encore je n’ai lancé que 5 tâches en parallèle, n’hésitez pas à augmenter ce nombre afin de constater par vous-même les résultats. On comprend bien la puissance de Gevent face à des tâches dédiées à de l’IO Bound.

Conclusion

Les pool d’exécution ne sont pas beaucoup mis en avant dans la documentation Celery, hors comme vous le voyez il s’agit d’une configuration très simple à mettre en oeuvre afin de bénéficier de meilleurs performances.

N’hésitez surtout pas à utiliser les queues pour effectuer le routing de vos tâches IO bound et CPU bound. Les tâches gourmandes en CPU pourront être redirigées vers une queue cpu et les tâches gourmandes en IO bound vers une queue io.

Il suffira alors de lancer 2 workers avec les bons arguments :

$ celery -A tasks worker -P gevent -c 100 -Q io
$ celery -A tasks worker -P prefork -c 4 -Q cpu

Vous trouverez plus d’info sur les queues ici.

En espérant que cet article vous ait plus, n’hésitez pas à me contacter sur Twitter si vous avez des questions ou des remarques :)

27 Oct 2019, 23:44

Retour d'experience sur le lancement d'un projet opensource

Ceux qui me suivent sur Twitter savent que j’ai créé il y a quelque temps une plateforme dédiée à l’alerting de vulnérabilités : Saucs.com.

Déjà 3 ans (le premier commit date du 03/11/2016) que le projet vit et que de nombreux utilisateurs s’en servent pour effectuer leur veille au quotidien. Malheureusement la vie personnelle et professionnelle fait qu’il est très difficile de maintenir seul ce genre de projet, c’est pourquoi il a été décidé en 2019 d’opensourcer Saucs.

Cet article explique les difficultés d’un tel projet.

Pourquoi opensourcer Saucs

Pour info la plateforme s’appelait à l’origine SecAdvisory, mais nous avons décidé de la renommer lorsque Laurent a pris part au projet en tant que sysadmin. Il a depuis fait un boulot de fou, et si le site tient si bien la charge c’est vraiment grâce à lui et à l’architecture qu’il a su mettre en place.

A la base Saucs est né d’un besoin personnel : je souhaitais être tenu au courant des vulnérabilités d’un produit mais aucune plateforme ne répondait à mes attentes. L’idée a donc été de parser les CVE mises à disposition par le NVD, d’en extraire les produits associés (grâce aux CPE) puis de proposer un système d’alertes basé dessus.

En soit le principe est extrèmement simple, mais de nombreux utilisateurs ont été séduits par Saucs, peut-être dû au fait que le NVD ne fournit pas cette fonctionnalité sur son propre site. Les chiffres actuels sont plutôt sympas : au moment de l’écriture de cet article nous comptons exactement 3,351 utilisateurs et 1,184,274 rapports envoyés.

Malheureusement nous ne sommes pas assez et nous n’avons pas le temps de tout gérer. Alors que nous avons des tonnes d’idées en tête, leur concrétisation est devenu très compliquée par simple manque de temps. Les fonctionnalités ne sortent plus à un rythme convenable, et c’est pourquoi nous avons donc décidé début 2019 de libérer le code source et de transformer Saucs en solution opensource.

A priori l’idée est excellente : un code maintenu par la communauté, plus de fonctionnalités, d’autres outils basés dessus, la possibilité d’installation on-premise, etc.

Mais c’est bien plus facile à dire qu’à faire, et je vais vous expliquer dans cet article pourquoi. En outre les utilisateurs réclamant à tout bout de champs où j’en suis comprendront peut-être un peu mieux pourquoi cela met autant de temps :)

Les difficultés rencontrées

La principale difficulté que j’ai rencontré a été de me replonger dans mon propre code source ! Cela faisait plusieurs mois que je n’y avais pas touché (voire plus d’un an), et la reprise a été difficile.

Il a ensuite fallu décider ce que je devais garder et ce que je devais modifier. La liste des choses à changer était finalement assez courte : le thème (indispensable car acheté en ligne et sous licence), l’upgrade des dépendances, un peu de refacto par-ci par-là, et c’était à priori tout. Ce qui pouvait me paraître simple m’a finalement pris plusieurs soirées et week-ends sur mon temps libre (pour rappel ce projet n’est en aucun cas lié à mon job).

Ce travail de refacto a malheureusement fait apparaître un nouveau problème : le legacy accumulé en 3 ans. En effet l’écosystème entourant les CVE avait sacrément évolué, et les flux proposés par le NVD aussi. Les fichiers XML sur lesquels Saucs se basait sont désormais dépréciés par un flux JSON. Encore une fois, ce qui peut paraître simple (“il suffit de changer le parser, voyons”) ne l’est pas forcément quand ça n’a pas été prévu (dépendances fortes dans le code).

Anecdote amusante : j’ai entamé la migration du XML vers le JSON il y a quelques mois. A ce moment-là le NVD fournissait la version 1.0 de ce flux, et je me servais d’une clé nommée affects pour parser les vendeurs et leurs produits affectés. Cette clé était très pratique puisqu’elle fournissait directement la liste dans un format simple. Malheureusement j’ai eu la mauvaise surprise en revenant de vacances de constater que mon code levait une exception… la clé avait été supprimée dans la nouvelle version 1.1 de ce flux (changelog):

“In CVE_JSON_4.0_min.schema, the affects element has been removed from the required properties.”

Encore du travail non prévu. Je pourrai également citer l’exemple des CVSS : Saucs était à l’origine basé sur la version 2 alors que la version 3 est désormais proposée. Un changement dans le model Python (et donc dans la database directement) a dû être opéré, impliquant des migrations dans le schema actuel.

Le refacto du code m’a également permis de détecter ce que je pourrais appeler aujourd’hui des erreurs de conception. Erreurs qui me paraissaient normales et non gênantes il y a 3 ans. C’est là qu’on remarque que notre façon de coder évolue dans le temps :)

Pour finir il est également important de noter que le code ne pouvait pas être opensourcer tel quel car l’installation était pour ainsi dire chaotique. En effet l’import initial des CVE, CPE et CWE prenait plusieurs heures, il a donc fallut travailler à l’amélioration de cette partie. Pour info c’est désormais faisable en quelques minutes grâce à la commande saucs import.

Les erreurs à éviter

Je dirai que la principale erreur que j’ai commise a été d’annoncer l’opensource de Saucs avant même d’avoir commencé à travailler dessus.

Je ne savais pas la charge de travail que cela allait représenter, et j’ai créé une attente parmi certains utilisateurs. Il faut savoir que je reçois régulièrement des messages sur Twitter de personnes, et parfois d’entreprises, qui me demandent où j’en suis. La plupart de ces messages sont bienveillants. Les utilisateurs souhaitent juste l’utiliser chez eux en local, ou alors certaines entreprises me contactent pour affiner leur propre roadmap. Néanmoins je reçois parfois des messages pouvant se résumer à : “bah alors t’en es où de l’opensource ?? Ca traine !”. Ceux-là me font sourire, et dans ces cas-là je leur rappelle tout simplement que je ne leur dois rien.

Dans tous les cas je vous conseille de bien à réfléchir à votre roadmap avant d’annoncer quoi que ce soit, l’attente créée peut être énorme et on vous demandera tôt ou tard des comptes.

Ma deuxième erreur a été de vouloir trop en faire. Je me suis lancé dans un refacto complet du code, ce qui a impliqué beaucoup de changements que je n’avais pas prévu. J’ai finalement décidé de stopper ces évolutions avant l’opensource et de les proposer après. Qui sait, peut-être que ce sera la communauté elle-même qui les créera. J’en serai ravi !

And next ?

L’ajout de la version 3 des CVSS a rajouté un bug lors de l’envoie des mails, j’ai découvert ça ce week-end et je dois le fixer. Une fois cela fait nous allons écrire les scripts pour migrer les données de la base actuelle vers le nouveau schema. Seulement à ce moment là, lorsque la nouvelle version sera disponible en ligne sur Saucs.com, le code source pourra être libéré sur Github.

J’ai hâte que ce moment arrive, d’une part car je sais que l’attente a été longue et que certains d’entres vous sont impatients, et d’autre part car des développeurs m’ont déjà proposé leur aide pour de futures fonctionnalités.

Nous avons vraiment envie que ce projet serve au plus grand nombre, et cela ne peut passer que par l’opensource. C’est la dernière ligne droite, ça devient bon !

10 Dec 2018, 10:00

Introduction à la base de données orientée graphe Neo4j

Je travaille depuis un an et demi sur un projet chez OVH qui calcule la qualité de service de notre infrastructure.

Cet outil, nommé DEPC, se compose d’une API développée en Flask ainsi que d’une WebUI. Le fonctionnement interne repose sur 3 composants majeurs :

  • Apache Airflow pour le scheduling,
  • les base de données TimeSeries pour le calcul de la QOS,
  • et enfin la base de données Neo4j pour la gestion des dépendances.

Je ne vais pas présenter DEPC dans cet article, nous allons bientôt l’open sourcer et j’en parlerai à ce moment-là. En revanche laissez-moi vous présenter une base données que j’utilise désormais au quotidien et que j’affectionne tout particulièrement : Neo4j.

Base de données orientée graphe

En tant que développeur nous sommes habitués à travailler avec des bases de données relationnelles, que ce soit PostgreSQL, MySQL ou encore Oracle pour n’en citer que quelques unes (la liste est longue).

Pour rappel l’élément principal d’une base de données relationnelle est la table : les données qui y sont stockées sont organisées sous forme de tableau où chaque ligne correspond à un enregistrement et chaque colonne à une catégorie de même type.

Ce format est parfait pour la plupart de nos usages, notamment grâce à leur propriété ACID. Néanmoins les limites sont vite atteintes dans certains cas, notamment lorsqu’il s’agit de réaliser plusieurs jointures en une seule requête.

Les bases de données orientées graphe ont donc été créées pour pallier entre autres à ce problème. Et on peut dire qu’elles connaissent un succès grandissant depuis quelques années (source):

Et ça tombe bien puisque Neo4j est la base de données orientées graphe la plus répandue !

Intérêts des graphes

Un graphe représente un ensemble de points reliés entre eux par des arcs (source de l’image):

L’intérêt des graphes devient évident lorsque l’on souhaite visualiser les intéractions entre différentes données.

Prenons le cas d’un projet Python nécessitant de nombreuses librairies. Certaines de ces librairies nécessitent elles-même d’autres librairies, et ainsi de suite. Il est très difficile avec pip de retrouver qui dépend de qui.

Le projet Scrapy par exemple requiert une vingtaine de librairies :

$ pip install scrapy
$ pip freeze | wc -l
25

Vous le savez la sortie de pip freeze nous renvoie la liste des modules à plat, sans aucune arborescence. Il est alors très difficile de visualiser quelle librairie dépend d’une autre.

Nous pouvons utiliser l’outil pipdeptree afin de générer un arbre de dépendances de nos modules :

$ pip install pipdeptree graphviz
$ pipdeptree --graph-output png > scrapy.png

Nous obtenons le graphe suivant, bien plus lisible que le résultat de pip :

Comme vous le voyez les graphes permettent très rapidement de comprendre la structure de la base de données et les relations entre les différents noeuds. Le graphe précédent nous permet facilement de distinguer quelles sont les dépendances directes et indirectes de Scrapy.

Ce graphe ne contient que quelques dizaines de noeuds, nous pouvons donc facilement le parcourir de tête. Mais imaginez un graphe contenant plusieurs centaines de milliers de noeuds liés entre eux par des relations.

Et bien Neo4j répond à cette problématique en nous fournissant les outils nécessaires à la construction et au parcours optimisé de nos graphes !

Les concepts

Les données stockées dans Neo4j vont donc être organisées sous la forme d’un graphe, les noeuds étant reliés entre eux par des relations. Des propriétés (clé:valeur) peuvent enrichir les noeuds et les relations afin d’y ajouter du contexte :

Nous pouvons retrouver 2 noeuds dans ce graphe reliés entre eux par une relation de type WORKS AT. Des propriétés ont été ajoutées afin de compléter les informations propres à chaque noeud.

Des noeuds de même type sont regroupés au sein de labels :

Nous avons rajouté un noeud de label Job, lié au noeud Person via une relation de type HAS. Sur ce graphe nous pourrions très facilement répondre à la requête “Donnes moi l’emploi actuel de tous les développeurs Python Français”.

Et c’est tout ! Il existe évidemment d’autres notions internes à Neo4j, telles que les index ou les contraintes, mais ces 4 notions de base (noeuds, relations, propriétés, labels) suffisent à débuter l’utilisation concrète de Neo4j !

Installation

Neo4j s’installe très simplement dès lors que Java 8 est disponible sur votre OS. Je suis pour ma part sur Debian, nous devons donc ajouter le repo officiel à nos sources :

$ wget -O - https://debian.neo4j.org/neotechnology.gpg.key | sudo apt-key add -
$ echo 'deb https://debian.neo4j.org/repo stable/' | sudo tee -a /etc/apt/sources.list.d/neo4j.list
$ sudo apt-get update

Puis nous installons ensuite Neo4j comme tous les autres packages (la dernière version stable à l’écriture de cet article est la 3.5.0) :

$ sudo apt-get install neo4j=1:3.5.0

Utilisation

La première chose à faire est d’ouvrir le Neo4j Browser : il s’agit d’une application très pratique qui va vous permettre de créer vos données et de les visualiser à travers une interface web.

Le browser est accessible en local sur le port 7474 : http://localhost:7474.

Vous devez tout d’abord vous connecter grâce aux identifiants neo4j / neo4j (pas d’inquiétude le browser vous demandera tout de suite après de modifier votre mot de passe).

Voici ce que vous devriez obtenir une fois connecté :

Comme vous le voyez le browser peut se découper en 3 parties :

  1. C’est ici que nous retrouverons des informations sur notre instance Neo4j, comme la version ou la taille, ainsi que la liste des labels, des types de relations et des propriétés stockées dans notre base de données.
  2. Ce formulaire va vous permettre de saisir vos requêtes afin d’interroger Neo4j.
  3. Et c’est dans cette vue que les résultats de vos précédentes requêtes s’afficheront.

Nous allons créer les 4 noeuds que j’ai donné en exemple dans la partie “Les concepts”. Pour cela nous allons utilisons un langage de requête créé pour Neo4j : le Cypher, qui est un peu le SQL des graphes :)

Par exemple l’insertion et la sélection d’une données en SQL se ferait grâce à la requête suivante :

CREATE TABLE person (name varchar, from varchar);
INSERT INTO person VALUES ('Nicolas', 'France');
SELECT * FROM person;

Voici l’équivalent en Cypher :

CREATE(nico:Person{name: "Nicolas", from: "France"}) RETURN nico;

Vous pouvez remarquer plusieurs choses :

  • la création d’un noeud se fait grâce au mot-clé CREATE,
  • le noeud (que nous avons nommé nico) est entouré de parenthèses,
  • les propriétés s’ajoutent entre les accolades,
  • nous retournons le noeud nouvellement créé.

Notez également que nous n’avons pas eu besoin de créer le label Person. En effet Neo4j s’est chargé de le faire lui-même au moment de la création du noeud (pour rappel Neo4j fait partie des bases de données NoSQL).

Nous pouvons d’ailleurs le voir dans le Browser Neo4j sous la partie “Node Labels” :

Nous allons maintenant créer le noeud de type Job que nous lierons avec le noeud Person grâce à la requête suivante :

MATCH(nico:Person{name: "Nicolas", from: "France"})
CREATE(nico)-[:HAS]->(job:Job{profile: "Dev", lang: "Python"})
RETURN nico, job;

La forme change un peu ici :

  1. tout d’abord je sélectionne le noeud nico,
  2. puis, en utilisant une seule requête CREATE, je créé en base le noeud job et j’ajoute la relation HAS entre les deux,
  3. je retourne pour finir mes 2 noeuds.

Les propriétés peuvent être visibles dans le Browser en survolant un noeud :

Pour finir créons en une seule requête les noeuds de type Company :

MATCH(nico:Person{name: "Nicolas", from: "France"})
MERGE(nico)-[:`WORKS AT`{since: 2015}]->(ovh:Company{name: "OVH"})
MERGE(nico)-[:`WORKS AT`{until: 2015, since: 2013}]->(anssi:Company{name: "Anssi"})
RETURN nico, ovh, anssi;

Note : Les plus attentifs auront remarqué le mot-clé MERGE, que nous pourrions renommer en GET or CREATE. Il agit ici comme le CREATE que nous avons vu plus tôt, mais cette commande n’aurait rien ajouté de plus si les noeuds Company et leurs relations existaient déjà. Je vous la montre car elle peut être très pratique lorsque l’on souhaite réaliser des requêtes idempotentes.

Il est désormais temps d’afficher notre graphe, et le browser va nous être très pratique car nous allons le faire sans écrire une seule requête :

  1. Sélectionnez le label Person dans la liste des labels à gauche de l’écran.
  2. Vous devriez voir apparaître le noeud “Nicolas”, cliquez dessus.
  3. Le noeud s’entoure alors de différent bouton, cliquez sur celui du bas afin de déplier les dépendances directes.

Nous avons bien affiché le graphe complet sans taper une ligne de Cypher :

Pour info la requête équivalente aurait été la suivante :

MATCH(n:Person)-[]->(m) RETURN n, m

L’astuce consiste à sélectionner les noeuds de type Person, puis à retourner l’ensemble de ses dépendances sans rien filtrer. Attention néanmoins à cette requête si votre graphe contient trop de données, votre navigateur risque de ne pas apprécier :)

Pour finir cette partie souvenez-vous de la question posée plus haut : “Donnes moi l’emploi actuel de tous les développeurs Python Français”. Et bien la réponse serait apportée par la requête suivante :

MATCH(p:Person{from: 'France'})-[:HAS]->(:Job{profile: 'Dev', lang: 'Python'})
MATCH(p)-[r:`WORKS AT`]->(c:Company)
WHERE NOT EXISTS(r.until)
RETURN p.name, c.name

Conclusion

Nous n’avons fait que survoler Neo4j et le Cypher. Si vous souhaitez en apprendre plus sur ce langage, je vous invite à vous tourner vers la documentation officielle, vous y apprendrez notamment à filtrer vos résultats, à les ordonner ou encore à utiliser des algorithmes optimisés de parcours de graphe.

Comme je le disais en introduction de cet article j’utilise désormais Neo4j au quotidien dans le cadre de DEPC. Nous gérons plusieurs millions de noeuds et autant de relations, et je dois dire que je ne suis pas du tout déçu de ce choix technique.

Evidemment le choix d’une base de données orientée graphe répondra généralement à une problématique précise. Bien souvent une base de données relationnelle répondra à vos besoins, mais si vos données peuvent s’architecturer sous la forme d’un graphe alors n’hésitez pas et sautez le pas, vous ne serez pas déçu.

02 Nov 2018, 09:25

Gestion de Tâches avec Apache Airflow

Apache Airflow est un outil open source d’orchestration de workflows : si vous êtes habitués à gérer des tâches cron au quotidien, alors cet article devrez vous plaire.

Airflow vs. Cron

Pour l’histoire Airflow a été créé en 2014 par Maxime Beauchemin lorsqu’il travaillait chez Airbnb. En 2016 le projet a rejoint le programme d’incubation de la fondation Apache, c’est pourquoi nous parlons désormais d’Apache Airflow.

Alors qu’est-ce qui le différencie des tâches cron ? Après tout nous y sommes habitués, et il faut avouer qu’il est très simple de lancer de manière régulière un script, une commande ou un programme.

Mais il y a fort à parier que votre tâche rencontrera tôt ou tard un problème :

  • un timeout sur une requête SQL,
  • un changement de clé dans le Json d’une API que vous interrogez,
  • un site web injoignable au moment d’un GET,
  • etc.

A la limite si notre script est très simple nous pouvons gérer ces cas dans le code lui-même, il ne faudra en oublier aucun, mais ça reste faisable.

En revanche les choses se compliquent lorsque nous devons gérer plusieurs tâches dépendantes les unes des autres. Imaginez qu’une tâche B ait besoin de données créées par une tâche A, elle doit donc s’exécuter après. En cron nous laisserions un temps raisonnable entre le lancement des 2 tâches pour que celles-ci s’exécutent correctement. Sauf que ce laps de temps est fixé, si la tâche A plante pour une raison ou une autre, la tâche B s’exécutera tout de même avec potentiellement des données corrompues.

C’est là qu’Airflow entre en jeu : cet outil va justement nous permettre de gérer les dépendances entre nos tâches, on parle alors de workflow ou encore de pipeline. Nous déclarons nos tâches à Airflow, et ce dernier s’occupe de les lancer dans le bon ordre, de gérer les exceptions s’il y en a, et de relancer les tâches le cas échéant.

En bonus Airflow est livré avec une Webui extrêmement pratique qui va nous permettre de suivre l’avancement de nos tâches et de les monitorer :

Les Concepts

L’utilisation d’Airflow se fait via du code Python. Mais avant cela voici quelques concepts qu’il vous faudra connaître afin de mettre les mains dans le cambouis.

DAG & Tasks

Airflow est principalement basé sur le concept de DAG, pour Directed Acyclic Graph. Un DAG n’est ni plus ni moins qu’un graphe orienté sans retour possible. Nos tâches s’exécuteront donc dans un ordre précis, en parallèle ou à la suite, et ce sans risque de boucle infinie.

Un DAG est constitué de tasks liées les unes aux autres. Vous avez déjà certainement entendu parlé de l’acronyme ETL pour Extract Transform Load, qui consiste à récupérer des données d’une source, à les transformer dans un nouveau format, puis à les renvoyer dans une seconde source adaptée à ce nouveau format.

Et bien Airflow s’adapte parfaitement bien à ce modèle. Voici un exemple de DAG contenant 3 tâches :

Un DAG est lancé de manière régulière ou via un trigger. Un DAG lancé ou en cours d’exécution est appelé une DagInstance, ses tâches étant donc des TaskInstances.

Operators

Les tâches d’un DAG ne sont pas toutes forcément du même type : nous aurons parfois besoin de lancer un script Bash, d’autres fois notre logique métier sera contenue dans une fonction Python, et parfois nous aurons simplement besoin d’envoyer un mail.

La création d’une tâche passe par des operators. A chaque besoin son operator :

  • BashOperator : exécute une commande bash,
  • PythonOperator : exécute une fonction Python,
  • EmailOperator : envoie un mail,
  • DockerOperator : exécute une commande dans un container Docker,
  • HttpOperator : effectue une requête sur un endpoint HTTP,
  • etc…

La liste des operators intégrés à Airflow est longue, et vous pouvez bien sûr créer les vôtres si besoin.

Pour l’aspect technique, un operator est tout simplement une classe Python héritant de BaseOperator. Lorsque la tâche est appelée, la fonction execute() de l’operator est exécutée. Vous l’aurez compris, un operator instancié devient donc une task.

Executors

Lorsqu’une tâche est appelée, elle va s’exécuter quelque part : ce quelque part est géré par Airflow via les executors.

Par défaut Airflow utilise le SequentialExecutor : les tâches sont lancées les unes après les autres sur le serveur local (là où Airflow lui-même est lancé) sans aucun parallélisme. Cet executor est pratique lorsqu’on développe, mais les limites sont vites atteintes lorsque l’on passe à l’échelle.

On pourra dans ce cas utiliser le LocalExecutor, plus performant que le précédent car basé sur un modèle prefork : plusieurs tâches pourront donc être lancées en parallèle jusqu’à ce que les ressources de la machine locale soient utilisées. Ici le scaling est vertical (plus de ressources signifie plus de tasks lancées en même temps).

Et le meilleur pour la fin : le CeleryExecutor. C’est via cet executor que le scaling des tâches pourra réellement se faire puisqu’Airflow ira déléguer à Celery la distribution des tâches sur un pool de workers. Le scaling sera donc horizontal : plus nous aurons de workers plus nous pourrons lancer de tâches en parallèle. Notons tout de même que Celery nécessite l’utilisation d’un broker pour l’échange des messages, ainsi qu’une synchronisation du code sur l’ensemble des workers (facilement réalisable avec l’utilisation de Redis & Kubernetes par exemple).

Création du DAG

Bon assez parlé, je me doute que vous avez envie de tester Airflow de ce pas. L’installation est commune à tous les packages Python :

$ pip install apache-airflow

Airflow va chercher les Dags dans le répertoire ~/airflow, mais nous pouvons changer ce comportement via la variable d’environnement AIRFLOW_HOME :

$ export AIRFLOW_HOME=~/Dev/python/demo_airflow

Une base de données est nécessaire pour faire fonctionner Airflow, nous la créons avec la CLI fournie lors de l’installation :

$ airflow initdb
$ ls
airflow.cfg   airflow.db    logs          unittests.cfg

Plusieurs fichiers ont été créés, les plus importans étant les deux suivants :

  • airflow.db : il s’agit de la base de données utilisées par Airflow pour gérer nos Dags et leurs tasks. Ce fichier est au format SQLite3, on pourra le garder tout au long de nos développements mais je vous conseille fortement de passer sur un autre type (PostgreSQL ou MySQL par exemple) lors du passage en production.
  • airflow.cfg : ce fichier contient la configuration d’Airflow, c’est ici par exemple que vous configurerez l’accès à la base de données, l’executor à utiliser, les paramètres de parallélisation, et j’en passe.

Créons maintenant notre DAG qui contiendra plus tard nos différentes tasks. La première étape consiste à instancier un object DAG et à lui fournir certains paramètres :

from datetime import datetime, timedelta

from airflow import DAG

default_args = {
    'owner': 'ncrocfer',
    'start_date': datetime(2018, 10, 31),
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('demo', default_args=default_args, schedule_interval='0 * * * *')

Les paramètres que nous déclarons dans la variable default_args sont assez parlants :

  • owner : l’utilisateur du DAG.
  • start_date : Airflow lancera le DAG à partir de cette date.
  • retries : le nombre d’essai qui sera effectué si une tâche tombe en erreur. Une fois cette valeur atteinte, le DAG tombera en erreur et sera stoppé.
  • retry_delay : le temps d’attente entre deux essais.

Vous avez dû remarquer un pattern qui vous parle si vous utilisez les tâches cron : 0 * * * *. C’est en effet ici que l’on définit le cycle d’exécution du DAG, dans notre cas notre DAG demo sera exécuté chaque heure. La syntaxe est la même qu’avec les crontab, et certains raccourcis sont disponibles.

Vous devriez déjà voir votre DAG apparaître dans la liste :

$ airflow list_dags
[2018-11-01 16:35:35,220] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-11-01 16:35:35,564] {models.py:258} INFO - Filling up the DagBag from /Users/ncrocfer/Dev/python/demo_airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
demo

Note : il se peut que d’autres DAG apparaissent dans votre liste. C’est en fait Airflow qui inclut des exemples de DAG. Vous pouvez les supprimer en passant à False la variable load_examples du fichier airflow.cfg, puis en remettant à zéro la base de données avec la commande airflow resetdb.

Création des Tâches

Nous devons maintenant créer nos tâches. Je vais prendre comme exemple le site Saucs.com, pour lequel je gère la partie développement. Pour rappel ce site permet aux utilisateurs de s’abonner à des produits et de recevoir un rapport par mail dès qu’une vulnérabilité apparaît sur le site du NVD.

L’ordre des tâches est donc le suivant :

  1. CheckUpdates : On vérifie sur le site du NVD si une mise à jour des CVE a eu lieu,
  2. DownloadXml : On télécharge les nouvelles CVE,
  3. ParseXml : On parse les nouvelles CVE et on met à jour notre base de données,
  4. SendMails : On envoie un mail aux utilisateurs abonnés aux produits impactés.

Comme expliqué plus haut, la création de tâches se fait via l’utilisation des operators. Nous allons pour cet exemple utiliser plusieurs PythonOperator auxquels nous passons des fonctions Python :

Voici le code final de notre DAG :

import gzip
import logging
import re
from datetime import datetime, timedelta
from io import BytesIO
from xml.etree import ElementTree as ET

import requests
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator


default_args = {
    'owner': 'ncrocfer',
    'start_date': datetime(2018, 10, 31),
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}


logger = logging.getLogger(__name__)


NVD_MODIFIED_META_URL = 'https://static.nvd.nist.gov/feeds/xml/cve/2.0/nvdcve-2.0-Modified.meta'
NVD_MODIFIED_URL = 'https://static.nvd.nist.gov/feeds/xml/cve/nvdcve-2.0-Modified.xml.gz'
LAST_NVD_HASH = '/tmp/lastnvd'


def check_updates():
    logger.info("Downloading {url}...".format(url=NVD_MODIFIED_META_URL))

    resp = requests.get(NVD_MODIFIED_META_URL)
    buf = BytesIO(resp.content).read().decode('utf-8')

    nvd_sha256 = buf.split('sha256')[1][1:-2]
    logger.info("New NVD hash is : {hash}".format(hash=nvd_sha256))

    try:
        with open(LAST_NVD_HASH) as f:
            last_nvd256 = f.read()
    except FileNotFoundError:
        last_nvd256 = None
    logger.info("Local hash is : {hash}".format(hash=last_nvd256))

    if nvd_sha256 != last_nvd256:
        logger.info("Hashes differ, Saucs database needs to be updated...")
        return {'updated': True, 'hash': nvd_sha256}
    else:
        logger.info("Hashes are the same, nothing to do.")
        return {'updated': False}


def download_xml(ds, **context):
    update = context['task_instance'].xcom_pull(task_ids='CheckUpdates')
    if not update['updated']:
        return

    logger.info("Downloading {url}...".format(url=NVD_MODIFIED_URL))

    resp = requests.get(NVD_MODIFIED_URL)
    buf = BytesIO(resp.content)
    data = gzip.GzipFile(fileobj=buf)

    xml_string = data.read().decode('utf-8')
    xml_string = re.sub(' xmlns="[^"]+"', '', xml_string)

    with open('/tmp/{0}.xml'.format(ds), 'w') as f:
        f.write(xml_string)


def parse_xml(ds, **context):
    update = context['task_instance'].xcom_pull(task_ids='CheckUpdates')
    if not update['updated']:
        return

    parser = ET.XMLParser(encoding="utf-8")
    tree = ET.parse('/tmp/{0}.xml'.format(ds), parser=parser)
    root = tree.getroot()

    for child in root:
        logger.info("Parsing {cve}...".format(cve=child.attrib.get('id')))

        # 1. Process the CVE (new CVE, CPE updated, references updated, CVSS updated...)
        # 2. Create an alert for all impacted users


def send_mails(ds, **context):
    update = context['task_instance'].xcom_pull(task_ids='CheckUpdates')
    if not update['updated']:
        return

    logger.info('Sending mail for users with an alert')
    """users = get_users_with_alerts()

    for user in users:
        send_user_mail()
        user.new_alerts = False"""

    # We're done, we can set the new NVD hash in local
    with open(LAST_NVD_HASH, 'w') as f:
        f.write(update['hash'])


dag = DAG('demo', default_args=default_args, schedule_interval='0 * * * *')

with dag:
    check_updates_op = PythonOperator(
        task_id='CheckUpdates',
        python_callable=check_updates
    )

    download_xml_op = PythonOperator(
        task_id='DownloadXml',
        provide_context=True,
        python_callable=download_xml
    )

    parse_xml_op = PythonOperator(
        task_id='ParseXml',
        provide_context=True,
        python_callable=parse_xml
    )

    send_mails_op = PythonOperator(
        task_id='SendMails',
        provide_context=True,
        python_callable=send_mails
    )


# Order the tasks
check_updates_op >> download_xml_op >> parse_xml_op >> send_mails_op

"""
# Can be written :
check_updates_op.set_downstream(download_xml_op)
download_xml_op.set_downstream(parse_xml_op)
parse_xml_op.set_downstream(send_mails_op)
"""

La logique des 4 fonctions n’est pas compliquée et ne concerne pas directement Airflow, je ne m’étalerai pas dessus. En revanche certaines parties du code sont nouvelles :

  1. On constate l’ouverture d’un context manager (with dag) juste après la déclaration du DAG : cela signifie que tous les opérateurs déclarés à l’intérieur de celui-ci seront liés à notre DAG.
  2. Certains opérateurs possèdent le paramètre provide_context : dans ce cas les fonctions à lancer (renseignées via le paramètre python_callable) recevront les paramètres ds et context. Le premier est un string représentant la date d’éxecution de la tâche, le second est un dictionnaire contenant des informations relatives à l’instance de tâche.
  3. Nous utilisons les xcom : il s’agit d’une fonctionnalité proposée par Airflow afin de communiquer des informations d’une tâche à une autre.
  4. la dernière ligne peut paraître obscure : il s’agit en fait tout simplement de la façon dont nous déclarons l’ordre de lancement de nos tâches. Nous utilisons ici les opérateurs bits à bits (Bitwise operators) car je trouve que la forme est assez parlante, mais Airflow propose également les méthodes set_upstream() et set_downstream().

Nous pouvons tester que tout fonctionne correctement grâce à la commande list_tasks :

$ airflow list_tasks demo --tree
2018-11-01 18:45:03,572] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-11-01 18:45:03,785] {models.py:258} INFO - Filling up the DagBag from /Users/ncrocfer/Dev/python/demo_airflow/dags
<Task(PythonOperator): SendMails>
    <Task(PythonOperator): ParseXml>
        <Task(PythonOperator): DownloadXml>
            <Task(PythonOperator): CheckUpdates>

Exécution du DAG

Tout semble en ordre, nous pouvons lancer le serveur web :

$ airflow webserver

Par défaut l’interface est disponible en localhost sur le port 8080. Nous pouvons y voir notre DAG :

L’exécution du DAG nécessite deux actions :

  • tout d’abord il faut l’activer (le bouton On/Off),
  • puis vous devez lancer le scheduler d’Airflow.

C’est ce dernier qui va gérer pour nous l’ordonnancement des tâches, leur retry si besoin, et c’est également lui qui vérifiera en permanence si un nouveau DagRun ne doit pas être lancé :

$ airflow scheduler

Cliquez maintenant sur le DAG demo, puis sur l’onglet Graph View. Le DAG a bien été lancé :

Airflow s’occupe désormais de tout et va lancer le DAG chaque heure, et ce depuis le start_date (le scheduler effectue ce qu’on appelle le backfill, ce comportement étant modifiable grâce à la configuration catchup_by_default ou au paramètre catchup du DAG lui-même) :

Pour finir il est important de noter que le monitoring de nos tâches est très simple puisqu’Airflow fournit une interface pour visualiser les logs (et donc potentiellement les exceptions lancées).

Vous pouvez voir les logs d’une tâche en cliquant sur la tâche puis sur le bouton View Log :

Conclusion

Je m’arrête ici pour cette présentation d’Apache Airflow. Il y a beaucoup de chose à dire dessus, nous n’avons fait qu’effleurer le sujet mais j’espère néanmoins vous avoir donner envie de le tester.

L’exemple que j’ai donné était très simple : j’ai volontairement épuré le Dag de Saucs afin de vous montrer un exemple concret d’usage d’Airflow sans polluer le code par de la logique externe au tool. De même vous aurez peut-être remarqué que chaque tâche est exécutée même si aucune mise à jour n’a été effectuée par le NVD. La solution dans ce cas aurait été d’utiliser l’opérateur BranchPythonOperator afin de renvoyer vers la tâche adéquate, mais cela aurait compliqué cet article et ce n’était pas l’objectif souhaité.

Airflow est un outil que j’utilise désormais au quotidien, et je ne peux que vous conseiller de l’essayer vous-aussi. Je tenterai dans les prochains articles de couvrir d’autres fonctionnalités d’Airflow, tels que les BranchOperator, le CeleryExecutor ou encore la création dynamique de Dags.

07 Aug 2017, 01:03

Gérer vos CVE avec Saucs

Je vais vous présenter un projet infosec sur lequel je travaille depuis plusieurs mois : Saucs.com.

Cette plateforme web vous permet de manager vos alertes de sécurité en vous abonnant à différents produits et vendeurs. Dès lors qu’une nouvelle vulnérabilité est détectée, une alerte est créée et un rapport vous est envoyé par mail.

Vous pouvez également rechercher les vulnérabilités par produit ou plus globalement par vendeur :

Saucs se repose sur les bases maintenues par le MITRE et le NVD pour se synchroniser :

  • CVE (Common Vulnerability Enumeration),
  • CPE (Common Platform Enumeration),
  • CWE (Common Weakness Enumeration),
  • CVSS (Common Vulnerability Scoring System).

Ces bases contiennent des données organisées permettant de classifier les vulnérabilités relatives à des produits. Saucs utilise par exemple les CPE afin de fournir une liste de vendeurs et de produits auxquels s’abonner :

Concrètement les données fournies par le NVD sont parsées régulièrement et des tâches Celery analysent les abonnements des utilisateurs pour envoyer les notifications. Il peut s’agir d’une nouvelle CVE, mais également de la modification d’une CVE existante : nouvelle référence, changement du score CVSS, etc.

J’ai débuté ce projet seul : c’est une idée que j’avais en tête depuis pas mal de temps, et en tant que dev Python aucun problème pour créer le site et les robots. Mais ceux qui administrent leur propre site le savent, un site web ce n’est pas que du dev :)

J’ai donc rapidement été rejoint par Laurent pour prendre en charge la partie administration : il gère entre autres les serveurs web, les mails, le load balancing, le scaling automatique, l’architecture master/slave des databases, leur réplication, etc. Il fera d’ailleurs prochainement un article sur le blog de Saucs afin d’expliquer comment bootstraper rapidement une architecture HA et fault tolerant.

Côté dev le site est entièrement codé en Python, grâce au framework Flask. Les tâches asynchrones (comme le parsing des CVE, la création des reports, l’envoie de mail, la mise en cache) sont prises en charge par Celery.

Je ne rentre pas plus en détail pour le moment sur ces aspects techniques puisque des articles dédiés seront mis en ligne sur le blog officiel de Saucs. Nous aimerions décrire les changements qui interviendront sur l’architecture de la plateforme au fur et à mesure que celle-ci grandira.

En parlant de ça, nous avons lancé le site il y a 3 semaines pour tâter le terrain et savoir si ce genre d’outil pouvait être utile à d’autres personne. Une semaine après l’envoi d’un premier Tweet nous étions déjà à 500 utilisateurs! Nous avons reçus beaucoup de feedbacks, notamment des demandes sur l’ajout d’autres sources ainsi qu’un système de filtres d’alertes.

C’est donc ce que nous allons faire dans les semaines à venir, à commencer par le système de Report qui a été lancé ce week-end :

Au niveau des fonctionnalités à venir, nous avons énormément d’idées :

  • filtrer les types de notifications,
  • ajout de nouvelles sources (DSA par exemple),
  • intégration de sources externes pour les notifications (Slack, Mattermost…),
  • discussions au sein de chaque CVE,
  • API / SDK,
  • applications mobiles,
  • etc.

Bref comme vous le voyez nous avons pas mal d’idées, et nous pensons que Saucs peut avoir un réel intérêt pour les particuliers comme pour les entreprises.

N’hésitez pas à nous contacter sur Twitter (Laurent ou moi) , tout feedback est le bienvenu :)

23 Dec 2015, 23:22

Installer un module à la création d'un virtualenv

L’utilisation des environnements virtuels (avec Virtualenv et Virtualenvwrapper) est inévitable si vous développez régulièrement en Python. D’une part leur utilisation permet de laisser votre hôte propre, et d’autre part différentes versions d’un même module peuvent être testées de manière très simple.

En revanche aucun module n’est disponible par défaut dans un nouvel environnement récemment créé.

Pour cela 2 solutions existent : la première est d’importer les modules disponibles dans l’environnement global (votre host). En gros un pip freeze dans votre host et dans votre virtualenv renverra le même résultat :

$ mkvirtualenv foo --system-site-packages

L’idée est d’installer et de mettre à jour sur votre host les modules que vous utilisez le plus souvent, et d’installer dans le virtualenv ceux qui sont plus spécifiques à chaque projet.

La seconde solution, si vous ne souhaitez vraiment rien installer sur le host, et d’installer automatiquement les modules à la création du virtualenv. C’est dans le fichier ~/.virtualenvs/postmkvirtualenv que les commandes se font :

#!/bin/bash
# This hook is sourced after a new virtualenv is activated.

pip install requests
pip install httpie

La création de l’environnement est un peu plus longue mais les modules sont directement disponibles dans leur dernière version :

$ mkvirtualenv foo
New python executable in foo/bin/python
Installing setuptools, pip...done.
Downloading/unpacking requests
  Downloading requests-2.9.1-py2.py3-none-any.whl (501kB): 501kB downloaded
Installing collected packages: requests
Successfully installed requests
Cleaning up...
Downloading/unpacking httpie
  Downloading httpie-0.9.2-py2.py3-none-any.whl (66kB): 66kB downloaded
Requirement already satisfied (use --upgrade to upgrade): requests>=2.3.0 in /home/ncrocfer/.virtualenvs/foo/lib/python2.7/site-packages (from httpie)
Downloading/unpacking Pygments>=1.5 (from httpie)
  Downloading Pygments-2.0.2-py2-none-any.whl (672kB): 672kB downloaded
Installing collected packages: httpie, Pygments
Successfully installed httpie Pygments
Cleaning up...
(foo) $ pip freeze
Pygments==2.0.2
argparse==1.2.1
httpie==0.9.2
requests==2.9.1
wsgiref==0.1.2
(foo) $

08 Dec 2015, 22:58

Twitter calendar

Si vous utilisez de temps à autre Github, vous êtes déjà probablement tombé sur le profil d’un développeur avec le calendrier de ses contributions. Plus le nombre de contributions est élevé, plus la couleur de la case sera foncée.

Je suis tombé sur la librairie cal-heatmap qui permet de faire exactement la même chose avec les données que l’on souhaite. J’ai donc décidé de créer l’équivalent pour Twitter, afin de visualiser facilement le nombre de Tweets envoyés par jour pour un utilisateur donné :

Ce calendrier représente les Tweets envoyés sur mon compte Twitter. Les données datent un peu puisque j’ai poussé le code sur Github il y a déjà quelques mois (et j’ai la flemme de refaire un screenshot ^^).

Les sources sont disponibles sur Github à cette adresse.

Alors ok, ce n’est pas super utile, mais je trouve plutôt sympa d’avoir une représentation visuelle de son activité quotidienne sur Twitter. Et c’est un bon exemple de ce qu’il est possible de faire avec ce module javascript.

30 Nov 2015, 21:01

Autoreload des modules sous iPython

Si vous utilisez iPython comme interpréteur Python (et si ce n’est pas le cas vous loupez quelque chose), il vous est certainement déjà arrivé de bosser dessus dans un terminal, dans un autre de modifier l’un des modules importés, puis de relancer la fonction dans iPython. Sauf que ce dernier n’a pas rechargé le code et que le résultat est le même qu’avant.

Prenez par exemple le code suivant :

def calcul(a, b):
    print(a+b)

Bon ok c’est bidon, mais c’est pour montrer le principe. Dans un autre terminal, démarrez iPython puis testez-le :

In [1]: from calcul import calcul
In [2]: calcul(2, 3)
5

Si vous modifiez la fonction dans le second terminal, par exemple :

def calcul(a, b):
    print(a*b)

La mise à jour ne sera pas répercutée dans iPython, il faudrait le fermer puis le relancer. L’activation de l’autoreload permet d’éviter cela :

In [3]: calcul(2, 3)
5

In [4]: %load_ext autoreload

In [5]: %autoreload 2

# ré-enregistrez le module calcul.py

In [6]: calcul(2, 3)
6

Comme vous le voyez les modifications effectuées après l’activation de l’extension sont automatiquement pris en compte. Si vous souhaitez que l’extension soit activée à chaque session, il faut créer un nouveau profile :

ncrocfer@home:~/ ipython profile create
[ProfileCreate] Generating default config file: '/home/ncrocfer/.ipython/profile_default/ipython_config.py'

Puis ajoutez à la fin les lignes suivantes :

c.InteractiveShellApp.exec_lines = []
c.InteractiveShellApp.exec_lines.append('%load_ext autoreload')
c.InteractiveShellApp.exec_lines.append('%autoreload 2')

L’autoreload des modules importés s’effectuera désormais automatiquement !