Skip to content

Commit b68d682

Browse files
authored
Ami queries 2 (#200)
1 parent f293874 commit b68d682

1 file changed

Lines changed: 181 additions & 18 deletions

File tree

amiadapters/storage/snowflake.py

Lines changed: 181 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,110 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime
360360

361361
conn = self.sink_config.connection()
362362

363+
ami_meters_score_sql = f"""
364+
create or replace table meters_score_{self.org_id}
365+
as
366+
with cte as
367+
(
368+
select org_id
369+
, device_id
370+
, flowtime::timestamp_ntz as flowtime
371+
, lag(flowtime) over(partition by org_id, device_id order by flowtime)::timestamp_ntz as flowtime0
372+
, datediff(second, flowtime0, flowtime::timestamp_ntz) as flowinterval
373+
, interval_value
374+
, register_value
375+
, coalesce(estimated::boolean, false) as estimated
376+
, case when not coalesce(estimated::boolean, false) and flowinterval = 3600 and interval_value > 0 then interval_value end as interval_value_clean
377+
from ami_connect.readings
378+
where 1=1
379+
and org_id = '{self.org_id}'
380+
and flowtime::timestamp_ntz::date >= '{min_date.isoformat()}'
381+
and flowtime::timestamp_ntz::date <= '{max_date.isoformat()}'
382+
)
383+
, cte2 as
384+
(
385+
select *
386+
, conditional_change_event(coalesce(interval_value, 0) = 0) over(partition by org_id, device_id order by flowtime desc) as grp
387+
from cte
388+
where flowinterval is not null
389+
)
390+
, failing_meters_0 as
391+
(
392+
select r.*
393+
-- , count(interval_value) over(partition by r.org_id, r.device_id, grp) as observations
394+
, sum(flowinterval) over(partition by r.org_id, r.device_id, grp) / 3600 as duration_hours
395+
, c.class_full_name
396+
from cte2 r
397+
join ami_connect.meters m on r.org_id = m.org_id and r.device_id = m.device_id and m.row_active_until is null
398+
join wavelet.customer_location_data c on m.account_id = c.cust_id_from_utility and m.location_id = c.location_id_from_utility
399+
)
400+
, failing_meters_thresholds as
401+
(
402+
select org_id
403+
, class_full_name
404+
, percentile_cont(0.99999) within group (order by duration_hours) as duration_hours_thr
405+
, duration_hours_thr / 24 as duration_days_thr
406+
from
407+
(
408+
select distinct org_id, class_full_name, duration_hours, device_id, grp
409+
from failing_meters_0
410+
where coalesce(interval_value, 0) = 0
411+
)
412+
group by all
413+
)
414+
, failing_meters_score_0 as
415+
(
416+
select
417+
a.org_id
418+
, a.device_id
419+
, a.class_full_name
420+
, min(a.flowtime) as first_read
421+
, max(a.flowtime) as last_read
422+
, datediff(hour, last_read::date, '{max_date.isoformat()}') as last_read_hours
423+
--
424+
, count(interval_value) as observations
425+
, datediff(hour, '{min_date.isoformat()}', '{max_date.isoformat()}') + 24 as hours_in_period
426+
, observations / hours_in_period as read_freq
427+
, 1 - read_freq as unread_freq
428+
--
429+
, count(distinct case when duration_hours > duration_hours_thr and coalesce(interval_value, 0) = 0 then grp end) stuck_events
430+
, sum(case when duration_hours > duration_hours_thr and coalesce(interval_value, 0) = 0 then flowinterval else 0 end)/3600 stuck_hours
431+
, duration_hours_thr
432+
--
433+
,sum(case when coalesce(interval_value,0) > 0 then interval_value end) as total_usage
434+
,sum(case when coalesce(interval_value,0) > 0 then flowinterval end)/3600 as used_hours
435+
,sum(case when coalesce(interval_value,0) = 0 then flowinterval end)/3600 as idle_hours
436+
,total_usage / used_hours as avg_hourly_usage
437+
,avg_hourly_usage * stuck_hours as unaccounted
438+
from failing_meters_0 a
439+
join failing_meters_thresholds b on a.org_id = b.org_id and a.class_full_name = b.class_full_name
440+
group by all
441+
)
442+
, z_scores as
443+
(
444+
select *
445+
, (last_read_hours - avg(last_read_hours) over(partition by org_id, class_full_name)) / nullif(stddev(last_read_hours) over(partition by org_id, class_full_name), 0) as z_last_read
446+
, (unread_freq - avg(unread_freq ) over(partition by org_id, class_full_name)) / nullif(stddev(unread_freq ) over(partition by org_id, class_full_name), 0) as z_unread_freq
447+
, (stuck_hours - avg(stuck_hours ) over(partition by org_id, class_full_name)) / nullif(stddev(stuck_hours ) over(partition by org_id, class_full_name), 0) as z_stuck_hours
448+
, (stuck_events - avg(stuck_events ) over(partition by org_id, class_full_name)) / nullif(stddev(stuck_events ) over(partition by org_id, class_full_name), 0) as z_stuck_events
449+
, (unaccounted - avg(unaccounted ) over(partition by org_id, class_full_name)) / nullif(stddev(unaccounted ) over(partition by org_id, class_full_name), 0) as z_unaccounted
450+
from failing_meters_score_0
451+
)
452+
select *
453+
,1+4*((z_last_read -min(z_last_read) over(partition by org_id, class_full_name))/nullif((max(z_last_read) over(partition by org_id, class_full_name)-min(z_last_read) over(partition by org_id, class_full_name)),0)) as score_last_read
454+
,1+4*((z_unread_freq -min(z_unread_freq) over(partition by org_id, class_full_name))/nullif((max(z_unread_freq) over(partition by org_id, class_full_name)-min(z_unread_freq) over(partition by org_id, class_full_name)),0)) as score_unread_freq
455+
,1+4*((z_stuck_hours -min(z_stuck_hours) over(partition by org_id, class_full_name))/nullif((max(z_stuck_hours) over(partition by org_id, class_full_name)-min(z_stuck_hours) over(partition by org_id, class_full_name)),0)) as score_stuck_hours
456+
,1+4*((z_stuck_events-min(z_stuck_events) over(partition by org_id, class_full_name))/nullif((max(z_stuck_events) over(partition by org_id, class_full_name)-min(z_stuck_events) over(partition by org_id, class_full_name)),0)) as score_stuck_events
457+
,1+4*((z_unaccounted -min(z_unaccounted) over(partition by org_id, class_full_name))/nullif((max(z_unaccounted) over(partition by org_id, class_full_name)-min(z_unaccounted) over(partition by org_id, class_full_name)),0)) as score_unaccounted
458+
,(1 * coalesce(score_last_read, 0)
459+
+ 1 * coalesce(score_unread_freq, 0)
460+
+ 1 * coalesce(score_stuck_hours, 0)
461+
+ 1 * coalesce(score_stuck_events, 0)
462+
+ 1 * coalesce(score_unaccounted, 0)
463+
) / (1 + 1 + 1 + 1 + 1)::float as score
464+
"""
465+
conn.cursor().execute(ami_meters_score_sql)
466+
363467
ami_leaks_sql = f"""
364468
create or replace table leaks_{self.org_id}
365469
as
@@ -472,7 +576,30 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime
472576
,case when estimated then interval_value else 0 end as est_usage
473577
from cte2
474578
)
475-
select *
579+
select
580+
org_id
581+
,device_id
582+
,flowtime as flowtime_ts
583+
,flowinterval as flowinterval_sec
584+
,register_value as raw_register_value_cf
585+
,interval_value as raw_interval_value_cf
586+
,interval_value_clean as clean_interval_value_cf
587+
,estimated as is_estimated
588+
,is_leak as is_leak
589+
,min_leak_bck as minflow_prev24h_cf
590+
,min_leak_fwd as minflow_lead24h_cf
591+
,leak_calc as leak_calculated_cf
592+
,leak_avg as leak_average_cf
593+
,leak_stdev as leak_stdev_cf
594+
,leak_clean as leak_clean_cf
595+
,grp as event_seq
596+
,stime as event_start_ts
597+
,etime as event_end_ts
598+
,duration as event_hrs
599+
,event_id as event_id
600+
,leak as final_leak_cf
601+
,usage as final_usage_cf
602+
,est_usage as final_est_usage_cf
476603
from leaks
477604
"""
478605
conn.cursor().execute(ami_leaks_sql)
@@ -484,28 +611,64 @@ def exec_postprocessor(self, run_id: str, min_date: datetime, max_date: datetime
484611
org_id
485612
, device_id
486613
, event_id
487-
, stime
488-
, etime
489-
, duration
614+
, event_start_ts
615+
, event_end_ts
616+
, event_hrs
490617
, is_leak
491-
, array_agg(flowtime) as flowtime
492-
, array_agg(flowinterval) as flowinterval
493-
, array_agg(ifnull(estimated, 'NaN')) as estimated
494-
, array_agg(ifnull(interval_value, 'NaN')) as interval_value
495-
, array_agg(ifnull(interval_value_clean, 'NaN')) as interval_value_clean
496-
, array_agg(ifnull(leak_calc, 'NaN')) as leak_calc
497-
, avg(leak_calc) as leak_avg
498-
, stddev(leak_calc) as leak_stdev
499-
, array_agg(ifnull(leak_clean, 'NaN')) as leak_clean
500-
, avg(leak) as leak_rate
501-
, array_agg(ifnull(leak, 'NaN')) as leak
502-
, array_agg(ifnull(usage, 'NaN')) as usage
503-
, array_agg(ifnull(est_usage, 'NaN')) as est_usage
618+
, array_agg(flowtime_ts) as flowtime_ts
619+
, array_agg(flowinterval_sec) as flowinterval_sec
620+
, array_agg(ifnull(is_estimated, 'NaN')) as is_estimated
621+
, array_agg(ifnull(raw_interval_value_cf, 'NaN')) as raw_interval_value_cf
622+
, array_agg(ifnull(clean_interval_value_cf, 'NaN')) as clean_interval_value_cf
623+
, array_agg(ifnull(leak_calculated_cf, 'NaN')) as leak_calculated_cf
624+
, avg(leak_calculated_cf) as leak_average_cf
625+
, stddev(leak_calculated_cf) as leak_stdev_cf
626+
, array_agg(ifnull(leak_clean_cf, 'NaN')) as leak_clean_cf
627+
, avg(final_leak_cf) as final_leak_rate_cfph
628+
, array_agg(ifnull(final_leak_cf, 'NaN')) as final_leak_cf
629+
, array_agg(ifnull(final_usage_cf, 'NaN')) as final_usage_cf
630+
, array_agg(ifnull(final_est_usage_cf, 'NaN')) as final_est_usage_cf
631+
, sum(final_leak_cf) as final_leak_sum_cf
632+
, sum(final_usage_cf) as final_usage_sum_cf
633+
, sum(final_est_usage_cf) as final_est_usage_sum_cf
634+
, sum(raw_interval_value_cf) as raw_interval_value_sum_cf
504635
from leaks_{self.org_id}
505-
group by org_id, device_id, event_id, stime, etime, duration, is_leak
636+
group by org_id, device_id, event_id, event_start_ts, event_end_ts, event_hrs, is_leak
506637
"""
507638
conn.cursor().execute(ami_leaks_agg_sql)
508639

640+
ami_irrigation_detection_agg_sql = f"""
641+
create or replace table irrigation_detection_agg
642+
as
643+
select
644+
meter_id
645+
, array_agg(timestamp) within group (order by timestamp) as timestamp
646+
, array_agg(total_volume) within group (order by timestamp) as total_volume
647+
, array_agg(irrigation_volume) within group (order by timestamp) as irrigation_volume
648+
, array_agg(non_irrigation_volume) within group (order by timestamp) as non_irrigation_volume
649+
--
650+
, array_agg(irrigation_flag) within group (order by timestamp) as irrigation_flag
651+
, array_agg(daily_irrigation_detected) within group (order by timestamp) as daily_irrigation_detected
652+
--
653+
, array_agg(daily_confidence) within group (order by timestamp) as daily_confidence
654+
, array_agg(hourly_confidence) within group (order by timestamp) as hourly_confidence
655+
--
656+
, model_used
657+
, meter_baseline_days
658+
, meter_normal_days
659+
, meter_total_training_days
660+
, meter_needs_finetuning
661+
, meter_has_custom_model
662+
, district_source
663+
, meter_type
664+
from
665+
irrigation_detection
666+
where
667+
1=1
668+
group by all
669+
"""
670+
conn.cursor().execute(ami_irrigation_detection_agg_sql)
671+
509672
def _meter_tuple(self, meter: GeneralMeter, row_active_from: datetime):
510673
result = [
511674
meter.org_id,

0 commit comments

Comments
 (0)