-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv_helper.py
More file actions
203 lines (161 loc) · 7.22 KB
/
csv_helper.py
File metadata and controls
203 lines (161 loc) · 7.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
CSV Helper Module for ESDAR-Checker
Handles CSV file operations for reading and writing DNS check results
@author Merlin von der Weide
@date 2025
@version 2.0.0
"""
import csv
import os
from typing import List, Dict, Any
import config
from terminal_message_handler import print_error, print_warning
OUTPUT_FILENAME = "esdar-check_result.csv"
def get_output_filepath() -> str:
"""Get the full path to the output CSV file."""
return os.path.join(config.RELATIVE_FILE_PATH, OUTPUT_FILENAME).replace("\\", "/")
def create_csv_file_with_header(filepath: str) -> bool:
"""
Create a new CSV file with headers.
Args:
filepath: Full path to the CSV file
Returns:
True if successful, False otherwise
"""
headers = ["Domain", "MX Record", "SPF Record", "DKIM Record", "DMARC Record"]
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter=',', quoting=csv.QUOTE_ALL)
writer.writerow(headers)
return True
except PermissionError:
print_error(f"Cannot write to {filepath} - file is open in another program. Please close it and try again.")
return False
except Exception as e:
print_error(f"Error creating CSV file: {str(e)}")
return False
def write_results_to_csv(results: List[Dict[str, Any]], append: bool = False) -> bool:
"""
Write DNS check results to CSV file.
Args:
results: List of dictionaries containing DNS check results
append: If True, append to existing file; if False, overwrite
Returns:
True if successful, False otherwise
"""
if not results:
print_warning("No results to write to CSV")
return False
filepath = get_output_filepath()
# Check if file exists
file_exists = os.path.isfile(filepath)
# Define headers
headers = ["Domain", "MX Record", "SPF Record", "DKIM Record", "DMARC Record"]
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Determine mode and whether to write header
if append:
# Append mode: only write header if file doesn't exist
mode = 'a'
write_header = not file_exists
else:
# Overwrite mode: always write header
mode = 'w'
write_header = True
with open(filepath, mode=mode, newline='', encoding='utf-8') as file:
# Use QUOTE_ALL to ensure all fields are properly quoted
# This prevents issues with commas, semicolons, and other special characters
writer = csv.writer(file, delimiter=',', quoting=csv.QUOTE_ALL)
# Write header if needed
if write_header:
writer.writerow(headers)
# Write results
for result in results:
# Extract values from result dictionary and ensure they are strings
domain = str(result.get('domain', '')).strip()
mx = str(result.get('mx', '')).strip()
spf = str(result.get('spf', '')).strip()
dkim = str(result.get('dkim', '')).strip()
dmarc = str(result.get('dmarc', '')).strip()
# Replace semicolons with pipe character in values to prevent CSV parsing issues
# This ensures Excel and other CSV readers don't misinterpret semicolons as delimiters
# Pipe character (|) is less likely to cause issues than commas or semicolons
domain = domain.replace(';', ' | ')
mx = mx.replace(';', ' | ')
spf = spf.replace(';', ' | ')
dkim = dkim.replace(';', ' | ')
dmarc = dmarc.replace(';', ' | ')
# Write row - each value in its own column, properly escaped
writer.writerow([domain, mx, spf, dkim, dmarc])
print(f"Results written to {filepath}")
return True
except PermissionError:
print_error(f"Cannot write to {filepath} - file is open in another program. Please close it and try again.")
return False
except Exception as e:
print_error(f"Error writing to CSV file: {str(e)}")
return False
def read_domains_from_file(filepath: str) -> List[str]:
"""
Read domains from a text file (one domain per line) or CSV file.
Tries multiple encodings to handle different file formats.
Args:
filepath: Path to the input file
Returns:
List of domain names
"""
domains = []
if not os.path.isfile(filepath):
print_error(f"File not found: {filepath}")
return domains
# List of encodings to try (in order of preference)
encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'iso-8859-1', 'windows-1252', 'cp1252']
for encoding in encodings_to_try:
try:
with open(filepath, 'r', encoding=encoding) as file:
# Try to detect if it's a CSV file
first_line = file.readline().strip()
file.seek(0) # Reset to beginning
# If first line looks like CSV header, skip it
if ',' in first_line and ('domain' in first_line.lower() or 'url' in first_line.lower()):
# It's a CSV file
reader = csv.reader(file)
next(reader, None) # Skip header
for row in reader:
if row and row[0].strip():
domains.append(row[0].strip())
else:
# It's a plain text file (one domain per line)
for line in file:
domain = line.strip()
# Skip empty lines and comments
if domain and not domain.startswith('#'):
domains.append(domain)
# If we got here, the file was read successfully
return domains
except UnicodeDecodeError:
# Try next encoding
continue
except Exception as e:
# For other errors, try next encoding but log a warning
if encoding == encodings_to_try[-1]:
# Last encoding failed, show error
print_error(f"Error reading domains from file: {str(e)}")
return domains
continue
# If all encodings failed
print_error(f"Could not read file {filepath} with any supported encoding")
return domains
def write_single_result_to_csv(result: Dict[str, Any], append: bool = True) -> bool:
"""
Write a single DNS check result to CSV file (useful for processing domains one by one).
Args:
result: Dictionary containing DNS check result for one domain
append: If True, append to existing file; if False, overwrite
Returns:
True if successful, False otherwise
"""
return write_results_to_csv([result], append=append)