-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathagent.py
More file actions
2933 lines (2653 loc) · 162 KB
/
agent.py
File metadata and controls
2933 lines (2653 loc) · 162 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import sys
import json
import platform
import subprocess
import socket
import uuid
import psutil
import requests
import winreg
from typing import Dict, Any
import logging
import logging.handlers
import argparse
import time
import threading
from pathlib import Path
import concurrent.futures
from functools import lru_cache
import traceback
# Default configuration
DEFAULT_CONFIG = {
"servers": {
"development": "http://localhost:3001",
"production": "https://10.1.32.66"
},
"default_server": "production",
"log_level": "INFO",
"send_interval": 21600,
"hidden_mode": True,
"create_service": False,
"add_to_startup": False,
"auth": {
"email": "infraagent@localhost.com",
"password": "Infraagent@2025"
}
}
# Load configuration
CONFIG_FILE = 'config.json'
CONFIG = {}
try:
with open(CONFIG_FILE, 'r') as f:
CONFIG = json.load(f)
# Validate and merge with default config
for key, value in DEFAULT_CONFIG.items():
if key not in CONFIG:
CONFIG[key] = value
elif isinstance(value, dict) and isinstance(CONFIG[key], dict):
# Merge nested dictionaries
for sub_key, sub_value in value.items():
if sub_key not in CONFIG[key]:
CONFIG[key][sub_key] = sub_value
except Exception as e:
print(f"Warning: Could not load config file: {e}. Using default configuration.")
CONFIG = DEFAULT_CONFIG
# Configure logging with file rotation to prevent permission issues
class SafeRotatingFileHandler(logging.handlers.RotatingFileHandler):
def __init__(self, filename, mode='a', maxBytes=1024*1024, backupCount=3, encoding=None, delay=False):
# Ensure the log directory exists and has proper permissions
log_path = Path(filename)
if log_path.parent and not log_path.parent.exists():
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Warning: Could not create log directory {log_path.parent}: {e}")
# Try to handle permission issues gracefully
try:
super().__init__(filename, mode, maxBytes, backupCount, encoding, delay)
except (PermissionError, OSError) as e:
# If we can't write to the log file, create a unique log file name in a writable location
try:
# Try to use a writable location like temp directory
import tempfile
temp_dir = Path(tempfile.gettempdir())
unique_filename = f"infraagent_{int(time.time())}.log"
unique_path = temp_dir / unique_filename
super().__init__(str(unique_path), mode, maxBytes, backupCount, encoding, delay)
print(f"Warning: Could not write to {filename}, using {unique_path} instead")
except Exception as e2:
print(f"Error creating log file in temp directory: {e2}")
# Fallback to console only logging
pass
# Configure logging with more detailed formatting and rotation
logging.basicConfig(
level=getattr(logging, CONFIG.get('log_level', 'INFO')),
format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s() - %(message)s',
handlers=[
SafeRotatingFileHandler('agent.log', maxBytes=1024*1024, backupCount=3), # 1MB files, keep 3 backups
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class Agent:
def __init__(self, server_url: str, token: str = ""):
self.server_url = server_url
self.token = token
self.headers = {
'Content-Type': 'application/json'
}
if token:
self.headers['Authorization'] = f'Bearer {token}'
# Create a session for connection pooling
self.session = requests.Session()
self.session.headers.update(self.headers)
# Disable SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def login(self, email: str = "infraagent@localhost.com", password: str = "Infraagent@2025") -> bool:
"""Login to the server and obtain authentication token with optimized timeout"""
try:
login_url = f"{self.server_url}/api/auth/login"
logger.info(f"Logging in to {login_url}")
login_data = {
'email': email,
'password': password
}
response = self.session.post(
login_url,
json=login_data,
timeout=(5, 15), # (connection timeout, read timeout)
verify=False # Disable SSL verification for self-signed certificates
)
if response.status_code in [200, 201]:
login_result = response.json()
self.token = login_result.get('token', '')
if self.token:
self.headers['Authorization'] = f'Bearer {self.token}'
self.session.headers.update(self.headers)
logger.info("Login successful!")
return True
else:
logger.error("Login succeeded but no token received")
return False
else:
error_data = response.json() if response.content else {}
logger.error(f"Login failed. Status code: {response.status_code}. Error: {error_data}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection error during login (network issue): {e}")
return False
except Exception as e:
logger.error(f"Error during login: {e}")
logger.debug(f"Full traceback: {traceback.format_exc()}")
return False
@lru_cache(maxsize=1)
def get_network_info(self) -> Dict[str, str]:
"""Get IP and MAC address with caching for performance and permission handling"""
try:
# Get MAC address - using a more reliable method to get the primary network adapter's MAC
mac = ''
ip = ''
# Try multiple methods to get MAC address for better compatibility
import subprocess
import platform
if platform.system() == "Windows":
# Method 1: Use PowerShell to get detailed network adapter information
# Handle both 32-bit and 64-bit PowerShell compatibility
try:
ps_command = """
Get-WmiObject -Class Win32_NetworkAdapter -Filter "NetEnabled='true'" |
Where-Object { $_.MACAddress -ne $null } |
Select-Object MACAddress, Name, AdapterType |
ConvertTo-Json
"""
# Use sysnative for 32-bit processes on 64-bit systems to access 64-bit PowerShell
powershell_path = "powershell"
if platform.architecture()[0] == "32bit" and "64" in platform.machine():
# Try to use 64-bit PowerShell from 32-bit process
sysnative_path = os.path.join(os.environ.get('SystemRoot', 'C:\\Windows'), 'Sysnative\\WindowsPowerShell\\v1.0\\powershell.exe')
if os.path.exists(sysnative_path):
powershell_path = sysnative_path
result = subprocess.run(
[powershell_path, "-Command", ps_command],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
# Use the module-level json import instead of local import
adapters_data = json.loads(result.stdout.strip())
# Handle both single object and array
if isinstance(adapters_data, list):
adapters = adapters_data
else:
adapters = [adapters_data] if adapters_data else []
# Look for Ethernet adapters first, then any adapter with valid MAC
ethernet_adapter = None
first_valid_adapter = None
for adapter in adapters:
mac_address = adapter.get('MACAddress', '').strip()
name = adapter.get('Name', '').strip()
adapter_type = adapter.get('AdapterType', '').strip()
if mac_address and mac_address != "N/A" and ':' in mac_address:
# Normalize MAC address format
mac_address = mac_address.upper()
# Remove hyphens and colons for consistent formatting
clean_mac = mac_address.replace("-", "").replace(":", "")
if not first_valid_adapter:
first_valid_adapter = clean_mac
# Prefer Ethernet adapters
if "ethernet" in name.lower() or "ethernet" in adapter_type.lower():
ethernet_adapter = clean_mac
break
# Also check for Intel adapters specifically
if "intel" in name.lower() and ("i225" in name.lower() or "ethernet" in name.lower()):
ethernet_adapter = clean_mac
break
# Use Ethernet adapter if found, otherwise first valid adapter
mac = ethernet_adapter or first_valid_adapter or ''
except (PermissionError, subprocess.SubprocessError, FileNotFoundError, json.JSONDecodeError) as e:
logger.warning(f"Error getting MAC via PowerShell WMI: {e}")
# Method 2: Fallback to getmac command if PowerShell method didn't work
if not mac:
try:
result = subprocess.run(["getmac", "/fo", "csv", "/v"],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines[1:]: # Skip header
# Parse CSV line
parts = line.split(',')
if len(parts) >= 3:
transport_name = parts[0].strip().strip('"')
mac_address = parts[2].strip().strip('"')
# Look for Ethernet adapter or Intel adapters
if ("Ethernet" in transport_name or
"ethernet" in transport_name.lower() or
"Intel" in transport_name):
# Remove all possible separators for consistent formatting
mac = mac_address.replace('-', '').replace(':', '').replace('"', '').replace("'", "").upper()
break
# If we didn't find Ethernet, use the first valid MAC
if not mac:
for line in lines[1:]:
parts = line.split(',')
if len(parts) >= 3:
mac_address = parts[2].strip().strip('"')
if mac_address and mac_address != "N/A":
# Remove all possible separators for consistent formatting
mac = mac_address.replace('-', '').replace(':', '').replace('"', '').replace("'", "").upper()
break
except (PermissionError, subprocess.SubprocessError, FileNotFoundError) as e:
logger.warning(f"Permission error getting MAC via getmac: {e}")
# Method 3: Try using psutil for network interfaces
if not mac:
try:
import psutil
interfaces = psutil.net_if_addrs()
for interface_name, interface_addresses in interfaces.items():
# Skip loopback and virtual adapters
if interface_name.lower().startswith('lo') or 'loopback' in interface_name.lower():
continue
# Prefer physical ethernet adapters
if ('ethernet' in interface_name.lower() or
'eth' in interface_name.lower() or
'intel' in interface_name.lower()):
for address in interface_addresses:
if address.family == psutil.AF_LINK: # MAC address
# Remove all possible separators for consistent formatting
mac = str(address.address).replace('-', '').replace(':', '').replace('"', '').replace("'", "").upper()
break
if mac:
break
# If no preferred adapter found, use first valid one
if not mac:
for interface_name, interface_addresses in interfaces.items():
for address in interface_addresses:
if address.family == psutil.AF_LINK: # MAC address
# Remove all possible separators for consistent formatting
mac = str(address.address).replace('-', '').replace(':', '').replace('"', '').replace("'", "").upper()
break
if mac:
break
except Exception as e:
logger.warning(f"Error getting MAC via psutil: {e}")
# Fallback to original method if others didn't work
if not mac:
try:
mac = ''.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
for elements in range(0,2*6,2)][::-1])
except Exception as e:
logger.error(f"Error generating MAC from uuid: {e}")
mac = ''
# Ensure MAC address is in uppercase and has no separators
if mac:
mac = str(mac).upper().replace('-', '').replace(':', '').replace('"', '').replace("'", "")
# Get IP address with permission handling
try:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
except (PermissionError, socket.error) as e:
logger.warning(f"Permission error getting IP address: {e}")
ip = ''
return {
'ip': ip,
'mac': mac
}
except Exception as e:
logger.error(f"Error getting network info: {e}")
logger.debug(f"Traceback: {traceback.format_exc()}")
# Fallback to original method
try:
mac = ''.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
for elements in range(0,2*6,2)][::-1])
# Ensure consistent formatting
mac = str(mac).upper().replace('-', '').replace(':', '').replace('"', '').replace("'", "")
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return {'ip': ip, 'mac': mac}
except (PermissionError, socket.error):
logger.error(f"Error in fallback network info")
logger.debug(f"Traceback: {traceback.format_exc()}")
return {'ip': '', 'mac': ''}
@lru_cache(maxsize=1)
def get_os_info(self) -> Dict[str, str]:
"""Get detailed OS information with caching for performance and permission handling"""
try:
import json
system = platform.system()
release = platform.release()
version = platform.version()
arch = platform.machine()
# More user-friendly OS information
os_name = "Unknown OS"
os_version = ""
os_build = ""
os_edition = ""
os_install_date = ""
if system == "Windows":
# Try to get more detailed Windows version info with reduced timeout and permission handling
try:
result = subprocess.run(
["wmic", "os", "get", "Caption,Version,BuildNumber,OperatingSystemSKU,InstallDate", "/value"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
# Parse the output to extract OS information
for line in result.stdout.split('\n'):
if line.startswith('Caption='):
os_name = line.split('=', 1)[1].strip()
elif line.startswith('Version='):
version = line.split('=', 1)[1].strip()
elif line.startswith('BuildNumber='):
os_build = line.split('=', 1)[1].strip()
elif line.startswith('OperatingSystemSKU='):
sku = line.split('=', 1)[1].strip()
# Map SKU to edition (simplified)
sku_map = {
'0': 'Undefined',
'1': 'Ultimate',
'2': 'Home Basic',
'3': 'Home Premium',
'4': 'Enterprise',
'5': 'Home Basic N',
'6': 'Business',
'7': 'Standard Server',
'8': 'Datacenter Server',
'9': 'Small Business Server',
'10': 'Enterprise Server',
'11': 'Starter',
'12': 'Datacenter Server Core',
'13': 'Standard Server Core',
'14': 'Enterprise Server Core',
'15': 'Enterprise Server IA64',
'16': 'Business N',
'17': 'Web Server',
'18': 'Cluster Server',
'19': 'Home Server',
'20': 'Storage Express Server',
'21': 'Storage Standard Server',
'22': 'Storage Workgroup Server',
'23': 'Storage Enterprise Server',
'24': 'Server For Small Business',
'25': 'Small Business Server Premium',
'26': 'Home Premium N',
'27': 'Enterprise N',
'28': 'Ultimate N',
'29': 'Web Server Core',
'30': 'Medium Business Server Management',
'31': 'Medium Business Server Security',
'32': 'Medium Business Server Messaging',
'33': 'Server Foundation',
'34': 'Home Premium Server',
'35': 'Server For Small Business V',
'36': 'Standard Server V',
'37': 'Datacenter Server V',
'38': 'Enterprise Server V',
'39': 'Datacenter Server Core V',
'40': 'Standard Server Core V',
'41': 'Enterprise Server Core V',
'42': 'Hypervisor Server',
'43': 'Storage Express Server Core',
'44': 'Storage Standard Server Core',
'45': 'Storage Workgroup Server Core',
'46': 'Storage Enterprise Server Core',
'47': 'Starter N',
'48': 'Professional',
'49': 'Professional N',
'50': 'Server Solutions Premium',
'51': 'Server Solutions Premium Core',
'52': 'Enterprise Storage Server',
'53': 'Starter Edition',
'54': 'Home Basic Edition',
'55': 'Home Premium Edition',
'56': 'Business Edition',
'57': 'Enterprise Edition',
'58': 'Ultimate Edition',
'59': 'Home Server 2011',
'60': 'Server For SB Solutions EM',
'61': 'Server For SB Solutions',
'62': 'Standard Server Solutions',
'63': 'Standard Server Solutions Core',
'64': 'SB Solution Server EM',
'65': 'SB Solution Server',
'66': 'Windows Essential Server Solution Management',
'67': 'Windows Essential Server Solution Additional',
'68': 'Windows Essential Server Solution Management SVC',
'69': 'Windows Essential Server Solution Additional SVC',
'70': 'Small Business Server Premium Core',
'71': 'Cluster Server Core',
'72': 'Enterprise Evaluation Edition',
'73': 'Prerelease',
'74': 'Evaluation Edition',
'75': 'Azure Server Core',
'76': 'Azure Nano Server',
'77': 'Azure Server Datacenter',
'78': 'Azure Server Standard',
'79': 'Azure Server Datacenter Core',
'80': 'Azure Server Standard Core',
'81': 'Windows 10 IoT Core',
'82': 'Windows 10 IoT Core Commercial',
'83': 'Windows 10 S',
'84': 'Windows 10 S N',
'85': 'Windows 10 Pro for Workstations',
'86': 'Windows 10 Pro for Workstations N',
'87': 'Windows 10 IoT Core Services Commercial',
'88': 'Windows 10 Lean',
'89': 'Windows 10 Lean N',
'90': 'Windows 10 Education',
'91': 'Windows 10 Education N',
'92': 'Windows 10 Pro Education',
'93': 'Windows 10 Pro Education N',
'94': 'Windows 10 Pro for Workstations with Advanced Threat Protection',
'95': 'Windows 10 Pro for Workstations N with Advanced Threat Protection',
'96': 'Windows 10 Pro with Advanced Threat Protection',
'97': 'Windows 10 Pro N with Advanced Threat Protection',
'98': 'Windows 10 Enterprise with Advanced Threat Protection',
'99': 'Windows 10 Enterprise N with Advanced Threat Protection',
'100': 'Windows 10 Education with Advanced Threat Protection',
'101': 'Windows 10 Education N with Advanced Threat Protection',
'102': 'Windows 10 Pro Education with Advanced Threat Protection',
'103': 'Windows 10 Pro Education N with Advanced Threat Protection',
'104': 'Windows 10 Pro Single Language',
'105': 'Windows 10 Pro Single Language with Advanced Threat Protection',
'106': 'Windows 10 Pro Single Language N',
'107': 'Windows 10 Pro Single Language N with Advanced Threat Protection',
'108': 'Windows 10 Pro for Workstations Single Language',
'109': 'Windows 10 Pro for Workstations Single Language with Advanced Threat Protection',
'110': 'Windows 10 Pro for Workstations Single Language N',
'111': 'Windows 10 Pro for Workstations Single Language N with Advanced Threat Protection'
}
os_edition = sku_map.get(sku, f"SKU {sku}")
elif line.startswith('InstallDate='):
install_date = line.split('=', 1)[1].strip()
if install_date:
# Format the install date
try:
# WMIC date format: YYYYMMDDHHMMSS.mmmmmm+UUU
year = install_date[:4]
month = install_date[4:6]
day = install_date[6:8]
os_install_date = f"{year}-{month}-{day}"
except:
os_install_date = install_date
# Improve Windows version detection
# Windows 11 has build numbers starting with 22000 or higher
if os_build.isdigit() and int(os_build) >= 22000:
# This is Windows 11
os_name = os_name.replace("Microsoft Windows 10", "Microsoft Windows 11")
elif "Windows 10" in os_name and os_build.isdigit() and int(os_build) >= 22000:
# Fallback check for Windows 11
os_name = os_name.replace("Microsoft Windows 10", "Microsoft Windows 11")
except (PermissionError, subprocess.SubprocessError, FileNotFoundError) as e:
logger.warning(f"Permission error getting OS info via WMIC: {e}")
# If we still don't have OS info, try alternative methods
if os_name == "Unknown OS":
try:
# Try PowerShell to get OS information
ps_command = """
$os = Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object Caption,Version,BuildNumber,OperatingSystemSKU,InstallDate
$os | ConvertTo-Json
"""
result = subprocess.run(
["powershell", "-Command", ps_command],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
os_data = json.loads(result.stdout.strip())
os_name = os_data.get('Caption', '').strip()
version = os_data.get('Version', version)
os_build = os_data.get('BuildNumber', os_build)
# Check for Windows 11
if os_build.isdigit() and int(os_build) >= 22000:
os_name = os_name.replace("Microsoft Windows 10", "Microsoft Windows 11")
elif "Windows 10" in os_name and os_build.isdigit() and int(os_build) >= 22000:
os_name = os_name.replace("Microsoft Windows 10", "Microsoft Windows 11")
except (PermissionError, subprocess.SubprocessError, FileNotFoundError, json.JSONDecodeError) as e:
logger.warning(f"Permission error getting OS info via PowerShell: {e}")
# Final fallback to basic platform info
if os_name == "Unknown OS":
os_name = f"{system} {release}"
os_build = version
# Create the simplified OS version string (user preference)
# Handle 32-bit vs 64-bit architecture properly
if "64" in arch or "AMD64" in arch:
architecture = "AMD64"
elif "86" in arch or "32" in arch:
architecture = "x86"
else:
architecture = arch
os_version = f"{os_name} {architecture}"
if os_build:
os_version += f" Build {os_build}"
else:
# For non-Windows systems
os_name = f"{system} {release}"
# Handle architecture for non-Windows systems
if "64" in arch:
architecture = "x64"
elif "86" in arch or "32" in arch:
architecture = "x86"
else:
architecture = arch
os_version = f"{os_name} {architecture}"
os_build = version
return {
'platform': system.lower(),
'release': os_name,
'build': os_build,
'arch': architecture, # Return the corrected architecture
'edition': os_edition,
'installDate': os_install_date,
'full': os_version
}
except Exception as e:
logger.error(f"Error getting OS info: {e}")
logger.debug(f"Traceback: {traceback.format_exc()}")
# Fallback with proper architecture detection
fallback_arch = platform.machine()
if "64" in fallback_arch or "AMD64" in fallback_arch:
fallback_architecture = "AMD64"
elif "86" in fallback_arch or "32" in fallback_arch:
fallback_architecture = "x86"
else:
fallback_architecture = fallback_arch
return {
'platform': platform.system().lower(),
'release': platform.release(),
'build': platform.version(),
'arch': fallback_architecture,
'edition': '',
'installDate': '',
'full': f"{platform.system()} {platform.release()} {fallback_architecture}"
}
@lru_cache(maxsize=1)
def get_system_specs(self) -> Dict[str, Any]:
"""Get system specifications with caching for performance and permission handling"""
try:
import platform # Import platform module at the function level to ensure it's available
# Get memory info with permission handling
total_memory_gb = 0
available_memory_gb = 0
memory_speed_mhz = "Unknown"
memory_type = "Unknown"
try:
mem = psutil.virtual_memory()
total_memory_gb = round(mem.total / (1024**3))
available_memory_gb = round(mem.available / (1024**3))
except (PermissionError, psutil.AccessDenied) as e:
logger.warning(f"Permission error getting memory info: {e}")
total_memory_gb = 0
available_memory_gb = 0
# Get detailed memory information on Windows
try:
if platform.system() == "Windows":
# Try to get memory speed and type using wmic
result = subprocess.run(
["wmic", "memorychip", "get", "Speed,MemoryType,FormFactor,Capacity,DeviceLocator,Manufacturer", "/format:csv"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
# Parse memory modules information
speeds = []
types = []
for line in lines[1:]: # Skip header
parts = line.split(',')
if len(parts) >= 7:
try:
speed = parts[-6].strip() # Speed column
mem_type = parts[-5].strip() # MemoryType column
# Only use speed if it's a reasonable value (not memory size)
if speed.isdigit() and speed != "0" and int(speed) < 1000000:
speeds.append(int(speed))
# Memory type mapping (simplified)
type_mapping = {
"20": "DDR",
"21": "DDR2",
"24": "DDR3",
"26": "DDR4",
"34": "DDR5"
}
if mem_type in type_mapping:
types.append(type_mapping[mem_type])
except Exception as e:
logger.debug(f"Error parsing memory module info: {e}")
# Determine common speed and type
if speeds:
memory_speed_mhz = f"{max(speeds)} MHz" # Use maximum speed
if types:
# Use the most common type
from collections import Counter
type_counts = Counter(types)
memory_type = type_counts.most_common(1)[0][0]
except (PermissionError, subprocess.SubprocessError, FileNotFoundError) as e:
logger.warning(f"Permission error getting detailed memory info: {e}")
# Get CPU info with permission handling
cpu_model = "Unknown"
cpu_count = 0
cpu_freq_str = "Unknown"
cpu_threads = 0
cpu_architecture = "Unknown"
try:
cpu_count = psutil.cpu_count(logical=False) # Physical cores
cpu_threads = psutil.cpu_count(logical=True) # Logical cores (threads)
except (PermissionError, psutil.AccessDenied) as e:
logger.warning(f"Permission error getting CPU count: {e}")
cpu_count = 0
cpu_threads = 0
try:
cpu_freq = psutil.cpu_freq()
cpu_freq_str = f"{cpu_freq.current:.2f}MHz" if cpu_freq else "Unknown"
except (PermissionError, psutil.AccessDenied, NotImplementedError) as e:
logger.warning(f"Permission error getting CPU frequency: {e}")
cpu_freq_str = "Unknown"
# Get CPU architecture with better 32-bit/64-bit detection
try:
cpu_architecture = platform.machine()
# Normalize architecture naming for consistency
if "64" in cpu_architecture or "AMD64" in cpu_architecture:
cpu_architecture = "AMD64"
elif "86" in cpu_architecture or "32" in cpu_architecture:
cpu_architecture = "x86"
else:
# Try alternative methods for better architecture detection
if platform.system() == "Windows":
try:
# Use WMIC to get system type for more accurate architecture detection
result = subprocess.run(
["wmic", "computersystem", "get", "SystemType", "/value"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if line.startswith('SystemType='):
system_type = line.split('=', 1)[1].strip()
if "64" in system_type:
cpu_architecture = "AMD64"
elif "86" in system_type or "32" in system_type:
cpu_architecture = "x86"
break
except Exception as e:
logger.warning(f"Error getting system type via WMIC: {e}")
except Exception as e:
logger.warning(f"Error getting CPU architecture: {e}")
cpu_architecture = "Unknown"
# Try to get better CPU name using multiple methods on Windows
try:
if platform.system() == "Windows":
# Method 1: Try to get CPU name using WMIC with more detailed information
result = subprocess.run(
["wmic", "cpu", "get", "Name,NumberOfCores,NumberOfLogicalProcessors,MaxClockSpeed,L2CacheSize,L3CacheSize,Architecture", "/format:csv"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
# Parse CSV output
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
# Skip header line
data_line = lines[1]
parts = data_line.split(',')
if len(parts) >= 8:
# Extract CPU name (should be the last part)
cpu_model = parts[-7].strip() if parts[-7].strip() else "Unknown"
# We can also extract other details if needed in the future
# Method 2: If WMIC didn't work or didn't provide detailed info, try PowerShell with enhanced approach
if cpu_model == "Unknown" or cpu_model == "":
# Enhanced PowerShell approach with multiple methods
ps_commands = [
# Method 2a: Get CPU information using Get-CimInstance (modern approach)
"""
$cpu = Get-CimInstance -ClassName Win32_Processor | Select-Object Name, NumberOfCores, NumberOfLogicalProcessors, MaxClockSpeed
$cpu | ConvertTo-Json
""",
# Method 2b: Fallback to Get-WmiObject (for older systems)
"""
$cpu = Get-WmiObject -Class Win32_Processor | Select-Object Name, NumberOfCores, NumberOfLogicalProcessors, MaxClockSpeed
$cpu | ConvertTo-Json
""",
# Method 2c: Get detailed processor information from registry via PowerShell
"""
$cpu = Get-ItemProperty -Path "HKLM:\\HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0" -Name ProcessorNameString -ErrorAction SilentlyContinue
if ($cpu) {
@{Name = $cpu.ProcessorNameString.Trim()}
} else {
$null
}
| ConvertTo-Json
"""
]
# Try each PowerShell command in order
for ps_command in ps_commands:
try:
result = subprocess.run(
["powershell", "-Command", ps_command],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
import json
cpu_data = json.loads(result.stdout.strip())
# Handle both single object and array
if isinstance(cpu_data, list) and len(cpu_data) > 0:
cpu_model = cpu_data[0].get('Name', '').strip() or cpu_data[0].get('ProcessorNameString', '').strip()
elif isinstance(cpu_data, dict):
cpu_model = cpu_data.get('Name', '').strip() or cpu_data.get('ProcessorNameString', '').strip()
if cpu_model:
break # Success, exit the loop
except Exception as e:
logger.debug(f"PowerShell command failed: {e}")
continue
# Method 3: Try registry approach for more exact CPU information with enhanced error handling
if cpu_model == "Unknown" or cpu_model == "":
try:
# Enhanced registry approach with multiple keys
registry_keys = [
r"HARDWARE\DESCRIPTION\System\CentralProcessor\0",
r"HARDWARE\DESCRIPTION\System\CentralProcessor\1" # Sometimes the first core has better info
]
for reg_key in registry_keys:
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_key) as key:
# Try multiple value names
value_names = ["ProcessorNameString", "Identifier", "VendorIdentifier"]
for value_name in value_names:
try:
cpu_model = winreg.QueryValueEx(key, value_name)[0].strip()
if cpu_model and cpu_model != "Unknown":
break
except FileNotFoundError:
continue
if cpu_model and cpu_model != "Unknown":
break
except Exception as e:
logger.debug(f"Registry key {reg_key} access failed: {e}")
continue
except Exception as e:
logger.warning(f"Error getting CPU from registry: {e}")
# Method 4: Try to get detailed CPU information using CPUID or other methods
if cpu_model == "Unknown" or cpu_model == "":
try:
# Try to get more detailed CPU information using wmic with different approach
result = subprocess.run(
["wmic", "computersystem", "get", "SystemType", "/value"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if line.startswith('SystemType='):
system_type = line.split('=', 1)[1].strip()
if system_type:
cpu_architecture = system_type
except Exception as e:
logger.warning(f"Error getting system type: {e}")
# Method 5: Enhanced platform.processor() with additional checks
if cpu_model == "Unknown" or cpu_model == "":
try:
# Try to get CPU information using different approaches
import platform
temp_cpu_model = platform.processor()
if temp_cpu_model and temp_cpu_model != "Unknown":
cpu_model = temp_cpu_model
except Exception as e:
logger.debug(f"Error getting CPU model from platform.processor(): {e}")
# Additional check using environment variables
if cpu_model == "Unknown" or cpu_model == "":
try:
# Check environment variables that might contain CPU info
processor_identifier = os.environ.get('PROCESSOR_IDENTIFIER', '')
if processor_identifier and processor_identifier != "Unknown":
cpu_model = processor_identifier
except Exception as e:
logger.debug(f"Error getting CPU model from environment: {e}")
else:
# For non-Windows systems with enhanced detection
try:
# Try to get CPU information using /proc/cpuinfo on Linux with enhanced parsing
if os.path.exists('/proc/cpuinfo'):
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if line.startswith('model name'):
temp_cpu_model = line.split(':', 1)[1].strip()
if temp_cpu_model:
cpu_model = temp_cpu_model
break
# Fallback to other identifiers
if not cpu_model or cpu_model == "Unknown":
if line.startswith('cpu'):
temp_cpu_model = line.split(':', 1)[1].strip()
if temp_cpu_model:
cpu_model = temp_cpu_model
else:
# Try alternative methods for Unix-like systems
try:
# Try sysctl on macOS/BSD systems
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
temp_cpu_model = result.stdout.strip()
if temp_cpu_model:
cpu_model = temp_cpu_model
except Exception as e:
logger.debug(f"Error getting CPU model from sysctl: {e}")
except Exception as e:
logger.warning(f"Error reading /proc/cpuinfo: {e}")
except (PermissionError, subprocess.SubprocessError, FileNotFoundError) as e:
logger.warning(f"Permission error getting CPU name: {e}")
# Fallback to platform.processor() if other methods didn't work
if cpu_model == "Unknown" or cpu_model == "":
try:
cpu_model = platform.processor() or "Unknown"
except Exception as e:
logger.warning(f"Error getting CPU model from platform: {e}")
cpu_model = "Unknown"
# Enhanced fallback mechanisms for CPU detection failures
if cpu_model == "Unknown" or cpu_model == "":
try:
logger.debug("All primary CPU detection methods failed, trying enhanced fallback methods")
# Fallback Method 1: Try to get CPU information from Windows Management Instrumentation (WMI) with different approach
if platform.system() == "Windows":
try:
# Use PowerShell with Get-WmiObject as an alternative approach
ps_command = """
$ErrorActionPreference = "SilentlyContinue"
$cpu = Get-WmiObject -Class Win32_Processor -Property Name, Caption, Description | Select-Object -First 1 Name, Caption, Description
if ($cpu) {
if ($cpu.Name) { $cpu.Name }
elseif ($cpu.Caption) { $cpu.Caption }
elseif ($cpu.Description) { $cpu.Description }
else { "Unknown" }
} else {
"Unknown"
}
"""
result = subprocess.run(
["powershell", "-Command", ps_command],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0 and result.stdout.strip() and result.stdout.strip() != "Unknown":
cpu_model = result.stdout.strip()
logger.debug(f"CPU model detected via PowerShell WMI fallback: {cpu_model}")
except Exception as e:
logger.debug(f"PowerShell WMI fallback failed: {e}")
# Fallback Method 2: Try to get CPU information from system information command
if cpu_model == "Unknown" or cpu_model == "":
try:
# Use systeminfo command to get system information including CPU
result = subprocess.run(
["systeminfo"],
capture_output=True, text=True, timeout=20
)
if result.returncode == 0 and result.stdout:
# Parse systeminfo output to find CPU information
for line in result.stdout.split('\n'):
if 'Processor(s):' in line:
# Extract CPU information from the line
cpu_info = line.split(':', 1)[1].strip() if ':' in line else line.strip()
if cpu_info and cpu_info != "":
cpu_model = cpu_info
logger.debug(f"CPU model detected via systeminfo: {cpu_model}")
break
except Exception as e:
logger.debug(f"Systeminfo fallback failed: {e}")
# Fallback Method 3: Try to get CPU information from MSInfo32 export (if available)
if cpu_model == "Unknown" or cpu_model == "":
try:
import tempfile
import xml.etree.ElementTree as ET
# Create temporary file for MSInfo32 export
with tempfile.NamedTemporaryFile(suffix='.xml', delete=False) as tmp_file:
tmp_filename = tmp_file.name
try:
# Export system information to XML
result = subprocess.run(
["msinfo32", "/report", tmp_filename, "/category", "components"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and os.path.exists(tmp_filename):
# Parse the XML file to find CPU information
tree = ET.parse(tmp_filename)
root = tree.getroot()
# Look for CPU information in the XML
for elem in root.iter():
if elem.text and ('processor' in elem.text.lower() or 'cpu' in elem.text.lower()):
# Check siblings or parent elements for the actual CPU name
# In ElementTree, we need to find the parent by iterating through the tree
# For now, let's just check the element itself and its siblings
if elem.text and len(elem.text) > 10:
if 'intel' in elem.text.lower() or 'amd' in elem.text.lower():
cpu_model = elem.text.strip()
logger.debug(f"CPU model detected via MSInfo32: {cpu_model}")
break
except Exception as e:
logger.debug(f"MSInfo32 fallback failed: {e}")
finally:
# Clean up temporary file