diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md
index 2fdca38..c7c53e5 100644
--- a/docs/impulse/docs/config/configuration.md
+++ b/docs/impulse/docs/config/configuration.md
@@ -62,6 +62,7 @@ Maps the silver-layer input tables.
| `container_tags_table` | `str` | No | Full Unity Catalog path. Container EAV tags. |
| `channel_tags_table` | `str` | No | Full Unity Catalog path. Channel EAV tags. |
| `channel_mapping_table` | `str` | No | Full Unity Catalog path. Logical-to-physical channel alias table. Required when using `QueryBuilder.channel_with_alias()` (currently supported by `KeyValueStoreSolver`). |
+| `unit_conversion_table` | `str` | No | Full Unity Catalog path. Per-unit-family conversion factors. When configured together with a `channel_mapping_table` whose rows carry `source_unit` / `target_unit` columns, aliased selectors auto-convert values from source to target unit during `solve()` (currently supported by `KeyValueStoreSolver`). |
Tag tables are required for solvers that consume tag-based filters
(`DeltaSolver` with tag filters, `KeyValueStoreSolver`).
@@ -172,8 +173,9 @@ Per-table sections (each a `TableConfig`):
| `container_metrics`| All solvers | Custom container_id column, custom timestamp columns |
| `channel_tags` | DeltaSolver | Tag key/value column renames |
| `channel_metrics` | All solvers | Custom channel_id column, custom value/timestamp columns |
-| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column |
+| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column; optional `join_keys` for non-default alias-resolution composite keys |
| `channels` | All solvers | RLE column renames (`tstart`/`tend`/`value`) |
+| `unit_conversion` | KeyValueStoreSolver | Unit-conversion table column renames (`unit`, `group_id`, `conversion_factor`) |
Internal column names that mappings can target:
@@ -187,6 +189,14 @@ Internal column names that mappings can target:
| `priority` | Tie-breaker column on the `channel_mapping` table |
| `project_id` | Project scoping column |
| `parent_id` | Parent/scope identifier |
+| `source_channel`| Source-channel identifier on the `channel_mapping` table |
+| `data_key` | Data-key identifier (default present on both `channel_mapping` and `channel_metrics`) |
+| `channel_alias` | Alias identifier on the `channel_mapping` table |
+| `channel_name` | Channel-name identifier on the `channel_metrics` table |
+| `source_unit`, `target_unit` | Source/target unit columns on the `channel_mapping` table |
+| `unit` | Unit name column on the `unit_conversion` table |
+| `group_id` | Unit-family identifier on the `unit_conversion` table |
+| `conversion_factor` | Per-unit factor on `unit_conversion`; also the per-channel factor name carried into the solve UDF |
:::note Per-solver feature support
@@ -235,6 +245,92 @@ However, only the parts each solver supports are actually consumed:
Sections you don't customize can be omitted; defaults are an empty mapping and no filters.
+### Unit conversion (optional)
+
+Set `source.unit_conversion_table` and extend `channel_mapping` with `source_unit` / `target_unit` columns
+to have aliased selectors auto-convert values from source to target unit during `solve()`. Direct selectors
+via `query.channel(...)` always return raw values, even on a channel that an aliased sibling converts —
+conversion is a property of the alias, not of the channel. See
+[`unit_conversion`](../data_model/silver_layer_schema.md#unit_conversion-optional) for the table schema.
+
+```python
+"source": {
+ "container_metrics_table": "my_catalog.silver.container_metrics",
+ "channel_metrics_table": "my_catalog.silver.channel_metrics",
+ "channels_uri": "my_catalog.silver.channels",
+ "channel_mapping_table": "my_catalog.silver.channel_mapping",
+ "unit_conversion_table": "my_catalog.silver.unit_conversion"
+},
+"query_engine": {
+ "solver": "KeyValueStoreSolver",
+ "solver_config": {
+ "unit_conversion": {
+ "column_name_mapping": {}
+ }
+ }
+}
+```
+
+### Alias-resolution join keys (optional)
+
+`KeyValueStoreSolver.filter_aliased_channel_metrics` joins `channel_mapping`
+to `channel_metrics` to resolve aliased selectors. The default composite key
+is `(source_channel, channel_name) + (data_key, data_key)`. Override
+`channel_mapping.join_keys` to change the arity or column choice — for
+example, a single-column join when `data_key` is not part of the channel
+identity in your silver layout:
+
+```python
+"solver_config": {
+ "channel_mapping": {
+ "join_keys": [
+ {"mapping_col": "source_channel", "metrics_col": "channel_name"}
+ ]
+ }
+}
+```
+
+Each `mapping_col` / `metrics_col` is an **internal** name (the name as the
+solver sees the column **after** `column_name_mapping` has been applied on
+the respective table). The two sides of a pair are independent, so the same
+column can carry different names on the two tables. For instance, a layout
+where the data-key column has different physical names on the two tables
+has two equivalent paths:
+
+```python
+# Path 1 — rename both physical columns to the same internal name; the
+# default join_keys then works unchanged.
+"solver_config": {
+ "channel_mapping": {
+ "column_name_mapping": {"mapping_data_key": "data_key"}
+ },
+ "channel_metrics": {
+ "column_name_mapping": {"metrics_data_key": "data_key"}
+ }
+}
+
+# Path 2 — leave the physical names as-is and reference them directly.
+"solver_config": {
+ "channel_mapping": {
+ "join_keys": [
+ {"mapping_col": "source_channel", "metrics_col": "channel_name"},
+ {"mapping_col": "mapping_data_key", "metrics_col": "metrics_data_key"}
+ ]
+ }
+}
+```
+
+`query.channel(...)` and `query.channel_with_alias(...)` kwargs are column
+references against the **post-`column_name_mapping`** schema. If you
+override `join_keys` (or skip renames) so that the solver sees a column
+under a non-default name, the same name must be used as the kwarg. Example:
+if `join_keys` references `metrics_col: "my_chan_name"` and the column is
+not renamed via `column_name_mapping`, call
+`query.channel(my_chan_name=...)`. The internal-name properties on
+`SolverConfig` exist primarily to remove magic strings from the solver
+code; the user-facing contract is "kwarg name == column name as the solver
+sees it".
+
### When to use what
- **`solver_config.
.column_name_mapping`** — your silver-layer column is named differently from
diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md
index 32d859d..22fe298 100644
--- a/docs/impulse/docs/data_model/silver_layer_schema.md
+++ b/docs/impulse/docs/data_model/silver_layer_schema.md
@@ -260,7 +260,32 @@ channel name to one or more physical channels keyed by `project_id` /
| `channel_name` | `string` | No | Logical channel name to match against `channel_with_alias` selectors. |
| `data_key` | `string` | No | Physical lookup key joined to `channel_metrics`. |
| `priority` | `int` | Yes | Tie-breaker when multiple physical channels match a logical name. |
+| `source_unit` | `string` | Yes | Unit of the raw channel data. When non-null together with `target_unit` and a configured `unit_conversion_table`, the solver converts values from source to target unit on aliased reads. |
+| `target_unit` | `string` | Yes | Target unit for aliased reads of this mapping. |
Configured via `source.channel_mapping_table` (see
[Configuration](../config/configuration.md)). Joins to `channel_metrics`
on `(project_id, data_key, channel_name)`.
+
+---
+
+## unit_conversion (optional)
+
+Per-unit-family conversion factors. Read by `KeyValueStoreSolver` at
+solve time when `source.unit_conversion_table` is configured and the
+`channel_mapping` table carries `source_unit` / `target_unit` columns.
+
+| Column | Type | Nullable | Description |
+|---------------------|----------|----------|------------------------------------------------------------------------------------------------------------|
+| `group_id` | `string` | No | Unit family identifier (e.g. `speed`, `rotation`). Only units within the same family can convert into each other. |
+| `unit` | `string` | No | Unit name. Matches the `source_unit` / `target_unit` values on `channel_mapping`. |
+| `conversion_factor` | `double` | No | Multiplier that converts a value in this unit to the family's base unit. The base unit has factor `1.0`. |
+
+For each aliased channel the solver looks up `source_factor` (the row
+whose `unit` matches `source_unit`) and `target_factor` (the row whose
+`unit` matches `target_unit`, constrained to the same `group_id`) and
+multiplies values by `source_factor / target_factor`. Missing rows or a
+`group_id` mismatch yield a null factor and no conversion.
+
+Configured via `source.unit_conversion_table` (see
+[Configuration](../config/configuration.md)).
diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md
index be9e711..34afe34 100644
--- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md
+++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md
@@ -171,6 +171,13 @@ def resolve_channel_selections(spark, channel_metrics_df,
Union direct and aliased channel metrics, combining selector_ids.
+When the aliased side carries ``source_unit`` / ``target_unit``
+columns (added by :meth:`filter_aliased_channel_metrics` when a
+unit conversion table is configured), those columns are preserved
+through the union and aggregation. Direct selectors produce null
+unit columns, which causes the downstream conversion-factor join
+in :meth:`solve` to leave their values unchanged.
+
**Arguments**:
- `spark` (`SparkSession`): Spark session used for query execution.
@@ -179,7 +186,9 @@ Union direct and aliased channel metrics, combining selector_ids.
**Returns**:
-`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)``.
+`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)``
+(plus ``source_unit`` / ``target_unit`` when present on the
+aliased side).
#### solve
@@ -189,6 +198,13 @@ def solve(query, channels_df, selections, dtypes) -> DataFrame
Solve the query by grouping channels and applying selections.
+When a ``unit_conversion_table`` is configured on the database and
+*channels_df* carries ``source_unit`` / ``target_unit`` columns
+(added upstream by :meth:`filter_aliased_channel_metrics`),
+per-channel conversion factors are computed and propagated into
+the grouped-map UDF so that time-series values are converted from
+the source to the target unit on the fly.
+
**Arguments**:
- `query` (`QueryBuilder`): Query object containing database and filter information.
diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md
index 7b0a8fa..e8369c0 100644
--- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md
+++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md
@@ -36,6 +36,47 @@ names used by the solver. An empty dict means no renaming
Keys are internal column names; values are the literal values
to match.
+## JoinKey
+
+```python
+class JoinKey(BaseModel)
+```
+
+A single column pair in the ``channel_mapping`` → ``channel_metrics`` join.
+
+Used by :class:`ChannelMappingConfig.join_keys` to override the default
+alias-resolution composite key.
+
+Both fields reference column names **after** ``column_name_mapping`` has
+been applied on the respective table; the two sides are independent, so
+a column may appear under different names on the two tables.
+
+**Arguments**:
+
+- `mapping_col` (`str`): Column name on ``channel_mapping`` after its ``column_name_mapping``
+has been applied.
+- `metrics_col` (`str`): Column name on ``channel_metrics`` after its ``column_name_mapping``
+has been applied.
+
+## ChannelMappingConfig
+
+```python
+class ChannelMappingConfig(TableConfig)
+```
+
+``TableConfig`` plus an optional alias-resolution join-key spec.
+
+**Arguments**:
+
+- `join_keys` (`list[JoinKey] or None`): Custom composite key for the ``channel_mapping`` → ``channel_metrics``
+join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``.
+When ``None`` (the default), the solver uses the backward-compatible
+pair ``[(source_channel, channel_name), (data_key, data_key)]``
+sourced from :class:`SolverConfig` internal-name properties.
+Provide a custom list to change the join arity or column choice
+(e.g. a single-column join when ``data_key`` is not part of the
+channel identity in your silver layout).
+
## SolverConfig
```python
@@ -58,8 +99,10 @@ so that solver code can always reference the same constants.
- `container_metrics` (`TableConfig`): Column mappings and filters for the container metrics table.
- `channel_tags` (`TableConfig`): Column mappings and filters for the channel tags table.
- `channel_metrics` (`TableConfig`): Column mappings and filters for the channel metrics table.
-- `channel_mapping` (`TableConfig`): Column mappings and filters for the channel mapping (alias) table.
+- `channel_mapping` (`ChannelMappingConfig`): Column mappings, filters, and the alias-resolution ``join_keys``
+override for the channel mapping (alias) table.
- `channels` (`TableConfig`): Column mappings and filters for the channel data table.
+- `unit_conversion` (`TableConfig`): Column mappings and filters for the unit conversion table.
#### from\_json
@@ -176,6 +219,50 @@ def alias_priority_col() -> str
Internal column name for the alias priority on the channel_mapping table.
+#### source\_channel\_col
+
+```python
+def source_channel_col() -> str
+```
+
+Internal column name for the source-channel identifier on the channel_mapping table.
+
+
+#### data\_key\_col
+
+```python
+def data_key_col() -> str
+```
+
+Internal column name for the data-key identifier.
+
+Default present on both ``channel_mapping`` and ``channel_metrics``;
+used by the default :meth:`effective_alias_join_keys` for both sides.
+Layouts where the two tables carry the data-key column under different
+physical names can either rename both to ``"data_key"`` via per-table
+``column_name_mapping`` or override
+
+
+#### channel\_alias\_col
+
+```python
+def channel_alias_col() -> str
+```
+
+Internal column name for the alias identifier on the channel_mapping table.
+
+Referenced by the dedup window in
+
+
+#### channel\_name\_col
+
+```python
+def channel_name_col() -> str
+```
+
+Internal column name for the channel-name identifier on the channel_metrics table.
+
+
#### project\_id\_col
```python
@@ -194,6 +281,72 @@ def parent_id_col() -> str
Internal column name for the parent/scope identifier.
+#### conversion\_factor\_col
+
+```python
+def conversion_factor_col() -> str
+```
+
+Internal column name for the conversion factor on the unit_conversion table.
+
+Also used as the column that carries the per-channel combined factor
+downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors`
+into the grouped-map UDF.
+
+
+#### source\_unit\_col
+
+```python
+def source_unit_col() -> str
+```
+
+Internal column name for the source unit on the channel_mapping table.
+
+
+#### target\_unit\_col
+
+```python
+def target_unit_col() -> str
+```
+
+Internal column name for the target unit on the channel_mapping table.
+
+
+#### unit\_col
+
+```python
+def unit_col() -> str
+```
+
+Internal column name for the unit name on the unit_conversion table.
+
+
+#### group\_id\_col
+
+```python
+def group_id_col() -> str
+```
+
+Internal column name for the unit group id on the unit_conversion table.
+
+
+#### effective\_alias\_join\_keys
+
+```python
+def effective_alias_join_keys() -> list[tuple[str, str]]
+```
+
+Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples.
+
+Falls back to the default composite key
+``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]``
+when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise
+returns the configured list.
+
+Both members of each tuple are column names **after**
+``column_name_mapping`` has been applied on the respective table.
+
+
#### col\_map
```python
diff --git a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md
index 2eccf27..c12be71 100644
--- a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md
+++ b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md
@@ -146,6 +146,10 @@ configured) regardless of whether ``container_tags_table`` is set.
- `channels_uri` (`str`): Full Unity Catalog path to the channels data table.
- `channel_mapping_table` (`str`): Full Unity Catalog path to the channel mapping table. Required when using
``channel_with_alias()`` for logical alias resolution.
+- `unit_conversion_table` (`str`): Full Unity Catalog path to the unit conversion table. When set together
+with a ``channel_mapping_table`` whose rows carry ``source_unit`` and
+``target_unit`` columns, the query engine converts time-series values
+from the source to the target unit during ``solve()``.
## UnitySink
diff --git a/docs/impulse/docs/references/query_engine.md b/docs/impulse/docs/references/query_engine.md
index 150e928..fa07dfb 100644
--- a/docs/impulse/docs/references/query_engine.md
+++ b/docs/impulse/docs/references/query_engine.md
@@ -47,6 +47,7 @@ attributes are already wide on `container_metrics` itself.
| `container_tags` | required (narrow EAV) | optional (narrow EAV) |
| `channel_tags` | required (narrow EAV) | not used |
| `channel_mapping` | not used | optional (channel aliases) |
+| `unit_conversion` | not used | optional (per-alias unit conversion) |
See the [Silver Layer Schema](../data_model/silver_layer_schema.md) for
the columns each table is expected to carry.
diff --git a/docs/impulse/docs/references/tsal.md b/docs/impulse/docs/references/tsal.md
index d94d654..9542df7 100644
--- a/docs/impulse/docs/references/tsal.md
+++ b/docs/impulse/docs/references/tsal.md
@@ -51,6 +51,12 @@ Each keyword argument becomes a tag filter on the `channel_mapping` table; the s
to the physical channels at read time. Use this when the consuming code should not need to know which physical signal
backs a given logical name.
+When the `channel_mapping` table carries `source_unit` and `target_unit` columns and the report config sets
+`source.unit_conversion_table`, values returned from `channel_with_alias()` are automatically converted from source
+to target unit before any expression is evaluated. Constants and parameters in expressions over an aliased selector
+must therefore be expressed in the target unit. Direct selectors via `channel(...)` on the same physical channel are
+unaffected — conversion is a property of the alias, not of the channel.
+
---
## Operators
diff --git a/src/impulse_query_engine/analyze/metadata/time_series_expression.py b/src/impulse_query_engine/analyze/metadata/time_series_expression.py
index fbdacd2..7df22c1 100644
--- a/src/impulse_query_engine/analyze/metadata/time_series_expression.py
+++ b/src/impulse_query_engine/analyze/metadata/time_series_expression.py
@@ -598,7 +598,7 @@ def build(self, cache: SeriesCache) -> SampleSeries:
# TODO: select candidate
mid = candidates.container_id.iloc[0]
cid = candidates.channel_id.iloc[0]
- return cache.load_blob(mid, cid)
+ return cache.load_blob(mid, cid, uses_alias=self.uses_alias)
def get_required_tag_exprs(self) -> set[TagExpression]:
"""
diff --git a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py
index 089190d..6e5bced 100644
--- a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py
+++ b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py
@@ -49,7 +49,7 @@ def resolve(self, selection):
idx = selection._expr.build_pandas(self.df)
return self.df[idx]
- def load_blob(self, container_id, channel_id):
+ def load_blob(self, container_id, channel_id, uses_alias: bool = False):
"""
Load a time series blob from disk.
@@ -59,6 +59,9 @@ def load_blob(self, container_id, channel_id):
Container ID.
channel_id : Any
Channel ID.
+ uses_alias : bool, optional
+ Unused by this cache (no unit conversion); accepted for
+ interface compatibility with :class:`SeriesCache`.
Returns
-------
diff --git a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py
index cddf9df..30b545a 100644
--- a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py
+++ b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py
@@ -61,7 +61,7 @@ def resolve(self, selection):
idx = selection._expr.build_pandas(self.mdf)
return self.mdf[idx]
- def load_blob(self, mid, cid):
+ def load_blob(self, mid, cid, uses_alias: bool = False):
"""
Load a time series blob from the DataFrame.
@@ -71,6 +71,9 @@ def load_blob(self, mid, cid):
Container or measurement ID.
cid : Any
Channel ID.
+ uses_alias : bool, optional
+ Unused by this cache (no unit conversion); accepted for
+ interface compatibility with :class:`SeriesCache`.
Returns
-------
diff --git a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py
index 84254bf..32ca0d1 100644
--- a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py
+++ b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py
@@ -25,7 +25,7 @@ def resolve(self, selection):
"""
return []
- def load_blob(self, mid, cid):
+ def load_blob(self, mid, cid, uses_alias: bool = False):
"""
Return an empty SampleSeries for any container and channel ID.
@@ -35,6 +35,9 @@ def load_blob(self, mid, cid):
Container or measurement ID.
cid : Any
Channel ID.
+ uses_alias : bool, optional
+ Unused by this cache; accepted for interface compatibility
+ with :class:`SeriesCache`.
Returns
-------
diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py
index 2a1e518..b8d9898 100644
--- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py
+++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py
@@ -30,16 +30,22 @@ def __init__(self, pdf, col_map: dict[str, str]):
Parameters
----------
pdf : pd.DataFrame
- DataFrame containing time series data.
+ DataFrame containing time series data. When the column named by
+ ``col_map["conv"]`` is present, :meth:`load_blob` multiplies the
+ loaded values by that per-channel factor. All rows of a given
+ ``(cid, ch)`` slice are expected to share the same factor.
col_map : dict[str, str]
Mapping with keys ``"cid"``, ``"ch"``, ``"ts"``, ``"te"``,
- ``"val"`` to the actual column names in *pdf*.
+ ``"val"``, ``"conv"`` to the actual column names in *pdf*. The
+ ``"conv"`` column is optional in *pdf*.
"""
self._cid_col = col_map["cid"]
self._ch_col = col_map["ch"]
self._ts_col = col_map["ts"]
self._te_col = col_map["te"]
self._val_col = col_map["val"]
+ self._conv_col = col_map.get("conv")
+ self._has_conversion = self._conv_col is not None and self._conv_col in pdf.columns
meta = pdf.drop(columns=[self._ts_col, self._te_col, self._val_col])
self.mdf = meta.drop_duplicates(subset=[self._cid_col, self._ch_col]).reset_index()
@@ -67,16 +73,26 @@ def resolve(self, selection):
idx = selection._expr.build_pandas(self.mdf)
return self.mdf[idx]
- def load_blob(self, mid, cid):
+ def load_blob(self, mid, cid, uses_alias: bool = False):
"""
Load a time series blob from the DataFrame.
+ When the underlying *pdf* carries a conversion-factor column (the
+ column named by ``col_map["conv"]``) **and** the caller is an
+ aliased selector (``uses_alias=True``), the returned values are
+ multiplied by that factor. Direct selectors on the same physical
+ channel always receive raw values — unit conversion is a property
+ of the alias, not of the channel.
+
Parameters
----------
mid : Any
Container or measurement ID.
cid : Any
Channel ID.
+ uses_alias : bool, optional
+ ``True`` when the calling selector resolved via channel_mapping.
+ Gates the per-channel conversion factor; defaults to ``False``.
Returns
-------
@@ -84,7 +100,12 @@ def load_blob(self, mid, cid):
The loaded sample series object.
"""
s = self.pdf[(self.pdf[self._cid_col] == mid) & (self.pdf[self._ch_col] == cid)]
- return SampleSeries(s[self._ts_col], s[self._te_col], s[self._val_col])
+ values = s[self._val_col]
+ if self._has_conversion and len(s) > 0 and uses_alias:
+ factor = s[self._conv_col].iloc[0]
+ if pd.notna(factor):
+ values = values * factor
+ return SampleSeries(s[self._ts_col], s[self._te_col], values)
class KeyValueStoreSolver(QuerySolver):
@@ -382,28 +403,46 @@ def filter_aliased_channel_metrics(
resolved_mapping = channel_mapping.where(self._build_expr(selectors))
- channel_metrics = db.channel_metrics(spark).join(
+ channel_metrics = db.channel_metrics(spark)
+ channel_metrics = self._apply_column_mapping(
+ channel_metrics, self.config.channel_metrics.column_name_mapping
+ )
+ channel_metrics = channel_metrics.join(
F.broadcast(container_df.select(container_id_col)),
on=[container_id_col],
how="inner",
)
alias_priority_col = self.config.alias_priority_col
+ channel_alias_col = self.config.channel_alias_col
+ join_keys = self.config.effective_alias_join_keys
+
+ source_unit_col = self.config.source_unit_col
+ target_unit_col = self.config.target_unit_col
+ has_unit_cols = (
+ db.config.unit_conversion_table is not None
+ and source_unit_col in resolved_mapping.columns
+ and target_unit_col in resolved_mapping.columns
+ )
+
+ # Mapping-side projection: one aliased copy per mapping_col plus the
+ # alias / priority columns (and the optional unit columns).
+ mapping_select_cols = [
+ F.col(mapping_col).alias(f"_map_{mapping_col}") for mapping_col, _ in join_keys
+ ]
+ mapping_select_cols.extend([F.col(channel_alias_col), F.col(alias_priority_col)])
+ if has_unit_cols:
+ mapping_select_cols.extend([F.col(source_unit_col), F.col(target_unit_col)])
resolved = channel_metrics.join(
- resolved_mapping.select(
- F.col("source_channel").alias("_map_source_channel"),
- F.col("data_key").alias("_map_data_key"),
- F.col("channel_alias"),
- F.col(alias_priority_col),
- ),
+ resolved_mapping.select(*mapping_select_cols),
on=[
- channel_metrics["channel_name"] == F.col("_map_source_channel"),
- channel_metrics["data_key"] == F.col("_map_data_key"),
+ channel_metrics[metrics_col] == F.col(f"_map_{mapping_col}")
+ for mapping_col, metrics_col in join_keys
],
how="inner",
)
- dedup_window = Window.partitionBy(container_id_col, "channel_alias").orderBy(
+ dedup_window = Window.partitionBy(container_id_col, channel_alias_col).orderBy(
F.col(alias_priority_col).asc_nulls_last()
)
resolved = resolved.withColumn("_rank", F.row_number().over(dedup_window))
@@ -412,7 +451,10 @@ def filter_aliased_channel_metrics(
resolved = resolved.withColumn(
"selector_ids", F.array(self._build_selector_id_expr(selectors))
)
- return resolved.select(container_id_col, channel_id_col, "selector_ids")
+ out_cols = [container_id_col, channel_id_col, "selector_ids"]
+ if has_unit_cols:
+ out_cols.extend([source_unit_col, target_unit_col])
+ return resolved.select(*out_cols)
def resolve_channel_selections(
self, spark, channel_metrics_df, aliased_channel_metrics_df
@@ -420,6 +462,13 @@ def resolve_channel_selections(
"""
Union direct and aliased channel metrics, combining selector_ids.
+ When the aliased side carries ``source_unit`` / ``target_unit``
+ columns (added by :meth:`filter_aliased_channel_metrics` when a
+ unit conversion table is configured), those columns are preserved
+ through the union and aggregation. Direct selectors produce null
+ unit columns, which causes the downstream conversion-factor join
+ in :meth:`solve` to leave their values unchanged.
+
Parameters
----------
spark : SparkSession
@@ -432,13 +481,114 @@ def resolve_channel_selections(
Returns
-------
pyspark.sql.DataFrame
- Merged DataFrame with ``(container_id, channel_id, selector_ids)``.
+ Merged DataFrame with ``(container_id, channel_id, selector_ids)``
+ (plus ``source_unit`` / ``target_unit`` when present on the
+ aliased side).
"""
- merged = channel_metrics_df.unionByName(aliased_channel_metrics_df)
+ source_unit_col = self.config.source_unit_col
+ target_unit_col = self.config.target_unit_col
+ has_unit_cols = (
+ source_unit_col in aliased_channel_metrics_df.columns
+ and target_unit_col in aliased_channel_metrics_df.columns
+ )
+
+ merged = channel_metrics_df.unionByName(
+ aliased_channel_metrics_df, allowMissingColumns=has_unit_cols
+ )
+
+ agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")]
+ if has_unit_cols:
+ agg_exprs.append(F.first(source_unit_col, ignorenulls=True).alias(source_unit_col))
+ agg_exprs.append(F.first(target_unit_col, ignorenulls=True).alias(target_unit_col))
+
return merged.groupBy(
self.config.container_id_col,
self.config.channel_id_col,
- ).agg(F.flatten(F.collect_list("selector_ids")).alias("selector_ids"))
+ ).agg(*agg_exprs)
+
+ # ------------------------------------------------------------------
+ # Unit conversion
+ # ------------------------------------------------------------------
+
+ def _compute_conversion_factors(self, spark, query, channels_df: DataFrame) -> DataFrame:
+ """
+ Join *channels_df* with the unit conversion table to compute a
+ per-channel combined conversion factor.
+
+ The unit conversion table associates each unit with a base-unit
+ scaling factor inside a unit family (``group_id``). For a row with
+ ``source_unit = S``, ``target_unit = T`` belonging to family ``G``:
+
+ - ``_src_factor`` converts a value in ``S`` to the base unit of ``G``.
+ - ``_tgt_factor`` converts a value in ``T`` to the base unit of ``G``.
+ - The combined factor that converts ``S`` to ``T`` is
+ ``_src_factor / _tgt_factor``.
+
+ Rows whose source or target unit is missing on the table — or whose
+ source/target units belong to different families — receive a null
+ factor. Null factors are treated as "no conversion" by the cache.
+
+ Parameters
+ ----------
+ spark : SparkSession
+ Active Spark session.
+ query : QueryBuilder
+ Query object carrying the configured ``db``.
+ channels_df : pyspark.sql.DataFrame
+ DataFrame that already carries ``source_unit`` / ``target_unit``
+ columns (added by :meth:`filter_aliased_channel_metrics`).
+
+ Returns
+ -------
+ pyspark.sql.DataFrame
+ *channels_df* augmented with a ``conversion_factor`` column.
+ """
+ uc_table = query.db.unit_conversion(spark)
+ uc_table = self._apply_column_mapping(
+ uc_table, self.config.unit_conversion.column_name_mapping
+ )
+
+ unit_col = self.config.unit_col
+ group_id_col = self.config.group_id_col
+ factor_col = self.config.conversion_factor_col
+ source_unit_col = self.config.source_unit_col
+ target_unit_col = self.config.target_unit_col
+
+ # Source-side join: fetch _src_factor and _src_group_id.
+ channels_df = channels_df.join(
+ F.broadcast(
+ uc_table.select(
+ F.col(unit_col).alias("_src_unit"),
+ F.col(factor_col).alias("_src_factor"),
+ F.col(group_id_col).alias("_src_group_id"),
+ )
+ ),
+ on=[channels_df[source_unit_col] == F.col("_src_unit")],
+ how="left",
+ ).drop("_src_unit")
+
+ # Target-side join: must belong to the same unit family.
+ channels_df = channels_df.join(
+ F.broadcast(
+ uc_table.select(
+ F.col(unit_col).alias("_tgt_unit"),
+ F.col(factor_col).alias("_tgt_factor"),
+ F.col(group_id_col).alias("_tgt_group_id"),
+ )
+ ),
+ on=[
+ channels_df[target_unit_col] == F.col("_tgt_unit"),
+ F.col("_src_group_id") == F.col("_tgt_group_id"),
+ ],
+ how="left",
+ ).drop("_tgt_unit", "_tgt_group_id")
+
+ channels_df = channels_df.withColumn(
+ factor_col,
+ F.col("_src_factor") / F.col("_tgt_factor"),
+ ).drop("_src_factor", "_src_group_id", "_tgt_factor")
+
+ return channels_df
# ------------------------------------------------------------------
# Solve
@@ -478,6 +628,13 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame:
"""
Solve the query by grouping channels and applying selections.
+ When a ``unit_conversion_table`` is configured on the database and
+ *channels_df* carries ``source_unit`` / ``target_unit`` columns
+ (added upstream by :meth:`filter_aliased_channel_metrics`),
+ per-channel conversion factors are computed and propagated into
+ the grouped-map UDF so that time-series values are converted from
+ the source to the target unit on the fly.
+
Parameters
----------
query : QueryBuilder
@@ -495,6 +652,20 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame:
DataFrame containing results for each container.
"""
col_map = self.config.col_map
+ source_unit_col = self.config.source_unit_col
+ target_unit_col = self.config.target_unit_col
+
+ has_conversion_table = getattr(query.db.config, "unit_conversion_table", None) is not None
+ has_unit_cols = (
+ source_unit_col in channels_df.columns and target_unit_col in channels_df.columns
+ )
+
+ if has_conversion_table and has_unit_cols:
+ channels_df = self._compute_conversion_factors(self.spark, query, channels_df)
+
+ for col_name in (source_unit_col, target_unit_col):
+ if col_name in channels_df.columns:
+ channels_df = channels_df.drop(col_name)
q = query.db.channels(self.spark)
q = self._apply_column_mapping(q, self.config.channels.column_name_mapping)
diff --git a/src/impulse_query_engine/analyze/query/solvers/series_cache.py b/src/impulse_query_engine/analyze/query/solvers/series_cache.py
index b2f7e2f..7ad955f 100644
--- a/src/impulse_query_engine/analyze/query/solvers/series_cache.py
+++ b/src/impulse_query_engine/analyze/query/solvers/series_cache.py
@@ -24,7 +24,7 @@ def resolve(self, selection) -> pd.DataFrame:
pass
@abstractmethod
- def load_blob(self, mid, cid) -> SampleSeries:
+ def load_blob(self, mid, cid, uses_alias: bool = False) -> SampleSeries:
"""
Resolve given mid and cid to a series.
@@ -34,6 +34,13 @@ def load_blob(self, mid, cid) -> SampleSeries:
Container or measurement ID.
cid : Any
Channel ID.
+ uses_alias : bool, optional
+ ``True`` when the calling selector resolves the channel via a
+ ``channel_mapping`` alias. Caches that perform unit conversion
+ (e.g. :class:`KVSTimeSeriesCache`) only apply the per-channel
+ conversion factor when this is ``True``, so a direct selector
+ on the same physical channel always returns raw values.
+ Defaults to ``False`` (direct / no-conversion semantics).
Returns
-------
diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py
index 5536b3f..a743e4c 100644
--- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py
+++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py
@@ -38,6 +38,49 @@ class TableConfig(BaseModel):
filters: dict[str, str] = {}
+class JoinKey(BaseModel):
+ """A single column pair in the ``channel_mapping`` → ``channel_metrics`` join.
+
+ Used by :class:`ChannelMappingConfig.join_keys` to override the default
+ alias-resolution composite key.
+
+ Both fields reference column names **after** ``column_name_mapping`` has
+ been applied on the respective table; the two sides are independent, so
+ a column may appear under different names on the two tables.
+
+ Attributes
+ ----------
+ mapping_col : str
+ Column name on ``channel_mapping`` after its ``column_name_mapping``
+ has been applied.
+ metrics_col : str
+ Column name on ``channel_metrics`` after its ``column_name_mapping``
+ has been applied.
+ """
+
+ mapping_col: str
+ metrics_col: str
+
+
+class ChannelMappingConfig(TableConfig):
+ """``TableConfig`` plus an optional alias-resolution join-key spec.
+
+ Attributes
+ ----------
+ join_keys : list[JoinKey] or None
+ Custom composite key for the ``channel_mapping`` → ``channel_metrics``
+ join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``.
+ When ``None`` (the default), the solver uses the backward-compatible
+ pair ``[(source_channel, channel_name), (data_key, data_key)]``
+ sourced from :class:`SolverConfig` internal-name properties.
+ Provide a custom list to change the join arity or column choice
+ (e.g. a single-column join when ``data_key`` is not part of the
+ channel identity in your silver layout).
+ """
+
+ join_keys: list[JoinKey] | None = None
+
+
class SolverConfig(BaseModel):
"""Per-table configuration for solver column name mappings and filters.
@@ -60,10 +103,13 @@ class SolverConfig(BaseModel):
Column mappings and filters for the channel tags table.
channel_metrics : TableConfig
Column mappings and filters for the channel metrics table.
- channel_mapping : TableConfig
- Column mappings and filters for the channel mapping (alias) table.
+ channel_mapping : ChannelMappingConfig
+ Column mappings, filters, and the alias-resolution ``join_keys``
+ override for the channel mapping (alias) table.
channels : TableConfig
Column mappings and filters for the channel data table.
+ unit_conversion : TableConfig
+ Column mappings and filters for the unit conversion table.
"""
project_id: str | None = None
@@ -72,8 +118,9 @@ class SolverConfig(BaseModel):
container_metrics: TableConfig = TableConfig()
channel_tags: TableConfig = TableConfig()
channel_metrics: TableConfig = TableConfig()
- channel_mapping: TableConfig = TableConfig()
+ channel_mapping: ChannelMappingConfig = ChannelMappingConfig()
channels: TableConfig = TableConfig()
+ unit_conversion: TableConfig = TableConfig()
# ------------------------------------------------------------------
# Class methods
@@ -166,6 +213,44 @@ def alias_priority_col(self) -> str:
"""Internal column name for the alias priority on the channel_mapping table."""
return "priority"
+ @property
+ def source_channel_col(self) -> str:
+ """Internal column name for the source-channel identifier on the channel_mapping table."""
+ return "source_channel"
+
+ @property
+ def data_key_col(self) -> str:
+ """Internal column name for the data-key identifier.
+
+ Default present on both ``channel_mapping`` and ``channel_metrics``;
+ used by the default :meth:`effective_alias_join_keys` for both sides.
+ Layouts where the two tables carry the data-key column under different
+ physical names can either rename both to ``"data_key"`` via per-table
+ ``column_name_mapping`` or override
+ :attr:`ChannelMappingConfig.join_keys` with explicit
+ ``mapping_col`` / ``metrics_col`` values.
+ """
+ return "data_key"
+
+ @property
+ def channel_alias_col(self) -> str:
+ """Internal column name for the alias identifier on the channel_mapping table.
+
+ Referenced by the dedup window in
+ :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` and is the
+ conventional kwarg name passed to
+ :meth:`QueryBuilder.channel_with_alias` (e.g.
+ ``channel_with_alias(channel_alias="vehicle_speed")``). The kwarg name
+ must match the column name as seen by the solver after
+ ``column_name_mapping`` is applied.
+ """
+ return "channel_alias"
+
+ @property
+ def channel_name_col(self) -> str:
+ """Internal column name for the channel-name identifier on the channel_metrics table."""
+ return "channel_name"
+
@property
def project_id_col(self) -> str:
"""Internal column name for the project identifier."""
@@ -176,6 +261,55 @@ def parent_id_col(self) -> str:
"""Internal column name for the parent/scope identifier."""
return "parent_id"
+ @property
+ def conversion_factor_col(self) -> str:
+ """Internal column name for the conversion factor on the unit_conversion table.
+
+ Also used as the column that carries the per-channel combined factor
+ downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors`
+ into the grouped-map UDF.
+ """
+ return "conversion_factor"
+
+ @property
+ def source_unit_col(self) -> str:
+ """Internal column name for the source unit on the channel_mapping table."""
+ return "source_unit"
+
+ @property
+ def target_unit_col(self) -> str:
+ """Internal column name for the target unit on the channel_mapping table."""
+ return "target_unit"
+
+ @property
+ def unit_col(self) -> str:
+ """Internal column name for the unit name on the unit_conversion table."""
+ return "unit"
+
+ @property
+ def group_id_col(self) -> str:
+ """Internal column name for the unit group id on the unit_conversion table."""
+ return "group_id"
+
+ @property
+ def effective_alias_join_keys(self) -> list[tuple[str, str]]:
+ """Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples.
+
+ Falls back to the default composite key
+ ``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]``
+ when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise
+ returns the configured list.
+
+ Both members of each tuple are column names **after**
+ ``column_name_mapping`` has been applied on the respective table.
+ """
+ if self.channel_mapping.join_keys is None:
+ return [
+ (self.source_channel_col, self.channel_name_col),
+ (self.data_key_col, self.data_key_col),
+ ]
+ return [(jk.mapping_col, jk.metrics_col) for jk in self.channel_mapping.join_keys]
+
@property
def col_map(self) -> dict[str, str]:
"""Short-key → internal-column-name mapping for UDFs and caches."""
@@ -185,4 +319,5 @@ def col_map(self) -> dict[str, str]:
"ts": self.tstart_col,
"te": self.tend_col,
"val": self.value_col,
+ "conv": self.conversion_factor_col,
}
diff --git a/src/impulse_query_engine/measurement_db.py b/src/impulse_query_engine/measurement_db.py
index 36a1c26..c0ba27c 100644
--- a/src/impulse_query_engine/measurement_db.py
+++ b/src/impulse_query_engine/measurement_db.py
@@ -15,6 +15,7 @@ def __init__(
channel_metrics_table=None,
channels_uri=None,
channel_mapping_table=None,
+ unit_conversion_table=None,
table_locations: str = "external_locations",
):
self.container_tags_table = container_tags_table
@@ -23,6 +24,7 @@ def __init__(
self.channel_metrics_table = channel_metrics_table
self.channels_uri = channels_uri
self.channel_mapping_table = channel_mapping_table
+ self.unit_conversion_table = unit_conversion_table
self.table_locations = table_locations
self.debug_tables = None
@@ -31,6 +33,7 @@ def for_unity_catalog(
catalog_name: str,
core_schema_name: str = "core",
channel_mapping_table: str | None = None,
+ unit_conversion_table: str | None = None,
):
return MeasurementDBConfig(
container_tags_table=f"{catalog_name}.{core_schema_name}.container_tags",
@@ -39,6 +42,7 @@ def for_unity_catalog(
channel_metrics_table=f"{catalog_name}.{core_schema_name}.channel_metrics",
channels_uri=f"{catalog_name}.{core_schema_name}.channels",
channel_mapping_table=channel_mapping_table,
+ unit_conversion_table=unit_conversion_table,
table_locations="unity_catalog",
)
@@ -57,6 +61,9 @@ def for_debug(debug_tables):
channel_mapping_table=(
"channel_mapping" if "channel_mapping" in debug_tables else None
),
+ unit_conversion_table=(
+ "unit_conversion" if "unit_conversion" in debug_tables else None
+ ),
table_locations="debug",
)
cfg.debug_tables = debug_tables
@@ -101,6 +108,11 @@ def channel_mapping(self, spark) -> DataFrame:
raise ValueError("channel_mapping_table is not configured")
return self._read_table(spark, self.config.channel_mapping_table)
+ def unit_conversion(self, spark) -> DataFrame:
+ if self.config.unit_conversion_table is None:
+ raise ValueError("unit_conversion_table is not configured")
+ return self._read_table(spark, self.config.unit_conversion_table)
+
def channel_uri(self):
return self.config.channels_uri
diff --git a/src/impulse_reporting/config/config_parser.py b/src/impulse_reporting/config/config_parser.py
index 198b445..4953448 100644
--- a/src/impulse_reporting/config/config_parser.py
+++ b/src/impulse_reporting/config/config_parser.py
@@ -204,6 +204,11 @@ class Source(BaseModel):
channel_mapping_table : str, optional
Full Unity Catalog path to the channel mapping table. Required when using
``channel_with_alias()`` for logical alias resolution.
+ unit_conversion_table : str, optional
+ Full Unity Catalog path to the unit conversion table. When set together
+ with a ``channel_mapping_table`` whose rows carry ``source_unit`` and
+ ``target_unit`` columns, the query engine converts time-series values
+ from the source to the target unit during ``solve()``.
Notes
-----
@@ -217,6 +222,7 @@ class Source(BaseModel):
channel_metrics_table: Annotated[str, AfterValidator(is_valid_table_name)]
channels_uri: Annotated[str, AfterValidator(is_valid_table_name)]
channel_mapping_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None
+ unit_conversion_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None
class UnitySink(BaseModel):
diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py
index ab09ee4..7b73278 100644
--- a/src/impulse_reporting/core/report.py
+++ b/src/impulse_reporting/core/report.py
@@ -859,6 +859,13 @@ def determine_report(self, is_incremental: bool = None):
# Validate that every aggregation references a registered event
self._validate_aggregation_events()
+ # TODO: port unit-consistency sanity check from MDA Framework
+ # (`mda_reporting/util/unit_sanity_check.py`). When a
+ # `unit_conversion_table` is configured, walk all aggregation /
+ # event expressions and emit a UserWarning for each aliased
+ # selector whose source_unit differs from target_unit so the
+ # caller knows to express formula constants in target units.
+
# Clean up temp tables from previous runs
self._cleanup_temp_tables()
diff --git a/tests/conftest.py b/tests/conftest.py
index d29fa74..e91a383 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -334,3 +334,69 @@ def key_value_store_alias_db(
cfg = MeasurementDBConfig.for_debug(tables)
cfg.channel_mapping_table = "channel_mapping"
return MeasurementDB(cfg, ws=mock_workspace_client)
+
+
+@pytest.fixture(scope="session")
+def unit_conversion_dataframes(spark):
+ """Load unit-conversion test CSVs into cached in-memory DataFrames.
+
+ Hands DataFrames directly to MeasurementDB (via ``for_debug``) instead of
+ persisting them through Delta — the alias-style write-then-read fixture
+ occasionally hit Delta ``ProtocolChangedException`` during macOS test
+ runs. Caching the DataFrames once per session keeps the data stable.
+ """
+ base_path = os.path.dirname(os.path.abspath(__file__))
+ base_path = base_path[: base_path.find("tests")]
+
+ container_tags_path = f"{base_path}/tests/unit/data/key_value_store_csv/container_metrics.csv"
+ container_metric_path = f"{base_path}/tests/unit/data/basic_narrow_csv/container_metrics.csv"
+ channel_metric_path = (
+ f"{base_path}/tests/unit/data/key_value_store_alias_csv/channel_metrics.csv"
+ )
+ channels_path = f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv"
+ channel_mapping_path = (
+ f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv"
+ )
+ unit_conversion_path = (
+ f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv"
+ )
+
+ options = {"header": "True", "delimiter": ",", "inferSchema": "True"}
+
+ def _load(path):
+ df = spark.read.options(**options).csv(path).cache()
+ df.count()
+ return df
+
+ return {
+ "container_tags": _load(container_tags_path),
+ "container_metrics": _load(container_metric_path),
+ "channel_metrics": _load(channel_metric_path),
+ "channels": _load(channels_path),
+ "channel_mapping": _load(channel_mapping_path),
+ "unit_conversion": _load(unit_conversion_path),
+ }
+
+
+@pytest.fixture
+def key_value_store_unit_conversion_db(
+ unit_conversion_dataframes, mock_workspace_client
+) -> MeasurementDB:
+ """Return a key-value-store MeasurementDB with unit conversion configured."""
+ cfg = MeasurementDBConfig.for_debug(unit_conversion_dataframes)
+ cfg.channel_mapping_table = "channel_mapping"
+ cfg.unit_conversion_table = "unit_conversion"
+ return MeasurementDB(cfg, ws=mock_workspace_client)
+
+
+@pytest.fixture
+def key_value_store_unit_conversion_db_no_table(
+ unit_conversion_dataframes, mock_workspace_client
+) -> MeasurementDB:
+ """Same data as ``key_value_store_unit_conversion_db`` but with
+ ``unit_conversion_table=None`` to test the opt-out path."""
+ tables = {k: v for k, v in unit_conversion_dataframes.items() if k != "unit_conversion"}
+ cfg = MeasurementDBConfig.for_debug(tables)
+ cfg.channel_mapping_table = "channel_mapping"
+ # Explicitly leave unit_conversion_table = None
+ return MeasurementDB(cfg, ws=mock_workspace_client)
diff --git a/tests/impulse_query_engine/integration/kvs_solver_test.py b/tests/impulse_query_engine/integration/kvs_solver_test.py
index 6fca0d2..c79825e 100644
--- a/tests/impulse_query_engine/integration/kvs_solver_test.py
+++ b/tests/impulse_query_engine/integration/kvs_solver_test.py
@@ -19,6 +19,7 @@
KeyValueStoreSolver,
)
from impulse_query_engine.analyze.query.solvers.solver_config import (
+ ChannelMappingConfig,
SolverConfig,
TableConfig,
)
@@ -29,7 +30,7 @@ def _kvs_cfg(
project_id: str = "SAMPLE_PROJECT",
container_tags: TableConfig | None = None,
container_metrics: TableConfig | None = None,
- channel_mapping: TableConfig | None = None,
+ channel_mapping: ChannelMappingConfig | None = None,
) -> SolverConfig:
"""Build a SolverConfig wired up for the KVS test data.
@@ -42,7 +43,7 @@ def _kvs_cfg(
container_tags=container_tags or TableConfig(column_name_mapping={"element_id": "key"}),
container_metrics=container_metrics
or TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=channel_mapping or TableConfig(),
+ channel_mapping=channel_mapping or ChannelMappingConfig(),
)
@@ -222,7 +223,7 @@ def test_solve_with_aliased_channel(
solver = KeyValueStoreSolver(
spark,
config=_kvs_cfg(
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py
index 02c3798..e229b4f 100644
--- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py
+++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py
@@ -11,6 +11,8 @@
KeyValueStoreSolver,
)
from impulse_query_engine.analyze.query.solvers.solver_config import (
+ ChannelMappingConfig,
+ JoinKey,
SolverConfig,
TableConfig,
)
@@ -31,7 +33,7 @@ def test_no_aliased_selections_returns_empty(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -52,7 +54,7 @@ def test_alias_resolves_to_correct_channels(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -80,7 +82,7 @@ def test_alias_scoped_by_project_id(
config=SolverConfig(
project_id="NON_EXISTENT_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -100,7 +102,9 @@ def test_alias_scoped_by_toolbox_id(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "non_existent_toolbox"}),
+ channel_mapping=ChannelMappingConfig(
+ filters={"toolbox_id": "non_existent_toolbox"}
+ ),
),
)
query = key_value_store_alias_db.query
@@ -120,7 +124,7 @@ def test_selector_id_consistent_for_same_expression(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -140,7 +144,7 @@ def test_multiple_aliases(self, spark: SparkSession, key_value_store_alias_db: M
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -165,7 +169,7 @@ def test_solve_with_alias_only(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -185,7 +189,7 @@ def test_solve_with_mixed_direct_and_alias(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -209,7 +213,7 @@ def test_solve_deduplication(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -241,7 +245,7 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm(
config=SolverConfig(
project_id="SAMPLE_PROJECT",
container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
- channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
),
)
query = key_value_store_alias_db.query
@@ -304,3 +308,130 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm(
def test_channel_with_alias_without_mapping_raises(self, key_value_store_db: MeasurementDB):
with pytest.raises(ValueError, match="channel_mapping_table is not configured"):
key_value_store_db.query.channel_with_alias(channel_alias="engine_speed")
+
+
+class TestConfigurableJoinKeys:
+ """Behavior of the configurable ``channel_mapping.join_keys`` override."""
+
+ def test_single_column_join_key(
+ self, spark: SparkSession, key_value_store_alias_db: MeasurementDB
+ ):
+ # Single-column join on source_channel == channel_name only; data_key
+ # is intentionally dropped from the join. The alias resolution still
+ # works and the (container_id, channel_alias) dedup keeps results
+ # unique.
+ solver = KeyValueStoreSolver(
+ spark,
+ config=SolverConfig(
+ project_id="SAMPLE_PROJECT",
+ container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
+ channel_mapping=ChannelMappingConfig(
+ filters={"toolbox_id": "container_concept"},
+ join_keys=[
+ JoinKey(mapping_col="source_channel", metrics_col="channel_name"),
+ ],
+ ),
+ ),
+ )
+ query = key_value_store_alias_db.query
+ engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed")
+
+ pdf = query.select(engine_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ assert pdf["container_id"].tolist() == [1, 2, 3]
+ assert all(length > 0 for length in pdf["engine_speed"].map(len))
+
+ def test_different_data_key_names_per_side_via_rename(
+ self, spark: SparkSession, key_value_store_alias_db: MeasurementDB
+ ):
+ # Path 1: rename both physical `data_key` columns to a common
+ # internal name (here we use a non-default name `dk`). Two
+ # JoinKey entries cover the composite key.
+ solver = KeyValueStoreSolver(
+ spark,
+ config=SolverConfig(
+ project_id="SAMPLE_PROJECT",
+ container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
+ channel_metrics=TableConfig(column_name_mapping={"data_key": "dk"}),
+ channel_mapping=ChannelMappingConfig(
+ column_name_mapping={"data_key": "dk"},
+ filters={"toolbox_id": "container_concept"},
+ join_keys=[
+ JoinKey(mapping_col="source_channel", metrics_col="channel_name"),
+ JoinKey(mapping_col="dk", metrics_col="dk"),
+ ],
+ ),
+ ),
+ )
+ query = key_value_store_alias_db.query
+ engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed")
+
+ pdf = query.select(engine_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+ assert pdf["container_id"].tolist() == [1, 2, 3]
+ assert all(length > 0 for length in pdf["engine_speed"].map(len))
+
+ def test_different_data_key_names_per_side_via_join_keys(
+ self, spark: SparkSession, key_value_store_alias_db: MeasurementDB
+ ):
+ # Path 2: rename the two physical `data_key` columns to *different*
+ # internal names per table and reference them directly in
+ # join_keys. No common-name rename — the JoinKey's two sides are
+ # independent.
+ solver = KeyValueStoreSolver(
+ spark,
+ config=SolverConfig(
+ project_id="SAMPLE_PROJECT",
+ container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
+ channel_metrics=TableConfig(column_name_mapping={"data_key": "metrics_dk"}),
+ channel_mapping=ChannelMappingConfig(
+ column_name_mapping={"data_key": "map_dk"},
+ filters={"toolbox_id": "container_concept"},
+ join_keys=[
+ JoinKey(mapping_col="source_channel", metrics_col="channel_name"),
+ JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"),
+ ],
+ ),
+ ),
+ )
+ query = key_value_store_alias_db.query
+ engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed")
+
+ pdf = query.select(engine_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+ assert pdf["container_id"].tolist() == [1, 2, 3]
+ assert all(length > 0 for length in pdf["engine_speed"].map(len))
+
+ def test_tag_kwarg_must_match_post_rename_name(
+ self, spark: SparkSession, key_value_store_alias_db: MeasurementDB
+ ):
+ # When channel_metrics.channel_name is renamed via column_name_mapping
+ # to a non-default internal name, the direct selector's kwarg must use
+ # the renamed name AND the override `join_keys` must reference it.
+ solver = KeyValueStoreSolver(
+ spark,
+ config=SolverConfig(
+ project_id="SAMPLE_PROJECT",
+ container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
+ channel_metrics=TableConfig(column_name_mapping={"channel_name": "chan"}),
+ channel_mapping=ChannelMappingConfig(
+ filters={"toolbox_id": "container_concept"},
+ join_keys=[
+ JoinKey(mapping_col="source_channel", metrics_col="chan"),
+ JoinKey(mapping_col="data_key", metrics_col="data_key"),
+ ],
+ ),
+ ),
+ )
+ query = key_value_store_alias_db.query
+ # Direct selector — kwarg `chan` must match the renamed column name.
+ engine_rpm = query.channel(chan="Engine RPM", data_key="TM").alias("engine_rpm")
+
+ pdf = query.select(engine_rpm).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+ # Direct selector — containers with no matching channel drop out.
+ # Containers 1 and 2 have "Engine RPM"/data_key="TM"; container 3
+ # only carries it under "EngSpd"/"ProjSpecREC_10Hz" (no match).
+ assert sorted(pdf["container_id"].tolist()) == [1, 2]
+ assert all(length > 0 for length in pdf["engine_rpm"].map(len))
diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py
index ad0590a..f4d98b4 100644
--- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py
+++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py
@@ -270,6 +270,7 @@ def test_col_map_always_returns_internal_names(self, spark: SparkSession):
"ts": "tstart",
"te": "tend",
"val": "value",
+ "conv": "conversion_factor",
}
def test_config_properties_return_internal_names(self, spark: SparkSession):
diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py
new file mode 100644
index 0000000..993c3df
--- /dev/null
+++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py
@@ -0,0 +1,281 @@
+# pylint: disable=missing-function-docstring
+
+import os
+
+import numpy as np
+import pandas as pd
+import pytest
+from pyspark.sql import SparkSession
+
+from impulse_query_engine.analyze.query.solvers.key_value_store_solver import (
+ KeyValueStoreSolver,
+)
+from impulse_query_engine.analyze.query.solvers.solver_config import (
+ ChannelMappingConfig,
+ SolverConfig,
+ TableConfig,
+)
+from impulse_query_engine.measurement_db import MeasurementDB
+
+
+def _solver(spark: SparkSession) -> KeyValueStoreSolver:
+ return KeyValueStoreSolver(
+ spark,
+ config=SolverConfig(
+ project_id="SAMPLE_PROJECT",
+ container_metrics=TableConfig(column_name_mapping={"project": "project_id"}),
+ channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}),
+ ),
+ )
+
+
+def _expected_raw_values(channels_csv_path: str, container_id: int, channel_id: int) -> np.ndarray:
+ raw = pd.read_csv(channels_csv_path)
+ rows = raw[(raw["container_id"] == container_id) & (raw["channel_id"] == channel_id)]
+ return rows.sort_values("tstart")["value"].values.astype(np.float64)
+
+
+@pytest.fixture
+def channels_csv_path() -> str:
+ base_path = os.path.dirname(os.path.abspath(__file__))
+ base_path = base_path[: base_path.find("tests")]
+ return f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv"
+
+
+class TestUnitConversionSolve:
+ def test_solve_with_unit_conversion(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias(
+ "vehicle_speed"
+ )
+
+ pdf = query.select(vehicle_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ assert pdf["container_id"].tolist() == [1, 2, 3]
+
+ factor = 0.277778
+ # Containers 1 and 2 resolve vehicle_speed -> "Vehicle Speed Sensor" (channel 7).
+ for cid in (1, 2):
+ expected = _expected_raw_values(channels_csv_path, cid, 7) * factor
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6)
+
+ # Container 3 resolves to channel 7 via Spd_Vhcl / ProjSpecREC_10Hz.
+ expected3 = _expected_raw_values(channels_csv_path, 3, 7) * factor
+ row3 = pdf.loc[pdf["container_id"] == 3].iloc[0]
+ np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-6)
+
+ def test_solve_no_conversion_when_same_unit(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed")
+
+ pdf = query.select(engine_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ for cid in (1, 2, 3):
+ expected = _expected_raw_values(channels_csv_path, cid, 5)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.engine_speed.values, expected, rtol=1e-12)
+
+ def test_solve_no_conversion_when_table_not_configured(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db_no_table: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db_no_table.query
+ vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias(
+ "vehicle_speed"
+ )
+
+ pdf = query.select(vehicle_speed).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ # No conversion: values are returned exactly as-is from the raw channel data.
+ for cid in (1, 2, 3):
+ expected = _expected_raw_values(channels_csv_path, cid, 7)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-12)
+
+ def test_solve_no_conversion_for_direct_selectors(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ # Direct selector — no alias, so no unit metadata, so no conversion.
+ vehicle_speed_direct = query.channel(
+ channel_name="Vehicle Speed Sensor", data_key="TM"
+ ).alias("vehicle_speed_direct")
+
+ pdf = query.select(vehicle_speed_direct).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ for cid in (1, 2):
+ expected = _expected_raw_values(channels_csv_path, cid, 7)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.vehicle_speed_direct.values, expected, rtol=1e-12)
+
+ def test_solve_same_channel_direct_stays_raw_aliased_converts(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ # When a direct selector and an aliased selector resolve to the same
+ # (container_id, channel_id) (both land on channel 7), conversion is a
+ # property of the alias — the direct selector returns raw values,
+ # the aliased selector returns raw * factor (km/h -> m/s).
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ direct = query.channel(channel_name="Vehicle Speed Sensor", data_key="TM").alias(
+ "vehicle_speed_raw"
+ )
+ aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias(
+ "vehicle_speed_converted"
+ )
+
+ pdf = query.select(direct, aliased).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ factor = 0.277778
+ for cid in (1, 2):
+ raw = _expected_raw_values(channels_csv_path, cid, 7)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.vehicle_speed_raw.values, raw, rtol=1e-12)
+ np.testing.assert_allclose(row.vehicle_speed_converted.values, raw * factor, rtol=1e-6)
+
+ def test_solve_mixed_direct_and_aliased_disjoint_channels(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ # Direct selector targets a *different* channel than the aliased one.
+ # Direct: Ambient Air Temperature (channel 6, no conversion).
+ # Aliased: vehicle_speed (channel 7, km/h -> m/s).
+ #
+ # Note: when a direct selector and an aliased selector resolve to the
+ # same (container_id, channel_id), the conversion factor stored on the
+ # channel row applies to both — the per-channel factor model in
+ # KVSTimeSeriesCache cannot distinguish callers. We therefore only
+ # cover the disjoint case here.
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ direct = query.channel(channel_name="Ambient Air Temperature", data_key="TM").alias(
+ "ambient_temp"
+ )
+ aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias(
+ "vehicle_speed_converted"
+ )
+
+ pdf = query.select(direct, aliased).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ factor = 0.277778
+ for cid in (1, 2):
+ ambient_raw = _expected_raw_values(channels_csv_path, cid, 6)
+ speed_raw = _expected_raw_values(channels_csv_path, cid, 7)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.ambient_temp.values, ambient_raw, rtol=1e-12)
+ np.testing.assert_allclose(
+ row.vehicle_speed_converted.values, speed_raw * factor, rtol=1e-6
+ )
+
+ def test_solve_cross_family_units_leave_values_unchanged(
+ self,
+ spark: SparkSession,
+ key_value_store_unit_conversion_db: MeasurementDB,
+ channels_csv_path: str,
+ ):
+ # cross_family_alias maps Engine RPM (rotation family) -> m/s
+ # (speed family). The group_id mismatch makes the target-side join
+ # miss, leaving conversion_factor null and values unchanged.
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+ cross = query.channel_with_alias(channel_alias="cross_family_alias").alias("cross")
+
+ pdf = query.select(cross).toPandas(spark, solver=solver)
+ pdf = pdf.sort_values("container_id").reset_index(drop=True)
+
+ # The mapping only references Engine RPM/TM, which exists for containers 1 and 2.
+ for cid in (1, 2):
+ expected = _expected_raw_values(channels_csv_path, cid, 5)
+ row = pdf.loc[pdf["container_id"] == cid].iloc[0]
+ np.testing.assert_allclose(row.cross.values, expected, rtol=1e-12)
+
+
+class TestComputeConversionFactors:
+ def test_factor_one_for_identical_units(
+ self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+
+ channels_df = spark.createDataFrame(
+ [(1, 5, "RPM", "RPM"), (2, 5, "RPM", "RPM")],
+ schema=["container_id", "channel_id", "source_unit", "target_unit"],
+ )
+
+ result = solver._compute_conversion_factors(spark, query, channels_df).collect()
+ factors = {row.container_id: row.conversion_factor for row in result}
+ assert pytest.approx(factors[1], rel=1e-12) == 1.0
+ assert pytest.approx(factors[2], rel=1e-12) == 1.0
+
+ def test_factor_for_known_speed_conversion(
+ self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+
+ channels_df = spark.createDataFrame(
+ [(1, 7, "km/h", "m/s")],
+ schema=["container_id", "channel_id", "source_unit", "target_unit"],
+ )
+
+ row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0]
+ assert row.conversion_factor == pytest.approx(0.277778, rel=1e-6)
+
+ def test_null_factor_for_cross_family(
+ self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+
+ channels_df = spark.createDataFrame(
+ [(1, 5, "RPM", "m/s")],
+ schema=["container_id", "channel_id", "source_unit", "target_unit"],
+ )
+
+ row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0]
+ assert row.conversion_factor is None
+
+ def test_null_factor_for_unknown_unit(
+ self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB
+ ):
+ solver = _solver(spark)
+ query = key_value_store_unit_conversion_db.query
+
+ channels_df = spark.createDataFrame(
+ [(1, 5, "furlongs/fortnight", "m/s")],
+ schema=["container_id", "channel_id", "source_unit", "target_unit"],
+ )
+
+ row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0]
+ assert row.conversion_factor is None
diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py
index 8606f7b..a08c635 100644
--- a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py
+++ b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py
@@ -610,6 +610,7 @@ def test_col_map_always_returns_internal_names(self, spark):
"ts": "tstart",
"te": "tend",
"val": "value",
+ "conv": "conversion_factor",
}
def test_mapping_entries_stored_correctly(self, spark):
diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py
index 509f06a..6bc29ba 100644
--- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py
+++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py
@@ -15,6 +15,8 @@
import pytest
from impulse_query_engine.analyze.query.solvers.solver_config import (
+ ChannelMappingConfig,
+ JoinKey,
SolverConfig,
TableConfig,
)
@@ -35,6 +37,7 @@
"ts": "tstart",
"te": "tend",
"val": "value",
+ "conv": "conversion_factor",
}
@@ -137,7 +140,7 @@ class TestColMap:
def test_col_map_keys(self, cfg: SolverConfig):
"""col_map should contain exactly the expected short keys."""
- assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val"}
+ assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val", "conv"}
def test_col_map_default_config(self):
"""Default SolverConfig col_map should match hardcoded defaults."""
@@ -148,6 +151,7 @@ def test_col_map_default_config(self):
"ts": "tstart",
"te": "tend",
"val": "value",
+ "conv": "conversion_factor",
}
def test_col_map_consistent_with_properties(self, cfg: SolverConfig):
@@ -157,6 +161,96 @@ def test_col_map_consistent_with_properties(self, cfg: SolverConfig):
assert cfg.col_map["ts"] == cfg.tstart_col
assert cfg.col_map["te"] == cfg.tend_col
assert cfg.col_map["val"] == cfg.value_col
+ assert cfg.col_map["conv"] == cfg.conversion_factor_col
def test_col_map_values(self, cfg: SolverConfig):
assert cfg.col_map == _EXPECTED_COL_MAP
+
+
+# ---------------------------------------------------------------------------
+# TestAliasInternalNameProperties – channel mapping / metrics internal names
+# ---------------------------------------------------------------------------
+
+
+class TestAliasInternalNameProperties:
+ """Internal-name properties for the alias-resolution columns."""
+
+ def test_source_channel_col(self):
+ assert SolverConfig().source_channel_col == "source_channel"
+
+ def test_data_key_col(self):
+ assert SolverConfig().data_key_col == "data_key"
+
+ def test_channel_alias_col(self):
+ assert SolverConfig().channel_alias_col == "channel_alias"
+
+ def test_channel_name_col(self):
+ assert SolverConfig().channel_name_col == "channel_name"
+
+
+# ---------------------------------------------------------------------------
+# TestEffectiveAliasJoinKeys – default + override behavior
+# ---------------------------------------------------------------------------
+
+
+class TestEffectiveAliasJoinKeys:
+ def test_default_when_join_keys_none(self):
+ cfg = SolverConfig()
+ assert cfg.channel_mapping.join_keys is None
+ assert cfg.effective_alias_join_keys == [
+ ("source_channel", "channel_name"),
+ ("data_key", "data_key"),
+ ]
+
+ def test_single_column_override(self):
+ cfg = SolverConfig(
+ channel_mapping=ChannelMappingConfig(
+ join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")]
+ )
+ )
+ assert cfg.effective_alias_join_keys == [("source_channel", "channel_name")]
+
+ def test_different_names_per_side(self):
+ cfg = SolverConfig(
+ channel_mapping=ChannelMappingConfig(
+ join_keys=[
+ JoinKey(mapping_col="source_channel", metrics_col="channel_name"),
+ JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"),
+ ]
+ )
+ )
+ assert cfg.effective_alias_join_keys == [
+ ("source_channel", "channel_name"),
+ ("map_dk", "metrics_dk"),
+ ]
+
+
+# ---------------------------------------------------------------------------
+# TestChannelMappingConfig – type acceptance + JSON round-trip
+# ---------------------------------------------------------------------------
+
+
+class TestChannelMappingConfig:
+ def test_accepts_channel_mapping_config_instance(self):
+ cm = ChannelMappingConfig(
+ filters={"toolbox_id": "tb"},
+ join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")],
+ )
+ cfg = SolverConfig(channel_mapping=cm)
+ assert cfg.channel_mapping is cm
+
+ def test_json_round_trip_with_join_keys(self):
+ raw = {
+ "channel_mapping": {
+ "column_name_mapping": {"alias": "channel_alias"},
+ "filters": {"toolbox_id": "tb"},
+ "join_keys": [{"mapping_col": "source_channel", "metrics_col": "channel_name"}],
+ }
+ }
+ cfg = SolverConfig.from_dict(raw)
+ assert isinstance(cfg.channel_mapping, ChannelMappingConfig)
+ assert cfg.channel_mapping.column_name_mapping == {"alias": "channel_alias"}
+ assert cfg.channel_mapping.filters == {"toolbox_id": "tb"}
+ assert cfg.channel_mapping.join_keys == [
+ JoinKey(mapping_col="source_channel", metrics_col="channel_name")
+ ]
diff --git a/tests/impulse_reporting/unit/meta/container_dimensions_test.py b/tests/impulse_reporting/unit/meta/container_dimensions_test.py
index e40bf0b..acad8f8 100644
--- a/tests/impulse_reporting/unit/meta/container_dimensions_test.py
+++ b/tests/impulse_reporting/unit/meta/container_dimensions_test.py
@@ -68,7 +68,7 @@ def test_config_hashing(spark):
schema = T.StructType([T.StructField(col, T.StringType(), True) for col in silver_columns])
df = spark.createDataFrame([("test_vehicle",)], schema)
result = df.transform(ContainerDimension._add_config_hash(impulse_config))
- expected_result = [Row(uut_id="test_vehicle", config_hash=1983688711)]
+ expected_result = [Row(uut_id="test_vehicle", config_hash=1267386821)]
assert expected_result == result.collect()
diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv
new file mode 100644
index 0000000..eab3979
--- /dev/null
+++ b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv
@@ -0,0 +1,6 @@
+project_id,toolbox_id,channel_alias,source_channel,data_key,priority,source_unit,target_unit
+SAMPLE_PROJECT,container_concept,engine_speed,Engine RPM,TM,,RPM,RPM
+SAMPLE_PROJECT,container_concept,engine_speed,EngSpd,ProjSpecREC_10Hz,,RPM,RPM
+SAMPLE_PROJECT,container_concept,vehicle_speed,Vehicle Speed Sensor,TM,,km/h,m/s
+SAMPLE_PROJECT,container_concept,vehicle_speed,Spd_Vhcl,ProjSpecREC_10Hz,,km/h,m/s
+SAMPLE_PROJECT,container_concept,cross_family_alias,Engine RPM,TM,,RPM,m/s
diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv
new file mode 100644
index 0000000..2aa4d36
--- /dev/null
+++ b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv
@@ -0,0 +1,6 @@
+group_id,unit,conversion_factor,is_base
+speed,m/s,1.0,true
+speed,km/h,0.277778,false
+speed,mph,0.44704,false
+rotation,RPM,1.0,true
+rotation,rad/s,0.10472,false