-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathemail_client.py
More file actions
227 lines (196 loc) · 9.42 KB
/
email_client.py
File metadata and controls
227 lines (196 loc) · 9.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
IMAP Email Client to fetch and parse emails.
"""
import imaplib
import email
from email.header import decode_header
from datetime import datetime, timedelta
import logging
from config_manager import config_manager
# Configure logging - will be updated dynamically
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class EmailClient:
def __init__(self):
self.mail = None
self._update_logging_level()
def _update_logging_level(self):
"""Update logging level from current config."""
log_level = config_manager.get('LOG_LEVEL', 'INFO')
logging.getLogger().setLevel(getattr(logging, log_level.upper()))
def connect(self):
"""Connect to the IMAP server and log in."""
self._update_logging_level()
try:
imap_server = config_manager.get('IMAP_SERVER')
imap_port = config_manager.get('IMAP_PORT', 993)
email_address = config_manager.get('EMAIL_ADDRESS')
email_password = config_manager.get('EMAIL_PASSWORD')
self.mail = imaplib.IMAP4_SSL(imap_server, imap_port)
self.mail.login(email_address, email_password)
logging.info("Successfully connected to the email server.")
return True
except imaplib.IMAP4.error as e:
logging.error(f"Could not connect to email server: {e}")
return False
except Exception as e:
logging.error(f"An unexpected error occurred during connection: {e}")
return False
def _build_search_criteria(self):
"""Builds the IMAP search criteria based on config."""
fetch_criteria = config_manager.get('FETCH_CRITERIA', 'UNSEEN')
fetch_days = config_manager.get('FETCH_DAYS', 0)
criteria = [fetch_criteria]
if fetch_days > 0:
date_n_days_ago = (datetime.now() - timedelta(days=fetch_days)).strftime("%d-%b-%Y")
criteria.append(f'SINCE "{date_n_days_ago}" ')
# IMAP search expects bytes
return [c.encode('utf-8') for c in criteria]
def fetch_emails(self):
"""Fetch emails based on configured criteria."""
if not self.mail:
logging.error("Not connected to the email server.")
return []
try:
imap_mailbox = config_manager.get('IMAP_MAILBOX', 'INBOX')
status, _ = self.mail.select(imap_mailbox)
if status != 'OK':
logging.error(f"Failed to select mailbox '{imap_mailbox}': {status}")
return []
search_criteria = self._build_search_criteria()
logging.info(f"Searching for emails with criteria: {search_criteria}")
status, messages = self.mail.search(None, *search_criteria)
if status != 'OK':
logging.error(f"Failed to search for emails: {status}")
return []
email_ids = messages[0].split()
if not email_ids:
logging.info("No emails found matching criteria.")
return []
# Apply FETCH_LIMIT, fetching newest first
fetch_limit = config_manager.get('FETCH_LIMIT', 10)
email_ids_to_fetch = email_ids[-fetch_limit:] if fetch_limit > 0 else email_ids
logging.info(f"Found {len(email_ids)} emails, fetching {len(email_ids_to_fetch)}.")
fetched_emails = []
for email_id in reversed(email_ids_to_fetch):
status, msg_data = self.mail.fetch(email_id, '(RFC822)')
if status == 'OK':
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
parsed_email = self._parse_email(msg, email_id.decode())
fetched_emails.append(parsed_email)
else:
logging.warning(f"Failed to fetch email ID {email_id.decode()}: {status}")
return fetched_emails
except imaplib.IMAP4.error as e:
logging.error(f"IMAP error during email fetch: {e}")
return []
except Exception as e:
logging.error(f"An unexpected error occurred during email fetch: {e}")
return []
def _parse_email(self, msg, email_id):
"""Parse the email message into a dictionary."""
subject, encoding = decode_header(msg['Subject'])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else 'utf-8', errors='ignore')
from_header = msg.get('From')
to_header = msg.get('To')
cc_header = msg.get('Cc')
date_header = msg.get('Date')
reply_to_header = msg.get('Reply-To')
body_plain = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition'))
if 'attachment' in content_disposition:
continue
charset = part.get_content_charset()
payload = part.get_payload(decode=True)
if not payload:
continue
try:
decoded_payload = payload.decode(charset if charset else 'utf-8', errors='ignore')
except (LookupError, UnicodeDecodeError):
decoded_payload = payload.decode('latin-1', errors='ignore') # Fallback encoding
if content_type == 'text/plain' and not body_plain:
body_plain = decoded_payload
elif content_type == 'text/html' and not body_html:
body_html = decoded_payload
else:
# Not a multipart message, just get the payload
charset = msg.get_content_charset()
payload = msg.get_payload(decode=True)
try:
body_plain = payload.decode(charset if charset else 'utf-8', errors='ignore')
except (LookupError, UnicodeDecodeError):
body_plain = payload.decode('latin-1', errors='ignore')
# Prioritize HTML, but fall back to plain text for a GUI client
body = body_html.strip() if body_html else body_plain.strip()
return {
'id': email_id,
'from': from_header,
'to': to_header,
'cc': cc_header,
'date': date_header,
'reply_to': reply_to_header,
'subject': subject,
'body': body
}
def mark_email_as_read(self, email_id):
"""Marks an email as read (seen)."""
if not self.mail:
logging.error("Not connected to the email server.")
return
try:
status, _ = self.mail.store(email_id, '+FLAGS', '\Seen')
if status == 'OK':
logging.info(f"Email ID {email_id} marked as read.")
else:
logging.warning(f"Failed to mark email ID {email_id} as read: {status}")
except imaplib.IMAP4.error as e:
logging.error(f"IMAP error marking email {email_id} as read: {e}")
def move_email_to_folder(self, email_id, folder_name):
"""Moves an email to a specified folder."""
if not self.mail:
logging.error("Not connected to the email server.")
return
try:
# Check if folder exists, create if not
status, _ = self.mail.list('', folder_name)
if not any(folder_name.encode() in f for f in _):
logging.info(f"Folder '{folder_name}' does not exist. Creating...")
status, _ = self.mail.create(folder_name)
if status != 'OK':
logging.error(f"Failed to create folder '{folder_name}': {status}")
return
logging.info(f"Folder '{folder_name}' created successfully.")
# Copy email to new folder
status, _ = self.mail.copy(email_id, folder_name)
if status == 'OK':
logging.info(f"Email ID {email_id} copied to '{folder_name}'.")
# Mark original for deletion and expunge
status, _ = self.mail.store(email_id, '+FLAGS', '\Deleted')
if status == 'OK':
self.mail.expunge()
logging.info(f"Original email ID {email_id} deleted.")
else:
logging.warning(f"Failed to mark original email ID {email_id} for deletion: {status}")
else:
logging.warning(f"Failed to copy email ID {email_id} to '{folder_name}': {status}")
except imaplib.IMAP4.error as e:
logging.error(f"IMAP error moving email {email_id} to {folder_name}: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred moving email {email_id}: {e}")
def close(self):
"""Close the connection to the IMAP server."""
if self.mail:
try:
self.mail.close()
self.mail.logout()
logging.info("Disconnected from the email server.")
except imaplib.IMAP4.error as e:
logging.error(f"Error during IMAP close/logout: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during close: {e}")