-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession_parser.py
More file actions
171 lines (134 loc) · 5.56 KB
/
session_parser.py
File metadata and controls
171 lines (134 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Parser for Octopus Energy session strings."""
import re
import logging
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Session:
"""Represents a free electricity session."""
session_str: str
start_time: datetime
end_time: datetime
code: Optional[str] = None # Optional unique identifier from Octopus API
@property
def duration(self) -> timedelta:
"""Get session duration."""
return self.end_time - self.start_time
class SessionParser:
"""Parser for session strings like '12-2pm, Saturday 4th October'."""
def __init__(self, timezone: str = 'GMT'):
"""
Initialize parser.
Args:
timezone: Timezone for sessions (currently assumes GMT)
"""
self.timezone = timezone
def parse(self, session_str: str) -> Optional[Session]:
"""
Parse a session string to extract start and end times.
Args:
session_str: Session string (e.g., '12-2pm, Saturday 4th October')
Returns:
Session object or None if parsing fails
"""
parts = session_str.split(',')
if len(parts) != 2:
logger.warning(f"Invalid session format: {session_str}")
return None
time_part = parts[0].strip() # e.g., '12-2pm'
date_part = parts[1].strip() # e.g., 'Saturday 4th October'
# Parse times
start_time = self._parse_start_time(time_part, date_part)
end_time = self._parse_end_time(time_part, date_part)
if start_time is None or end_time is None:
logger.warning(f"Failed to parse times for session: {session_str}")
return None
return Session(
session_str=session_str,
start_time=start_time,
end_time=end_time
)
def _parse_start_time(self, time_part: str, date_part: str) -> Optional[datetime]:
"""Parse start time from session string."""
# Extract start time (before the dash)
start_time_str = time_part.split('-')[0].strip()
return self._parse_datetime(start_time_str, date_part, time_part)
def _parse_end_time(self, time_part: str, date_part: str) -> Optional[datetime]:
"""Parse end time from session string."""
# Extract end time (after the dash)
end_time_str = time_part.split('-')[1].strip()
return self._parse_datetime(end_time_str, date_part, time_part)
def _parse_datetime(self, time_str: str, date_part: str, full_time_range: Optional[str] = None) -> Optional[datetime]:
"""
Parse a datetime from time and date strings.
Args:
time_str: Time string (e.g., '12pm' or '2pm')
date_part: Date string (e.g., 'Saturday 4th October')
full_time_range: Full time range string (e.g., '9-10pm') to infer AM/PM
Returns:
datetime object or None if parsing fails
"""
# Parse time
match = re.match(r'(\d+)(am|pm)?', time_str.lower())
if not match:
return None
hour = int(match.group(1))
ampm = match.group(2)
# If no AM/PM marker on this time, try to infer from the full time range
if not ampm and full_time_range:
range_match = re.search(r'(am|pm)', full_time_range.lower())
if range_match:
ampm = range_match.group(1)
# Default to AM if still no marker (except for 12 which defaults to PM)
if not ampm:
ampm = 'pm' if hour == 12 else 'am'
# Convert to 24-hour format
if ampm == 'pm' and hour != 12:
hour += 12
elif ampm == 'am' and hour == 12:
hour = 0
minute = 0 # Assume on the hour
# Parse date
# Remove ordinal suffix from date
date_part_clean = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', date_part, flags=re.IGNORECASE)
# Get current year and try to parse
current_year = datetime.now().year
date_str_full = f"{date_part_clean} {current_year}"
try:
date_obj = datetime.strptime(date_str_full, '%A %d %B %Y')
except ValueError:
# Try next year if the date is in the past
try:
date_str_full = f"{date_part_clean} {current_year + 1}"
date_obj = datetime.strptime(date_str_full, '%A %d %B %Y')
except ValueError:
logger.error(f"Failed to parse date: {date_part}")
return None
# Combine into datetime
result = date_obj.replace(hour=hour, minute=minute)
# If the datetime is in the past, try next year
if result < datetime.now():
try:
date_str_full = f"{date_part_clean} {current_year + 1}"
date_obj = datetime.strptime(date_str_full, '%A %d %B %Y')
result = date_obj.replace(hour=hour, minute=minute)
except ValueError:
pass
return result
def get_upcoming_notification_time(
self, session: Session, hours_before: int
) -> Optional[datetime]:
"""
Get the time for upcoming notification.
Args:
session: Session object
hours_before: Hours before session to notify
Returns:
datetime for notification or None if already passed
"""
notification_time = session.start_time - timedelta(hours=hours_before)
if notification_time <= datetime.now():
return None
return notification_time