Skip to content

API Reference

Auto-generated from the bmw_sales package docstrings.

Data

bmw_sales.data.loader

Dataset loading with schema enforcement.

The loader is the single supported entrypoint for reading the raw BMW dataset. It validates structure on the way in so that downstream code can assume a clean, well-typed frame and fail fast (with a clear message) otherwise.

SchemaValidationError

Bases: ValueError

Raised when the loaded dataset does not match the expected schema.

Source code in src/bmw_sales/data/loader.py
18
19
class SchemaValidationError(ValueError):
    """Raised when the loaded dataset does not match the expected schema."""

load_raw(path=None, *, validate=True, apply_dtypes=True)

Load the raw BMW sales dataset.

Parameters:

Name Type Description Default
path Optional[Path]

Override the default raw dataset location (useful for tests/fixtures).

None
validate bool

If True (default), enforce the expected column schema.

True
apply_dtypes bool

If True (default), coerce columns to canonical, memory-efficient dtypes.

True

Returns:

Type Description
DataFrame

The validated dataset.

Raises:

Type Description
FileNotFoundError

If the dataset file does not exist.

SchemaValidationError

If validation is enabled and the schema does not match.

Source code in src/bmw_sales/data/loader.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def load_raw(
    path: Optional[Path] = None, *, validate: bool = True, apply_dtypes: bool = True
) -> pd.DataFrame:
    """Load the raw BMW sales dataset.

    Parameters
    ----------
    path:
        Override the default raw dataset location (useful for tests/fixtures).
    validate:
        If ``True`` (default), enforce the expected column schema.
    apply_dtypes:
        If ``True`` (default), coerce columns to canonical, memory-efficient dtypes.

    Returns
    -------
    pandas.DataFrame
        The validated dataset.

    Raises
    ------
    FileNotFoundError
        If the dataset file does not exist.
    SchemaValidationError
        If validation is enabled and the schema does not match.
    """
    csv_path = Path(path) if path is not None else RAW_DATASET_PATH
    if not csv_path.exists():
        raise FileNotFoundError(
            f"Raw dataset not found at '{csv_path}'. "
            "Place 'BMW_sales_data_(2010-2024).csv' under data/raw/."
        )

    df = pd.read_csv(csv_path)

    if validate:
        _validate_columns(df)
    if apply_dtypes:
        df = _apply_dtypes(df)
    return df

bmw_sales.data.validation

Data-integrity checks and the markdown report.

Looks at three things: structural integrity (shape, nulls, duplicates), whether the features carry any signal about the targets (Pearson correlation, one-way ANOVA, mutual information), and whether Sales_Classification is just a threshold on Sales_Volume (target leakage).

Run with python -m bmw_sales.data.validation to regenerate the report.

IntegrityFinding dataclass

A single, human-readable finding with its supporting statistic.

Source code in src/bmw_sales/data/validation.py
25
26
27
28
29
30
31
@dataclass
class IntegrityFinding:
    """A single, human-readable finding with its supporting statistic."""

    title: str
    verdict: str
    detail: str

DataIntegrityReport dataclass

Structured result of the data-integrity analysis.

Source code in src/bmw_sales/data/validation.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class DataIntegrityReport:
    """Structured result of the data-integrity analysis."""

    n_rows: int
    n_cols: int
    n_nulls: int
    n_duplicates: int
    numeric_corr: pd.DataFrame
    anova_pvalues: dict[str, float]
    mutual_information: dict[str, float]
    leakage_detected: bool
    leakage_threshold: Optional[int]
    findings: list[IntegrityFinding] = field(default_factory=list)

analyse(df=None)

Run the full integrity analysis and return a structured report.

Source code in src/bmw_sales/data/validation.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def analyse(df: Optional[pd.DataFrame] = None) -> DataIntegrityReport:
    """Run the full integrity analysis and return a structured report."""
    if df is None:
        df = load_raw()

    corr = _numeric_correlations(df)
    anova = _anova_against_volume(df)
    mi = _mutual_information(df)
    leakage, threshold = _detect_leakage(df)

    # --- Derive plain-language verdicts -----------------------------------
    findings: list[IntegrityFinding] = []

    findings.append(
        IntegrityFinding(
            title="Structural integrity",
            verdict="PASS",
            detail=(
                f"{len(df):,} rows × {df.shape[1]} columns, "
                f"{int(df.isna().sum().sum())} nulls, "
                f"{int(df.duplicated().sum())} duplicate rows."
            ),
        )
    )

    off_diag = corr.where(~np.eye(len(corr), dtype=bool))
    max_abs_corr = float(np.nanmax(np.abs(off_diag.to_numpy())))
    findings.append(
        IntegrityFinding(
            title="Numeric signal (correlation)",
            verdict="NO SIGNAL" if max_abs_corr < 0.05 else "SIGNAL PRESENT",
            detail=(
                f"Largest absolute off-diagonal Pearson correlation = "
                f"{max_abs_corr:.4f}. Features are effectively mutually independent."
            ),
        )
    )

    insignificant = [c for c, p in anova.items() if not (p < 0.05)]
    findings.append(
        IntegrityFinding(
            title="Categorical signal (ANOVA on Sales_Volume)",
            verdict="NO SIGNAL" if len(insignificant) == len(anova) else "SIGNAL PRESENT",
            detail=(
                "No categorical shows a significant effect on sales volume "
                f"(all p>0.05): {', '.join(f'{c} p={anova[c]:.2f}' for c in anova)}."
            ),
        )
    )

    max_mi = max(mi.values()) if mi else 0.0
    findings.append(
        IntegrityFinding(
            title="Non-linear signal (mutual information)",
            verdict="NO SIGNAL" if max_mi < 0.01 else "SIGNAL PRESENT",
            detail=(
                f"Maximum MI across all features = {max_mi:.4f} nats "
                "(≈0 ⇒ no exploitable non-linear dependence either)."
            ),
        )
    )

    findings.append(
        IntegrityFinding(
            title="Target leakage",
            verdict="LEAKAGE" if leakage else "OK",
            detail=(
                f"Sales_Classification == 'High' ⟺ Sales_Volume ≥ {threshold}. "
                "The classes do not overlap → the label is a deterministic "
                "threshold and MUST be excluded as a feature."
                if leakage
                else "No deterministic threshold relationship detected."
            ),
        )
    )

    return DataIntegrityReport(
        n_rows=len(df),
        n_cols=df.shape[1],
        n_nulls=int(df.isna().sum().sum()),
        n_duplicates=int(df.duplicated().sum()),
        numeric_corr=corr,
        anova_pvalues=anova,
        mutual_information=mi,
        leakage_detected=leakage,
        leakage_threshold=threshold,
        findings=findings,
    )

to_markdown(report)

Render a :class:DataIntegrityReport as a portfolio-ready markdown doc.

Source code in src/bmw_sales/data/validation.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def to_markdown(report: DataIntegrityReport) -> str:
    """Render a :class:`DataIntegrityReport` as a portfolio-ready markdown doc."""
    corr_md = report.numeric_corr.round(4).to_markdown()
    findings_md = "\n".join(
        f"| {f.title} | **{f.verdict}** | {f.detail} |" for f in report.findings
    )
    mi_md = "\n".join(
        f"| {k} | {v:.4f} |"
        for k, v in sorted(report.mutual_information.items(), key=lambda kv: -kv[1])
    )

    # Built without leading indentation so interpolated multi-line tables render
    # correctly (textwrap.dedent cannot infer common indentation across them).
    return (
        f"# Data Integrity Report - BMW Sales (2010–2024)\n\n"
        f"*Generated: {date.today().isoformat()} · Author: Maxime GOURGUECHON*\n\n"
        f"> Auto-generated by `bmw_sales.data.validation`. Reproduce with `make eda`.\n"
        f"> Source: [BMW Sales Dataset on Kaggle]"
        f"(https://www.kaggle.com/datasets/eshummalik/bmw-sales-dataset) by eshummalik.\n\n"
        f"## Executive summary\n\n"
        f"The dataset is **structurally pristine** ({report.n_rows:,} rows, zero nulls, "
        f"zero duplicates) but **contains no exploitable predictive signal**: every "
        f"feature is statistically independent of the sales targets. In addition, "
        f"`Sales_Classification` is a **leaked** deterministic threshold on "
        f"`Sales_Volume`. These findings are reported transparently and drive the "
        f"project's modelling strategy (honest baselines + a labelled Scenario "
        f"Simulator). See **ADR-0002** for the resulting decision.\n\n"
        f"## Findings\n\n"
        f"| Check | Verdict | Evidence |\n|---|---|---|\n{findings_md}\n\n"
        f"## Numeric correlation matrix (Pearson)\n\n{corr_md}\n\n"
        f"## Mutual information vs `Sales_Volume` (nats)\n\n"
        f"| Feature | MI |\n|---|---|\n{mi_md}\n\n"
        f"## Interpretation & consequences\n\n"
        f"- A supervised model predicting `Sales_Volume` from these features is "
        f"expected to achieve **R² ≈ 0** on held-out data. This is a property of "
        f"the data-generating process, not a modelling failure.\n"
        f"- Classification using `Sales_Volume` as an input is **trivially perfect "
        f"via leakage**; excluding it yields **ROC-AUC ≈ 0.5**.\n"
        f"- We therefore (a) report these honest baselines, (b) prove the leakage, "
        f"and (c) deliver business value through the econometrics-grounded "
        f"**Scenario Simulator** rather than a spurious predictive model.\n"
    )

main()

CLI entrypoint: run analysis and persist the markdown report.

Source code in src/bmw_sales/data/validation.py
252
253
254
255
256
257
258
259
260
def main() -> None:
    """CLI entrypoint: run analysis and persist the markdown report."""
    report = analyse()
    REPORTS_DIR.mkdir(parents=True, exist_ok=True)
    out_path = REPORTS_DIR / "data_integrity_report.md"
    out_path.write_text(to_markdown(report), encoding="utf-8")
    print(f"Data Integrity Report written to {out_path}")
    for f in report.findings:
        print(f"  [{f.verdict:>14}] {f.title}")

Signal audit

bmw_sales.audit.signal_tests

Statistical tests for whether a dataset has any learnable signal.

Three checks, usable on any frame:

  • permutation (label-shuffle) test: compare the real held-out score to the distribution of scores under shuffled labels; a high p-value means no signal;
  • Kolmogorov-Smirnov test of each numeric feature against a uniform fit;
  • chi-squared test of independence between categoricals.

PermutationResult dataclass

Outcome of a label-permutation signal test.

Source code in src/bmw_sales/audit/signal_tests.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@dataclass
class PermutationResult:
    """Outcome of a label-permutation signal test."""

    task: str
    metric: str
    observed: float
    null_mean: float
    null_std: float
    p_value: float
    n_permutations: int
    null_scores: list[float] = field(default_factory=list)

    @property
    def has_signal(self) -> bool:
        """Signal is present only if the real score beats the null at 5%."""
        return self.p_value < 0.05

has_signal property

Signal is present only if the real score beats the null at 5%.

UniformityResult dataclass

KS test of one numeric feature against a fitted Uniform distribution.

Source code in src/bmw_sales/audit/signal_tests.py
112
113
114
115
116
117
118
119
120
121
122
123
@dataclass
class UniformityResult:
    """KS test of one numeric feature against a fitted Uniform distribution."""

    feature: str
    ks_statistic: float
    p_value: float

    @property
    def looks_uniform(self) -> bool:
        """Cannot reject Uniform at 5% ⇒ consistent with synthetic uniform data."""
        return self.p_value > 0.05

looks_uniform property

Cannot reject Uniform at 5% ⇒ consistent with synthetic uniform data.

permutation_test(df, task, *, n_permutations=30, sample=6000)

Run a label-permutation test for exploitable signal on task.

Parameters:

Name Type Description Default
df DataFrame

Source dataset.

required
task Task

"regression" (R²) or "classification" (ROC-AUC).

required
n_permutations int

Size of the null distribution (each is a full model fit).

30
sample int

Sub-sample size for tractable runtime.

6000
Source code in src/bmw_sales/audit/signal_tests.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def permutation_test(
    df: pd.DataFrame, task: Task, *, n_permutations: int = 30, sample: int = 6000
) -> PermutationResult:
    """Run a label-permutation test for exploitable signal on ``task``.

    Parameters
    ----------
    df:
        Source dataset.
    task:
        ``"regression"`` (R²) or ``"classification"`` (ROC-AUC).
    n_permutations:
        Size of the null distribution (each is a full model fit).
    sample:
        Sub-sample size for tractable runtime.
    """
    seed = get_settings().random_seed
    rng = np.random.default_rng(seed)
    work = df.sample(min(sample, len(df)), random_state=seed)
    dataset = make_dataset(work, task)

    metric = "R²" if task == "regression" else "ROC-AUC"
    observed = _fit_score(dataset, dataset.y_train.to_numpy())

    y_tr = dataset.y_train.to_numpy()
    null_scores: list[float] = []
    for _ in range(n_permutations):
        permuted = rng.permutation(y_tr)
        null_scores.append(_fit_score(dataset, permuted))

    null = np.asarray(null_scores)
    # One-sided p-value: P(null >= observed). +1 smoothing avoids p=0.
    p_value = float((np.sum(null >= observed) + 1) / (n_permutations + 1))
    return PermutationResult(
        task=task,
        metric=metric,
        observed=observed,
        null_mean=float(null.mean()),
        null_std=float(null.std()),
        p_value=p_value,
        n_permutations=n_permutations,
        null_scores=null_scores,
    )

uniformity_tests(df)

KS-test every numeric feature against Uniform(min, max).

Source code in src/bmw_sales/audit/signal_tests.py
126
127
128
129
130
131
132
133
134
135
136
def uniformity_tests(df: pd.DataFrame) -> list[UniformityResult]:
    """KS-test every numeric feature against Uniform(min, max)."""
    results: list[UniformityResult] = []
    for col in SCHEMA.NUMERIC:
        x = df[col].astype(float).to_numpy()
        lo, hi = float(x.min()), float(x.max())
        if hi <= lo:
            continue
        stat, p = stats.kstest(x, "uniform", args=(lo, hi - lo))
        results.append(UniformityResult(col, float(stat), float(p)))
    return results

chi2_independence(df, col_a, col_b)

Return the chi-squared independence p-value between two categoricals.

Source code in src/bmw_sales/audit/signal_tests.py
139
140
141
142
143
def chi2_independence(df: pd.DataFrame, col_a: str, col_b: str) -> float:
    """Return the chi-squared independence p-value between two categoricals."""
    table = pd.crosstab(df[col_a], df[col_b])
    _, p, _, _ = stats.chi2_contingency(table)
    return float(p)

bmw_sales.audit.control

Positive control: run the same pipeline on a synthetic, signal-bearing target.

A null R2 on the real data could mean either no signal or a broken pipeline. Building a target that is a known function of the features and checking the pipeline recovers it (high R2) rules out the second case.

ControlResult dataclass

Held-out R² of the same pipeline on the real vs synthetic target.

Source code in src/bmw_sales/audit/control.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class ControlResult:
    """Held-out R² of the same pipeline on the real vs synthetic target."""

    r2_real: float
    r2_synthetic: float
    model: str

    @property
    def pipeline_validated(self) -> bool:
        """The pipeline is sound if it clearly learns the known synthetic signal."""
        return self.r2_synthetic > 0.5

    @property
    def verdict(self) -> str:
        return (
            f"Pipeline VALIDATED: it recovers a known signal (R²={self.r2_synthetic:.3f}) "
            f"but scores ~0 on the real target (R²={self.r2_real:.3f}) - the null "
            f"result is a property of the data, not a modelling failure."
            if self.pipeline_validated
            else "Inconclusive: synthetic signal not recovered."
        )

pipeline_validated property

The pipeline is sound if it clearly learns the known synthetic signal.

make_signal_bearing_target(df, *, noise_sd=400.0)

Construct a synthetic demand target that genuinely depends on the features.

synthetic_demand = f(region, premium tier, engine, year, price, electrified) + Gaussian noise. This is not a claim about the real world - it exists only to verify the pipeline can learn a relationship that is known to exist.

Source code in src/bmw_sales/audit/control.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def make_signal_bearing_target(df: pd.DataFrame, *, noise_sd: float = 400.0) -> pd.Series:
    """Construct a synthetic demand target that genuinely depends on the features.

    ``synthetic_demand = f(region, premium tier, engine, year, price, electrified)
    + Gaussian noise``. This is **not** a claim about the real world - it exists
    only to verify the pipeline can learn a relationship that is known to exist.
    """
    seed = get_settings().random_seed
    rng = np.random.default_rng(seed)
    data = add_engineered_features(df)

    region = data[SCHEMA.REGION].astype(str).map(_REGION_EFFECT).fillna(0.0)
    base = 5000.0
    target = (
        base
        + region.to_numpy()
        + 600.0 * data["is_premium_tier"].to_numpy()
        + 350.0 * data[SCHEMA.ENGINE_SIZE_L].to_numpy()
        + 40.0 * (data[SCHEMA.YEAR].astype(int).to_numpy() - 2010)
        - 0.02 * data[SCHEMA.PRICE_USD].astype(float).to_numpy()
        + 500.0 * data["is_electrified"].to_numpy()
        + rng.normal(0.0, noise_sd, size=len(data))
    )
    return pd.Series(np.clip(target, 100, None), index=df.index, name="synthetic_demand")

run_control(df, *, model_name='LightGBM', sample=12000)

Train the same model on the real and a synthetic signal-bearing target.

Source code in src/bmw_sales/audit/control.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def run_control(
    df: pd.DataFrame, *, model_name: str = "LightGBM", sample: int = 12000
) -> ControlResult:
    """Train the same model on the real and a synthetic signal-bearing target."""
    seed = get_settings().random_seed
    work = df.sample(min(sample, len(df)), random_state=seed).copy()

    real_ds = make_dataset(work, "regression")
    r2_real = train_one(model_name, real_ds, tune=False).metrics["r2"]

    synth = work.copy()
    synth[SCHEMA.SALES_VOLUME] = make_signal_bearing_target(work).to_numpy()
    synth_ds = make_dataset(synth, "regression")
    r2_synth = train_one(model_name, synth_ds, tune=False).metrics["r2"]

    return ControlResult(r2_real=float(r2_real), r2_synthetic=float(r2_synth), model=model_name)

External-data APIs

bmw_sales.apis.base

Base class for the external API clients.

Each client either hits a real endpoint or returns a deterministic mock, so the project runs offline with no keys. Live calls retry with backoff (tenacity), and after a failure a per-client circuit breaker falls back to the mock. Successful responses are cached to parquet, and every result carries its provenance (live / cache / mock). Subclasses implement _fetch_live and _mock.

DataSource

Bases: str, Enum

Provenance of a returned dataset.

Source code in src/bmw_sales/apis/base.py
32
33
34
35
36
37
class DataSource(str, Enum):
    """Provenance of a returned dataset."""

    LIVE = "live"
    CACHE = "cache"
    MOCK = "mock"

APIResult dataclass

A dataset plus its provenance metadata.

Source code in src/bmw_sales/apis/base.py
40
41
42
43
44
45
46
47
48
49
50
@dataclass
class APIResult:
    """A dataset plus its provenance metadata."""

    data: pd.DataFrame
    source: DataSource
    client: str

    @property
    def is_live(self) -> bool:
        return self.source is DataSource.LIVE

BaseAPIClient

Bases: ABC

Abstract hybrid client with caching, retries and a circuit breaker.

Source code in src/bmw_sales/apis/base.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class BaseAPIClient(ABC):
    """Abstract hybrid client with caching, retries and a circuit breaker."""

    #: Short, unique client name (used for cache keys and logging).
    name: str = "base"

    def __init__(self, settings: Optional[Settings] = None) -> None:
        self.settings = settings or get_settings()
        self._cache_dir = Path(self.settings.cache_dir) / self.name
        self._circuit_open = False  # once True, skip live calls for this instance

    # Public API
    def fetch(self, **params: Any) -> APIResult:
        """Return data for ``params``, preferring cache → live → mock.

        The method never raises on network failure: it degrades gracefully to a
        deterministic mock so downstream code always receives a valid frame.
        """
        cache_path = self._cache_path(params)

        cached = self._read_cache(cache_path)
        if cached is not None:
            return APIResult(cached, DataSource.CACHE, self.name)

        if self.settings.offline_mode or self._circuit_open:
            return APIResult(self._mock(**params), DataSource.MOCK, self.name)

        try:
            data = self._fetch_live(**params)
            self._write_cache(cache_path, data)
            return APIResult(data, DataSource.LIVE, self.name)
        except Exception:  # noqa: BLE001 - any failure must fall back, not crash
            self._circuit_open = True
            return APIResult(self._mock(**params), DataSource.MOCK, self.name)

    # To implement in subclasses
    @abstractmethod
    def _fetch_live(self, **params: Any) -> pd.DataFrame:
        """Fetch real data from the upstream API. May raise on failure."""

    @abstractmethod
    def _mock(self, **params: Any) -> pd.DataFrame:
        """Return a deterministic, plausible mock for the same parameters."""

    # Shared HTTP helper (retry + timeout)
    def _http_get_json(self, url: str, params: Optional[dict[str, Any]] = None) -> Any:
        """GET ``url`` and return parsed JSON, with retry/backoff and timeout."""

        @retry(
            reraise=True,
            stop=stop_after_attempt(self.settings.http_max_retries),
            wait=wait_exponential(multiplier=0.5, max=4),
            retry=retry_if_exception_type(requests.RequestException),
        )
        def _do_request() -> Any:
            resp = requests.get(url, params=params, timeout=self.settings.http_timeout)
            resp.raise_for_status()
            return resp.json()

        return _do_request()

    def _fetch_wb_indicator(
        self, area_code: str, indicator: str, start: int, end: int
    ) -> dict[int, float]:
        """Fetch one World Bank indicator for a country/aggregate as {year: value}.

        Shared by clients backed by the (keyless) World Bank Indicators API.
        Raises if the payload shape is unexpected; returns ``{}`` if the series
        exists but has no observations in range.
        """
        url = f"{self.settings.worldbank_base_url}/country/{area_code}/indicator/{indicator}"
        payload = self._http_get_json(
            url, params={"date": f"{start}:{end}", "format": "json", "per_page": "500"}
        )
        if not isinstance(payload, list) or len(payload) < 2 or payload[1] is None:
            raise ValueError(f"Unexpected World Bank payload for {indicator}")
        return {
            int(obs["date"]): float(obs["value"])
            for obs in payload[1]
            if obs.get("value") is not None
        }

    # Cache plumbing
    def _cache_key(self, params: dict[str, Any]) -> str:
        payload = json.dumps(params, sort_keys=True, default=str)
        digest = hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16]
        return f"{self.name}_{digest}"

    def _cache_path(self, params: dict[str, Any]) -> Path:
        return self._cache_dir / f"{self._cache_key(params)}.parquet"

    def _read_cache(self, path: Path) -> Optional[pd.DataFrame]:
        if not path.exists():
            return None
        try:
            return pd.read_parquet(path)
        except Exception:  # noqa: BLE001 - corrupt cache should not be fatal
            return None

    def _write_cache(self, path: Path, data: pd.DataFrame) -> None:
        path.parent.mkdir(parents=True, exist_ok=True)
        try:
            data.to_parquet(path, index=False)
        except Exception:  # noqa: BLE001 - caching is best-effort
            pass

    @staticmethod
    def _seed_from(*parts: Any) -> int:
        """Stable integer seed derived from arbitrary params (for mock determinism)."""
        payload = "|".join(str(p) for p in parts)
        return int(hashlib.sha1(payload.encode("utf-8")).hexdigest()[:8], 16)

fetch(**params)

Return data for params, preferring cache → live → mock.

The method never raises on network failure: it degrades gracefully to a deterministic mock so downstream code always receives a valid frame.

Source code in src/bmw_sales/apis/base.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def fetch(self, **params: Any) -> APIResult:
    """Return data for ``params``, preferring cache → live → mock.

    The method never raises on network failure: it degrades gracefully to a
    deterministic mock so downstream code always receives a valid frame.
    """
    cache_path = self._cache_path(params)

    cached = self._read_cache(cache_path)
    if cached is not None:
        return APIResult(cached, DataSource.CACHE, self.name)

    if self.settings.offline_mode or self._circuit_open:
        return APIResult(self._mock(**params), DataSource.MOCK, self.name)

    try:
        data = self._fetch_live(**params)
        self._write_cache(cache_path, data)
        return APIResult(data, DataSource.LIVE, self.name)
    except Exception:  # noqa: BLE001 - any failure must fall back, not crash
        self._circuit_open = True
        return APIResult(self._mock(**params), DataSource.MOCK, self.name)

bmw_sales.apis.enrichment

Augmentation layer: join external API data onto the BMW sales dataset.

Builds a region×year (×fuel) external panel from the four hybrid clients and left-joins it onto the transactional sales data. All joins are left joins so the sales data is never dropped, and provenance per source is reported so the UI can show whether each block came from a live API or a mock fallback.

EnrichmentResult dataclass

Augmented dataset plus per-source provenance.

Source code in src/bmw_sales/apis/enrichment.py
24
25
26
27
28
29
@dataclass
class EnrichmentResult:
    """Augmented dataset plus per-source provenance."""

    data: pd.DataFrame
    provenance: dict[str, str]

build_external_panel(start_year=2010, end_year=2024)

Assemble the region×year(×fuel) external panel from all four clients.

Source code in src/bmw_sales/apis/enrichment.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def build_external_panel(
    start_year: int = 2010, end_year: int = 2024
) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, str]]:
    """Assemble the region×year(×fuel) external panel from all four clients."""
    macro, macro_src = _collect(WorldBankClient(), start_year, end_year)
    fuel_panel, fuel_src = _collect(FuelPriceClient(), start_year, end_year)
    co2_panel, co2_src = _collect(CO2RegulationClient(), start_year, end_year)
    fx_panel, fx_src = _collect(FXRateClient(), start_year, end_year)

    # region×year block (macro + co2 + fx); fuel stays region×year×fuel_type.
    panel = macro.merge(co2_panel, on=["region", "year"], how="outer")
    panel = panel.merge(fx_panel, on=["region", "year"], how="outer")

    provenance = {
        "worldbank": macro_src,
        "fuel_prices": fuel_src,
        "co2_regulations": co2_src,
        "fx_rates": fx_src,
    }
    return panel, fuel_panel, provenance

enrich_dataset(df, *, start_year=2010, end_year=2024)

Left-join the external panel onto the sales dataset.

Parameters:

Name Type Description Default
df DataFrame

The raw/clean sales dataset (must contain Region, Year, Fuel_Type).

required

Returns:

Type Description
EnrichmentResult

The augmented frame and per-source provenance (live / mock).

Source code in src/bmw_sales/apis/enrichment.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def enrich_dataset(
    df: pd.DataFrame, *, start_year: int = 2010, end_year: int = 2024
) -> EnrichmentResult:
    """Left-join the external panel onto the sales dataset.

    Parameters
    ----------
    df:
        The raw/clean sales dataset (must contain Region, Year, Fuel_Type).

    Returns
    -------
    EnrichmentResult
        The augmented frame and per-source provenance (``live`` / ``mock``).
    """
    region_panel, fuel_panel, provenance = build_external_panel(start_year, end_year)

    out = df.copy()
    # Normalise join keys (sales data uses categorical dtypes).
    out["_region"] = out[SCHEMA.REGION].astype(str)
    out["_year"] = out[SCHEMA.YEAR].astype(int)
    out["_fuel"] = out[SCHEMA.FUEL_TYPE].astype(str)

    out = out.merge(
        region_panel.rename(columns={"region": "_region", "year": "_year"}),
        on=["_region", "_year"],
        how="left",
    )
    out = out.merge(
        fuel_panel.rename(columns={"region": "_region", "year": "_year", "fuel_type": "_fuel"}),
        on=["_region", "_year", "_fuel"],
        how="left",
    )
    out = out.drop(columns=["_region", "_year", "_fuel"])

    return EnrichmentResult(data=out, provenance=provenance)

summarise_provenance(provenance)

One-line human summary of where the external data came from.

Source code in src/bmw_sales/apis/enrichment.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def summarise_provenance(provenance: dict[str, str]) -> str:
    """One-line human summary of where the external data came from."""
    live = [k for k, v in provenance.items() if v == DataSource.LIVE.value]
    mock = [k for k, v in provenance.items() if v == DataSource.MOCK.value]
    cache = [k for k, v in provenance.items() if v == DataSource.CACHE.value]
    bits = []
    if live:
        bits.append(f"live: {', '.join(live)}")
    if cache:
        bits.append(f"cache: {', '.join(cache)}")
    if mock:
        bits.append(f"mock: {', '.join(mock)}")
    return " | ".join(bits) if bits else "no sources"

Features & models

bmw_sales.features.engineering

Feature engineering shared by the econometric and ML pipelines.

Adds vehicle age, usage intensity, electrification and premium-tier flags, and a couple of log transforms. The same frame feeds statsmodels and the boosters.

add_engineered_features(df, *, reference_year=REFERENCE_YEAR)

Return a copy of df with domain-informed engineered features.

Parameters:

Name Type Description Default
df DataFrame

A frame containing the canonical raw columns.

required
reference_year int

Year used as "today" when computing vehicle age.

REFERENCE_YEAR

Returns:

Type Description
DataFrame

df plus :data:ENGINEERED_NUMERIC and :data:ENGINEERED_CATEGORICAL.

Source code in src/bmw_sales/features/engineering.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def add_engineered_features(
    df: pd.DataFrame, *, reference_year: int = REFERENCE_YEAR
) -> pd.DataFrame:
    """Return a copy of ``df`` with domain-informed engineered features.

    Parameters
    ----------
    df:
        A frame containing the canonical raw columns.
    reference_year:
        Year used as "today" when computing vehicle age.

    Returns
    -------
    pandas.DataFrame
        ``df`` plus :data:`ENGINEERED_NUMERIC` and :data:`ENGINEERED_CATEGORICAL`.
    """
    out = df.copy()

    age = (reference_year - out[SCHEMA.YEAR].astype(int)).clip(lower=0)
    out["vehicle_age"] = age
    # Avoid divide-by-zero for current-year vehicles (age 0 -> treat as 1).
    out["mileage_per_year"] = out[SCHEMA.MILEAGE_KM].astype(float) / age.replace(0, 1)
    out["price_per_litre_engine"] = out[SCHEMA.PRICE_USD].astype(float) / out[
        SCHEMA.ENGINE_SIZE_L
    ].astype(float)
    # Log transforms (stabilise scale; enable elasticity interpretation).
    out["log_price"] = np.log(out[SCHEMA.PRICE_USD].astype(float))
    out["log_mileage"] = np.log1p(out[SCHEMA.MILEAGE_KM].astype(float))

    out["is_electrified"] = out[SCHEMA.FUEL_TYPE].astype(str).isin(_ELECTRIFIED_FUELS).astype(int)
    out["is_premium_tier"] = out[SCHEMA.MODEL].astype(str).isin(_PREMIUM_MODELS).astype(int)
    return out

feature_columns(*, include_leakage=False)

Return the modelling feature sets (categorical vs numeric).

Parameters:

Name Type Description Default
include_leakage bool

If True, includes Sales_Volume in the numeric set - used only to demonstrate target leakage for the classification task (see ADR-0002).

False
Source code in src/bmw_sales/features/engineering.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def feature_columns(*, include_leakage: bool = False) -> dict[str, list[str]]:
    """Return the modelling feature sets (categorical vs numeric).

    Parameters
    ----------
    include_leakage:
        If ``True``, includes ``Sales_Volume`` in the numeric set - used *only*
        to demonstrate target leakage for the classification task (see ADR-0002).
    """
    categorical = list(SCHEMA.CATEGORICAL) + list(ENGINEERED_CATEGORICAL)
    numeric = [
        SCHEMA.YEAR,
        SCHEMA.ENGINE_SIZE_L,
        SCHEMA.MILEAGE_KM,
        SCHEMA.PRICE_USD,
        *ENGINEERED_NUMERIC,
    ]
    if include_leakage:
        numeric.append(SCHEMA.SALES_VOLUME)
    return {"categorical": categorical, "numeric": numeric}

bmw_sales.models.preprocessing

Shared preprocessing for the supervised ML/DL pipelines.

Provides a single, leakage-aware way to turn the (optionally enriched) dataset into model-ready X/y plus a fitted-on-train ColumnTransformer. Using the same preprocessing for every model keeps the benchmark fair.

Dataset dataclass

A train/validation/test split plus column metadata.

Source code in src/bmw_sales/models/preprocessing.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclass
class Dataset:
    """A train/validation/test split plus column metadata."""

    X_train: pd.DataFrame
    X_val: pd.DataFrame
    X_test: pd.DataFrame
    y_train: pd.Series
    y_val: pd.Series
    y_test: pd.Series
    numeric: list[str]
    categorical: list[str]
    task: Task

    @property
    def n_features(self) -> int:
        return len(self.numeric) + len(self.categorical)

select_features(df, *, include_leakage=False)

Return (numeric, categorical) feature names present in df.

Adds API-enriched numeric columns when available. Never includes a target.

Source code in src/bmw_sales/models/preprocessing.py
54
55
56
57
58
59
60
61
62
63
64
65
def select_features(
    df: pd.DataFrame, *, include_leakage: bool = False
) -> tuple[list[str], list[str]]:
    """Return (numeric, categorical) feature names present in ``df``.

    Adds API-enriched numeric columns when available. Never includes a target.
    """
    cols = feature_columns(include_leakage=include_leakage)
    numeric = [c for c in cols["numeric"] if c in df.columns]
    numeric += [c for c in ENRICHED_NUMERIC if c in df.columns and df[c].notna().any()]
    categorical = [c for c in cols["categorical"] if c in df.columns]
    return numeric, categorical

build_preprocessor(numeric, categorical)

Standardise numerics and one-hot encode categoricals (dense, unknown-safe).

Source code in src/bmw_sales/models/preprocessing.py
68
69
70
71
72
73
74
75
76
77
78
79
80
def build_preprocessor(numeric: list[str], categorical: list[str]) -> ColumnTransformer:
    """Standardise numerics and one-hot encode categoricals (dense, unknown-safe)."""
    return ColumnTransformer(
        transformers=[
            ("num", StandardScaler(), numeric),
            (
                "cat",
                OneHotEncoder(handle_unknown="ignore", sparse_output=False),
                categorical,
            ),
        ],
        remainder="drop",
    )

make_dataset(df, task, *, include_leakage=False, test_size=0.15, val_size=0.15)

Build a leakage-aware train/val/test split for the requested task.

Parameters:

Name Type Description Default
df DataFrame

Raw or enriched dataset.

required
task Task

"regression" (target Sales_Volume) or "classification" (target Sales_Classification).

required
include_leakage bool

Classification only - include Sales_Volume to demonstrate leakage.

False
Source code in src/bmw_sales/models/preprocessing.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def make_dataset(
    df: pd.DataFrame,
    task: Task,
    *,
    include_leakage: bool = False,
    test_size: float = 0.15,
    val_size: float = 0.15,
) -> Dataset:
    """Build a leakage-aware train/val/test split for the requested task.

    Parameters
    ----------
    df:
        Raw or enriched dataset.
    task:
        ``"regression"`` (target ``Sales_Volume``) or ``"classification"``
        (target ``Sales_Classification``).
    include_leakage:
        Classification only - include ``Sales_Volume`` to demonstrate leakage.
    """
    seed = get_settings().random_seed
    data = add_engineered_features(df)

    if task == "regression":
        target = SCHEMA.SALES_VOLUME
        # Drop the leaked label so it cannot inform the regression target.
        data = data.drop(columns=[SCHEMA.SALES_CLASSIFICATION], errors="ignore")
        y = data[target].astype(float)
        stratify = None
    else:
        target = SCHEMA.SALES_CLASSIFICATION
        y = (data[target].astype(str) == "High").astype(int)
        stratify = y

    numeric, categorical = select_features(data, include_leakage=include_leakage)
    x = data[numeric + categorical].copy()

    # First carve out the test set, then split the remainder into train/val.
    x_tr, x_test, y_tr, y_test = train_test_split(
        x, y, test_size=test_size, random_state=seed, stratify=stratify
    )
    rel_val = val_size / (1.0 - test_size)
    strat2 = y_tr if task == "classification" else None
    x_train, x_val, y_train, y_val = train_test_split(
        x_tr, y_tr, test_size=rel_val, random_state=seed, stratify=strat2
    )

    return Dataset(
        X_train=x_train,
        X_val=x_val,
        X_test=x_test,
        y_train=y_train,
        y_val=y_val,
        y_test=y_test,
        numeric=numeric,
        categorical=categorical,
        task=task,
    )

Simulation

bmw_sales.simulation.scenario

What-if demand simulator (not a fit to the historical data).

Projects demand under a constant-elasticity model:

Q' = Q0 * (1+dp)^ep * (1+dy)^ey * (1+df)^ef * R(ds) * (1+dfx)^ep

for changes in list price (dp), income (dy), fuel price (df), CO2-regulation stringency (ds) and FX (dfx). Elasticity priors are segment-specific: the premium tier is less price-elastic (own-price ~-0.3, with Veblen effects) and more income-elastic (~2.2) than the standard tier (~-0.7 / ~1.3). Baselines come from the macro APIs; all priors are adjustable in the UI.

ElasticityAssumptions dataclass

Segment-specific elasticity priors (all user-overridable in the UI).

Defaults are the standard segment; use :meth:for_segment for the luxury/premium tier (less price-elastic, more income-elastic).

Source code in src/bmw_sales/simulation/scenario.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@dataclass(frozen=True)
class ElasticityAssumptions:
    """Segment-specific elasticity priors (all user-overridable in the UI).

    Defaults are the **standard** segment; use :meth:`for_segment` for the
    luxury/premium tier (less price-elastic, more income-elastic).
    """

    own_price: float = -0.7
    income: float = 1.3
    fuel_price_combustion: float = -0.15
    fuel_price_electrified: float = 0.10
    #: Demand share shifted per +10 stringency points (toward electrified).
    regulation_per_10pts: float = 0.08

    @classmethod
    def for_segment(cls, premium: bool) -> "ElasticityAssumptions":
        """Return priors for the premium/luxury tier or the standard tier.

        Premium: own-price ≈ -0.3 (Veblen-leaning, price-inelastic), income ≈ 2.2
        (positional good), weaker fuel sensitivity. See the module docstring.
        """
        if premium:
            return cls(
                own_price=-0.3,
                income=2.2,
                fuel_price_combustion=-0.08,
                fuel_price_electrified=0.06,
                regulation_per_10pts=0.08,
            )
        return cls()

for_segment(premium) classmethod

Return priors for the premium/luxury tier or the standard tier.

Premium: own-price ≈ -0.3 (Veblen-leaning, price-inelastic), income ≈ 2.2 (positional good), weaker fuel sensitivity. See the module docstring.

Source code in src/bmw_sales/simulation/scenario.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def for_segment(cls, premium: bool) -> "ElasticityAssumptions":
    """Return priors for the premium/luxury tier or the standard tier.

    Premium: own-price ≈ -0.3 (Veblen-leaning, price-inelastic), income ≈ 2.2
    (positional good), weaker fuel sensitivity. See the module docstring.
    """
    if premium:
        return cls(
            own_price=-0.3,
            income=2.2,
            fuel_price_combustion=-0.08,
            fuel_price_electrified=0.06,
            regulation_per_10pts=0.08,
        )
    return cls()

ScenarioInput dataclass

A single what-if scenario.

Source code in src/bmw_sales/simulation/scenario.py
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class ScenarioInput:
    """A single what-if scenario."""

    region: str
    fuel_type: str
    base_volume: float
    price_change_pct: float = 0.0  # Δ list price (%)
    gdp_growth_pct: float = 0.0  # Δ income / GDP per capita (%)
    fuel_price_change_pct: float = 0.0  # Δ pump price (%)
    regulation_change_pts: float = 0.0  # Δ stringency index (absolute pts)
    fx_depreciation_pct: float = 0.0  # local-currency depreciation vs USD (%)

FactorContribution dataclass

Multiplicative contribution of one driver to projected demand.

Source code in src/bmw_sales/simulation/scenario.py
72
73
74
75
76
77
78
79
80
81
@dataclass
class FactorContribution:
    """Multiplicative contribution of one driver to projected demand."""

    driver: str
    multiplier: float

    @property
    def pct_effect(self) -> float:
        return (self.multiplier - 1.0) * 100.0

ScenarioResult dataclass

Output of a scenario projection.

Source code in src/bmw_sales/simulation/scenario.py
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass
class ScenarioResult:
    """Output of a scenario projection."""

    base_volume: float
    projected_volume: float
    contributions: list[FactorContribution] = field(default_factory=list)

    @property
    def total_change_pct(self) -> float:
        if self.base_volume == 0:
            return 0.0
        return (self.projected_volume / self.base_volume - 1.0) * 100.0

simulate(scenario, assumptions=None)

Project demand for a scenario using the constant-elasticity model.

All effects are independent and multiplicative; each is reported separately so the user can see why demand moves, not just by how much.

Source code in src/bmw_sales/simulation/scenario.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def simulate(
    scenario: ScenarioInput, assumptions: ElasticityAssumptions | None = None
) -> ScenarioResult:
    """Project demand for a scenario using the constant-elasticity model.

    All effects are independent and multiplicative; each is reported separately
    so the user can see *why* demand moves, not just by how much.
    """
    a = assumptions or ElasticityAssumptions()
    electrified = _is_electrified(scenario.fuel_type)

    # Convert percentage inputs to fractional changes.
    dp = scenario.price_change_pct / 100.0
    dy = scenario.gdp_growth_pct / 100.0
    df = scenario.fuel_price_change_pct / 100.0
    dfx = scenario.fx_depreciation_pct / 100.0

    price_mult = (1.0 + dp) ** a.own_price
    income_mult = (1.0 + dy) ** a.income
    fuel_elasticity = a.fuel_price_electrified if electrified else a.fuel_price_combustion
    fuel_mult = (1.0 + df) ** fuel_elasticity
    # FX depreciation raises the effective local price -> apply own-price elasticity.
    fx_mult = (1.0 + dfx) ** a.own_price
    # Regulation: tighter rules help electrified, hurt combustion.
    reg_direction = 1.0 if electrified else -1.0
    reg_mult = 1.0 + reg_direction * a.regulation_per_10pts * (
        scenario.regulation_change_pts / 10.0
    )
    reg_mult = max(reg_mult, 0.0)

    contributions = [
        FactorContribution("List price", price_mult),
        FactorContribution("Income (GDP/cap)", income_mult),
        FactorContribution("Fuel price", fuel_mult),
        FactorContribution("CO₂ regulation", reg_mult),
        FactorContribution("FX (currency)", fx_mult),
    ]

    projected = scenario.base_volume
    for c in contributions:
        projected *= c.multiplier

    return ScenarioResult(
        base_volume=scenario.base_volume,
        projected_volume=projected,
        contributions=contributions,
    )

macro_defaults(region, *, year=2024)

Suggest scenario defaults from the (real/mock) external APIs for a region.

Returns plausible starting values: recent inflation, a GDP-growth proxy, and the latest regulation-stringency level - so the UI opens on realistic numbers.

Source code in src/bmw_sales/simulation/scenario.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def macro_defaults(region: str, *, year: int = 2024) -> dict[str, float]:
    """Suggest scenario defaults from the (real/mock) external APIs for a region.

    Returns plausible starting values: recent inflation, a GDP-growth proxy, and
    the latest regulation-stringency level - so the UI opens on realistic numbers.
    """
    wb = WorldBankClient().fetch(region=region, start_year=year - 2, end_year=year).data
    co2 = CO2RegulationClient().fetch(region=region, start_year=year, end_year=year).data
    fuel = FuelPriceClient().fetch(region=region, start_year=year, end_year=year).data

    inflation = float(wb["inflation_pct"].dropna().tail(1).iloc[0]) if not wb.empty else 3.0
    gdp = wb["gdp_per_capita_usd"].dropna()
    gdp_growth = (
        float((gdp.iloc[-1] / gdp.iloc[0]) ** (1 / max(len(gdp) - 1, 1)) - 1.0) * 100.0
        if len(gdp) >= 2
        else 2.0
    )
    stringency = float(co2["regulation_stringency_index"].iloc[0]) if not co2.empty else 50.0
    fuel_price = float(fuel["price_usd_per_litre"].mean()) if not fuel.empty else 1.0
    return {
        "inflation_pct": round(inflation, 2),
        "gdp_growth_pct": round(gdp_growth, 2),
        "regulation_stringency": round(stringency, 1),
        "fuel_price_usd_per_litre": round(fuel_price, 3),
    }

bmw_sales.simulation.uncertainty

Monte-Carlo uncertainty for the scenario simulator.

Puts Gaussian priors on the elasticities and samples them through the constant-elasticity model, so each scenario returns a distribution of projected demand with credible intervals instead of a single point. Sampling is seeded.

ElasticityPriors dataclass

Gaussian priors (mean ± sd) on each elasticity - segment-specific.

Means match the deterministic model's standard-segment priors; the standard deviations encode honest parameter uncertainty. Use :meth:for_segment for the luxury/premium tier.

Source code in src/bmw_sales/simulation/uncertainty.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@dataclass(frozen=True)
class ElasticityPriors:
    """Gaussian priors (mean ± sd) on each elasticity - segment-specific.

    Means match the deterministic model's **standard**-segment priors; the
    standard deviations encode honest parameter uncertainty. Use
    :meth:`for_segment` for the luxury/premium tier.
    """

    own_price_mean: float = -0.7
    own_price_sd: float = 0.2
    income_mean: float = 1.3
    income_sd: float = 0.4
    fuel_combustion_mean: float = -0.15
    fuel_electrified_mean: float = 0.10
    fuel_sd: float = 0.08
    regulation_mean: float = 0.08
    regulation_sd: float = 0.03

    @classmethod
    def for_segment(cls, premium: bool) -> "ElasticityPriors":
        """Priors for the premium/luxury tier (Veblen-leaning) or the standard tier."""
        if premium:
            return cls(
                own_price_mean=-0.3,
                own_price_sd=0.15,
                income_mean=2.2,
                income_sd=0.5,
                fuel_combustion_mean=-0.08,
                fuel_electrified_mean=0.06,
            )
        return cls()

for_segment(premium) classmethod

Priors for the premium/luxury tier (Veblen-leaning) or the standard tier.

Source code in src/bmw_sales/simulation/uncertainty.py
37
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def for_segment(cls, premium: bool) -> "ElasticityPriors":
    """Priors for the premium/luxury tier (Veblen-leaning) or the standard tier."""
    if premium:
        return cls(
            own_price_mean=-0.3,
            own_price_sd=0.15,
            income_mean=2.2,
            income_sd=0.5,
            fuel_combustion_mean=-0.08,
            fuel_electrified_mean=0.06,
        )
    return cls()

ScenarioDistribution dataclass

Monte-Carlo distribution of projected demand for a scenario.

Source code in src/bmw_sales/simulation/uncertainty.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@dataclass
class ScenarioDistribution:
    """Monte-Carlo distribution of projected demand for a scenario."""

    base_volume: float
    samples: np.ndarray  # projected volumes, one per draw

    def percentile(self, q: float) -> float:
        return float(np.percentile(self.samples, q))

    @property
    def median(self) -> float:
        return self.percentile(50)

    @property
    def ci80(self) -> tuple[float, float]:
        return self.percentile(10), self.percentile(90)

    @property
    def ci95(self) -> tuple[float, float]:
        return self.percentile(2.5), self.percentile(97.5)

    def pct_change_ci(self) -> tuple[float, float]:
        """80% credible interval expressed as % change vs baseline."""
        lo, hi = self.ci80
        if self.base_volume == 0:
            return 0.0, 0.0
        return (lo / self.base_volume - 1) * 100, (hi / self.base_volume - 1) * 100

pct_change_ci()

80% credible interval expressed as % change vs baseline.

Source code in src/bmw_sales/simulation/uncertainty.py
74
75
76
77
78
79
def pct_change_ci(self) -> tuple[float, float]:
    """80% credible interval expressed as % change vs baseline."""
    lo, hi = self.ci80
    if self.base_volume == 0:
        return 0.0, 0.0
    return (lo / self.base_volume - 1) * 100, (hi / self.base_volume - 1) * 100

simulate_mc(scenario, priors=None, *, n_draws=5000)

Propagate elasticity uncertainty through the demand model via Monte Carlo.

Each draw samples the elasticities from their priors and recomputes projected demand; the collection of draws forms the predictive distribution.

Source code in src/bmw_sales/simulation/uncertainty.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def simulate_mc(
    scenario: ScenarioInput,
    priors: ElasticityPriors | None = None,
    *,
    n_draws: int = 5000,
) -> ScenarioDistribution:
    """Propagate elasticity uncertainty through the demand model via Monte Carlo.

    Each draw samples the elasticities from their priors and recomputes projected
    demand; the collection of draws forms the predictive distribution.
    """
    pr = priors or ElasticityPriors()
    rng = np.random.default_rng(get_settings().random_seed)
    electrified = scenario.fuel_type in ELECTRIFIED_FUELS

    dp = scenario.price_change_pct / 100.0
    dy = scenario.gdp_growth_pct / 100.0
    df = scenario.fuel_price_change_pct / 100.0
    dfx = scenario.fx_depreciation_pct / 100.0

    own_price = rng.normal(pr.own_price_mean, pr.own_price_sd, n_draws)
    income = rng.normal(pr.income_mean, pr.income_sd, n_draws)
    fuel_mean = pr.fuel_electrified_mean if electrified else pr.fuel_combustion_mean
    fuel_e = rng.normal(fuel_mean, pr.fuel_sd, n_draws)
    reg_sens = rng.normal(pr.regulation_mean, pr.regulation_sd, n_draws)

    price_mult = np.power(1.0 + dp, own_price)
    income_mult = np.power(1.0 + dy, income)
    fuel_mult = np.power(1.0 + df, fuel_e)
    fx_mult = np.power(1.0 + dfx, own_price)
    reg_dir = 1.0 if electrified else -1.0
    reg_mult = np.clip(
        1.0 + reg_dir * reg_sens * (scenario.regulation_change_pts / 10.0), 0.0, None
    )

    projected = scenario.base_volume * price_mult * income_mult * fuel_mult * fx_mult * reg_mult
    return ScenarioDistribution(base_volume=scenario.base_volume, samples=projected)

SQL analytics

bmw_sales.sql.analytics

Run the .sql files in sql/queries/ against the CSV with DuckDB.

No database server or ETL; the queries are plain SQL and this module just runs them.

list_queries()

Return the available query names (.sql file stems), sorted.

Source code in src/bmw_sales/sql/analytics.py
35
36
37
def list_queries() -> list[str]:
    """Return the available query names (``.sql`` file stems), sorted."""
    return sorted(p.stem for p in QUERIES_DIR.glob("*.sql"))

run_query(name, *, dataset_path=None)

Execute the named query and return the result as a DataFrame.

Parameters:

Name Type Description Default
name str

A query name from :func:list_queries (the .sql file stem).

required
dataset_path Path | None

Optional dataset override (for tests/fixtures).

None
Source code in src/bmw_sales/sql/analytics.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def run_query(name: str, *, dataset_path: Path | None = None) -> pd.DataFrame:
    """Execute the named query and return the result as a DataFrame.

    Parameters
    ----------
    name:
        A query name from :func:`list_queries` (the ``.sql`` file stem).
    dataset_path:
        Optional dataset override (for tests/fixtures).
    """
    sql_path = QUERIES_DIR / f"{name}.sql"
    if not sql_path.exists():
        raise FileNotFoundError(f"Unknown query '{name}'. Available: {list_queries()}")
    sql = sql_path.read_text(encoding="utf-8")
    con = _connect(dataset_path)
    try:
        return con.execute(sql).df()
    finally:
        con.close()

run_all(*, dataset_path=None)

Execute every query and return {name: DataFrame}.

Source code in src/bmw_sales/sql/analytics.py
61
62
63
def run_all(*, dataset_path: Path | None = None) -> dict[str, pd.DataFrame]:
    """Execute every query and return ``{name: DataFrame}``."""
    return {name: run_query(name, dataset_path=dataset_path) for name in list_queries()}