Description synthétique

Le générateur automatise la création d'un projet DBT (staging → intermediate → marts) à partir d’un fichier de spécification et d’une consigne en langage naturel. Le cœur du pipeline est un graphe LangGraph dont chaque nœud exécute une action sur un état partagé (state). L’étape pivot est la génération d’un plan structuré par un LLM (Groq), ensuite converti en fichiers SQL et YAML.

Stack technique

  • Python 3.12+, dbt-core
  • LangGraph (orchestration)
  • Groq (LLM) — modèle configurable via .env
  • Snowflake (ingestion des métadonnées)
  • Streamlit (interface web)

Fichiers clefs

Le projet est organisé de manière modulaire : src/core (logique), src/graph (nœuds & arêtes), formats/prompts (templates jinja2), inputs/ et outputs/.

Voir README complet

Architecture interne du projet

Le générateur s’appuie sur une architecture modulaire en couches : la CLI (main.py), les tasks pour les actions de haut niveau (install, run, clean...), le graphe LangGraph pour l’orchestration logique, et des modules core pour les fonctions métier (Snowflake, LLM, DBT).

Architecture interne du projet
Schéma de l’architecture interne (modules, graphes et dépendances)

Arborescence du projet

Structure du graphe LangGraph

L’un des aspects clés du projet est l’orchestration asynchrone des étapes via LangGraph. Chaque nœud du graphe exécute une fonction spécifique et modifie un State partagé entre tous les nœuds. Cela permet de suivre l’évolution du contexte global et d’assurer la continuité du pipeline, même entre plusieurs itérations.

Structure du graphe LangGraph
Vue simplifiée du graphe LangGraph

Chaque nœud représente une étape clé du pipeline : ingestion, planification, génération, sauvegarde, ou fin de processus. Les arêtes conditionnelles (edges) contrôlent la logique selon la présence d’un plan ou de fichiers précédemment sauvegardés.

Structure du State

                from typing import List, Dict, Any
                from typing_extensions import TypedDict


                class State(TypedDict, total=False):
                    """
                    Represents the shared state across different steps of the generation process.
                    """

                    plan: dict
                    generation_status: list = []
                    generated_files: dict
                    expected_format: dict
                    file_specs: dict
                    spec_file: str
                    ingested_marts_data: Dict[str, Any] = []
                    sources: dict
                    use_previous_plan: bool
                    path_to_saved_plan: str
                    use_previous_files: bool
                    path_to_saved_files: str
                    save_project: bool
                    project_name: str
                    profile_name: str
                    base_path: str
                    update_graph_status: str
          

Cette structure normalisée permet de centraliser toutes les données de contexte (spécification, plan, fichiers générés, etc.) et de passer proprement les informations entre nœuds. Cela facilite aussi la sauvegarde intermédiaire pour relancer le pipeline à mi-parcours si besoin.

Flux d'exécution (vue globale)

Ingestion des métadonnées — Snowflake

Le pipeline peut connecter Snowflake pour extraire la liste des tables et des colonnes (nom, type data). Ces métadonnées sont injectées dans state["expected_format"]["sources"] et utilisées pour enrichir le prompt du LLM et produire un schema.yml conforme.

Paramètres Snowflake requis

SNOWFLAKE_ACCOUNT Identifiant du compte (ex. xy12345.eu-west-1)
SNOWFLAKE_USER Utilisateur DBT
SNOWFLAKE_PASSWORD Mot de passe (masqué dans l'UI)
SNOWFLAKE_ROLE Rôle (ex. SYSADMIN)
SNOWFLAKE_WAREHOUSE Entrepôt (compute)
SNOWFLAKE_DATABASE / SCHEMA Base et schéma cibles (ex. ANALYTICS_DB.PUBLIC)

Comportement

  • La connexion s’effectue via la librairie Snowflake Connector (config dans config/variables.py).
  • Le noeud node_ingestion appelle ingest_metadata_from_snowflake_to_sources_format.
  • Les métadonnées retournées sont normalisées sous la forme attendue par DBT (liste de sources → tables → colonnes).

CLI & Tasks — Utilisation sans interface graphique

Le projet propose une interface en ligne de commande (entrée principale : main.py) qui délègue des actions « tâches » (install, init, run, clean, backup) implémentées sous src/core/tasks/. Cela permet d'exécuter l'ensemble du pipeline dans un environnement headless (serveur CI, VM, etc.).

Commandes principales

  • python main.py install — crée un environnement virtuel et installe requirements.txt.
  • python main.py init — lance la configuration interactive (.env, profiles.yml) via popups Tkinter.
  • python main.py run — exécution interactive du pipeline en console (live logs).
  • python main.py clean — supprime __pycache__ et outputs/.
  • python main.py backup <project_name> — crée un snapshot horodaté dans backups/.

Extrait : squelette d’un task (ex: task_run)

      # src/core/tasks/task_run.py (schéma)
      def run_console():
          base_path = input("Base path: ")
          project_name = input("Project name: ")
          spec_file = open(input("Path to specs: "), "rb")
          # configure logger (InMemoryLogger)
          reset_graph_status()
          result_state = generate_dbt_project(..., spec_file, ...)
          # stream logs while thread alive
          # print summary
        
Séquence CLI → Tasks → pipeline
Flux d’appel : CLI → tâche → orchestration (generate_dbt_project).

Descriptions détaillées des nœuds du graphe

Ci-dessous la liste des nœuds principaux avec leur rôle, entrées, traitement et sorties. Ces descriptions correspondent aux fichiers dans src/graph/nodes/.

node_init_project — Initialisation du projet

Rôle : préparer l’arborescence du projet DBT (dossiers models/, tests/, marts/, logs/, saved/), injecter les paramètres initiaux (project_name, base_path, profile_name) dans state et vérifier les conflits de nom.

  • Entrées : project_name, base_path, profile_name, spec_file
  • Traitement : vérifie existence du dossier, incrémente suffixe si besoin, crée répertoires.
  • Sorties : state.base_path, state.project_name, state.path_to_logs.

node_ingestion — Ingestion des spécifications & métadonnées Snowflake

Rôle : lire le fichier utilisateur (XLSX/CSV/TSV) pour produire ingested_marts_data et appeler la fonction Snowflake pour récupérer la liste des tables/colonnes et alimenter expected_format["sources"].

  • Entrées : state.spec_file (UploadedFile ou file handle), connexion Snowflake via config.
  • Traitement : parsing de l’excel → standardisation (column_name, description, sql) ; appel à ingest_metadata_from_snowflake_to_sources_format(log).
  • Sorties : state.ingested_marts_data, state.expected_format["sources"].
Flow ingestion Snowflake
Exemple d’export de métadonnées Snowflake (tables → colonnes → types).

node_generate_plan — Génération d’un plan structuré par LLM

Rôle : construire un prompt enrichi (Jinja2 template) contenant expected_format et ingested_marts_data, appeler le LLM (via get_llm()) et parser la réponse JSON en state.plan.

  • Entrées : state.expected_format, state.ingested_marts_data
  • Traitement : rendu du prompt (PLAN_PROMPT_PATH) → appel LLM → extract_plan() → validation → stockage.
  • Sorties : state.plan (structures : models[], tests[], dependencies[]).

Note : le module gère les erreurs LLM (timeout, rate-limit, valeur non-json) et incrémente un compteur de retry.

[insérer exemple : plan produit complet — voir outputs/<project>/saved/saved_plan.json]

node_generate_source_yaml — Génération de schema.yml pour les sources

Rôle : à partir de expected_format["sources"] et state.plan["models"], produire un YAML compatible DBT (version 2) décrivant sources + models.

  • Entrées : state.expected_format["sources"], state.plan
  • Traitement : construction des blocs sources et models, heuristiques (tests not_null si nom de colonne se termine par "id").
  • Sorties : state.generated_files["sources"][...] et fichier écrit sous models/staging/schema.yml.

node_prepare_file_specs — Préparation des spécifications de fichiers

Rôle : transformer le state.plan en spécifications complètes pour chaque fichier attendu (models, tests). Chaque spec contient fields requis (name, description, path, columns, dependencies, filetype).

# Extrait d'un spec (simplifié)
"models": {
  "models/staging/stg_expeditions": {
    "name": "stg_expeditions",
    "path": "models/staging/",
    "columns": [...],
    "dependencies": "himalayan_expeditions_seeds.EXPED",
    "filetype": "models"
  }, ...
}
    

node_generate_file — Génération de code via LLM (templates)

Rôle : pour chaque spec, construire un contexte (Jinja2) et appeler le LLM pour produire le contenu du fichier (SQL pour models/tests). Les prompts sont stockés sous formats/prompts/ (e.g., prompt_file_model.txt).

  • Entrées : state.file_specs[filetype]
  • Traitement : rendu du prompt → LLM.invoke(prompt) → extraire response.content (cleanup code fences).
  • Sorties : state.generated_files[filetype][path] = content.

node_create_file — Écriture des fichiers sur disque

Rôle : écrire physiquement les fichiers DBT à partir de generated_files, en nettoyant le contenu et en respectant l’architecture DBT.

  • Entrées : state.generated_files, state.file_specs, state.base_path
  • Traitement : extraire bloc de code, composer chemin complet, créer répertoires, écrire via write_code_to_file().
  • Sorties : fichiers écrits et logs d’écriture.

node_save_outputs — Persist & save

Rôle : si l’utilisateur a demandé la sauvegarde, écrire saved/saved_plan.json et saved/saved_files.json dans le dossier du projet pour permettre réutilisation / débogage ultérieur.

  • Entrées : state.plan, state.generated_files, state.base_path
  • Sortie : fichiers JSON sous <base_path>/saved/.

node_end_process & node_end_process_bad_plan_generation — Fin du pipeline

Le nœud final nettoie l’état (serialisation sûre via safe_serialize), construit un résumé lisible (state.result_summary) et s’assure que l’état peut être dumpé (JSON). Un nœud spécifique gère le cas d’un échec récurrent de génération de plan (bad_plan_generation).

  • Comportement : compile un résumé, transforme les objets non-serialisables en chaîne, log final.

Logging & Monitoring

Le projet implémente un logger en mémoire (affiché dans Streamlit) et des handlers vers des fichiers: debug.log, warnings.log, errors.log. Le module InMemoryLogger encapsule ces comportements (formatters, filters, redirection de stderr).

Comportement clé

  • Logs affichés en temps réel dans Streamlit via la fonction display_graph_and_logs_placeholders().
  • Buffer en mémoire (StringIO) pour téléchargement / affichage final.
  • Handlers : INFO→info.log, DEBUG→debug.log, WARNING→warnings.log (exact), ERROR→errors.log (exact).
Architecture logging
Schéma : flux des logs (Worker thread → InMemoryLogger → Streamlit UI et fichiers).

Conseil pratique

En environnement de production, envisager la redirection des fichiers de logs vers un stockage centralisé (ELK, CloudWatch) et augmenter la rotation des fichiers.

Backups & outputs

Le dossier outputs/ contient les projets générés ; la commande backup copie un snapshot horodaté dans backups/. Les sauvegardes incluent les logs et le dossier saved/ (saved_plan.json, saved_files.json).

Exemple d’arborescence sauvegardée

outputs/
└─ my_project/
   ├─ models/
   ├─ tests/
   ├─ logs/
   │   ├─ debug.log
   │   └─ generation.log
   └─ saved/
       ├─ saved_plan.json
       └─ saved_files.json
  

Pour restaurer, il suffit de copier le dossier (backups/my_project_<timestamp>) vers outputs/ ou d’extraire saved_plan.json & saved_files.json pour relancer le pipeline en mode « reuse previous plan/files ».

Sécurité & gestion des identifiants

Les secrets (GROQ_API_KEY, SNOWFLAKE_PASSWORD, etc.) sont stockés dans .env. L’outil interactif (init) propose une saisie via fenêtres masquées (Tkinter). Ne commitez jamais .env dans Git.

Bonnes pratiques

  • Utiliser des comptes de service dédiés avec des rôles limités (minimum nécessaire).
  • Ne donner que les droits nécessaires au profil DBT (lectures métadonnées / selects si besoin).
  • Limiter la rotation d’API keys et mettre en place un stockage secret (Vault, GitHub Secrets) en production.

Détails Snowflake

Les paramètres à renseigner sont : SNOWFLAKE_ACCOUNT, SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ROLE, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA. Le rôle doit avoir le droit d’exécuter SHOW TABLES et SHOW COLUMNS dans le schéma ciblé.

Extensibilité & bonnes pratiques

Le code est conçu pour être modulaire. Voici des axes d’amélioration faciles à implémenter :

  • Multi-marts : supporter plusieurs marts dans le même plan (boucler sur plusieurs dossiers / domaines).
  • Fichiers dépendants : intégrer la génération d’assets SQL partagés et la résolution automatique des dépendances.
  • Cache LLM : stocker les requêtes/réponses LLM pour éviter des coûts et accélérer les essais.
  • Tests unitaires : ajouter validations sur plan (schéma JSON) et fixtures pour les prompts.

[insérer suggestions graphiques / diagrammes d’extension]

Exemple (extrait) de plan produit

Ci-dessous un extrait abrégé du JSON produit par le LLM et stocké dans state["plan"] :

{
  "models": [
    {
      "name": "stg_expeditions",
      "description": "A staging model for the EXPED table",
      "model_type": "stage",
      "domain": "himalayan_expeditions",
      "path": "models/staging/stg_expeditions.sql",
      "columns": [
        {"name": "expid", "data_type": "TEXT"},
        {"name": "year", "data_type": "FIXED"},
        ...
      ],
      "tests": [
        {"name": "not_null_expid", "description": "expid should not be null"}
      ]
    },
    {
      "name": "int_expeditions_with_peaks",
      "description": "Join of expeditions + peaks",
      "model_type": "intermediate",
      "dependencies": ["stg_expeditions", "stg_peaks"],
      ...
    }
  ],
  "tests": [
    {"name": "test_stg_expeditions_not_null", "model": "stg_expeditions", ...}
  ]
}
        

[insérer exemple complet : plan produit] — le plan réel sauvegardé est plus verbeux (sources, tables, colonnes, chemins).

Interface Streamlit

L’interface graphique permet de lancer la génération DBT sans passer par la CLI. Elle affiche les logs en temps réel, une animation d’attente montrant les étapes en cours de la génération du projet. Une fois généré dans le répertoire outputs/, on peut télécharger les logs, ou l'état pour du débuggage.

Interface Streamlit du projet
Interface Streamlit — génération et suivi en direct

Compétences & enseignements

Ce projet m’a permis d’approfondir la conception d’architectures de données intelligentes et la génération de code automatisée. J’ai exploré en profondeur :

  • L’orchestration de workflows complexes avec LangGraph.
  • L’intégration et la configuration avancée de Snowflake.
  • La génération dynamique de fichiers dbt (modèles, tests, schema.yml).
  • La communication et le prompt engineering pour un LLM Groq.
  • La création d’une interface Streamlit réactive avec logs en temps réel.

Au-delà de la technique, ce projet démontre comment l’IA peut accélérer le développement de pipelines de données robustes et documentés.