Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 18 additions & 181 deletions amiadapters/storage/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,110 +360,6 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime

conn = self.sink_config.connection()

ami_meters_score_sql = f"""
create or replace table meters_score_{self.org_id}
as
with cte as
(
select org_id
, device_id
, flowtime::timestamp_ntz as flowtime
, lag(flowtime) over(partition by org_id, device_id order by flowtime)::timestamp_ntz as flowtime0
, datediff(second, flowtime0, flowtime::timestamp_ntz) as flowinterval
, interval_value
, register_value
, coalesce(estimated::boolean, false) as estimated
, case when not coalesce(estimated::boolean, false) and flowinterval = 3600 and interval_value > 0 then interval_value end as interval_value_clean
from ami_connect.readings
where 1=1
and org_id = '{self.org_id}'
and flowtime::timestamp_ntz::date >= '{min_date.isoformat()}'
and flowtime::timestamp_ntz::date <= '{max_date.isoformat()}'
)
, cte2 as
(
select *
, conditional_change_event(coalesce(interval_value, 0) = 0) over(partition by org_id, device_id order by flowtime desc) as grp
from cte
where flowinterval is not null
)
, failing_meters_0 as
(
select r.*
-- , count(interval_value) over(partition by r.org_id, r.device_id, grp) as observations
, sum(flowinterval) over(partition by r.org_id, r.device_id, grp) / 3600 as duration_hours
, c.class_full_name
from cte2 r
join ami_connect.meters m on r.org_id = m.org_id and r.device_id = m.device_id and m.row_active_until is null
join customer_location_data c on m.account_id = c.cust_id_from_utility and m.location_id = c.location_id_from_utility
)
, failing_meters_thresholds as
(
select org_id
, class_full_name
, percentile_cont(0.99999) within group (order by duration_hours) as duration_hours_thr
, duration_hours_thr / 24 as duration_days_thr
from
(
select distinct org_id, class_full_name, duration_hours, device_id, grp
from failing_meters_0
where coalesce(interval_value, 0) = 0
)
group by all
)
, failing_meters_score_0 as
(
select
a.org_id
, a.device_id
, a.class_full_name
, min(a.flowtime) as first_read
, max(a.flowtime) as last_read
, datediff(hour, last_read::date, '{max_date.isoformat()}') as last_read_hours
--
, count(interval_value) as observations
, datediff(hour, '{min_date.isoformat()}', '{max_date.isoformat()}') + 24 as hours_in_period
, observations / hours_in_period as read_freq
, 1 - read_freq as unread_freq
--
, count(distinct case when duration_hours > duration_hours_thr and coalesce(interval_value, 0) = 0 then grp end) stuck_events
, sum(case when duration_hours > duration_hours_thr and coalesce(interval_value, 0) = 0 then flowinterval else 0 end)/3600 stuck_hours
, duration_hours_thr
--
,sum(case when coalesce(interval_value,0) > 0 then interval_value end) as total_usage
,sum(case when coalesce(interval_value,0) > 0 then flowinterval end)/3600 as used_hours
,sum(case when coalesce(interval_value,0) = 0 then flowinterval end)/3600 as idle_hours
,total_usage / used_hours as avg_hourly_usage
,avg_hourly_usage * stuck_hours as unaccounted
from failing_meters_0 a
join failing_meters_thresholds b on a.org_id = b.org_id and a.class_full_name = b.class_full_name
group by all
)
, z_scores as
(
select *
, (last_read_hours - avg(last_read_hours) over(partition by org_id, class_full_name)) / nullif(stddev(last_read_hours) over(partition by org_id, class_full_name), 0) as z_last_read
, (unread_freq - avg(unread_freq ) over(partition by org_id, class_full_name)) / nullif(stddev(unread_freq ) over(partition by org_id, class_full_name), 0) as z_unread_freq
, (stuck_hours - avg(stuck_hours ) over(partition by org_id, class_full_name)) / nullif(stddev(stuck_hours ) over(partition by org_id, class_full_name), 0) as z_stuck_hours
, (stuck_events - avg(stuck_events ) over(partition by org_id, class_full_name)) / nullif(stddev(stuck_events ) over(partition by org_id, class_full_name), 0) as z_stuck_events
, (unaccounted - avg(unaccounted ) over(partition by org_id, class_full_name)) / nullif(stddev(unaccounted ) over(partition by org_id, class_full_name), 0) as z_unaccounted
from failing_meters_score_0
)
select *
,1+4*((z_last_read -min(z_last_read) over(partition by org_id, class_full_name))/nullif((max(z_last_read) over(partition by org_id, class_full_name)-min(z_last_read) over(partition by org_id, class_full_name)),0)) as score_last_read
,1+4*((z_unread_freq -min(z_unread_freq) over(partition by org_id, class_full_name))/nullif((max(z_unread_freq) over(partition by org_id, class_full_name)-min(z_unread_freq) over(partition by org_id, class_full_name)),0)) as score_unread_freq
,1+4*((z_stuck_hours -min(z_stuck_hours) over(partition by org_id, class_full_name))/nullif((max(z_stuck_hours) over(partition by org_id, class_full_name)-min(z_stuck_hours) over(partition by org_id, class_full_name)),0)) as score_stuck_hours
,1+4*((z_stuck_events-min(z_stuck_events) over(partition by org_id, class_full_name))/nullif((max(z_stuck_events) over(partition by org_id, class_full_name)-min(z_stuck_events) over(partition by org_id, class_full_name)),0)) as score_stuck_events
,1+4*((z_unaccounted -min(z_unaccounted) over(partition by org_id, class_full_name))/nullif((max(z_unaccounted) over(partition by org_id, class_full_name)-min(z_unaccounted) over(partition by org_id, class_full_name)),0)) as score_unaccounted
,(1 * coalesce(score_last_read, 0)
+ 1 * coalesce(score_unread_freq, 0)
+ 1 * coalesce(score_stuck_hours, 0)
+ 1 * coalesce(score_stuck_events, 0)
+ 1 * coalesce(score_unaccounted, 0)
) / (1 + 1 + 1 + 1 + 1)::float as score
"""
conn.cursor().execute(ami_meters_score_sql)

ami_leaks_sql = f"""
create or replace table leaks_{self.org_id}
as
Expand Down Expand Up @@ -576,30 +472,7 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime
,case when estimated then interval_value else 0 end as est_usage
from cte2
)
select
org_id
,device_id
,flowtime as flowtime_ts
,flowinterval as flowinterval_sec
,register_value as raw_register_value_cf
,interval_value as raw_interval_value_cf
,interval_value_clean as clean_interval_value_cf
,estimated as is_estimated
,is_leak as is_leak
,min_leak_bck as minflow_prev24h_cf
,min_leak_fwd as minflow_lead24h_cf
,leak_calc as leak_calculated_cf
,leak_avg as leak_average_cf
,leak_stdev as leak_stdev_cf
,leak_clean as leak_clean_cf
,grp as event_seq
,stime as event_start_ts
,etime as event_end_ts
,duration as event_hrs
,event_id as event_id
,leak as final_leak_cf
,usage as final_usage_cf
,est_usage as final_est_usage_cf
select *
from leaks
"""
conn.cursor().execute(ami_leaks_sql)
Expand All @@ -611,64 +484,28 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime
org_id
, device_id
, event_id
, event_start_ts
, event_end_ts
, event_hrs
, stime
, etime
, duration
, is_leak
, array_agg(flowtime_ts) as flowtime_ts
, array_agg(flowinterval_sec) as flowinterval_sec
, array_agg(ifnull(is_estimated, 'NaN')) as is_estimated
, array_agg(ifnull(raw_interval_value_cf, 'NaN')) as raw_interval_value_cf
, array_agg(ifnull(clean_interval_value_cf, 'NaN')) as clean_interval_value_cf
, array_agg(ifnull(leak_calculated_cf, 'NaN')) as leak_calculated_cf
, avg(leak_calculated_cf) as leak_average_cf
, stddev(leak_calculated_cf) as leak_stdev_cf
, array_agg(ifnull(leak_clean_cf, 'NaN')) as leak_clean_cf
, avg(final_leak_cf) as final_leak_rate_cfph
, array_agg(ifnull(final_leak_cf, 'NaN')) as final_leak_cf
, array_agg(ifnull(final_usage_cf, 'NaN')) as final_usage_cf
, array_agg(ifnull(final_est_usage_cf, 'NaN')) as final_est_usage_cf
, sum(final_leak_cf) as final_leak_sum_cf
, sum(final_usage_cf) as final_usage_sum_cf
, sum(final_est_usage_cf) as final_est_usage_sum_cf
, sum(raw_interval_value_cf) as raw_interval_value_sum_cf
, array_agg(flowtime) as flowtime
, array_agg(flowinterval) as flowinterval
, array_agg(ifnull(estimated, 'NaN')) as estimated
, array_agg(ifnull(interval_value, 'NaN')) as interval_value
, array_agg(ifnull(interval_value_clean, 'NaN')) as interval_value_clean
, array_agg(ifnull(leak_calc, 'NaN')) as leak_calc
, avg(leak_calc) as leak_avg
, stddev(leak_calc) as leak_stdev
, array_agg(ifnull(leak_clean, 'NaN')) as leak_clean
, avg(leak) as leak_rate
, array_agg(ifnull(leak, 'NaN')) as leak
, array_agg(ifnull(usage, 'NaN')) as usage
, array_agg(ifnull(est_usage, 'NaN')) as est_usage
from leaks_{self.org_id}
group by org_id, device_id, event_id, event_start_ts, event_end_ts, event_hrs, is_leak
group by org_id, device_id, event_id, stime, etime, duration, is_leak
"""
conn.cursor().execute(ami_leaks_agg_sql)

ami_irrigation_detection_agg_sql = f"""
create or replace table irrigation_detection_agg
as
select
meter_id
, array_agg(timestamp) within group (order by timestamp) as timestamp
, array_agg(total_volume) within group (order by timestamp) as total_volume
, array_agg(irrigation_volume) within group (order by timestamp) as irrigation_volume
, array_agg(non_irrigation_volume) within group (order by timestamp) as non_irrigation_volume
--
, array_agg(irrigation_flag) within group (order by timestamp) as irrigation_flag
, array_agg(daily_irrigation_detected) within group (order by timestamp) as daily_irrigation_detected
--
, array_agg(daily_confidence) within group (order by timestamp) as daily_confidence
, array_agg(hourly_confidence) within group (order by timestamp) as hourly_confidence
--
, model_used
, meter_baseline_days
, meter_normal_days
, meter_total_training_days
, meter_needs_finetuning
, meter_has_custom_model
, district_source
, meter_type
from
irrigation_detection
where
1=1
group by all
"""
conn.cursor().execute(ami_irrigation_detection_agg_sql)

def _meter_tuple(self, meter: GeneralMeter, row_active_from: datetime):
result = [
meter.org_id,
Expand Down