diff --git a/CHANGELOG.md b/CHANGELOG.md index 8017520..ed1fabf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,141 @@ +# 2023/07/11 +## Version 0.46 +### Enhancements +* Aligning with some python better code practices. + +# 2023/07/09 +## Version 0.45 +### Fixes +* Fixed a minor bug with the certificate chain retrieval. + +# 2023/07/08 +## Version 0.44 +### Fixes +* [traceback untrusted issuer](https://github.com/TheScriptGuy/certificateChecker/issues/19). + +# 2023/06/24 +## Version 0.43 +### Enhancements +* [Socket timeout value](https://github.com/TheScriptGuy/certificateChecker/issues/17). + +# 2023/05/31 +## Version 0.42 +### Fixes +* A left over print statement from the previous build. + +# 2023/05/22 +## Version 0.41 +### Feature Additions/Enhancements +* Add the ability to ignore ssl warnings on a host by host basis. See examples here - [Example of queryFile structure](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-queryFile.md) + +## Version 0.40 +### Feature Additions/Enhancements +* Adding the ability to use unsafe legacy method of connection. See examples here - [Example of queryFile structure](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-queryFile.md) + +## Version 0.39 +### Fixes +* [MongoDB bugfix](https://github.com/TheScriptGuy/certificateChecker/issues/15) + +# 2023/04/15 +## Version 0.38 +### Feature additions +* [Allow environment variables usage](https://github.com/TheScriptGuy/certificateChecker/issues/13) + +## Version 0.37 +### Feature additions +* [Adding time to output when error occurs](https://github.com/TheScriptGuy/certificateChecker/issues/11) + +# 2023/03/25 +## Version 0.36 +### Feature additions +* [MongoDB enhancement](https://github.com/TheScriptGuy/certificateChecker/issues/9) + +# 2023/02/05 +## Version 0.35 +### Feature additions +* Added the ability to load the context flags from contextVariables.json file. + +# 2022/12/04 +## Version 0.34 +### Additions +* Added the ability to retry attempts if a connection fails. + * `--retryAttempts x`, where x is the number of attempts. Defaults to 1 attempt. + * `--timeBetweenRetries y` where y is the number of seconds between attempts. Defaults to 1 second. + +### Fixes +* Fixed an error when calculating statistics for a single host that didn't exist. + +# 2022/10/26 +## Version 0.33 +### Changes +* When using --mongoDB argument, the output will include the timestamp of the result being uploaded. + +## Version 0.32 +### Fixes +* Resolving [issue](https://github.com/TheScriptGuy/certificateChecker/issues/6) + +# 2022/10/23 +## Version 0.31 +### Changes +* updated dataFormatVersion to 20: + * added new fields - `lowestTemplateTime`, `lowestTemplateTimeHumanReadable`, `highestTemplateTime`, `highestTemplateTimeHumanReadable` + * added Certificate Authority Common Name count field - `commonCAIssuersCount` + * added Common Cipher Information Connection Count - `commonCipherInfoCount` +* Example new json structure (to see more, please see [here](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-json.md)): +```json +"queryStatistics": +{ + "scriptStartTime": "2022-10-23T22:50:12.830321", + "scriptEndTime": "2022-10-23T22:50:14.251425", + "scriptExecutionTime": 1421.1, + "averageQueryTime": 282.82, + "averageCertificateUtilization": 38.05, + "averageTemplateTime": 22584959, + "averageTemplateTimeHumanReadable": "8 months, 18 days, 2 hours, 35 minutes, 59 seconds", + "lowestCertificateTemplateTime": 7775999, + "lowestCertificateTemplateTimeHumanReadable": "2 months, 30 days, 15 hours, 59 minutes, 59 seconds", + "highestCertificateTemplateTime": 34127999, + "highestCertificateTemplateTimeHumanReadable": "1 year, 29 days, 15 hours, 59 minutes, 59 seconds", + "commonCAIssuersCount": + { + "Apple Public EV Server ECC CA 1 - G1": 1, + "Cloudflare Inc ECC CA-3": 1, + "COMODO RSA Organization Validation Secure Server CA": 1, + "R3": 2 + }, + "commonCipherInfoCount": + { + "bits": + { + "128": 2, + "256": 3 + }, + "cipher": + { + "TLS_AES_128_GCM_SHA256": 1, + "TLS_AES_256_GCM_SHA384": 3, + "ECDHE-RSA-AES128-GCM-SHA256": 1 + }, + "version": + { + "TLSv13": 4, + "TLSv12": 1 + } + }, + "numberofTests": + { + "success": 5, + "failed": 1 + } +} +``` + +# 2022/07/22 +## Version 0.30 +### Changes +* improved MongoDB connection string handling. +* improved MongoDB error handling. + # 2022/07/17 ## Version 0.30 ### Changes diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..f4b05e9 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,10 @@ +# LICENSE AGREEMENT + +Free for personal/non-commercial use. + +By downloading this code: +* You are responsible for validating what you run in your environment (whether it be on personal/work/other device, or in a home/corporate/public/other network) +* All liabilities, responsibilities, and actions fall onto whomever/whatever runs this code. +* No warranty provided. + +Custom software license agreements available on request. diff --git a/README-json.md b/README-json.md index fcb3634..5c4ba05 100644 --- a/README-json.md +++ b/README-json.md @@ -56,19 +56,42 @@ The resulting output is: ```json { "tenantId": "", - "deviceId": "2a44a3c9-3051-405f-b7ae-9c070d76c57d", + "deviceId": "2a4123c9-3051-405f-b7ae-9341346c57d", "deviceTag": "", - "clientHostName": "calvin", - "dataFormatVersion": 19, + "clientHostName": "PRODMON01", + "dataFormatVersion": 20, "queryStatistics": { - "scriptStartTime": "2022-07-17T22:12:54.498603", - "scriptEndTime": "2022-07-17T22:12:54.541030", - "scriptExecutionTime": 42.43, - "averageQueryTime": 42.43, - "averageCertificateUtilization": 20.76, + "scriptStartTime": "2022-10-23T22:57:55.637526", + "scriptEndTime": "2022-10-23T22:57:55.684981", + "scriptExecutionTime": 47.45, + "averageQueryTime": 47.45, + "averageCertificateUtilization": 45.58, "averageTemplateTime": 34127999, "averageTemplateTimeHumanReadable": "1 year, 29 days, 15 hours, 59 minutes, 59 seconds", + "lowestCertificateTemplateTime": 34127999, + "lowestCertificateTemplateTimeHumanReadable": "1 year, 29 days, 15 hours, 59 minutes, 59 seconds", + "highestCertificateTemplateTime": 34127999, + "highestCertificateTemplateTimeHumanReadable": "1 year, 29 days, 15 hours, 59 minutes, 59 seconds", + "commonCAIssuersCount": + { + "Apple Public EV Server ECC CA 1 - G1": 1 + }, + "commonCipherInfoCount": + { + "bits": + { + "128": 1 + }, + "cipher": + { + "TLS_AES_128_GCM_SHA256": 1 + }, + "version": + { + "TLSv13": 1 + } + }, "numberofTests": { "success": 1, @@ -80,9 +103,9 @@ The resulting output is: { "hostname": "apple.com", "port": 443, - "startTime": "2022-07-17T22:12:54.498603", - "endTime": "2022-07-17T22:12:54.541030", - "queryTime": 42.43, + "startTime": "2022-10-23T22:57:55.637526", + "endTime": "2022-10-23T22:57:55.684981", + "queryTime": 47.45, "connectionCipher": [ "TLS_AES_128_GCM_SHA256", @@ -110,8 +133,8 @@ The resulting output is: "DNS0": "apple.com" } }, - "timeLeft": "10 months, 8 days, 23 hours, 45 minutes, 42 seconds", - "percentageUtilization": 20.76, + "timeLeft": "7 months, 2 days, 23 hours, 41 seconds", + "percentageUtilization": 45.58, "certificateTemplateTime": 34127999 } ] @@ -129,19 +152,49 @@ The resulting output is (take note of the invalid host data at the end of the ce "deviceId": "2a44a3c9-3051-405f-b7ae-9c070d76c57d", "deviceTag": "", "clientHostName": "PRODMON01", - "dataFormatVersion": 19, + "dataFormatVersion": 20, "queryStatistics": { - "scriptStartTime": "2022-07-17T22:07:46.284438", - "scriptEndTime": "2022-07-17T22:07:46.652491", - "scriptExecutionTime": 368.05, - "averageQueryTime": 121.33, - "averageCertificateUtilization": 29.43, - "averageTemplateTime": 24508799, - "averageTemplateTimeHumanReadable": "9 months, 10 days, 8 hours, 59 minutes, 59 seconds", + "scriptStartTime": "2022-10-23T22:50:12.830321", + "scriptEndTime": "2022-10-23T22:50:14.251425", + "scriptExecutionTime": 1421.1, + "averageQueryTime": 282.82, + "averageCertificateUtilization": 38.05, + "averageTemplateTime": 22584959, + "averageTemplateTimeHumanReadable": "8 months, 18 days, 2 hours, 35 minutes, 59 seconds", + "lowestCertificateTemplateTime": 7775999, + "lowestCertificateTemplateTimeHumanReadable": "2 months, 30 days, 15 hours, 59 minutes, 59 seconds", + "highestCertificateTemplateTime": 34127999, + "highestCertificateTemplateTimeHumanReadable": "1 year, 29 days, 15 hours, 59 minutes, 59 seconds", + "commonCAIssuersCount": + { + "Apple Public EV Server ECC CA 1 - G1": 1, + "Cloudflare Inc ECC CA-3": 1, + "COMODO RSA Organization Validation Secure Server CA": 1, + "R3": 2 + }, + "commonCipherInfoCount": + { + "bits": + { + "128": 2, + "256": 3 + }, + "cipher": + { + "TLS_AES_128_GCM_SHA256": 1, + "TLS_AES_256_GCM_SHA384": 3, + "ECDHE-RSA-AES128-GCM-SHA256": 1 + }, + "version": + { + "TLSv13": 4, + "TLSv12": 1 + } + }, "numberofTests": { - "success": 3, + "success": 5, "failed": 1 } }, @@ -150,9 +203,9 @@ The resulting output is (take note of the invalid host data at the end of the ce { "hostname": "apple.com", "port": 443, - "startTime": "2022-07-17T22:07:46.284476", - "endTime": "2022-07-17T22:07:46.328282", - "queryTime": 43.81, + "startTime": "2022-10-23T22:50:12.830354", + "endTime": "2022-10-23T22:50:12.874737", + "queryTime": 44.38, "connectionCipher": [ "TLS_AES_128_GCM_SHA256", @@ -180,16 +233,16 @@ The resulting output is (take note of the invalid host data at the end of the ce "DNS0": "apple.com" } }, - "timeLeft": "10 months, 8 days, 23 hours, 50 minutes, 50 seconds", - "percentageUtilization": 20.76, + "timeLeft": "7 months, 2 days, 23 hours, 8 minutes, 24 seconds", + "percentageUtilization": 45.58, "certificateTemplateTime": 34127999 }, { "hostname": "news24.com", "port": 443, - "startTime": "2022-07-17T22:07:46.329542", - "endTime": "2022-07-17T22:07:46.373214", - "queryTime": 43.67, + "startTime": "2022-10-23T22:50:12.876082", + "endTime": "2022-10-23T22:50:12.946077", + "queryTime": 70.0, "connectionCipher": [ "TLS_AES_256_GCM_SHA384", @@ -218,20 +271,234 @@ The resulting output is (take note of the invalid host data at the end of the ce "DNS1": "news24.com" } }, - "timeLeft": "9 months, 13 days, 1 hour, 52 minutes, 13 seconds", - "percentageUtilization": 21.56, + "timeLeft": "6 months, 7 days, 1 hour, 9 minutes, 47 seconds", + "percentageUtilization": 48.35, "certificateTemplateTime": 31622399 }, { "hostname": "reuters.com", "port": 443, - "startTime": "2022-07-17T22:07:46.373394", - "endTime": "2022-07-17T22:07:46.649901", - "queryTime": 276.51, + "startTime": "2022-10-23T22:50:12.946226", + "endTime": "2022-10-23T22:50:13.222335", + "queryTime": 276.11, "connectionCipher": [ - "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES128-GCM-SHA256", "TLSv1.2", + 128 + ], + "certificateInfo": + { + "certificateIssuer": + { + "countryName": "GB", + "stateOrProvinceName": "Greater Manchester", + "localityName": "Salford", + "organizationName": "COMODO CA Limited", + "commonName": "COMODO RSA Organization Validation Secure Server CA" + }, + "version": 3, + "serialNumber": "E0D89C311AF7835D847BB6EBD4BB4E82", + "notBefore": "Oct 14 00:00:00 2022 GMT", + "notAfter": "Oct 14 23:59:59 2023 GMT", + "caIssuers": + [ + "http://crt.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crt" + ], + "subjectAltName": + { + "DNS0": "thomsonreuters.com", + "DNS1": "*.casecenter.tr.com", + "DNS2": "*.findlaw.com", + "DNS3": "*.int.thomsonreuters.com", + "DNS4": "*.learnlive.com", + "DNS5": "*.mobile.reuters.com", + "DNS6": "*.reuters.com", + "DNS7": "*.thomson.com", + "DNS8": "*.thomsonreuters.com", + "DNS9": "*.westlaw.com", + "DNS10": "acceluslms.com", + "DNS11": "adviser.accelus.com", + "DNS12": "archbolde-update.co.uk", + "DNS13": "breakingviews.com", + "DNS14": "carswell.com", + "DNS15": "caselines.com", + "DNS16": "clb1.canadalawbook.ca", + "DNS17": "clb7.canadalawbook.ca", + "DNS18": "cpeasy.com", + "DNS19": "cvmailasia.com", + "DNS20": "deskcopy.thomsonreuters.ca", + "DNS21": "editionsyvonblais.com", + "DNS22": "fastsalestax.com", + "DNS23": "find.support.checkpoint.thomsonreuters.com", + "DNS24": "findandprint.com", + "DNS25": "findprint.com", + "DNS26": "funds.in.reuters.com", + "DNS27": "funds.uk.reuters.com", + "DNS28": "funds.us.reuters.com", + "DNS29": "highq.com", + "DNS30": "hk-lawyer.org", + "DNS31": "iblj.com", + "DNS32": "jctadmin.com", + "DNS33": "laley.com.ar", + "DNS34": "lawtel.com", + "DNS35": "legalbusinessonline.com", + "DNS36": "legaledcenter.com", + "DNS37": "livenotecentral.com", + "DNS38": "login.westlawasia.com", + "DNS39": "login.westlawindia.com", + "DNS40": "oconnors.com", + "DNS41": "onesourcelogin.com.au", + "DNS42": "onesourcelogin.eu", + "DNS43": "onesourcetax.com", + "DNS44": "ordermgr.courtexpress.westlaw.com", + "DNS45": "quickview.com", + "DNS46": "reuters.co.uk", + "DNS47": "reuters.com", + "DNS48": "reuters.com.cn", + "DNS49": "reuters.de", + "DNS50": "reuters.es", + "DNS51": "reuters.fr", + "DNS52": "reuters.it", + "DNS53": "reutersagency.com", + "DNS54": "reutersconnect.com", + "DNS55": "roundhall.ie", + "DNS56": "serengetilaw.com", + "DNS57": "services.serengetilaw.com", + "DNS58": "stockscreener.in.reuters.com", + "DNS59": "stockscreener.uk.reuters.com", + "DNS60": "stockscreener.us.reuters.com", + "DNS61": "support.riahome.com", + "DNS62": "support2.riahome.com", + "DNS63": "sweetandmaxwell.co.uk", + "DNS64": "thomson.com", + "DNS65": "thomsonreuters.ca", + "DNS66": "thomsonreuters.cn", + "DNS67": "thomsonreuters.co.jp", + "DNS68": "thomsonreuters.co.kr", + "DNS69": "thomsonreuters.co.nz", + "DNS70": "thomsonreuters.com.au", + "DNS71": "thomsonreuters.com.br", + "DNS72": "thomsonreuters.com.hk", + "DNS73": "thomsonreuters.com.my", + "DNS74": "thomsonreuters.com.pe", + "DNS75": "thomsonreuters.com.sg", + "DNS76": "thomsonreuters.es", + "DNS77": "thomsonreuters.in", + "DNS78": "thomsonreutersmexico.com", + "DNS79": "tr.com", + "DNS80": "tracker.serengetilaw.com", + "DNS81": "training.digita.thomsonreuters.com", + "DNS82": "triform.com", + "DNS83": "westfindandprint.com", + "DNS84": "westfindprint.com", + "DNS85": "westlawasia.com", + "DNS86": "westlawnextcanada.com", + "DNS87": "www.acceluslms.com", + "DNS88": "www.adviser.accelus.com", + "DNS89": "www.adviser.westlaw.com", + "DNS90": "www.analytics.hotprod.westlaw.com", + "DNS91": "www.analytics.qed.westlaw.com", + "DNS92": "www.analytics.westlaw.com", + "DNS93": "www.archbolde-update.co.uk", + "DNS94": "www.ca.practicallaw.thomsonreuters.com", + "DNS95": "www.carswell.com", + "DNS96": "www.cn.reuters.com", + "DNS97": "www.cpeasy.com", + "DNS98": "www.cvmailasia.com", + "DNS99": "www.drafting.westlaw.com", + "DNS100": "www.ediscoverypoint.thomsonreuters.com", + "DNS101": "www.editionsyvonblais.com", + "DNS102": "www.findandprint.com", + "DNS103": "www.findprint.com", + "DNS104": "www.findprint.westlaw.com", + "DNS105": "www.firmcentral.hotprod.westlaw.com", + "DNS106": "www.firmcentral.qed.westlaw.com", + "DNS107": "www.firmcentral.westlaw.com", + "DNS108": "www.forms.hotprod.westlaw.com", + "DNS109": "www.forms.qed.westlaw.com", + "DNS110": "www.forms.westlaw.com", + "DNS111": "www.iblj.com", + "DNS112": "www.laley.com.ar", + "DNS113": "www.lawtel.com", + "DNS114": "www.login.westlawindia.com", + "DNS115": "www.monitorsuite.thomsonreuters.com", + "DNS116": "www.nextcanada.hotprod.westlaw.com", + "DNS117": "www.nextcanada.qed.westlaw.com", + "DNS118": "www.nextcanada.westlaw.com", + "DNS119": "www.oconnors.com", + "DNS120": "www.practicetechnology.thomsonreuters.com", + "DNS121": "www.proview.hotprod.thomsonreuters.com", + "DNS122": "www.proview.thomsonreuters.com", + "DNS123": "www.reuters.co.uk", + "DNS124": "www.reuters.com.cn", + "DNS125": "www.reuters.de", + "DNS126": "www.reuters.es", + "DNS127": "www.reuters.fr", + "DNS128": "www.reuters.it", + "DNS129": "www.roundhall.ie", + "DNS130": "www.serengetilaw.com", + "DNS131": "www.support.riahome.com", + "DNS132": "www.tr.com", + "DNS133": "www.triform.com", + "DNS134": "www.v3.taxnetpro.com", + "DNS135": "www.westfindandprint.com", + "DNS136": "www.westfindprint.com", + "DNS137": "www.westlawnextcanada.com", + "DNS138": "www.wireless.reuters.com" + } + }, + "timeLeft": "11 months, 21 days, 1 hour, 9 minutes, 46 seconds", + "percentageUtilization": 2.72, + "certificateTemplateTime": 31622399 + }, + { + "hostname": "test.remotenode.org", + "port": 443, + "startTime": "2022-10-23T22:50:13.222993", + "endTime": "2022-10-23T22:50:13.608658", + "queryTime": 385.66, + "connectionCipher": + [ + "TLS_AES_256_GCM_SHA384", + "TLSv1.3", + 256 + ], + "certificateInfo": + { + "certificateIssuer": + { + "countryName": "US", + "organizationName": "Let's Encrypt", + "commonName": "R3" + }, + "version": 3, + "serialNumber": "03B092614FDFFC7671BFCC3F1E9B5C014F4A", + "notBefore": "Sep 21 22:12:20 2022 GMT", + "notAfter": "Dec 20 22:12:19 2022 GMT", + "caIssuers": + [ + "http://r3.i.lencr.org/" + ], + "subjectAltName": + { + "DNS0": "test.remotenode.org" + } + }, + "timeLeft": "1 month, 26 days, 23 hours, 22 minutes, 6 seconds", + "percentageUtilization": 35.58, + "certificateTemplateTime": 7775999 + }, + { + "hostname": "mail.remotenode.org", + "port": 465, + "startTime": "2022-10-23T22:50:13.609113", + "endTime": "2022-10-23T22:50:14.247040", + "queryTime": 637.93, + "connectionCipher": + [ + "TLS_AES_256_GCM_SHA384", + "TLSv1.3", 256 ], "certificateInfo": @@ -243,28 +510,28 @@ The resulting output is (take note of the invalid host data at the end of the ce "commonName": "R3" }, "version": 3, - "serialNumber": "04203F2F15F8194772481DABC1061E213EAB", - "notBefore": "Jun 6 12:54:06 2022 GMT", - "notAfter": "Sep 4 12:54:05 2022 GMT", + "serialNumber": "04C41320E31433441C6C91DC18C7F5503D70", + "notBefore": "Sep 1 17:24:09 2022 GMT", + "notAfter": "Nov 30 17:24:08 2022 GMT", "caIssuers": [ "http://r3.i.lencr.org/" ], "subjectAltName": { - "DNS0": "reuters.com" + "DNS0": "mail.remotenode.org" } }, - "timeLeft": "1 month, 17 days, 14 hours, 46 minutes, 19 seconds", - "percentageUtilization": 45.98, + "timeLeft": "1 month, 6 days, 18 hours, 33 minutes, 54 seconds", + "percentageUtilization": 58.03, "certificateTemplateTime": 7775999 }, { "hostname": "thisisaverybadhost.xyz", "port": 443, - "startTime": "2022-07-17T22:07:46.650302", - "endTime": "2022-07-17T22:07:46.652458", - "queryTime": 2.16, + "startTime": "2022-10-23T22:50:14.247608", + "endTime": "2022-10-23T22:50:14.251382", + "queryTime": 3.77, "certificateInfo": { "subject": diff --git a/README-queryFile.md b/README-queryFile.md new file mode 100644 index 0000000..22a3de0 --- /dev/null +++ b/README-queryFile.md @@ -0,0 +1,36 @@ +# queryFile structure +## Background +With "recent" upgrades in openssl to by default not accept legacy renegotiation, some websites don't like this an error out. +The queryFile structure has been updated to allow (on a host-by-host basis) a legacy renegotiation to take place. + +## Acceptable File syntax: +``` +hostname +hostname, +hostname:port +hostname:port, +hostname,[] +hostname:port,[] +``` + +In the lines where no `[` or `]` are seen, then it's treated as `None` options provided. i.e. use defaults. + +Now in the `[` and `]`, the options that are available to today are: +* `unsafe_legacy` - this allows for legacy renegotiation +* `local_untrusted_allow` - this prevents chain validation. Useful for when websites are misconfigured and presenting the full certificate chain. + +To connect to a host with an option configured, these would all be considered valid examples: +``` +apple.com,['unsafe_legacy'] +apple.com:443,['unsafe_legacy'] +``` + +Another example: +``` +apple.com,['local_untrusted_allow'] +apple.com:443,['local_untrusted_allow'] +apple.com,['unsafe_legacy','local_untrusted_allow'] +apple.com:443,['unsafe_legacy','local_untrusted_allow'] +``` + +As you can see, multiple options can be supported on each line. diff --git a/README.md b/README.md index 5b7d56b..7de810f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Certificate Checker -Version: 0.30 +Version: 0.46 Author: TheScriptGuy @@ -26,10 +26,11 @@ python3 certChecker.py --hostname example.com --displayTimeLeft ## Help output ```bash -usage: certCheck.py [-h] [--hostname HOSTNAME] [--displayCertificate] [--displayCertificateJSON] [--displayScriptDataJSON] [--displayTimeLeft] [--queryFile QUERYFILE] [--uploadJsonData UPLOADJSONDATA] [--mongoDB] [--sendEmail] [--setTag SETTAG] - [--delTag] [--getTag] [--renewDeviceId] [--getDeviceId] [--deleteDeviceId] [--setTenantId SETTENANTID] [--getTenantId] [--delTenantId] [--createBlankConfiguration] +usage: certCheck.py [-h] [--hostname HOSTNAME] [--displayCertificate] [--displayCertificateJSON] [--displayScriptDataJSON] [--displayTimeLeft] [--queryFile QUERYFILE] [--uploadJsonData UPLOADJSONDATA] [--mongoDB] + [--sendEmail] [--retryAmount RETRYAMOUNT] [--timeBetweenRetries TIMEBETWEENRETRIES] [--contextVariables] [--environmentVariables] [--setTag SETTAG] [--delTag] [--getTag] [--renewDeviceId] + [--getDeviceId] [--deleteDeviceId] [--setTenantId SETTENANTID] [--getTenantId] [--delTenantId] [--createBlankConfiguration] -Certificate Checker v0.30 +Certificate Checker v0.46 optional arguments: -h, --help show this help message and exit @@ -46,6 +47,13 @@ optional arguments: Upload JSON data to HTTP URL via HTTP POST method. --mongoDB Upload results to MongoDB. Connection details stored in mongo.cfg --sendEmail Send an email with the results. SMTP connection details stored in mail.cfg + --retryAmount RETRYAMOUNT + Attempt to retry the connection if any error occured. Defaults to 1 attempt. + --timeBetweenRetries TIMEBETWEENRETRIES + The number of seconds between each retry attempt if the connection fails. Defaults to 1 second. + --contextVariables Read the context variables from contextVariables.json + --environmentVariables + Uses the environment values for TENANT_ID and TAG to set the runtime environment. --setTag SETTAG Set the tag for the query results. Use commas to separate multiple tags. --delTag Removes the tags from the configuration file. --getTag Get tags from the configuration file. @@ -60,6 +68,22 @@ optional arguments: Creates a blank configuration file template - myConfig.json. Overwrites any existing configuration ``` +## Environment variables +The script will by default attempt to look at a configuration file for the Tenant ID and Tags. If the environment variables are defined, then it will use that in preference to the configuration file. + +To use, first define the variables: +```bash +$ export TENANT_ID="" +$ export TAGS="" +``` + +Now that environment variables are defined, we can use the `--environmentVariables` argument: +```bash +$ python3 certCheck.py --hostname apple.com --environmentVariables --displayScriptDataJSON +``` + +[Example of queryFile structure](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-queryFile.md) + [Example to send an email](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-email.md) [Example to upload to mongoDB](https://github.com/TheScriptGuy/certificateChecker/blob/main/README-mongoDB.md) diff --git a/certCheck.py b/certCheck.py index 7da85d2..b2879a7 100644 --- a/certCheck.py +++ b/certCheck.py @@ -1,12 +1,14 @@ # Program: Certificate Checker # Author: Nolan Rumble -# Date: 2022/07/17 -# Version: 0.30 +# Date: 2023/07/11 +# Version: 0.46 import argparse import datetime import sys import json +import time +import os from systemInfo import systemInfo from certificate import certificateModule @@ -16,7 +18,7 @@ from data import emailTemplateBuilder from data import sendDataEmail -scriptVersion = "0.30" +scriptVersion = "0.46" # Global Variables args = None @@ -28,7 +30,9 @@ def parseArguments(): """Create argument options and parse through them to determine what to do with script.""" # Instantiate the parser - parser = argparse.ArgumentParser(description='Certificate Checker v' + scriptVersion) + parser = argparse.ArgumentParser( + description=f'Certificate Checker v{scriptVersion}' + ) # Optional arguments parser.add_argument('--hostname', default='', @@ -58,6 +62,18 @@ def parseArguments(): parser.add_argument('--sendEmail', action='store_true', help='Send an email with the results. SMTP connection details stored in mail.cfg') + parser.add_argument('--retryAmount', default=1, + help='Attempt to retry the connection if any error occured. Defaults to 1 attempt.') + + parser.add_argument('--timeBetweenRetries', default=1, + help='The number of seconds between each retry attempt if the connection fails. Defaults to 1 second.') + + parser.add_argument('--contextVariables', action='store_true', + help='Read the context variables from contextVariables.json') + + parser.add_argument('--environmentVariables', action='store_true', + help='Uses the environment values for TENANT_ID and TAG to set the runtime environment.') + parser.add_argument('--setTag', default='', help='Set the tag for the query results. Use commas to separate multiple tags.') @@ -100,9 +116,20 @@ def defineInfoArguments(o_systemInfo): o_systemInfo.createBlankConfigurationFile() sys.exit(0) + # Check to see if the environmentVariables argument is set. + if args.environmentVariables: + tenantId = os.environ.get('TENANT_ID') + tags = os.environ.get('TAGS') + if tenantId and tags: + o_systemInfo.setTag(tags, False) + o_systemInfo.setTenantId(tenantId, False) + else: + print('Environment variables TENANT_ID and TAGS are not both set.') + sys.exit(1) + # If setTag argument is set, create the new Tag. if args.setTag: - o_systemInfo.setTag(args.setTag) + o_systemInfo.setTag(args.setTag, True) print('New tag set.') sys.exit(0) @@ -134,7 +161,7 @@ def defineInfoArguments(o_systemInfo): # If setTenantId is set, add it to the configuration file. if args.setTenantId: - o_systemInfo.setTenantId(args.setTenantId) + o_systemInfo.setTenantId(args.setTenantId, True) sys.exit(0) # If getTenantId is set, retrieve the tenant Id from the configuration file. @@ -148,7 +175,7 @@ def defineInfoArguments(o_systemInfo): sys.exit(0) -def gatherData(__certResults, __scriptStartTime, __scriptEndTime): +def gatherData(__certResults, __mySystemInfo, __scriptStartTime, __scriptEndTime): """ This will collect all the data into a uniform data structure that can help with measuring results across multiple executions. @@ -167,13 +194,11 @@ def gatherData(__certResults, __scriptStartTime, __scriptEndTime): * averageTemplateTime - Average template time being used across all the tests. * queryResults - The results of all queries that were performed against the nameservers. """ - mySystemInfo = systemInfo.systemInfo() myDetails = calculateStats.calculateStats() - # Create the json script structure with all the meta data. - myData = myDetails.combineData(__certResults, mySystemInfo, __scriptStartTime, __scriptEndTime) - - return myData + return myDetails.combineData( + __certResults, __mySystemInfo, __scriptStartTime, __scriptEndTime + ) def checkArguments(__myCertificate, __jsonCertificateInfo): @@ -229,14 +254,27 @@ def processQueryFile(): scriptStartTime = datetime.datetime.utcnow() for myHostname in myCertData.loadQueriesFile(args.queryFile): + # Set contectVariables to zero + contextVariables = 0 + + # Check to see if contextVariables argument was passed. + if args.contextVariables: + contextVariables = 1 + # Define initial certificate object - o_myCertificate = certificateModule.certificateModule() + o_myCertificate = certificateModule.certificateModule(contextVariables) # For SSL performance measurement - START o_startTime = datetime.datetime.utcnow() - # Connect to the hostname from the queryFile argument and get the certificate associated with it. - myCertificate = o_myCertificate.getCertificate(myHostname["hostname"], myHostname["port"]) + # Iterate through number of retryAmount + for _ in range(int(args.retryAmount)): + # Connect to the hostname from the queryFile argument and get the certificate associated with it. + myCertificate = o_myCertificate.getCertificate(myHostname) + + if myCertificate["certificateMetaData"] is None: + # If unable to connect to host for whatever reason, pause for a second then try again. + time.sleep(int(args.timeBetweenRetries)) # For SSL performance measurement - END o_endTime = datetime.datetime.utcnow() @@ -244,13 +282,17 @@ def processQueryFile(): # Convert the certificate object into JSON format. jsonCertificateInfo = o_myCertificate.convertCertificateObject2Json(myHostname["hostname"], myHostname["port"], o_startTime, o_endTime, myCertificate) + # Append jsonCertificateInfo to jsonScriptData jsonScriptData.append(jsonCertificateInfo) + # Check to see if additional arguments were passed checkArguments(myCertificate, jsonCertificateInfo) + # Get the time the script stopped gathering data. scriptEndTime = datetime.datetime.utcnow() - myJsonScriptData = gatherData(jsonScriptData, scriptStartTime, scriptEndTime) + # Combine all the data into a dict. + myJsonScriptData = gatherData(jsonScriptData, o_myInfo, scriptStartTime, scriptEndTime) if args.displayScriptDataJSON: # Display the certificate and system JSON structure @@ -265,7 +307,8 @@ def processQueryFile(): # Define the sendDataMongoDB object sdMDB = sendDataMongoDB.sendDataMongoDB() uploadResult = sdMDB.uploadDataToMongoDB(myJsonScriptData) - print(uploadResult) + uploadTime = str(datetime.datetime.utcnow()) + print(f"{uploadTime} - {str(uploadResult)}") if args.sendEmail: # Send an email with the results. @@ -275,20 +318,24 @@ def processQueryFile(): def processHostname(): """This will attempt to connect to the hostname defined by the --hostname argument.""" # Define initial certificate object - o_myCertificate = certificateModule.certificateModule() + + contextVariables = 1 if args.contextVariables else 0 + o_myCertificate = certificateModule.certificateModule(contextVariables) + o_myCertData = certData.certData() # For SSL performance measurement - START o_startTime = datetime.datetime.utcnow() # Connect to the hostname from the --hostname argument and get the certificate associated with it. - if ":" in args.hostname: - tmpLine = args.hostname.split(':') - hostnameQuery = {"hostname": tmpLine[0], "port": int(tmpLine[1])} + hostnameQuery = o_myCertData.parse_line(args.hostname) - else: - hostnameQuery = {"hostname": args.hostname, "port": 443} - - myCertificate = o_myCertificate.getCertificate(hostnameQuery["hostname"], hostnameQuery["port"]) + # Iterate through number of retryAmount + for _ in range(int(args.retryAmount)): + # Connect to the hostname from the queryFile argument and get the certificate associated with it. + myCertificate = o_myCertificate.getCertificate(hostnameQuery) + if myCertificate["certificateMetaData"] is None: + # If unable to connect to host for whatever reason, pause for a second then try again. + time.sleep(int(args.timeBetweenRetries)) # For SSL performance measurement - END o_endTime = datetime.datetime.utcnow() @@ -297,7 +344,7 @@ def processHostname(): jsonCertificateInfo = o_myCertificate.convertCertificateObject2Json(hostnameQuery["hostname"], hostnameQuery["port"], o_startTime, o_endTime, myCertificate) # Append system data to JSON certificate structure - jsonScriptData = gatherData([jsonCertificateInfo], o_startTime, o_endTime) + jsonScriptData = gatherData([jsonCertificateInfo], o_myInfo, o_startTime, o_endTime) if args.displayCertificateJSON: # Display the certificate JSON structure @@ -325,7 +372,8 @@ def processHostname(): # Define the sendDataMongoDB object sdMDB = sendDataMongoDB.sendDataMongoDB() uploadResult = sdMDB.uploadDataToMongoDB(jsonScriptData) - print(uploadResult) + uploadTime = str(datetime.datetime.utcnow()) + print(f"{uploadTime} - {str(uploadResult)}") if args.sendEmail: # Send an email with the results. @@ -344,7 +392,7 @@ def processHostname(): # initialize jsonCertificateInfo jsonCertificateInfo = {} - jsonScriptData = {} + jsonScriptData = [] if args.queryFile: processQueryFile() diff --git a/certificate/certificateModule.py b/certificate/certificateModule.py index 1595329..001638c 100644 --- a/certificate/certificateModule.py +++ b/certificate/certificateModule.py @@ -1,6 +1,6 @@ -# Certificate Module1 -# Version: 0.11 -# Last updated: 2022-07-16 +# Certificate Module +# Version: 0.18 +# Last updated: 2023-07-08 # Author: TheScriptGuy import ssl @@ -8,6 +8,9 @@ import datetime import json import requests +import hashlib +import os +from . import getCertificateChain from dateutil.relativedelta import relativedelta @@ -15,10 +18,59 @@ class certificateModule: """certificateModule class""" - @staticmethod - def getCertificate(__hostname, __port): + def getContextVariables(self) -> dict: + """Get the variables from the contextVariables.json""" + try: + # Assume contextVariables is empty. + contextVariables = None + + # Attempt to load the contextVariables.json file. + with open('contextVariables.json') as fContextVariables: + contextVariables = json.load(fContextVariables) + + return contextVariables + except FileNotFoundError: + print('I could not find contextVariables.json') + + def getCertificate(self, __hostinfo: dict) -> dict: """Connect to the host and get the certificate.""" - __ctx = ssl.create_default_context() + + # Determine which context to create + if __hostinfo['options'] is not None and \ + "local_untrusted_allow" in __hostinfo['options']: + hostnamePortPair = f'{__hostinfo["hostname"]}:{__hostinfo["port"]}' + certificateHashFilename = hashlib.sha256( + hostnamePortPair.encode() + ).hexdigest() + ".pem" + + if not os.path.exists(certificateHashFilename): + # Try to build the chain + occ = getCertificateChain.getCertificateChain() + occ.getCertificateChain( + __hostinfo['hostname'], + __hostinfo['port'] + ) + + __ctx = ssl.create_default_context(cafile=certificateHashFilename) + else: + # Create the default context. + __ctx = ssl.create_default_context() + + # Check to see if there are any options that need to be + # passed for the connection + if __hostinfo['options'] is not None: + if "unsafe_legacy" in __hostinfo['options']: + __ctx.options |= 0x4 # OP_LEGACY_SERVER_CONNECT + if "local_untrusted_allow" in __hostinfo['options']: + __ctx.check_hostname = False + __ctx.verify_mode = ssl.CERT_OPTIONAL + + # If there are any global options that need to be set. + if self.contextVariables is not None: + # If securityLevel is set + if self.contextVariables["securityLevel"] == 1: + # Lower the default security level + __ctx.set_ciphers('DEFAULT@SECLEVEL=1') # Initialize the __hostnameData object. __hostnameData = { @@ -27,37 +79,59 @@ def getCertificate(__hostname, __port): } try: - with __ctx.wrap_socket(socket.socket(), server_hostname=__hostname) as s: - s.connect((__hostname, __port)) - __certificate = s.getpeercert() - __cipher = s.cipher() - __hostnameData["certificateMetaData"] = __certificate - __hostnameData["connectionCipher"] = __cipher + # Create a new socket. + with socket.socket() as sock: + # Set timeout value for socket to 10 seconds. + sock.settimeout(10.0) + with __ctx.wrap_socket( + sock, + server_hostname=__hostinfo['hostname'] + ) as s: + s.connect((__hostinfo['hostname'], __hostinfo['port'])) + __certificate = s.getpeercert() + __cipher = s.cipher() + __hostnameData["certificateMetaData"] = __certificate + __hostnameData["connectionCipher"] = __cipher except ssl.SSLCertVerificationError as e: - connectHost = __hostname + ":" + str(__port) - print(connectHost + ' - Certificate error - ', e.verify_message) + connectHost = ( + f"{__hostinfo['hostname']}:{__hostinfo['port']}, " + f"options: {__hostinfo['options']}" + ) + print(f'{connectHost} - Certificate error - ', e.verify_message) except socket.gaierror as e: - connectHost = __hostname + ":" + str(__port) - print(connectHost + ' - Socket error - ', e.strerror) + connectHost = ( + f"{__hostinfo['hostname']}:{__hostinfo['port']}, " + f"options: {__hostinfo['options']}" + ) + print(f'{connectHost} - Socket error - ', e.strerror) except FileNotFoundError as e: - connectHost = __hostname + ":" + str(__port) - print(connectHost + ' - File not found - ', e.strerror) + connectHost = ( + f"{__hostinfo['hostname']}:{__hostinfo['port']}, " + f"options: {__hostinfo['options']}" + ) + print(f'{connectHost} - File not found - ', e.strerror) except TimeoutError as e: - connectHost = __hostname + ":" + str(__port) - print(connectHost + ' - Timeout error - ', e.strerror) + connectHost = ( + f"{__hostinfo['hostname']}:{__hostinfo['port']}, " + f"options: {__hostinfo['options']}" + ) + print(f'{connectHost} - Timeout error - ', e.strerror) except OSError as e: - connectHost = __hostname + ":" + str(__port) - print(connectHost + ' - OSError - ', e.strerror) - + connectHost = ( + f"{__hostinfo['hostname']}:{__hostinfo['port']}, " + f"options: {__hostinfo['options']}" + ) + print(f'{connectHost} - OSError - ', e.strerror) + return __hostnameData @staticmethod - def printSubject(__certificateObject): + def printSubject(__certificateObject: dict) -> None: """Print the subject name of the certificate.""" if __certificateObject is not None: subject = dict(x[0] for x in __certificateObject['subject']) @@ -65,17 +139,16 @@ def printSubject(__certificateObject): print("Subject: ", issued_to, end='') @staticmethod - def printSubjectAltName(__certificateObject): + def printSubjectAltName(__certificateObject) -> None: """Print the Subject Alternate Name(s) of the certificate.""" - __subjectAltName = [] - - for field, value in __certificateObject['subjectAltName']: - __subjectAltName.append({field: value}) - + __subjectAltName = [ + {field: value} + for field, value in __certificateObject['subjectAltName'] + ] print("Subject Alt Name: ", __subjectAltName) @staticmethod - def printIssuer(__certificateObject): + def printIssuer(__certificateObject) -> None: """Print the Issuer of the certificate.""" if __certificateObject is not None: issuer = dict(x[0] for x in __certificateObject['issuer']) @@ -83,74 +156,72 @@ def printIssuer(__certificateObject): print("Issued by: ", issued_by) @staticmethod - def printNotBefore(__certificateObject): + def printNotBefore(__certificateObject) -> None: """Print the notBefore field of the certificate.""" if __certificateObject is not None: notBefore = __certificateObject['notBefore'] print("Certificate start date: ", notBefore) @staticmethod - def printNotAfter(__certificateObject): + def printNotAfter(__certificateObject) -> None: """Print the notAfter field of the certificate.""" if __certificateObject is not None: notAfter = __certificateObject['notAfter'] print("Certificate end date: ", notAfter) @staticmethod - def returnNotBefore(__certificateObject): + def returnNotBefore(__certificateObject) -> None: """Return the notBefore field from the certificate.""" - if __certificateObject is not None: - return __certificateObject['notBefore'] - return "" + return "" if __certificateObject is None else __certificateObject['notBefore'] @staticmethod - def returnNotAfter(__certificateObject): + def returnNotAfter(__certificateObject) -> None: """Return the notAfter field from the certificate.""" - if __certificateObject is not None: - return __certificateObject['notAfter'] - return "" + return "" if __certificateObject is None else __certificateObject['notAfter'] - def howMuchTimeLeft(self, __certificateObject): + def howMuchTimeLeft(self, __certificateObject) -> None: """Return the remaining time left on the certificate.""" - if __certificateObject is not None: - timeNow = datetime.datetime.utcnow().replace(microsecond=0) - certNotAfter = datetime.datetime.strptime(self.returnNotAfter(__certificateObject["certificateMetaData"]), self.certTimeFormat) - - __delta = relativedelta(certNotAfter, timeNow) - - myDeltaDate = { - 'years': __delta.years, - 'months': __delta.months, - 'days': __delta.days, - 'hours': __delta.hours, - 'minutes': __delta.minutes, - 'seconds': __delta.seconds, - } - timeLeft = [] + if __certificateObject is None: + return "Invalid certificate" + timeNow = datetime.datetime.utcnow().replace(microsecond=0) + certNotAfter = datetime.datetime.strptime( + self.returnNotAfter( + __certificateObject["certificateMetaData"] + ), + self.certTimeFormat + ) + + __delta = relativedelta(certNotAfter, timeNow) + + myDeltaDate = { + 'years': __delta.years, + 'months': __delta.months, + 'days': __delta.days, + 'hours': __delta.hours, + 'minutes': __delta.minutes, + 'seconds': __delta.seconds, + } + timeLeft = [] - for field in myDeltaDate: - if myDeltaDate[field] > 1: - timeLeft.append("%d %s" % (myDeltaDate[field], field)) - else: - if myDeltaDate[field] == 1: - timeLeft.append("%d %s" % (myDeltaDate[field], field[:-1])) + for field, value in myDeltaDate.items(): + if value > 1: + timeLeft.append(f"{myDeltaDate[field]} {field}") + elif myDeltaDate[field] == 1: + timeLeft.append(f"{myDeltaDate[field]} {field[:-1]}") - certResult = ', '.join(timeLeft) - else: - certResult = "Invalid certificate" - return certResult + return ', '.join(timeLeft) @staticmethod - def checkIssuer(__certificateObject): + def checkIssuer(__certificateObject) -> bool: """Check to see if issuers are trusted.""" return True @staticmethod - def checkRevocation(__certificateObject): + def checkRevocation(__certificateObject) -> bool: """Check to see if certificate hasn't been revoked.""" return True - def checkTimeValidity(self, __certificateObject): + def checkTimeValidity(self, __certificateObject) -> bool: """ Check to see if the certificate is valid: current date is after certificate start date @@ -158,66 +229,68 @@ def checkTimeValidity(self, __certificateObject): """ if __certificateObject is not None: timeNow = datetime.datetime.utcnow().replace(microsecond=0).date() - certNotAfter = datetime.datetime.strptime(self.returnNotAfter(__certificateObject), self.certTimeFormat).date() - certNotBefore = datetime.datetime.strptime(self.returnNotBefore(__certificateObject), self.certTimeFormat).date() + certNotAfter = datetime.datetime.strptime( + self.returnNotAfter(__certificateObject), + self.certTimeFormat + ).date() - # Assume time not valid - isValid = bool((certNotBefore < timeNow) and (certNotAfter > timeNow)) + certNotBefore = datetime.datetime.strptime( + self.returnNotBefore(__certificateObject), + self.certTimeFormat + ).date() - return isValid + return certNotBefore < timeNow < certNotAfter return False @staticmethod - def printOCSP(__certificateObject): + def printOCSP(__certificateObject) -> None: """Print the OCSP field of the certificate.""" if __certificateObject is not None: - __OCSPList = [] - for value in __certificateObject['OCSP']: - __OCSPList.append(value) + __OCSPList = list(__certificateObject['OCSP']) print("OCSP: ", __OCSPList) @staticmethod - def printCRLDistributionPoints(__certificateObject): + def printCRLDistributionPoints(__certificateObject) -> None: """Print the CRL distribution points of the certificate.""" if __certificateObject is not None: - __CRLList = [] if 'crlDistributionPoints' in __certificateObject: - for value in __certificateObject['crlDistributionPoints']: - __CRLList.append(value) + __CRLList = list(__certificateObject['crlDistributionPoints']) print("CRL: ", __CRLList) @staticmethod - def printCertificateSerialNumber(__certificateObject): + def printCertificateSerialNumber(__certificateObject) -> None: """Print the certificate serial number.""" if __certificateObject is not None: certificateSerialNumber = __certificateObject['serialNumber'] print("Serial Number: ", certificateSerialNumber) @staticmethod - def printCaIssuers(__certificateObject): + def printCaIssuers(__certificateObject) -> None: """Print the certificates CA issuers.""" if __certificateObject is not None: certificateCaIssuers = __certificateObject['caIssuers'] print("CA Issuers: ", certificateCaIssuers) - def printHowMuchTimeLeft(self, __certificateObject): + def printHowMuchTimeLeft(self, __certificateObject) -> None: """Print how much time is left on the certificate.""" if __certificateObject is not None: timeLeft = self.howMuchTimeLeft(__certificateObject) print("Time left: ", timeLeft) - def certificateValid(self, __certificateObject): + def certificateValid(self, __certificateObject) -> None: """ Currently not in use. Check to see if the certificate is valid (Time, Recovation, Issuer) """ if __certificateObject is not None: - if self.checkTimeValidity(__certificateObject) and self.checkRevocation(__certificateObject) and self.checkIssuer(__certificateObject): + if self.checkTimeValidity(__certificateObject) and \ + self.checkRevocation(__certificateObject) and \ + self.checkIssuer(__certificateObject): print("Certificate good!") else: print("Certificate invalid!") - def printCertInfo(self, __certificateObject): + def printCertInfo(self, __certificateObject) -> None: """Print out all the certificate properties.""" if __certificateObject is not None: self.printSubject(__certificateObject) @@ -234,90 +307,121 @@ def printCertInfo(self, __certificateObject): else: print("No certificate info to display!") - def printCertInfoJSON(self, __certificateObject): + def printCertInfoJSON(self, __certificateObject) -> None: """Print the certificate information in JSON format.""" if __certificateObject is not None: jsonCertInfoFormat = json.dumps(__certificateObject) - print(jsonCertInfoFormat) else: jsonCertInfoFormat = { "subject": {"None": "None"}, - "certificateIssuer" : {"None": "None"}, - "version" : 0, - "serialNumber" : "0", - "notBefore" : "Jan 1 00:00:00 0000 GMT", - "notAfter" : "Jan 1 00:00:00 0000 GMT", - "timeLeft" : "0 seconds", - "OCSP" : "None", - "crlDistributionPoints" : "None", - "caIssuers" : "None", - "subjectAltName" : {"None": "None"} + "certificateIssuer": {"None": "None"}, + "version": 0, + "serialNumber": "0", + "notBefore": "Jan 1 00:00:00 0000 GMT", + "notAfter": "Jan 1 00:00:00 0000 GMT", + "timeLeft": "0 seconds", + "OCSP": "None", + "crlDistributionPoints": "None", + "caIssuers": "None", + "subjectAltName": {"None": "None"} } - print(jsonCertInfoFormat) - def calculateCertificateUtilization(self, __notBefore, __notAfter): + print(jsonCertInfoFormat) + + def calculateCertificateUtilization(self, __notBefore: datetime, __notAfter: datetime) -> float: """Calculating the percentage utilization of the certificate""" # Convert __notBefore to datetime object - notBeforeTime = datetime.datetime.strptime(__notBefore, self.certTimeFormat) + notBeforeTime = datetime.datetime.strptime( + __notBefore, + self.certTimeFormat + ) + # Convert __notAfter to datetime object - notAfterTime = datetime.datetime.strptime(__notAfter, self.certTimeFormat) + notAfterTime = datetime.datetime.strptime( + __notAfter, + self.certTimeFormat + ) # Get the current time. currentTime = datetime.datetime.utcnow() - # Calculate the differences between currentTime, notAfterTime, and notBeforeTime + # Calculate the differences between + # currentTime, notAfterTime, and notBeforeTime rest = notAfterTime - currentTime total = notAfterTime - notBeforeTime - # Calculate the percentage utilization of the time available before expiry. + # Calculate the percentage utilization of the time + # available before expiry. percentageUtilization = 100 - (rest / total * 100) # Return the percentage utilization as a string formatted to 2 places. return float(f"{percentageUtilization:.2f}") - def calculateCertificateTemplateTime(self, __notBefore, __notAfter): - """Calculate the number of seconds between __notBefore and __notAfter.""" + def calculateCertificateTemplateTime(self, __notBefore: datetime, __notAfter: datetime) -> int: + """ + Calculate the number of seconds between + __notBefore and __notAfter. + """ # Convert __notBefore to datetime object - notBeforeTime = datetime.datetime.strptime(__notBefore, self.certTimeFormat) + notBeforeTime = datetime.datetime.strptime( + __notBefore, + self.certTimeFormat + ) + # Convert __notAfter to datetime object - notAfterTime = datetime.datetime.strptime(__notAfter, self.certTimeFormat) + notAfterTime = datetime.datetime.strptime( + __notAfter, + self.certTimeFormat + ) # Calcualte the difference timeDifference = notAfterTime - notBeforeTime - # Get the number of seconds from the calculation above - timeDifferenceInSeconds = int(timeDifference.total_seconds()) - - return timeDifferenceInSeconds + return int(timeDifference.total_seconds()) - def convertCertificateObject2Json(self, __hostname, __port, __startTime, __endTime, __certificateObject): + def convertCertificateObject2Json( + self, + __hostname: str, + __port: int, + __startTime: datetime, + __endTime: datetime, + __certificateObject: dict + ) -> dict: """Convert the certificate object into JSON format.""" - myJsonCertificateInfo = {} - startTime = __startTime.isoformat() endTime = __endTime.isoformat() # Calculate queryTime between __endTime and __startTime in milliseconds - queryTime = round(float((__endTime - __startTime).total_seconds() * 1000), 2) - - myJsonCertificateInfo["hostname"] = __hostname - myJsonCertificateInfo["port"] = int(__port) - myJsonCertificateInfo["startTime"] = startTime - myJsonCertificateInfo["endTime"] = endTime - myJsonCertificateInfo["queryTime"] = queryTime - + queryTime = round( + float( + ( + __endTime - __startTime + ).total_seconds() * 1000 + ), 2 + ) + + myJsonCertificateInfo = { + "hostname": __hostname, + "port": __port, + "startTime": startTime, + "endTime": endTime, + "queryTime": queryTime, + } if __certificateObject["connectionCipher"] is not None: - myJsonCertificateInfo["connectionCipher"] = __certificateObject["connectionCipher"] + myJsonCertificateInfo["connectionCipher"] = \ + __certificateObject["connectionCipher"] myJsonCertificateInfo["certificateInfo"] = {} - + if __certificateObject["certificateMetaData"] is not None: certKeys = __certificateObject.keys() # Certificate might not have subject defined. if 'subject' in certKeys: - myJsonCertificateInfo["certificateInfo"]["subject"] = dict(x[0] for x in __certificateObject["certificateMetaData"]["subject"]) + myJsonCertificateInfo["certificateInfo"]["subject"] = dict( + x[0] for x in __certificateObject["certificateMetaData"]["subject"] + ) myJsonCertificateInfo["certificateInfo"]["certificateIssuer"] = dict(x[0] for x in __certificateObject["certificateMetaData"]["issuer"]) @@ -338,23 +442,22 @@ def convertCertificateObject2Json(self, __hostname, __port, __startTime, __endTi # Initialize subjectAltName myJsonCertificateInfo["certificateInfo"]["subjectAltName"] = {} - # Keep track of how many entries there are - subjectAltNameCounter = 0 - - for field, value in __certificateObject["certificateMetaData"]["subjectAltName"]: + for subjectAltNameCounter, (field, value) in enumerate(__certificateObject["certificateMetaData"]["subjectAltName"]): myJsonCertificateInfo["certificateInfo"]["subjectAltName"].update({field + str(subjectAltNameCounter): value}) - subjectAltNameCounter += 1 - # Time left on certificate myJsonCertificateInfo["timeLeft"] = self.howMuchTimeLeft(__certificateObject) + # Get the notBefore and notAfter dates. + certBeforeDate = __certificateObject["certificateMetaData"]["notBefore"] + certAfterDate = __certificateObject["certificateMetaData"]["notAfter"] + # Percentage Utilization of certificate - myJsonCertificateInfo["percentageUtilization"] = self.calculateCertificateUtilization(__certificateObject["certificateMetaData"]["notBefore"], __certificateObject["certificateMetaData"]["notAfter"]) + myJsonCertificateInfo["percentageUtilization"] = self.calculateCertificateUtilization(certBeforeDate, certAfterDate) # Certificate template time validity # Work out the time that certificates are issued for - myJsonCertificateInfo["certificateTemplateTime"] = self.calculateCertificateTemplateTime(__certificateObject["certificateMetaData"]["notBefore"], __certificateObject["certificateMetaData"]["notAfter"]) - + myJsonCertificateInfo["certificateTemplateTime"] = self.calculateCertificateTemplateTime(certBeforeDate, certAfterDate) + # Reset number of entries subjectAltNameCounter = 0 @@ -376,7 +479,7 @@ def convertCertificateObject2Json(self, __hostname, __port, __startTime, __endTi return myJsonCertificateInfo - def uploadJsonData(self, __certificateJsonData, __httpUrl): + def uploadJsonData(self, __certificateJsonData: dict, __httpUrl: str) -> str: """ This will upload the json data to a URL via a POST method. If the verbose argument is set, it'll display what URL it's being @@ -387,11 +490,15 @@ def uploadJsonData(self, __certificateJsonData, __httpUrl): x = requests.post(__httpUrl, json=__certificateJsonData) return x.headers - def __init__(self): + def __init__(self, __contextVariables=0): """Initialize the class.""" self.initialized = True - self.moduleVersion = "0.11" + self.moduleVersion = "0.18" self.certificate = {} + if __contextVariables == 1: + self.contextVariables = self.getContextVariables() + else: + self.contextVariables = None # Certificate date/time format that is to be interpreted by datetime module. self.certTimeFormat = "%b %d %H:%M:%S %Y %Z" diff --git a/certificate/getCertificateChain.py b/certificate/getCertificateChain.py new file mode 100644 index 0000000..05382fb --- /dev/null +++ b/certificate/getCertificateChain.py @@ -0,0 +1,307 @@ +# Description: Get the certificate chain from a website. +# Author: TheScriptGuy +# Last modified: 2023-07-29 +# Version: 0.04 + +import ssl +import socket +from cryptography import x509 +from cryptography.x509.oid import ExtensionOID +from cryptography.hazmat.primitives import hashes, serialization + +import requests +import argparse +import sys +import os +import glob +import re +import hashlib + + +class getCertificateChain: + """ + In some rare occassions where a website is poorly configured. + e.g. not presenting the full certificate chain, the python's ssl fails to connect. + This class will attempt to collect the certificate chain and store it to file for + later use. + """ + @staticmethod + def loadRootCACertChain(__filename: str) -> dict: + """ + Load the Root CA Chain in a structured format. + caRootStore = { + "Root CA Name 1": "", + "Root CA Name 2": "", + ... + } + """ + previousLine = "" + currentLine = "" + + caRootStore = {} + try: + with open(__filename, "r") as f_caCert: + while True: + previousLine = currentLine + currentLine = f_caCert.readline() + + if not currentLine: + break + + if re.search("^\={5,}", currentLine): + """ + This is where the Root CA certificate file begins. + Iterate through all the lines between + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- + """ + rootCACert = "" + rootCAName = previousLine.strip() + + while True: + caCertLine = f_caCert.readline() + if caCertLine.strip() != "-----END CERTIFICATE-----": + rootCACert += caCertLine + else: + rootCACert += "-----END CERTIFICATE-----\n" + break + + caRootStore[rootCAName] = rootCACert + + print(f"Number of Root CA's loaded: {len(caRootStore)}") + + return caRootStore + + except FileNotFoundError: + print("Could not find cacert.pem file.") + sys.exit(1) + + @staticmethod + def getCertificate(__hostname: str, __port: int) -> x509.Certificate: + """Retrieves the certificate from the website.""" + try: + """ + Create the SSL context. + We will ignore any certificate warnings for this process. + """ + sslContext = ssl._create_unverified_context() + + with socket.create_connection((__hostname, __port)) as sock: + with sslContext.wrap_socket(sock, server_hostname=__hostname) as sslSocket: + # Get the certificate from the connection, convert it to PEM format. + sslCertificate = ssl.DER_cert_to_PEM_cert(sslSocket.getpeercert(True)) + + # Load the PEM formatted file. + sslCertificate = x509.load_pem_x509_certificate(sslCertificate.encode('ascii')) + + except ConnectionRefusedError: + print(f"Connection refused to {__hostname}:{__port}") + sys.exit(1) + + # Return the sslCertificate object. + return sslCertificate + + @staticmethod + def getCertificateFromUri(__uri: str) -> str: + """Gets the certificate from a URI. + By default, we're expecting to find nothing. Therefore certI = None. + If we find something, we'll update certI accordingly. + """ + certI = None + + # Attempt to get the aia from __uri + aiaRequest = requests.get(__uri) + + # If response status code is 200 + if aiaRequest.status_code == 200: + # Get the content and assign to aiaContent + aiaContent = aiaRequest.content + + # Convert the certificate into PEM format. + sslCertificate = ssl.DER_cert_to_PEM_cert(aiaContent) + + # Load the PEM formatted content using x509 module. + certI = x509.load_pem_x509_certificate(sslCertificate.encode('ascii')) + + # Return certI back to the script. + return certI + + @staticmethod + def returnCertAKI(__sslCertificate: x509.Certificate) -> x509.extensions.Extension: + """Returns the AKI of the certificate.""" + try: + certAKI = __sslCertificate.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_KEY_IDENTIFIER) + except x509.extensions.ExtensionNotFound: + certAKI = None + return certAKI + + @staticmethod + def returnCertSKI(__sslCertificate): + """Returns the SKI of the certificate.""" + return __sslCertificate.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_KEY_IDENTIFIER + ) + + @staticmethod + def returnCertAIA(__sslCertificate): + """Returns the AIA of the certificate. If not defined, then return None.""" + try: + certAIA = __sslCertificate.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS) + + except x509.extensions.ExtensionNotFound: + certAIA = None + + return certAIA + + @staticmethod + def returnCertAIAList(__sslCertificate: x509.Certificate) -> list: + """Returns a list of AIA's defined in __sslCertificate.""" + aiaUriList = [] + + # Iterate through all the extensions. + for extension in __sslCertificate.extensions: + certValue = extension.value + + # If the extension is x509.AuthorityInformationAccess) then lets get the caIssuers from the field. + if isinstance(certValue, x509.AuthorityInformationAccess): + dataAIA = list(certValue or []) + aiaUriList.extend( + item.access_location._value + for item in dataAIA + if item.access_method._name == "caIssuers" + ) + # Return the aiaUriList back to the script. + return aiaUriList + + def walkTheChain(self, __sslCertificate: x509.Certificate, __depth: int): + """ + Walk the length of the chain, fetching information from AIA + along the way until AKI == SKI (i.e. we've found the Root CA. + + This is to prevent recursive loops. Usually there are only 4 certificates. + If the maxDepth is too small (why?) adjust it at the beginning of the script. + """ + + if __depth > self.maxDepth: + return + # Retrive the AKI from the certificate. + certAKI = self.returnCertAKI(__sslCertificate) + # Retrieve the SKI from the certificate. + certSKI = self.returnCertSKI(__sslCertificate) + + # Sometimes the AKI can be none. Lets handle this accordingly. + certAKIValue = certAKI._value.key_identifier if certAKI is not None else None + # Get the value of the SKI from certSKI + certSKIValue = certSKI._value.digest + + # Sometimes the AKI can be none. Lets handle this accordingly. + if certAKIValue is not None: + aiaUriList = self.returnCertAIAList(__sslCertificate) + if aiaUriList != []: + # Iterate through the aiaUriList list. + for item in aiaUriList: + # get the certificate for the item element. + nextCert = self.getCertificateFromUri(item) + + # If the certificate is not none (great), append it to the certChain, increase the __depth and run the walkTheChain subroutine again. + if nextCert is not None: + self.certChain.append(nextCert) + __depth += 1 + self.walkTheChain(nextCert, __depth) + else: + print("Could not retrieve certificate.") + sys.exit(1) + else: + """Now we have to go on a hunt to find the root from a standard root store.""" + print("Certificate didn't have AIA...ruh roh.") + + # Load the Root CA Cert Chain. + caRootStore = self.loadRootCACertChain("cacert.pem") + + # Assume we cannot find a Root CA + rootCACN = None + + # Iterate through the caRootStore object. + for rootCA in caRootStore: + try: + rootCACertificatePEM = caRootStore[rootCA] + rootCACertificate = x509.load_pem_x509_certificate(rootCACertificatePEM.encode('ascii')) + rootCASKI = self.returnCertSKI(rootCACertificate) + rootCASKI_Value = rootCASKI._value.digest + if rootCASKI_Value == certAKIValue: + rootCACN = rootCA + print(f"Root CA Found - {rootCACN}") + self.certChain.append(rootCACertificate) + break + except x509.extensions.ExtensionNotFound: + # Apparently some Root CA's don't have a SKI? + pass + + if rootCACN is None: + print("ERROR - Root CA NOT found.") + sys.exit(1) + + @staticmethod + def sendCertificateToFile(__filename: str, __sslCertificate) -> None: + """Write the certificate in PEM format to file.""" + with open(__filename, "ab") as f_clientPublicKey: + f_clientPublicKey.write( + __sslCertificate.public_bytes( + encoding=serialization.Encoding.PEM, + ) + b'\n' + ) + + def writeChainToFile(self, __certificateChain: dict) -> None: + """Write all the elements in the chain to file.""" + myCertChain = __certificateChain + + myCertChain.pop(0) + + # Iterate through all the elements in the chain. + for certificateItem in myCertChain: + # Get the subject from the certificate. + certSubject = certificateItem.subject.rfc4514_string() + + # Generate the certificate file name + sslCertificateFilename = f'{self.certificateHash}.pem' + + # Send the certificate object to the sslCertificateFileName filename + self.sendCertificateToFile(sslCertificateFilename, certificateItem) + + def getCertificateChain(self, __hostname: str, __port: int): + """Get Certificate Chain.""" + + # Create the hash for the __hostname:__port pair + hostnamePort = f"{__hostname}:{__port}" + + self.certificateHash = hashlib.sha256(hostnamePort.encode()).hexdigest() + + # Get the website certificate object from myHostname["hostname"]:myHostname["port"] + __websiteCertificate = self.getCertificate(__hostname, __port) + + if __websiteCertificate is not None: + # Get the AIA from the __websiteCertificate object + aia = self.returnCertAIA(__websiteCertificate) + if aia is not None: + # Extract the AIA URI list from the __websiteCertificate object. + aiaUriList = self.returnCertAIAList(__websiteCertificate) + + # Append the __websiteCertificate object to the certChain list. + self.certChain.append(__websiteCertificate) + + # Now we walk the chain up until we get the Root CA. + self.walkTheChain(__websiteCertificate, 1) + + # Write the certificate chain to individual files. + self.writeChainToFile(self.certChain) + else: + print("ERROR - I could not find AIA. Possible decryption taking place upstream?") + sys.exit(1) + + def __init__(self): + """Init the getCertChain class.""" + self.classVersion = "0.04" + self.maxDepth = 4 + self.certChain = [] + self.certificateHash = "" diff --git a/data/calculateStats.py b/data/calculateStats.py index 58760e8..04a2d98 100644 --- a/data/calculateStats.py +++ b/data/calculateStats.py @@ -1,30 +1,28 @@ # Class: calculateStats # Author: Nolan Rumble -# Date: 2022/07/17 -# Version: 0.01 +# Date: 2023/07/11 +# Version: 0.06 -import argparse import datetime -import sys -import time from dateutil.relativedelta import relativedelta + class calculateStats: """This calculates statistics off the data provided.""" - + @staticmethod - def convertTimeIntoHumanReadable(__seconds): + def convertTimeIntoHumanReadable(__seconds: int) -> str: """Return the remaining time left on the certificate.""" # Get date/time since epoch based off seconds myDateTime = datetime.datetime.fromtimestamp(__seconds) - + # Create epoch time datetime object. beginDate = datetime.date(1970, 1, 1) - + # Calculate the difference between the 2 dates myDateTime and beginDate myDateTimeObject = relativedelta(myDateTime, beginDate) - + myDateTime = { 'years': myDateTimeObject.years, 'months': myDateTimeObject.months, @@ -33,38 +31,102 @@ def convertTimeIntoHumanReadable(__seconds): 'minutes': myDateTimeObject.minutes, 'seconds': myDateTimeObject.seconds, } - + timeYMDHMS = [] # Iterate through the myDateTime dict and formulate a list of the values. # If the delimeter field is 0, don't include it in final result - for field in myDateTime: - if myDateTime[field] > 1: - timeYMDHMS.append("%d %s" % (myDateTime[field], field)) - else: - if myDateTime[field] == 1: - timeYMDHMS.append("%d %s" % (myDateTime[field], field[:-1])) - myDateTimeString = ', '.join(timeYMDHMS) - - # Return the human readable form string. - return myDateTimeString - - def calculateStatistics(self, __certResults): + for field, value in myDateTime.items(): + if value > 1: + humanReadable = f"{myDateTime[field]} {field}" + timeYMDHMS.append(humanReadable) + elif myDateTime[field] == 1: + humanReadable = f"{myDateTime[field]} {field[:-1]}" + timeYMDHMS.append(humanReadable) + return ', '.join(timeYMDHMS) + + def calculateStatistics(self, __certResults: dict) -> dict: """Returns statistics based off certificate information provided.""" # Calculate the average utilization and query time across all tests. avgUtilization = float(0) avgQueryTime = float(0) avgTemplateTimeSeconds = float(0) + + lowestCertificateTemplateTime = 9999999999999 + highestCertificateTemplateTime = 0 + successfulTests = 0 failedTests = 0 + commonCAIssuers = {} + + commonCipherInfoCount = { + "bits": {}, + "cipher": {}, + "version": {} + } + + combinedStatistics = { + "numberOfTests": { + "success": 0, + "failed": 0 + }, + "averageCertificateUtilization": 0.0, + "averageQueryTime": 0.0, + "averageTemplateTimeSeconds": 0, + "averageTemplateTimeHumanReadable": 0, + "lowestCertificateTemplateTime": 0, + "lowestCertificateTemplateTimeHumanReadable": "0 seconds", + "highestCertificateTemplateTime": 0, + "highestCertificateTemplateTimeHumanReadable": "0 seconds", + "commonCAIssuersCount": commonCAIssuers, + "commonCipherInfoCount": commonCipherInfoCount + } for item in __certResults: if item["certificateInfo"]["version"] != 0: avgUtilization += item["percentageUtilization"] avgQueryTime += item["queryTime"] avgTemplateTimeSeconds += item["certificateTemplateTime"] + + # Calculate lowest certificate template time. + lowestCertificateTemplateTime = min( + lowestCertificateTemplateTime, item["certificateTemplateTime"] + ) + # Calculate highest certificate template time. + highestCertificateTemplateTime = max( + highestCertificateTemplateTime, item["certificateTemplateTime"] + ) + caIssuerCommonName = item["certificateInfo"]["certificateIssuer"]["commonName"] + + # Calculate common Certificate Authority Issuers + if caIssuerCommonName in commonCAIssuers: + commonCAIssuers[caIssuerCommonName] += 1 + else: + commonCAIssuers[caIssuerCommonName] = 1 + + # Calculate common cipher connection details + if str(item["connectionCipher"][0]) in commonCipherInfoCount["cipher"]: + commonCipherInfoCount["cipher"][str(item["connectionCipher"][0])] += 1 + else: + commonCipherInfoCount["cipher"][str(item["connectionCipher"][0])] = 1 + + connectionCipherVersion = str(item["connectionCipher"][1]).replace(".", "") + + if connectionCipherVersion in commonCipherInfoCount["version"]: + commonCipherInfoCount["version"][connectionCipherVersion] += 1 + else: + commonCipherInfoCount["version"][connectionCipherVersion] = 1 + + connectionCipherBits = str(item["connectionCipher"][2]) + if connectionCipherBits in commonCipherInfoCount["bits"]: + commonCipherInfoCount["bits"][connectionCipherBits] += 1 + else: + commonCipherInfoCount["bits"][connectionCipherBits] = 1 + + # Increment number of successful tests successfulTests += 1 else: + # Incrememt number of failed tests failedTests += 1 # Round the values to 2 decimal places. @@ -73,22 +135,29 @@ def calculateStatistics(self, __certResults): avgQueryTime = round(avgQueryTime / successfulTests, 2) avgTemplateTimeSeconds = int(round(avgTemplateTimeSeconds / successfulTests, 2)) - combinedStatistics = { - "numberOfTests": { - "success": successfulTests, - "failed": failedTests - }, - "averageCertificateUtilization": avgUtilization, - "averageQueryTime": avgQueryTime, - "averageTemplateTimeSeconds": avgTemplateTimeSeconds, - "averageTemplateTimeHumanReadable": self.convertTimeIntoHumanReadable(avgTemplateTimeSeconds) - } + combinedStatistics = { + "numberOfTests": { + "success": successfulTests, + "failed": failedTests + }, + "averageCertificateUtilization": avgUtilization, + "averageQueryTime": avgQueryTime, + "averageTemplateTimeSeconds": avgTemplateTimeSeconds, + "averageTemplateTimeHumanReadable": self.convertTimeIntoHumanReadable(avgTemplateTimeSeconds), + "lowestCertificateTemplateTime": lowestCertificateTemplateTime, + "lowestCertificateTemplateTimeHumanReadable": self.convertTimeIntoHumanReadable(lowestCertificateTemplateTime), + "highestCertificateTemplateTime": highestCertificateTemplateTime, + "highestCertificateTemplateTimeHumanReadable": self.convertTimeIntoHumanReadable(highestCertificateTemplateTime), + "commonCAIssuersCount": commonCAIssuers, + "commonCipherInfoCount": commonCipherInfoCount + } + else: + combinedStatistics["numberOfTests"]["failed"] = failedTests return combinedStatistics - def combineData(self, __certResults, __mySystemInfo, __scriptStartTime, __scriptEndTime): - """"Combines all the data into structured data.""" - + def combineData(self, __certResults: dict, __mySystemInfo: dict, __scriptStartTime: datetime, __scriptEndTime: datetime) -> dict: + """Combines all the data into structured data.""" # Convert script start/end times into string isoformat scriptStartTime = __scriptStartTime.isoformat() scriptEndTime = __scriptEndTime.isoformat() @@ -97,8 +166,7 @@ def combineData(self, __certResults, __mySystemInfo, __scriptStartTime, __script # Get all the statistics for the measurements performed statistics = self.calculateStatistics(__certResults) - # Create the json script structure with all the meta data. - myData = { + return { "tenantId": __mySystemInfo.myConfigJson["myTenantId"], "deviceId": __mySystemInfo.myConfigJson["myDeviceId"], "deviceTag": __mySystemInfo.myConfigJson["myTags"], @@ -109,19 +177,34 @@ def combineData(self, __certResults, __mySystemInfo, __scriptStartTime, __script "scriptEndTime": scriptEndTime, "scriptExecutionTime": scriptExecutionTime, "averageQueryTime": statistics["averageQueryTime"], - "averageCertificateUtilization": statistics["averageCertificateUtilization"], + "averageCertificateUtilization": statistics[ + "averageCertificateUtilization" + ], "averageTemplateTime": statistics["averageTemplateTimeSeconds"], - "averageTemplateTimeHumanReadable": statistics["averageTemplateTimeHumanReadable"], - "numberofTests": statistics["numberOfTests"] + "averageTemplateTimeHumanReadable": statistics[ + "averageTemplateTimeHumanReadable" + ], + "lowestCertificateTemplateTime": statistics[ + "lowestCertificateTemplateTime" + ], + "lowestCertificateTemplateTimeHumanReadable": statistics[ + "lowestCertificateTemplateTimeHumanReadable" + ], + "highestCertificateTemplateTime": statistics[ + "highestCertificateTemplateTime" + ], + "highestCertificateTemplateTimeHumanReadable": statistics[ + "highestCertificateTemplateTimeHumanReadable" + ], + "commonCAIssuersCount": statistics["commonCAIssuersCount"], + "commonCipherInfoCount": statistics["commonCipherInfoCount"], + "numberofTests": statistics["numberOfTests"], }, - "certResults": __certResults + "certResults": __certResults, } - return myData - def __init__(self): - """Initialize the sendDataMongoDB class.""" + """Initialize the calculateStats class.""" self.initialized = True - self.version = "0.01" - self.dataFormatVersion = 19 - + self.version = "0.05" + self.dataFormatVersion = 20 diff --git a/data/certData.py b/data/certData.py index b262945..edec82f 100644 --- a/data/certData.py +++ b/data/certData.py @@ -1,17 +1,20 @@ -# Certificate Data Handling -# Version: 0.06 +# Description: Certificate Data Handling +# Author: TheScriptGuy +# Version: 0.09 +# Last modified: 2023/07/11 import sys from os import path import requests import socket +import ast class certData: """certData class""" @staticmethod - def getFileFromURL(fileURL): + def getFileFromURL(fileURL: str) -> list: """This function will download the contents of fileURL and return a list with the contents.""" tmpData = [] try: @@ -35,7 +38,7 @@ def getFileFromURL(fileURL): print('Too many redirects while accessing the URL') tmpData = ['URL Redirects too many'] except requests.exceptions.ConnectionError: - print('Could not connect to URL - ' + fileURL + '\n') + print(f'Could not connect to URL - {fileURL}' + '\n') tmpData = ['URL connection error'] return tmpData @@ -53,7 +56,32 @@ def uploadJsonHTTP(url, jsonData): return x.headers @staticmethod - def loadQueriesFile(queriesFile): + def parse_line(line: str) -> dict: + # Default values + hostname = '' + port = 443 + options = None + + # Remove newline character if it exists + line = line.rstrip() + + # Check if options exist + if '[' in line: + line, options = line.split('[', 1) + # Remove the closing bracket and convert to a list + options = ast.literal_eval(f'[{options}') + + # Check if port number exists + if ':' in line: + hostname, port = line.split(':') + port = int(port.rstrip(',')) + else: + hostname = line.rstrip(',') + + return {"hostname": hostname, "port": port, "options": options} + + @staticmethod + def loadQueriesFile(queriesFile: str) -> list: """ This will load the queries that need to be performed against each name server. One hostname entry per line. @@ -64,33 +92,25 @@ def loadQueriesFile(queriesFile): if queriesFile.startswith('http://') or queriesFile.startswith('https://'): myQueries = certData.getFileFromURL(queriesFile) for line in myQueries: - if ":" in line: - tmpLine = line.split(':') - queries.append({"hostname": tmpLine[0], "port": int(tmpLine[1])}) - else: - queries.append({"hostname": line, "port": 443}) + hostEntry = certData.parse_line(line) + queries.append(hostEntry) - return queries - - # Check to see if if the file exists. If not, exit with error code 1. - if not path.exists(queriesFile): - print('I cannot find file ' + queriesFile) + elif ( + path.exists(queriesFile) + and not queriesFile.startswith('http://') + and not queriesFile.startswith('https://') + ): + with open(queriesFile, "r", encoding="utf-8") as f_queryFile: + queryFile = f_queryFile.readlines() + for line in queryFile: + hostEntry = certData.parse_line(line) + queries.append(hostEntry) + else: + print(f'I cannot get file {queriesFile}') sys.exit(1) - - queryFile = open(queriesFile, "r", encoding="utf-8") - - for line in queryFile: - if ":" in line: - tmpLine = line.rstrip('\n').split(':') - queries.append({"hostname": tmpLine[0], "port": int(tmpLine[1])}) - else: - queries.append({"hostname": line.rstrip('\n'), "port": 443}) - - queryFile.close() - return queries def __init__(self): """Initialize the certData class.""" self.initialized = True - self.version = "0.06" + self.version = "0.09" diff --git a/data/emailConfigurationChecker.py b/data/emailConfigurationChecker.py index 0f4fbb2..5c8ce87 100644 --- a/data/emailConfigurationChecker.py +++ b/data/emailConfigurationChecker.py @@ -17,10 +17,9 @@ class emailConfigurationChecker: def checkConfigHostname(self, __mailConfigJson): """Check to see if the hostname field is defined""" # Check to see if hostname is defined. This is a mandatory field. - if "hostname" in __mailConfigJson: - if __mailConfigJson["hostname"] == "": - print(f"hostname field is a mandatory field and must be defined in {self.mailConfigurationFile}.") - sys.exit(1) + if "hostname" in __mailConfigJson and __mailConfigJson["hostname"] == "": + print(f"hostname field is a mandatory field and must be defined in {self.mailConfigurationFile}.") + sys.exit(1) def checkConfigPort(self, __mailConfigJson): """Check what the port is configured as. If not defined, assume port 25.""" @@ -145,8 +144,7 @@ def validateConfiguration(self): """Validate configuration file. If all checks pass successfully, return the json object.""" if self.checkConfigurationValid(self.mailConfig): return self.mailConfig - else: - return None + return None def __init__(self, __mailConfigurationFile="mail.cfg"): """Initialize the email configuration checker class.""" diff --git a/data/emailTemplateBuilder.py b/data/emailTemplateBuilder.py index 225e06a..9f6c2e0 100644 --- a/data/emailTemplateBuilder.py +++ b/data/emailTemplateBuilder.py @@ -55,10 +55,7 @@ def monitoredHostsText(self, __jsonData): # Build the string from all of the components above with the correct formatting. monitoredHostsFormattedText += f"{iHostname:{filler}<{maxHostname}}{iPort:{filler}<6}{iTimeLeft:{filler}<{maxTimeLeft}}{iPercentageUtilization:{filler}<{maxPercentageUtilization}}" + "\n" - # Go through the body text message and replace the MONITOREDHOSTS field with the newly formatted hostnames and ports. - __newBodyText = bodyTextHeaders + monitoredHostsFormattedText - - return __newBodyText + return bodyTextHeaders + monitoredHostsFormattedText def monitoredHostsHtml(self, __jsonData): """Builds out the html template for monitored hosts.""" @@ -75,9 +72,12 @@ def monitoredHostsHtml(self, __jsonData): iPercentageUtilization = entry["certificateInfo"]["percentageUtilization"] monitoredHostsFormattedHtml += f"{iHostname}{iPort}{iTimeLeft}{iPercentageUtilization}\n" - __newBodyHtml = "\n" + monitoredHostsFormattedHtmlHeaders + monitoredHostsFormattedHtml + "
\n" - - return __newBodyHtml + return ( + "\n" + + monitoredHostsFormattedHtmlHeaders + + monitoredHostsFormattedHtml + + "
\n" + ) def buildEmailFromTextTemplate(self, __jsonData): """Modifies a text file template based off the submitted hosts.""" diff --git a/data/sendDataEmail.py b/data/sendDataEmail.py index e1fd2a4..0f2e3b0 100644 --- a/data/sendDataEmail.py +++ b/data/sendDataEmail.py @@ -1,11 +1,9 @@ # Send email class -# Version: 0.03 -# Last modified: 2022-06-16 +# Version: 0.04 +# Last modified: 2022-12-18 import smtplib import ssl -import sys -from os import path from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import datetime diff --git a/data/sendDataMongoDB.py b/data/sendDataMongoDB.py index 9f00230..a7bb663 100644 --- a/data/sendDataMongoDB.py +++ b/data/sendDataMongoDB.py @@ -1,12 +1,16 @@ -# Send data to destination based on mongo.cfg file. -# Version 0.04 +# Class: sendDataMongoDB +# Last updated: 2023/05/18 +# Author: TheScriptGuy (https://github.com/TheScriptGuy) +# Version: 0.11 +# Description: Send json list to mongoDB based on configuration in mongo.cfg import pymongo from pymongo import MongoClient -import iptools import json import sys +import os from datetime import datetime +from bson.objectid import ObjectId class sendDataMongoDB: @@ -18,26 +22,106 @@ def loadConfigurationFile(__fileName="mongo.cfg"): try: with open(__fileName) as fileNameMongo: __mongoConfig = fileNameMongo.read() - __mongoConfigJson = json.loads(__mongoConfig) - return __mongoConfigJson + return json.loads(__mongoConfig) except FileNotFoundError: print(f"Cannot find file {__fileName}.") sys.exit(1) - + except json.decoder.JSONDecodeError as e: + print(f"Error with mongo.cfg config file - {e}") + sys.exit(1) except Exception as e: print(f"{e} - Error occured.") sys.exit(1) @staticmethod - def sendResults(__results, __destCollection): + def sendJsonScriptDataToFile(__fileName, __jsonScriptData): + """Send script data to __filename.""" + with open(__fileName, "a") as fileJsonScriptData: + while len(__jsonScriptData) > 0: + jsonScriptDataItem = __jsonScriptData.pop(0) + jsonScriptDataItem["queryStatistics"]["scriptStartTime"] = jsonScriptDataItem["queryStatistics"]["scriptStartTime"].strftime('%Y-%m-%dT%H:%M:%S.%f') + jsonScriptDataItem["queryStatistics"]["scriptEndTime"] = jsonScriptDataItem["queryStatistics"]["scriptEndTime"].strftime('%Y-%m-%dT%H:%M:%S.%f') + + for iResult in jsonScriptDataItem["certResults"]: + iResult["startTime"] = iResult["startTime"].strftime('%Y-%m-%dT%H:%M:%S.%f') + iResult["endTime"] = iResult["endTime"].strftime('%Y-%m-%dT%H:%M:%S.%f') + + if "_id" in jsonScriptDataItem: + jsonScriptDataItem["_id"] = str(jsonScriptDataItem["_id"]) + + fileJsonScriptData.write(json.dumps(jsonScriptDataItem) + "\n") + + @staticmethod + def getJsonScriptDataFromFile(__fileName): + """Load the contents of __filename into a list.""" + jsonLinesFile = [] + # Check to see if there's a certificateData.json file. If yes, upload its data first. + try: + with open("certificateData.json", "r") as fileJsonScriptData: + while True: + fileLine = fileJsonScriptData.readline().replace("\n", "") + if not fileLine: + break + + jsonLine = json.loads(fileLine) + jsonLine["queryStatistics"]["scriptStartTime"] = datetime.fromisoformat(jsonLine["queryStatistics"]["scriptStartTime"]) + jsonLine["queryStatistics"]["scriptEndTime"] = datetime.fromisoformat(jsonLine["queryStatistics"]["scriptEndTime"]) + + for iResult in jsonLine["certResults"]: + iResult["startTime"] = datetime.fromisoformat(iResult["startTime"]) + iResult["endTime"] = datetime.fromisoformat(iResult["endTime"]) + + if "_id" in jsonLine: + jsonLine["_id"] = ObjectId(jsonLine["_id"]) + + jsonLinesFile.append(jsonLine) + + except FileNotFoundError: + # File not found error - just ignore. + pass + + return jsonLinesFile + + def sendResults(self, __results, __destCollection): """upload the __results to __destCollection mongodb object.""" try: - __uploadResult = __destCollection.insert_one(__results) + # First check to see if we need to attempt to upload previous data that was not uploaded. + previousJsonScriptData = self.getJsonScriptDataFromFile("certificateData.json") + previousUploadResult = [] + while len(previousJsonScriptData) > 0: + jsonScriptDataItem = previousJsonScriptData.pop(0) + previousUploadResultItem = __destCollection.insert_one(jsonScriptDataItem) + previousUploadResult.append(previousUploadResultItem) + + if os.path.isfile("certificateData.json"): + os.remove("certificateData.json") + + if len(previousJsonScriptData) > 0: + # Didn't finish uploading all the data. Save it to file. + self.sendJsonScriptDataToFile("certificateData.json", previousJsonScriptData) + previousJsonScriptData = [] + + __mongoResult = __destCollection.insert_one(__results) + __uploadResult = [__mongoResult] except pymongo.errors.ServerSelectionTimeoutError: - print("Server connection timeout error when uploading data.") + # Get time of error + errTime = str(datetime.utcnow()) + print(f"{errTime} - Server connection timeout error when uploading data. Saving to certificateData.json") + + # Save test data to file. + self.sendJsonScriptDataToFile("certificateData.json", [__results]) + sys.exit(1) + + except pymongo.errors.OperationFailure as e: + # Get time of error + errTime = str(datetime.utcnow()) + print(f"{errTime} - Mongo operation error - {e}. Saving to certificateData.json") + + # Save test data to file. + self.sendJsonScriptDataToFile("certificateData.json", [__results]) sys.exit(1) - return __uploadResult + return previousUploadResult + __uploadResult @staticmethod def connectionString(__destination): @@ -63,24 +147,34 @@ def connectionString(__destination): # Builds the login credentials to be used. if __mongoUsername == "": __mongoLoginCredentials = "" + elif __mongoPassword == "": + __mongoLoginCredentials = __mongoUsername else: - if __mongoPassword == "": - __mongoLoginCredentials = __mongoUsername - else: - __mongoLoginCredentials = f"{__mongoUsername}:{__mongoPassword}" + __mongoLoginCredentials = f"{__mongoUsername}:{__mongoPassword}" # Check to see if __mongoUri is an IP address or not. - if iptools.ipv4.validate_ip(__mongoUri): - __srv = "" + if "cluster" in __destination and __destination["cluster"] is True: + __srv = "+srv" else: - #__srv = "+srv" # automatically enables TLS __srv = "" - if __mongoLoginCredentials == "": - __mongoConnectionString = f"mongodb{__srv}://{__mongoUri}/" + # Check to see if TLS is defined for secure connection. + if "tls" in __destination and __destination["tls"] is True: + __tls = "&tls=true" + else: + __tls = "" + + # Get the collection name. If it's empty, use the default. + if "collectionName" in __destination and __destination["collectionName"] != "": + __collectionName = __destination["collectionName"] else: - __mongoConnectionString = f"mongodb{__srv}://{__mongoLoginCredentials}@{__mongoUri}/" - return __mongoConnectionString + __collectionName = "certdataGlobal" + + return ( + f"mongodb{__srv}://{__mongoUri}/{__tls}" + if __mongoLoginCredentials == "" + else f"mongodb{__srv}://{__mongoLoginCredentials}@{__mongoUri}/{__collectionName}{__tls}" + ) def createDB(self, __destination): """create a destination database to upload the data to.""" @@ -94,8 +188,8 @@ def createDB(self, __destination): try: __mongoClient = MongoClient(__mongoConnectionString) - except pymongo.errors.ConfigurationError: - print("mongo.cfg configuration error.") + except pymongo.errors.ConfigurationError as e: + print(f"mongo.cfg configuration error. {e}") sys.exit(1) except pymongo.errors.ServerSelectionTimeoutError: print("Server connection timeout error.") @@ -105,7 +199,6 @@ def createDB(self, __destination): def createCollection(self, __mongoConnection, __mongoConfiguration): """create a collection within the DB.""" - # First check to see see if collectionName is defined in mongo.cfg if "collectionName" in __mongoConfiguration: # Retrieve the collectionName value from the dict __mongoConfiguration @@ -129,21 +222,16 @@ def uploadDataToMongoDB(self, __jsonScriptData): # Convert the startTime and endTime fields info ISODate format. jsonScriptData = __jsonScriptData - - jsonScriptData["scriptStartTime"] = datetime.fromisoformat(jsonScriptData["scriptStartTime"]) - jsonScriptData["scriptEndTime"] = datetime.fromisoformat(jsonScriptData["scriptEndTime"]) + jsonScriptData["queryStatistics"]["scriptStartTime"] = datetime.fromisoformat(jsonScriptData["queryStatistics"]["scriptStartTime"]) + jsonScriptData["queryStatistics"]["scriptEndTime"] = datetime.fromisoformat(jsonScriptData["queryStatistics"]["scriptEndTime"]) for iResult in jsonScriptData["certResults"]: iResult["startTime"] = datetime.fromisoformat(iResult["startTime"]) iResult["endTime"] = datetime.fromisoformat(iResult["endTime"]) - # Upload the results to the MongoDB - # It's only at this point that the database/collection gets created - # (if this is the first entry to be uploaded) - uploadResult = self.sendResults(__jsonScriptData, collection) - return uploadResult + return self.sendResults(__jsonScriptData, collection) def __init__(self): """Initialize the sendDataMongoDB class.""" self.initialized = True - self.version = "0.05" + self.version = "0.11" diff --git a/systemInfo/systemInfo.py b/systemInfo/systemInfo.py index b7234b9..30df5c4 100644 --- a/systemInfo/systemInfo.py +++ b/systemInfo/systemInfo.py @@ -1,4 +1,7 @@ +# Version: 0.05 +# Date: 2023/04/15 # systemInfo class + import socket from os import path import json @@ -28,23 +31,17 @@ def updateMyConfig(self): def getDeviceId(self): """Get the uuid.""" - __myDeviceId = "" - if "myDeviceId" in self.myConfigJson: - __myDeviceId = self.myConfigJson["myDeviceId"] - return __myDeviceId + return ( + self.myConfigJson["myDeviceId"] + if "myDeviceId" in self.myConfigJson + else "" + ) def getTag(self): """Get the tag(s).""" - __myTags = [] - - # First check to see if the myTags element is in the myConfigJson variable. - if "myTags" in self.myConfigJson: - __myTags = self.myConfigJson["myTags"] - - # Return the value of __myTags - return __myTags + return self.myConfigJson["myTags"] if "myTags" in self.myConfigJson else [] - def setTag(self, __tagName): + def setTag(self, __tagName, __writeConfig): """Set the tag for data aggregation purposes.""" # Separate out all the tags using commas __newTagName = __tagName.rstrip().split(',') @@ -55,8 +52,9 @@ def setTag(self, __tagName): # Update the class's myConfigJson variable. self.myConfigJson["myTags"] = __newTagName - # Update the configuration file. - self.updateMyConfig() + # If __writeConfig is true, then write the configuration file. + if __writeConfig: + self.updateMyConfig() def deleteTag(self): """Delete the tag.""" @@ -72,14 +70,17 @@ def getTenantId(self): """Returns the tenant Id.""" return self.myConfigJson["myTenantId"] - def setTenantId(self, __myTenantId): + def setTenantId(self, __myTenantId, __writeConfig): """Sets the tenant Id for the script.""" self.myConfigJson["myTenantId"] = __myTenantId - self.updateMyConfig() + + # If __writeConfig is true, then write it to file. + if __writeConfig: + self.updateMyConfig() def deleteTenantId(self): """Deletes the tenant Id and updates configuration file.""" - self.setTenantId("") + self.setTenantId("", True) @staticmethod def checkMyTenantId(__myConfigJson): @@ -88,11 +89,7 @@ def checkMyTenantId(__myConfigJson): Returns True if it is. Returns False if it is not """ - result = False - if "myTenantId" in __myConfigJson: - if __myConfigJson["myTenantId"] != "": - return True - return result + return "myTenantId" in __myConfigJson and __myConfigJson["myTenantId"] != "" @staticmethod def checkMyTags(__myConfigJson): @@ -101,8 +98,7 @@ def checkMyTags(__myConfigJson): Returns True if it is. Returns False if it is not. """ - result = bool("myTags" in __myConfigJson) - return result + return "myTags" in __myConfigJson @staticmethod def checkMyDeviceId(__myConfigJson): @@ -111,11 +107,7 @@ def checkMyDeviceId(__myConfigJson): Returns True if it is. Returns False if is not. """ - result = False - if "myDeviceId" in __myConfigJson: - if __myConfigJson["myDeviceId"] != "": - result = True - return result + return "myDeviceId" in __myConfigJson and __myConfigJson["myDeviceId"] != "" def checkIfTenantIdExists(self, __myConfigJson): """Check to see if the Tenant Id is defined.""" @@ -169,7 +161,7 @@ def refreshConfigFile(self): def __init__(self, __myConfigFile="myConfig.json"): """Initialize the class.""" # Define the class version. - self.classVersion = "0.04" + self.classVersion = "0.05" self.myConfigJson = {}