Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#### 2.6.4 2025-12-19

- Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156))
- Utilisation du logger de prefect plûtot que `log_prints=True`

#### 2.6.3 2025-12-16

Expand Down
7 changes: 7 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
SOLO_DATASET = os.getenv("SOLO_DATASET", "")
ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET

# Acheteurs absents de la base SIRENE (pour raisons de sécurité ou autre)
# Format: SIRET -> {"nom": "...", ...}
# Ces données sont utilisées en fallback si l'acheteur n'est pas trouvé dans SIRENE
ACHETEURS_NON_SIRENE = {
"13001536500013": {"nom": "Ministère des Armées"},
}

with open(
make_path_from_env(
"DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json"
Expand Down
3 changes: 3 additions & 0 deletions src/tasks/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame:
.name.keep()
)

# Nettoyage des espaces dans titulaire_id (ex: " 33487372600239")
lf = lf.with_columns(pl.col("titulaire_id").str.strip_chars())

# Type identifiant = SIRET si vide (marches-securises.fr)
lf = lf.with_columns(
pl.when(
Expand Down
31 changes: 30 additions & 1 deletion src/tasks/enrich.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import polars as pl
import polars.selectors as cs

from src.config import LOG_LEVEL, SIRENE_DATA_DIR
from src.config import LOG_LEVEL, SIRENE_DATA_DIR, ACHETEURS_NON_SIRENE
from src.tasks.transform import (
extract_unique_acheteurs_siret,
extract_unique_titulaires_siret,
Expand Down Expand Up @@ -139,6 +139,9 @@ def enrich_from_sirene(lf: pl.LazyFrame):

lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id")

# Fallback pour les acheteurs absents de SIRENE (ex: Ministère des Armées)
lf = apply_acheteurs_non_sirene_fallback(lf)

# En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur
# des id de titulaires non-SIRET
lf = lf.join(
Expand All @@ -158,6 +161,32 @@ def enrich_from_sirene(lf: pl.LazyFrame):
return lf


def apply_acheteurs_non_sirene_fallback(lf: pl.LazyFrame) -> pl.LazyFrame:
"""Applique les données d'acheteurs non présents dans SIRENE en fallback.

Les acheteurs absents de SIRENE ET du fallback conservent acheteur_nom = NULL.
"""
if not ACHETEURS_NON_SIRENE:
return lf

# Créer un DataFrame de fallback à partir du dictionnaire
fallback_data = [
{"acheteur_id": siret, "acheteur_nom_fallback": data["nom"]}
for siret, data in ACHETEURS_NON_SIRENE.items()
]
lf_fallback = pl.LazyFrame(fallback_data)

# Joindre avec les données de fallback
lf = lf.join(lf_fallback, on="acheteur_id", how="left")

# Remplacer acheteur_nom NULL par la valeur de fallback (si disponible)
lf = lf.with_columns(
pl.coalesce("acheteur_nom", "acheteur_nom_fallback").alias("acheteur_nom")
).drop("acheteur_nom_fallback")

return lf


def calculate_distance(lf: pl.LazyFrame) -> pl.LazyFrame:
# Utilisation de polars_ds.haversine
# https://polars-ds-extension.readthedocs.io/en/latest/num.html#polars_ds.exprs.num.haversine
Expand Down