Le générateur automatise la création d'un projet DBT (staging →
intermediate → marts) à partir d’un fichier de spécification et
d’une consigne en langage naturel. Le cœur du pipeline est un
graphe LangGraph dont chaque nœud exécute une action sur un état
partagé (state). L’étape pivot est la génération d’un
plan structuré par un LLM (Groq), ensuite converti en
fichiers SQL et YAML.
Stack technique
Python 3.12+, dbt-core
LangGraph (orchestration)
Groq (LLM) — modèle configurable via .env
Snowflake (ingestion des métadonnées)
Streamlit (interface web)
Fichiers clefs
Le projet est organisé de manière modulaire :
src/core (logique), src/graph (nœuds &
arêtes), formats/prompts (templates jinja2),
inputs/ et outputs/.
Le générateur s’appuie sur une architecture modulaire en couches :
la CLI (main.py), les tasks pour les
actions de haut niveau (install, run, clean...), le
graphe LangGraph pour l’orchestration logique, et
des modules core pour les fonctions métier
(Snowflake, LLM, DBT).
Schéma de l’architecture interne (modules, graphes et dépendances)
Arborescence du projet
Structure du graphe LangGraph
L’un des aspects clés du projet est l’orchestration asynchrone des
étapes via LangGraph. Chaque nœud du graphe exécute
une fonction spécifique et modifie un State partagé
entre tous les nœuds. Cela permet de suivre l’évolution du contexte
global et d’assurer la continuité du pipeline, même entre plusieurs
itérations.
Vue simplifiée du graphe LangGraph
Chaque nœud représente une étape clé du pipeline : ingestion,
planification, génération, sauvegarde, ou fin de processus. Les
arêtes conditionnelles (edges) contrôlent la logique selon
la présence d’un plan ou de fichiers précédemment sauvegardés.
Structure du State
from typing import List, Dict, Any
from typing_extensions import TypedDict
class State(TypedDict, total=False):
"""
Represents the shared state across different steps of the generation process.
"""
plan: dict
generation_status: list = []
generated_files: dict
expected_format: dict
file_specs: dict
spec_file: str
ingested_marts_data: Dict[str, Any] = []
sources: dict
use_previous_plan: bool
path_to_saved_plan: str
use_previous_files: bool
path_to_saved_files: str
save_project: bool
project_name: str
profile_name: str
base_path: str
update_graph_status: str
Cette structure normalisée permet de centraliser toutes les données
de contexte (spécification, plan, fichiers générés, etc.) et de
passer proprement les informations entre nœuds. Cela facilite aussi
la sauvegarde intermédiaire pour relancer le pipeline à mi-parcours
si besoin.
Flux d'exécution (vue globale)
Ingestion des métadonnées — Snowflake
Le pipeline peut connecter Snowflake pour extraire la liste des
tables et des colonnes (nom, type data). Ces métadonnées sont
injectées dans state["expected_format"]["sources"] et
utilisées pour enrichir le prompt du LLM et produire un
schema.yml conforme.
Paramètres Snowflake requis
SNOWFLAKE_ACCOUNT
Identifiant du compte (ex. xy12345.eu-west-1)
SNOWFLAKE_USER
Utilisateur DBT
SNOWFLAKE_PASSWORD
Mot de passe (masqué dans l'UI)
SNOWFLAKE_ROLE
Rôle (ex. SYSADMIN)
SNOWFLAKE_WAREHOUSE
Entrepôt (compute)
SNOWFLAKE_DATABASE / SCHEMA
Base et schéma cibles (ex. ANALYTICS_DB.PUBLIC)
Comportement
La connexion s’effectue via la librairie Snowflake Connector
(config dans config/variables.py).
Le noeud node_ingestion appelle
ingest_metadata_from_snowflake_to_sources_format.
Les métadonnées retournées sont normalisées sous la forme
attendue par DBT (liste de sources → tables → colonnes).
CLI & Tasks — Utilisation sans interface graphique
Le projet propose une interface en ligne de commande (entrée
principale : main.py) qui délègue des actions « tâches
» (install, init, run, clean, backup) implémentées sous
src/core/tasks/. Cela permet d'exécuter l'ensemble du
pipeline dans un environnement headless (serveur CI, VM, etc.).
Commandes principales
python main.py install — crée un environnement
virtuel et installe requirements.txt.
python main.py init — lance la configuration
interactive (.env, profiles.yml) via popups Tkinter.
python main.py run — exécution interactive du
pipeline en console (live logs).
python main.py clean — supprime
__pycache__ et outputs/.
python main.py backup <project_name> — crée un
snapshot horodaté dans backups/.
Ci-dessous la liste des nœuds principaux avec leur rôle, entrées,
traitement et sorties. Ces descriptions correspondent aux fichiers
dans src/graph/nodes/.
node_init_project — Initialisation du projet
Rôle : préparer l’arborescence du projet DBT (dossiers models/,
tests/, marts/, logs/, saved/), injecter les paramètres initiaux
(project_name, base_path, profile_name) dans state et
vérifier les conflits de nom.
node_ingestion — Ingestion des spécifications & métadonnées
Snowflake
Rôle : lire le fichier utilisateur (XLSX/CSV/TSV) pour produire
ingested_marts_data et appeler la fonction Snowflake
pour récupérer la liste des tables/colonnes et alimenter
expected_format["sources"].
Entrées :
state.spec_file (UploadedFile ou file handle),
connexion Snowflake via config.
Traitement : parsing de l’excel →
standardisation (column_name, description, sql) ; appel à
ingest_metadata_from_snowflake_to_sources_format(log).
Exemple d’export de métadonnées Snowflake (tables → colonnes →
types).
node_generate_plan — Génération d’un plan structuré par LLM
Rôle : construire un prompt enrichi (Jinja2 template) contenant
expected_format et ingested_marts_data,
appeler le LLM (via get_llm()) et parser la réponse
JSON en state.plan.
Traitement : construction des blocs
sources et models, heuristiques (tests
not_null si nom de colonne se termine par "id").
Sorties :
state.generated_files["sources"][...] et fichier
écrit sous models/staging/schema.yml.
node_prepare_file_specs — Préparation des spécifications de
fichiers
Rôle : transformer le state.plan en spécifications
complètes pour chaque fichier attendu (models, tests). Chaque spec
contient fields requis (name, description, path, columns,
dependencies, filetype).
node_generate_file — Génération de code via LLM (templates)
Rôle : pour chaque spec, construire un contexte (Jinja2) et
appeler le LLM pour produire le contenu du fichier (SQL pour
models/tests). Les prompts sont stockés sous
formats/prompts/ (e.g.,
prompt_file_model.txt).
Traitement : extraire bloc de code, composer
chemin complet, créer répertoires, écrire via
write_code_to_file().
Sorties : fichiers écrits et logs d’écriture.
node_save_outputs — Persist & save
Rôle : si l’utilisateur a demandé la sauvegarde, écrire
saved/saved_plan.json et
saved/saved_files.json
dans le dossier du projet pour permettre réutilisation / débogage
ultérieur.
node_end_process & node_end_process_bad_plan_generation — Fin
du pipeline
Le nœud final nettoie l’état (serialisation sûre via
safe_serialize), construit un résumé lisible
(state.result_summary) et s’assure que l’état peut
être dumpé (JSON). Un nœud spécifique gère le cas d’un échec
récurrent de génération de plan (bad_plan_generation).
Comportement : compile un résumé, transforme
les objets non-serialisables en chaîne, log final.
Logging & Monitoring
Le projet implémente un logger en mémoire (affiché dans Streamlit)
et des handlers vers des fichiers: debug.log,
warnings.log, errors.log. Le module
InMemoryLogger encapsule ces comportements (formatters,
filters, redirection de stderr).
Comportement clé
Logs affichés en temps réel dans Streamlit via la fonction
display_graph_and_logs_placeholders().
Buffer en mémoire (StringIO) pour téléchargement / affichage
final.
Schéma : flux des logs (Worker thread → InMemoryLogger → Streamlit
UI et fichiers).
Conseil pratique
En environnement de production, envisager la redirection des
fichiers de logs vers un stockage centralisé (ELK, CloudWatch) et
augmenter la rotation des fichiers.
Backups & outputs
Le dossier outputs/ contient les projets générés ; la
commande backup copie un snapshot horodaté dans
backups/. Les sauvegardes incluent les logs et le
dossier saved/ (saved_plan.json, saved_files.json).
Pour restaurer, il suffit de copier le dossier
(backups/my_project_<timestamp>) vers
outputs/ ou d’extraire
saved_plan.json & saved_files.json
pour relancer le pipeline en mode « reuse previous plan/files ».
Sécurité & gestion des identifiants
Les secrets (GROQ_API_KEY, SNOWFLAKE_PASSWORD, etc.) sont stockés
dans .env. L’outil interactif (init)
propose une saisie via fenêtres masquées (Tkinter). Ne commitez
jamais .env dans Git.
Bonnes pratiques
Utiliser des comptes de service dédiés avec des rôles limités
(minimum nécessaire).
Ne donner que les droits nécessaires au profil DBT (lectures
métadonnées / selects si besoin).
Limiter la rotation d’API keys et mettre en place un stockage
secret (Vault, GitHub Secrets) en production.
Détails Snowflake
Les paramètres à renseigner sont : SNOWFLAKE_ACCOUNT,
SNOWFLAKE_USER, SNOWFLAKE_PASSWORD,
SNOWFLAKE_ROLE, SNOWFLAKE_WAREHOUSE,
SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA. Le
rôle doit avoir le droit d’exécuter SHOW TABLES et
SHOW COLUMNS dans le schéma ciblé.
Extensibilité & bonnes pratiques
Le code est conçu pour être modulaire. Voici des axes d’amélioration
faciles à implémenter :
Multi-marts : supporter plusieurs marts dans le
même plan (boucler sur plusieurs dossiers / domaines).
Fichiers dépendants : intégrer la génération
d’assets SQL partagés et la résolution automatique des
dépendances.
Cache LLM : stocker les requêtes/réponses LLM
pour éviter des coûts et accélérer les essais.
Tests unitaires : ajouter validations sur
plan (schéma JSON) et fixtures pour les prompts.
Ci-dessous un extrait abrégé du JSON produit par le LLM et stocké
dans state["plan"] :
{
"models": [
{
"name": "stg_expeditions",
"description": "A staging model for the EXPED table",
"model_type": "stage",
"domain": "himalayan_expeditions",
"path": "models/staging/stg_expeditions.sql",
"columns": [
{"name": "expid", "data_type": "TEXT"},
{"name": "year", "data_type": "FIXED"},
...
],
"tests": [
{"name": "not_null_expid", "description": "expid should not be null"}
]
},
{
"name": "int_expeditions_with_peaks",
"description": "Join of expeditions + peaks",
"model_type": "intermediate",
"dependencies": ["stg_expeditions", "stg_peaks"],
...
}
],
"tests": [
{"name": "test_stg_expeditions_not_null", "model": "stg_expeditions", ...}
]
}
[insérer exemple complet : plan produit] — le plan réel sauvegardé
est plus verbeux (sources, tables, colonnes, chemins).
Visuels & documentation
Schéma du graphe LangGraph – orchestration du pipeline IA
Architecture globale : modules, nœuds et connexions du système
Exemple de fichier de spécification Excel utilisé pour
l’ingestion
L’interface graphique permet de lancer la génération DBT sans passer
par la CLI. Elle affiche les logs en temps réel, une animation
d’attente montrant les étapes en cours de la génération du projet.
Une fois généré dans le répertoire outputs/, on peut
télécharger les logs, ou l'état pour du débuggage.
Interface Streamlit — génération et suivi en direct
Compétences & enseignements
Ce projet m’a permis d’approfondir la conception d’architectures de
données intelligentes et la génération de code automatisée. J’ai
exploré en profondeur :
L’orchestration de workflows complexes avec
LangGraph.
L’intégration et la configuration avancée de
Snowflake.
La génération dynamique de fichiers dbt (modèles,
tests, schema.yml).
La communication et le prompt engineering pour un
LLM Groq.
La création d’une interface Streamlit réactive
avec logs en temps réel.
Au-delà de la technique, ce projet démontre comment l’IA peut
accélérer le développement de pipelines de données robustes et
documentés.