From 4f92b01ce4e43fe61044040fee97190e1e116576 Mon Sep 17 00:00:00 2001 From: Johann POLEWCZYK Date: Mon, 25 May 2026 23:02:46 +0200 Subject: [PATCH] Update check_in_media functions and data_views --- aleapp.py | 45 ++- aleappGUI.py | 12 +- scripts/artifact_report.py | 4 +- scripts/artifacts/DuckDuckGo.py | 6 +- scripts/artifacts/Grok.py | 4 - scripts/artifacts/OrnetBrowser.py | 6 +- scripts/artifacts/SamsungTrash.py | 2 +- scripts/artifacts/TorBrowser.py | 8 +- scripts/artifacts/ZangiChats.py | 2 +- scripts/artifacts/appicons.py | 6 +- scripts/artifacts/googleVoice.py | 8 +- scripts/context.py | 339 ++++++++++++++++++ scripts/ilapfuncs.py | 408 ++++++++++++++------- scripts/lavafuncs.py | 575 ++++++++++++++++++++++++------ scripts/report.py | 12 +- scripts/version_info.py | 3 +- 16 files changed, 1144 insertions(+), 296 deletions(-) create mode 100644 scripts/context.py diff --git a/aleapp.py b/aleapp.py index d3d043cf..607c162b 100755 --- a/aleapp.py +++ b/aleapp.py @@ -11,9 +11,10 @@ from scripts.search_files import * from scripts.ilapfuncs import * -from scripts.version_info import aleapp_version +from scripts.version_info import leapp_version from time import process_time, gmtime, strftime, perf_counter from scripts.lavafuncs import * +from scripts.context import Context def validate_args(args): if args.artifact_paths or args.create_profile_casedata: @@ -287,14 +288,15 @@ def main(): if output_path[1] == ':': output_path = '\\\\?\\' + output_path.replace('/', '\\') out_params = OutputParameters(output_path, custom_output_folder) + Context.set_output_params(out_params) selected_plugins = plugins_parsed_first + selected_plugins - initialize_lava(input_path, out_params.report_folder_base, extracttype) + initialize_lava(input_path, out_params.output_folder_base, extracttype) crunch_artifacts(selected_plugins, extracttype, input_path, out_params, wrap_text, loader, casedata, profile_filename) - lava_finalize_output(out_params.report_folder_base) + lava_finalize_output(out_params.output_folder_base) def crunch_artifacts( plugins: typing.Sequence[plugin_loader.PluginSpec], extracttype, input_path, out_params, wrap_text, @@ -305,7 +307,7 @@ def crunch_artifacts( logfunc('Processing started. Please wait. This may take a few minutes...') logfunc('\n--------------------------------------------------------------------------------------') - logfunc(f'ALEAPP v{aleapp_version}: ALEAPP Logs, Events, and Protobuf Parser') + logfunc(f'ALEAPP v{leapp_version}: ALEAPP Logs, Events, and Protobuf Parser') logfunc('Objective: Triage Android Full System Extractions.') logfunc('By: Alexis Brignoni | @AlexisBrignoni | abrignoni.com') logfunc('By: Yogesh Khatri | @SwiftForensics | swiftforensics.com\n') @@ -341,24 +343,30 @@ def crunch_artifacts( logfunc(f'File/Directory selected: {input_path}') logfunc('\n--------------------------------------------------------------------------------------') - log = open(os.path.join(out_params.report_folder_base, 'Script Logs', 'ProcessedFilesLog.html'), 'w+', encoding='utf8') + log = open(os.path.join(out_params.output_folder_base, '_HTML', '_Script_Logs', 'ProcessedFilesLog.html'), 'w+', encoding='utf8') log.write(f'Extraction/Path selected: {input_path}

') parsed_modules = 0 + artifact_search_pattern_id = 0 + file_path_ids = set() + # Search for the files per the arguments - for plugin in plugins: + for plugin_number, plugin in enumerate(plugins, start=1): logfunc() - logfunc('{} [{}] artifact started'.format(plugin.name, plugin.module_name)) + logfunc('[{}/{}] {} [{}] artifact started'.format(plugin_number, len(plugins), + plugin.name, plugin.module_name)) if isinstance(plugin.search, list) or isinstance(plugin.search, tuple): search_regexes = plugin.search else: search_regexes = [plugin.search] - parsed_modules += 1 - GuiWindow.SetProgressBar(parsed_modules, len(plugins)) files_found = [] log.write(f'For {plugin.name} module') for artifact_search_regex in search_regexes: + artifact_search_pattern_id += 1 + lava_insert_sqlite_artifact_search_pattern( + artifact_search_pattern_id, plugin.module_name, plugin.name, artifact_search_regex) + pattern_already_searched = artifact_search_regex in seeker.searched found = seeker.search(artifact_search_regex) if not found: log.write(f'') @@ -368,10 +376,16 @@ def crunch_artifacts( if pathh.startswith('\\\\?\\'): pathh = pathh[4:] log.write(f'') + if seeker.file_infos.get(pathh): + file_path_id = id(seeker.file_infos.get(pathh)) + if not pattern_already_searched and file_path_id not in file_path_ids: + lava_insert_sqlite_file_path(file_path_id,seeker.file_infos.get(pathh).source_path) + file_path_ids.add(file_path_id) + lava_insert_sqlite_artifact_link_pattern_to_file(artifact_search_pattern_id, file_path_id) log.write(f'') files_found.extend(found) if files_found: - category_folder = os.path.join(out_params.report_folder_base, '_HTML', plugin.category) + category_folder = os.path.join(out_params.output_folder_base, '_HTML', plugin.category) if not os.path.exists(category_folder): try: os.makedirs(category_folder) @@ -389,6 +403,9 @@ def crunch_artifacts( else: logfunc(f"No file found") logfunc('{} [{}] artifact completed'.format(plugin.name, plugin.module_name)) + parsed_modules += 1 + GuiWindow.SetProgressBar(parsed_modules, len(plugins)) + log.flush() log.close() write_device_info() @@ -407,15 +424,15 @@ def crunch_artifacts( logfunc('Report generation started.') # remove the \\?\ prefix we added to input and output paths, so it does not reflect in report if is_platform_windows(): - if out_params.report_folder_base.startswith('\\\\?\\'): - out_params.report_folder_base = out_params.report_folder_base[4:] + if out_params.output_folder_base.startswith('\\\\?\\'): + out_params.output_folder_base = out_params.output_folder_base[4:] if input_path.startswith('\\\\?\\'): input_path = input_path[4:] - report.generate_report(out_params.report_folder_base, run_time_secs, run_time_HMS, extracttype, input_path, casedata, profile_filename, icons) + report.generate_report(out_params.output_folder_base, run_time_secs, run_time_HMS, extracttype, input_path, casedata, profile_filename, icons) logfunc('Report generation Completed.') logfunc('') - logfunc(f'Report location: {out_params.report_folder_base}') + logfunc(f'Report location: {out_params.output_folder_base}') return True diff --git a/aleappGUI.py b/aleappGUI.py index 1017e60b..77f76b5c 100755 --- a/aleappGUI.py +++ b/aleappGUI.py @@ -9,10 +9,11 @@ from PIL import Image, ImageTk from tkinter import ttk, filedialog as tk_filedialog, messagebox as tk_msgbox -from scripts.version_info import aleapp_version +from scripts.version_info import leapp_version from scripts.search_files import * from scripts.modules_to_exclude import modules_to_exclude from scripts.lavafuncs import * +from scripts.context import Context def pickModules(): @@ -206,6 +207,7 @@ def process(casedata): progress_bar.config(maximum=len(selected_modules)) casedata = {key: value.get() for key, value in casedata.items()} out_params = OutputParameters(output_folder) + Context.set_output_params(out_params) wrap_text = True bottom_frame.pack_forget() @@ -215,16 +217,16 @@ def process(casedata): logtext_frame.pack(padx=8, pady=4, expand=True, fill='both') progress_bar_frame.pack(padx=2, pady=2, ipady=2, fill='x') - initialize_lava(input_path, out_params.report_folder_base, extracttype) + initialize_lava(input_path, out_params.output_folder_base, extracttype) crunch_successful = aleapp.crunch_artifacts( selected_modules, extracttype, input_path, out_params, wrap_text, loader, casedata, profile_filename) - lava_finalize_output(out_params.report_folder_base) + lava_finalize_output(out_params.output_folder_base) if crunch_successful: - report_path = os.path.join(out_params.report_folder_base, 'index.html') + report_path = os.path.join(out_params.output_folder_base, 'index.html') if report_path.startswith('\\\\?\\'): # windows report_path = report_path[4:] if report_path.startswith('\\\\'): # UNC path @@ -452,7 +454,7 @@ def add_agency_logo(): ## Main window properties main_window.minsize(890, 690) -main_window.title(f'ALEAPP version {aleapp_version}') +main_window.title(f'ALEAPP version {leapp_version}') main_window.configure(bg=theme_bgcolor) logo_icon = tk.PhotoImage(file=icon) main_window.iconphoto(True, logo_icon) diff --git a/scripts/artifact_report.py b/scripts/artifact_report.py index b1f18d90..027ce89a 100755 --- a/scripts/artifact_report.py +++ b/scripts/artifact_report.py @@ -2,7 +2,7 @@ import os import sys from scripts.html_parts import * -from scripts.version_info import aleapp_version +from scripts.version_info import leapp_version class ArtifactHtmlReport: @@ -21,7 +21,7 @@ def start_artifact_report(self, report_folder, artifact_file_name, artifact_desc '''Creates the report HTML file and writes the artifact name as a heading''' self.report_file = open(os.path.join(report_folder, f'{artifact_file_name}.temphtml'), 'w', encoding='utf8') self.report_file.write(page_header.format(f'ALEAPP - {self.artifact_name} report')) - self.report_file.write(body_start.format(f'ALEAPP {aleapp_version}')) + self.report_file.write(body_start.format(f'ALEAPP {leapp_version}')) self.report_file.write(body_sidebar_setup) self.report_file.write(body_sidebar_dynamic_data_placeholder) # placeholder for sidebar data self.report_file.write(body_sidebar_trailer) diff --git a/scripts/artifacts/DuckDuckGo.py b/scripts/artifacts/DuckDuckGo.py index 6b5f9bb8..27ba0a31 100644 --- a/scripts/artifacts/DuckDuckGo.py +++ b/scripts/artifacts/DuckDuckGo.py @@ -343,10 +343,6 @@ def duckduckgo_opentabs(files_found, report_folder, seeker, wrap_text): thumb_path = thumb_lookup.get(cached_filename) if thumb_path: tab_thumbnail_media = check_in_media( - artifact_info, - report_folder, - seeker, - files_found, thumb_path, cached_filename ) @@ -474,7 +470,7 @@ def duckduckgo_thumbnails(files_found, report_folder, seeker, wrap_text): filepath = str(media_path.parents[1]) timestamp = (datetime.datetime.utcfromtimestamp(utctime/1000).strftime('%Y-%m-%d %H:%M:%S')) - media_item = check_in_media(artifact_info, report_folder, seeker, files_found, file_found, filename) + media_item = check_in_media(file_found, filename) if media_item: tab_status = 'Open' if filename in open_preview_files else 'Closed' diff --git a/scripts/artifacts/Grok.py b/scripts/artifacts/Grok.py index ea680253..070ca526 100644 --- a/scripts/artifacts/Grok.py +++ b/scripts/artifacts/Grok.py @@ -180,10 +180,6 @@ def grok_generatedvideos(files_found, report_folder, seeker, wrap_text): missing_flag = "Present" media_item = check_in_media( - artifact_info, - report_folder, - seeker, - files_found, file_found, filename ) diff --git a/scripts/artifacts/OrnetBrowser.py b/scripts/artifacts/OrnetBrowser.py index 9458bb42..cca385bd 100644 --- a/scripts/artifacts/OrnetBrowser.py +++ b/scripts/artifacts/OrnetBrowser.py @@ -326,10 +326,6 @@ def is_sqlite_db(path): thumb_path = thumb_lookup.get(cached_filename) if thumb_path: tab_thumbnail_media = check_in_media( - artifact_info, - report_folder, - seeker, - files_found, thumb_path, cached_filename ) @@ -451,7 +447,7 @@ def ornetbrowser_thumbnails(files_found, report_folder, seeker, wrap_text): filepath = str(media_path.parents[1]) timestamp = (datetime.datetime.utcfromtimestamp(utctime/1000).strftime('%Y-%m-%d %H:%M:%S')) - media_item = check_in_media(artifact_info, report_folder, seeker, files_found, file_found, filename) + media_item = check_in_media(file_found, filename) if media_item: data_list.append((timestamp, media_item, filename, str(file_found))) diff --git a/scripts/artifacts/SamsungTrash.py b/scripts/artifacts/SamsungTrash.py index da1def56..cca429a9 100644 --- a/scripts/artifacts/SamsungTrash.py +++ b/scripts/artifacts/SamsungTrash.py @@ -71,7 +71,7 @@ def samsungTrash(files_found, report_folder, seeker, _wrap_text): if matched_media_path: media_item = check_in_media( - artifact_info, report_folder, seeker, files_found + [matched_media_path], matched_media_path, Path(matched_media_path).name + matched_media_path, Path(matched_media_path).name ) data_list.append(( diff --git a/scripts/artifacts/TorBrowser.py b/scripts/artifacts/TorBrowser.py index 9cded3ea..02125867 100644 --- a/scripts/artifacts/TorBrowser.py +++ b/scripts/artifacts/TorBrowser.py @@ -67,13 +67,7 @@ def torbrowser_thumbnails(files_found, report_folder, seeker, wrap_text): modified_ts = os.path.getmtime(file_found) modifiedtime = datetime.datetime.utcfromtimestamp(int(modified_ts)).strftime('%Y-%m-%d %H:%M:%S') - media_item = check_in_media( - artifact_info, - report_folder, - seeker, - files_found, - filename - ) + media_item = check_in_media(filename) if media_item: data_list.append((modifiedtime, media_item, filename, location)) diff --git a/scripts/artifacts/ZangiChats.py b/scripts/artifacts/ZangiChats.py index 1b57e2e3..0938b5a6 100644 --- a/scripts/artifacts/ZangiChats.py +++ b/scripts/artifacts/ZangiChats.py @@ -109,7 +109,7 @@ def zangichats(files_found, _report_folder, _seeker, _wrap_text): media_path = f"files/zangi/Zangi Files/{msgId}.*" try: attach_file_name = Path(media_path).name - attach_file = check_in_media(artifact_info, _report_folder, _seeker, files_found, media_path, attach_file_name) + attach_file = check_in_media(media_path, attach_file_name) except TypeError: attach_file = "" else: diff --git a/scripts/artifacts/appicons.py b/scripts/artifacts/appicons.py index 1a492d0f..76d84c49 100755 --- a/scripts/artifacts/appicons.py +++ b/scripts/artifacts/appicons.py @@ -100,12 +100,12 @@ def appIcons(files_found, report_folder, seeker, wrap_text): other_icons = [] if app.icon: # main_icon = check_in_embedded_media(artifact_info, report_folder, seeker, source_path, app.icon[1], app.icon[0], app.icon[2]) - main_icon = check_in_embedded_media(artifact_info, report_folder, seeker, source_path, app.icon[1], app.icon[0]) + main_icon = check_in_embedded_media(source_path, app.icon[1], app.icon[0]) for k, v in app.icons.items(): if v[1]: # sometimes icon is NULL in db # other_icon = check_in_embedded_media(artifact_info, report_folder, seeker, source_path, v[1], v[0], v[2]) - other_icon = check_in_embedded_media(artifact_info, report_folder, seeker, source_path, v[1], v[0]) + other_icon = check_in_embedded_media(source_path, v[1], v[0]) other_icons.append(other_icon) - data_list.append((escape(app.name), escape(app.package), main_icon, other_icons )) + data_list.append((escape(app.name), escape(app.package), main_icon, other_icons)) return data_headers, data_list, source_path diff --git a/scripts/artifacts/googleVoice.py b/scripts/artifacts/googleVoice.py index 82c97c76..bbf35110 100644 --- a/scripts/artifacts/googleVoice.py +++ b/scripts/artifacts/googleVoice.py @@ -251,7 +251,7 @@ def googlevoice_calls(files_found, report_folder, seeker, wrap_text): # get the audio file for audio_file in files_found: if "audio" in audio_file and message_id in audio_file: - recording = check_in_media(artifact_info, report_folder, seeker, files_found, audio_file) + recording = check_in_media(audio_file) break data_list.append((timestamp,account_number,direction,from_num,to_num,call_status,voicemail,duration,recording)) @@ -364,7 +364,7 @@ def googlevoice_voicemails(files_found, report_folder, seeker, wrap_text): # get the voicemail audio file for audio_file in files_found: if "audio" in audio_file and message_id in audio_file: - audio = check_in_media(artifact_info, report_folder, seeker, files_found, audio_file) + audio = check_in_media(audio_file) break data_list.append((timestamp,account_number,from_num,to_num,duration,read_status,transcript,audio)) @@ -469,7 +469,7 @@ def googlevoice_messages(files_found, report_folder, seeker, wrap_text): # image file resides in Photo MMS images folder # filename: message_id + "-14" + extension if "Photo MMS images" in image and message_id in image and "-14" in image: - thumb = check_in_media(artifact_info, report_folder, seeker, files_found, image) + thumb = check_in_media(image) data_list.append((timestamp,account_number,conversation_id,direction,from_num,to_num,read_status,message_content,thumb)) break @@ -543,7 +543,7 @@ def googlevoice_messages(files_found, report_folder, seeker, wrap_text): # image file resides in Photo MMS images folder # filename: message_id + "-14" + extension if "Photo MMS images" in image and message_id in image and "-14" in image: - thumb = check_in_media(artifact_info, report_folder, seeker, files_found, image) + thumb = check_in_media(image) data_list.append((timestamp,account_number,conversation_id,direction,from_num,to_nums,read_status,message_content,thumb)) break diff --git a/scripts/context.py b/scripts/context.py new file mode 100644 index 00000000..a1899ee5 --- /dev/null +++ b/scripts/context.py @@ -0,0 +1,339 @@ +"""Context class""" + +from os.path import basename +from pathlib import Path + + +class Context: + """ + Context class provides a static context for managing and accessing global + state and configuration used during artifact processing in the LEAPPs + framework. It stores information such as report folder, artifact details, + files found, device IDs, and OS build mappings, and provides utility + methods for retrieving and manipulating this data. + """ + + _output_params = None + _report_folder = None + _seeker = None + _artifact_info = None + _module_name = None + _module_file_path = None + _artifact_name = None + _files_found = [] + _filename_lookup_map = {} + + @staticmethod + def set_output_params(output_params): + """ + Sets the OutputParameters instance in the Context. This should only be + called once at the start of a run. + + Args: + output_params: The initialized OutputParameters object. + """ + Context._output_params = output_params + + @staticmethod + def set_report_folder(report_folder): + """ + Sets the report folder path in the Context. + + Args: + report_folder (str): The path to the folder where reports will be + stored. + """ + + Context._report_folder = report_folder + + @staticmethod + def set_seeker(seeker): + """ + Sets the seeker object in the Context class. + + Args: + seeker: The seeker object to be set as the current context seeker. + """ + + Context._seeker = seeker + + @staticmethod + def set_artifact_info(artifact_info): + """ + Sets the artifact information in the Context. + + Args: + artifact_info: The artifact information to be stored. + """ + + Context._artifact_info = artifact_info + + @staticmethod + def set_module_name(module_name): + """ + Sets the module name in the Context class. + + Args: + module_name (str): The name of the module to set. + """ + + Context._module_name = module_name + + @staticmethod + def set_module_file_path(module_file_path): + """ + Sets the file path for the current module in the Context. + + Args: + module_file_path (str): The file path to be set for the module. + """ + + Context._module_file_path = module_file_path + + @staticmethod + def set_artifact_name(artifact_name): + """ + Sets the artifact name in the Context. + + Args: + artifact_name (str): The name of the artifact to set. + """ + + Context._artifact_name = artifact_name + + @staticmethod + def set_files_found(files_found): + """ + Sets the list of files found in the current context. + + Args: + files_found (list): A list of file paths that have been found + using the paths regex of __artifact_v2__ and that are to be stored + in the context. + """ + + Context._files_found = files_found + + @staticmethod + def _build_lookup_map(): + """Builds and returns a dictionary mapping filenames to a list + of full paths.""" + + if not Context._files_found: + raise ValueError( + "Cannot build lookup map: _files_found is not set.") + + filename_lookup = {} + for full_path in Context._files_found: + filename = basename(full_path) + if filename not in filename_lookup: + filename_lookup[filename] = [] + filename_lookup[filename].append(full_path) + return filename_lookup + + @staticmethod + def get_output_params(): + """ + Retrieves the current OutputParameters instance from the Context. + + Raises: + ValueError: If the output parameters are not set. + + Returns: + OutputParameters: The OutputParameters instance. + """ + if not Context._output_params: + raise ValueError("Context not set. OutputParameters not available.") + return Context._output_params + + @staticmethod + def get_report_folder(): + """ + Retrieves the current report folder path from the Context. + + Raises: + ValueError: If the report folder is not set, indicating that the + function is called outside of an artifact context. + + Returns: + str: The path to the report folder. + """ + + if not Context._report_folder: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._report_folder + + @staticmethod + def get_seeker(): + """ + Retrieve the current seeker object from the Context. + + Raises: + ValueError: If the Context has not been set, indicating that this + function should only be called from within an artifact. + + Returns: + The seeker object associated with the current Context. + """ + + if not Context._seeker: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._seeker + + @staticmethod + def get_artifact_info(): + """ + Retrieve the current artifact information (__artifact_v2__) from the + Context. + + Raises: + ValueError: If the Context's artifact information is not set, + indicating that this function was called outside of an artifact + context. + + Returns: + dict: The artifact information stored in the Context. + """ + + if not Context._artifact_info: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._artifact_info + + @staticmethod + def get_module_name(): + """ + Retrieves the current module name from the Context. + + Raises: + ValueError: If the Context has not been set, indicating that this + function should only be called from within an artifact. + + Returns: + str: The name of the current module. + """ + + if not Context._module_name: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._module_name + + @staticmethod + def get_module_file_path(): + """ + Returns the file path of the current module set in the Context. + + Raises: + ValueError: If the module file path is not set in the Context, + indicating that this function was called outside of an artifact + context. + + Returns: + str: The file path of the current module. + """ + + if not Context._module_file_path: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._module_file_path + + @staticmethod + def get_artifact_name(): + """ + Retrieves the current artifact name from the Context. + + Raises: + ValueError: If the artifact name has not been set in the Context, + indicating that this function was called outside of an artifact + context. + + Returns: + str: The name of the current artifact. + """ + + if not Context._artifact_name: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._artifact_name + + @staticmethod + def get_files_found(): + """ + Retrieves the list of files found in the current context. + + Raises: + ValueError: If the context has not been set, indicating that this + function should only be called from within an artifact. + + Returns: + list: The list of files found in the current context. + """ + + if not Context._files_found: + raise ValueError("Context not set. This function should be" + + " called from within an artifact.") + return Context._files_found + + @staticmethod + def get_filename_lookup_map(): + """ + Retrieves the filename lookup map, initializing it if necessary. + + Returns: + dict: A mapping of filenames to their corresponding lookup values. + """ + + if not Context._filename_lookup_map: + Context._filename_lookup_map = Context._build_lookup_map() + return Context._filename_lookup_map + + @staticmethod + def get_source_file_path(partial_path): + """ + Finds the full source path for a given partial or relative path. + This function uses a pre-computed lookup map for high-speed searching. + It first finds candidate paths based on the filename and then verifies + the match using the full partial path provided. + + Args: + partial_path (str): The partial or relative path of the file + to find. + + Returns: + str: The full path of the matching source file, or None + if not found. + """ + lookup_map = Context.get_filename_lookup_map() + + # Defensive check to satisfy the linter. + # This state should not be possible in practice. + if lookup_map is None: + return None + + filename = basename(partial_path) + + if filename in lookup_map: + candidate_paths = lookup_map[filename] + for candidate in candidate_paths: + if Path(candidate).match(partial_path): + return candidate + + return None + + @staticmethod + def clear(): + """ + Resets all context-related class variables to None, effectively + clearing any stored state or references, except for the device IDs, + OS builds, and output parameters which are retained for efficiency. + """ + Context._report_folder = None + Context._seeker = None + Context._artifact_info = None + Context._module_name = None + Context._module_file_path = None + Context._artifact_name = None + Context._files_found = [] + Context._filename_lookup_map = {} diff --git a/scripts/ilapfuncs.py b/scripts/ilapfuncs.py index 827b941d..8b3bddaa 100755 --- a/scripts/ilapfuncs.py +++ b/scripts/ilapfuncs.py @@ -16,6 +16,7 @@ from pathlib import Path from urllib.parse import quote import scripts.artifact_report as artifact_report +from scripts.context import Context # common third party imports import pytz @@ -48,15 +49,19 @@ def __init__(self, output_folder, custom_folder_name=None): folder_name = custom_folder_name else: folder_name = 'ALEAPP_Reports_' + currenttime - self.report_folder_base = os.path.join(output_folder, folder_name) - self.data_folder = os.path.join(self.report_folder_base, 'data') + self.output_folder_base = os.path.join(output_folder, folder_name) + self.data_folder = os.path.join(self.output_folder_base, 'data') + self.media_folder = os.path.join(self.output_folder_base, 'media') + self.html_media_folder = os.path.join(self.output_folder_base, '_HTML', 'media') OutputParameters.screen_output_file_path = os.path.join( - self.report_folder_base, 'Script Logs', 'Screen Output.html') + self.output_folder_base, '_HTML', '_Script_Logs', 'Screen_Output.html') OutputParameters.screen_output_file_path_devinfo = os.path.join( - self.report_folder_base, 'Script Logs', 'DeviceInfo.html') + self.output_folder_base, '_HTML', '_Script_Logs', 'DeviceInfo.html') - os.makedirs(os.path.join(self.report_folder_base, 'Script Logs')) + os.makedirs(os.path.join(self.output_folder_base, '_HTML', '_Script_Logs')) os.makedirs(self.data_folder) + os.makedirs(self.media_folder, exist_ok=True) + os.makedirs(self.html_media_folder, exist_ok=True) class GuiWindow: '''This only exists to hold window handle if script is run from GUI''' @@ -77,7 +82,8 @@ def __init__(self, id): self.metadata = "" self.created_at = 0 self.updated_at = 0 - + self.is_embedded = 0 + def set_values(self, media_info): self.id = media_info[0] self.source_path = media_info[1] @@ -86,6 +92,7 @@ def set_values(self, media_info): self.metadata = media_info[4] self.created_at = media_info[5] self.updated_at = media_info[6] + self.is_embedded = media_info[7] class MediaReferences(): def __init__(self, id): @@ -94,15 +101,13 @@ def __init__(self, id): self.module_name = "" self.artifact_name = "" self.name = "" - self.media_path = "" - + def set_values(self, media_ref_info): self.id = media_ref_info[0] self.media_item_id = media_ref_info[1] self.module_name = media_ref_info[2] self.artifact_name = media_ref_info[3] self.name = media_ref_info[4] - self.media_path = media_ref_info[5] def logfunc(message=""): @@ -115,9 +120,10 @@ def redirect_logs(string): log_text = GuiWindow.window_handle.nametowidget('logs_frame.log_text') sys.stdout.write = redirect_logs - with open(OutputParameters.screen_output_file_path, 'a', encoding='utf8') as a: - print(message) - a.write(message + '
' + OutputParameters.nl) + if OutputParameters.screen_output_file_path: + with open(OutputParameters.screen_output_file_path, 'a', encoding='utf8') as a: + a.write(message + '
' + OutputParameters.nl) + print(message) def strip_tuple_from_headers(data_headers): @@ -136,102 +142,202 @@ def check_output_types(type, output_types): return True elif type != 'kml' and ('standard' in output_types or 'standard' == output_types): return True + elif type == 'lava' and ('lava_only' in output_types or 'lava_only' == output_types): + return True else: return False -def get_media_references_id(media_id, artifact_info, name): - artifact_name = artifact_info.function +def get_media_references_id(media_id, artifact_name, name): + ''' + Get the media references ID. + Args: + media_id: The ID of the media. + artifact_name: The name of the artifact. + name: The name of the media (optional). + Returns: + The media references ID. + ''' return hashlib.sha1(f"{media_id}-{artifact_name}-{name}".encode()).hexdigest() -def set_media_references(media_ref_id, media_id, artifact_info, name, media_path): - module_name = Path(artifact_info.filename).stem - artifact_name = artifact_info.function +def set_media_references(media_ref_id, media_id, module_name, artifact_name, name): + ''' + Set the media references in the LAVA database. + Args: + media_ref_id: The ID of the media references. + media_id: The ID of the media. + module_name: The name of the module. + artifact_name: The name of the artifact. + name: The name of the media (optional). + ''' media_references = MediaReferences(media_ref_id) media_references.set_values(( - media_ref_id, media_id, module_name, artifact_name, name, media_path + media_ref_id, media_id, module_name, artifact_name, name )) lava_insert_sqlite_media_references(media_references) -def check_in_media(artifact_info, report_folder, seeker, files_found, file_path, name="", converted_file_path=False): - extraction_path = next( - (path for path in files_found if Path(path).match(file_path)), None) - file_info = seeker.file_infos.get(extraction_path) - if file_info: - extraction_path = converted_file_path if converted_file_path else Path(extraction_path) - if extraction_path.is_file(): - media_id = hashlib.sha1(f"{file_info.source_path}".encode()).hexdigest() - media_ref_id = get_media_references_id(media_id, artifact_info, name) - lava_media_ref = lava_get_media_references(media_ref_id) - if lava_media_ref: - return media_ref_id - media_path = Path(report_folder).joinpath(media_ref_id).with_suffix(extraction_path.suffix) - try: - media_path.hardlink_to(extraction_path) - except OSError: - shutil.copy2(extraction_path, media_path) - lava_media_item = lava_get_media_item(media_id) - if not lava_media_item: - media_item = MediaItem(media_id) - media_item.source_path = file_info.source_path - media_item.extraction_path = f"./{Path(report_folder).stem}/{media_ref_id}{extraction_path.suffix}" - media_item.mimetype = guess_mime(extraction_path) - media_item.metadata = "not implemented yet" +def _check_in_media(media_id, source_path, is_embedded, name, media_data=None, converted_file_path=None, force_type=None, + force_extension=None, force_creation_date=None, force_modification_date=None): + ''' + Check in media. + Args: + media_id: The ID of the media. + source_path: The source path of the media file. + is_embedded: Whether the media is embedded. + name: The name of the media (optional). + media_data: The media data (optional). + converted_file_path: The converted file path (optional). + force_type: The MIME type of the media (optional). + force_extension: The extension of the media (optional). + force_creation_date: The creation date of the media (optional). + force_modification_date: The modification date of the media (optional). + Returns: + The media reference ID or None. + ''' + output_params = Context.get_output_params() + seeker = Context.get_seeker() + + media_ref_id = get_media_references_id(media_id, Context.get_artifact_name(), name) + if lava_get_media_references(media_ref_id): + return media_ref_id # Reference already exists, we're done. + + # If media item doesn't exist, create it. + if not lava_get_media_item(media_id): + media_item = MediaItem(media_id) + + if force_type: + media_item.mimetype = force_type + else: + media_item.mimetype = guess_mime(media_data) + + if force_extension: + suffix = force_extension + elif name and len(name.split('.')[-1]) < 5: + suffix = name.split('.')[-1] + elif not is_embedded and len(source_path.split('.')[-1]) < 5: + suffix = source_path.split('.')[-1] + else: + suffix = f".{guess_extension(media_data)}" + if suffix and not suffix.startswith('.'): + suffix = f".{suffix}" + + extraction_path = Context.get_source_file_path(source_path) + file_info = seeker.file_infos.get(extraction_path) + if file_info: + media_item.source_path = file_info.source_path + else: + media_item.source_path = source_path + + if is_embedded: + media_item.created_at = force_creation_date if force_creation_date else 0 + media_item.updated_at = force_modification_date if force_modification_date else 0 + else: + if not extraction_path: + return None + + file_to_copy = Path(converted_file_path) if converted_file_path else Path(extraction_path) + if not file_to_copy.is_file(): + return None + + if force_creation_date: + media_item.created_at = force_creation_date + elif file_info: media_item.created_at = file_info.creation_date + else: + media_item.created_at = 0 + + if force_modification_date: + media_item.updated_at = force_modification_date + elif file_info: media_item.updated_at = file_info.modification_date - lava_insert_sqlite_media_item(media_item) - set_media_references(media_ref_id, media_id, artifact_info, name, media_path) - return media_ref_id + else: + media_item.updated_at = 0 + + # 1. Create the canonical media file + canonical_media_path = Path(output_params.media_folder).joinpath(media_id).with_suffix(suffix) + if is_embedded: + canonical_media_path.write_bytes(media_data) else: - logfunc(f"{extraction_path} is not a file") - return None - else: + try: + canonical_media_path.hardlink_to(file_to_copy) + except OSError: + shutil.copy2(file_to_copy, canonical_media_path) + + # 2. Create the HTML media file link/copy + html_media_path = Path(output_params.html_media_folder).joinpath(media_id).with_suffix(suffix) + if not html_media_path.exists(): + try: + html_media_path.hardlink_to(canonical_media_path) + except OSError: + shutil.copy2(canonical_media_path, html_media_path) + + media_item.extraction_path = f"media/{media_id}{suffix}" + media_item.metadata = "not parsed yet" + media_item.is_embedded = 1 if is_embedded else 0 + lava_insert_sqlite_media_item(media_item) + + # Always set the reference + set_media_references(media_ref_id, media_id, Context.get_module_name(), Context.get_artifact_name(), name) + return media_ref_id + +def check_in_media(file_path, name="", converted_file_path=False, force_type=None, force_extension=None, + force_creation_date=None, force_modification_date=None): + ''' + Check in media. + Args: + file_path: The file path of the media file. + name: The name of the media (optional). + converted_file_path: The converted file path (optional). + force_type: The MIME type of the media (optional). + force_extension: The extension of the media (optional). + force_creation_date: The creation date of the media (optional). + force_modification_date: The modification date of the media (optional). + Returns: + The media reference ID or None. + ''' + extraction_path = Context.get_source_file_path(file_path) + if not extraction_path: logfunc(f'No matching file found for "{file_path}"') return None -def check_in_embedded_media(artifact_info, report_folder, seeker, source_file, data, name="", updated_at=0): - file_info = seeker.file_infos.get(source_file) - source_path = file_info.source_path if file_info else source_file - if data: - media_id = hashlib.sha1(data).hexdigest() - media_ref_id = get_media_references_id(media_id, artifact_info, name) - lava_media_ref = lava_get_media_references(media_ref_id) - if lava_media_ref: - return media_ref_id - media_path = Path(report_folder).joinpath(media_ref_id).with_suffix(f".{guess_extension(data)}") - lava_media_item = lava_get_media_item(media_id) - if not lava_media_item: - media_item = MediaItem(media_id) - media_item.source_path = source_path - media_item.extraction_path = media_path - media_item.mimetype = guess_mime(data) - media_item.metadata = "not implemented yet" - media_item.created_at = 0 - media_item.updated_at = updated_at - try: - with open(media_item.extraction_path, "wb") as file: - file.write(data) - except Exception as ex: - logfunc(f'Could not copy embedded media into {media_item.extraction_path} ' + str(ex)) - lava_insert_sqlite_media_item(media_item) - set_media_references(media_ref_id, media_id, artifact_info, name, media_path) - return media_ref_id - else: + file_info = Context.get_seeker().file_infos.get(extraction_path) + if file_info: + media_id = hashlib.sha1(f"{file_info.source_path}".encode()).hexdigest() + with open(extraction_path, "rb") as f: + file_data = f.read() + return _check_in_media(media_id, file_path, False, name, media_data=file_data, converted_file_path=converted_file_path, + force_type=force_type, force_extension=force_extension, + force_creation_date=force_creation_date, force_modification_date=force_modification_date) + return None + +def check_in_embedded_media(source_file, data, name="", force_type=None, force_extension=None, + force_creation_date=None, force_modification_date=None): + ''' + Check in embedded media. + Args: + source_file: The source file path of the embedded media data. + data: The bytes of the embedded media data. + name: The name of the media (optional). + force_type: The MIME type of the media (optional). + force_extension: The extension of the media (optional). + force_creation_date: The creation date of the media (optional). + force_modification_date: The modification date of the media (optional). + Returns: + The media reference ID or None. + ''' + if not data: return None + media_id = hashlib.sha1(data).hexdigest() + return _check_in_media(media_id, source_file, True, name, media_data=data, force_type=force_type, + force_extension=force_extension, force_creation_date=force_creation_date, + force_modification_date=force_modification_date) def html_media_tag(media_path, mimetype, style, title=''): def relative_paths(source): - splitter = '\\' if is_platform_windows() else '/' - first_split = source.split(splitter) - for x in first_split: - if 'data' in x: - index = first_split.index(x) - last_split = source.split(first_split[index - 1]) - return '..' + last_split[1].replace('\\', '/') - elif '_HTML' in x: - index = first_split.index(x) - last_split = source.split(first_split[index]) - return '.' + last_split[1].replace('\\', '/') - return source + # HTML report is in /_HTML/.html + # Media will be linked from /_HTML/media/. + # source path is the canonical path: ./media/. + filename = Path(source).name + return f"media/{filename}" filename = Path(media_path).name media_path = quote(relative_paths(media_path)) @@ -253,36 +359,63 @@ def get_data_list_with_media(media_header_info, data_list): ''' For columns with media item, generate: - A data list with HTML code for HTML output - - A data list with extraaction path of media items for TSV, KML and Timeline exports + - A data list with extraction path of media items for TSV, KML and Timeline exports ''' html_data_list = [] txt_data_list = [] + + # Get the correct output paths from the context + output_params = Context.get_output_params() + for data in data_list: - html_data = list(data) - txt_data = list(data) + html_row = list(data) + txt_row = list(data) + for idx, style in media_header_info.items(): - if html_data[idx]: - media_ref_id = html_data[idx] - if isinstance(media_ref_id, list): - html_code = '' - path_list = [] - for item in media_ref_id: - media_item = lava_get_full_media_info(item) - html_code += html_media_tag( - media_item['media_path'], media_item['type'], style, media_item['name']) - path_list.append(media_item[6]) - txt_code = ' | '.join(path_list) - else: - media_item = lava_get_full_media_info(media_ref_id) - html_code = html_media_tag(media_item['media_path'], media_item['type'], style, media_item['name']) - txt_code = media_item[6] - html_data[idx] = html_code - txt_data[idx] = txt_code + media_ref_id_cell = html_row[idx] + if not media_ref_id_cell: + html_row[idx] = '' + txt_row[idx] = '' + continue + + html_code = '' + path_list = [] + + # Handle both single items and lists of items uniformly + media_ref_ids = media_ref_id_cell if isinstance(media_ref_id_cell, list) else [media_ref_id_cell] + + for ref_id in media_ref_ids: + media_item = lava_get_full_media_info(ref_id) + if not (media_item and media_item['extraction_path']): + continue + + # Construct the full, absolute path to the canonical media file + canonical_path = os.path.join(output_params.output_folder_base, media_item['extraction_path']) + + # Construct the full, absolute path for the HTML link destination + html_path = os.path.join(output_params.html_media_folder, Path(canonical_path).name) + + # Create the link/copy for the HTML report if it doesn't exist + if os.path.exists(canonical_path) and not os.path.exists(html_path): + try: + os.link(canonical_path, html_path) + except OSError: + shutil.copy2(canonical_path, html_path) + + # Generate the HTML tag and add the path for the text report + html_code += html_media_tag(media_item['extraction_path'], media_item['type'], style, media_item['name']) + path_list.append(media_item['extraction_path']) + + # Assign the generated values to the rows + html_row[idx] = html_code + if isinstance(media_ref_id_cell, list): + txt_row[idx] = ' | '.join(path_list) else: - html_data[idx] = '' - txt_data[idx] = '' - html_data_list.append(tuple(html_data)) - txt_data_list.append(tuple(txt_data)) + txt_row[idx] = path_list[0] if path_list else '' + + html_data_list.append(tuple(html_row)) + txt_data_list.append(tuple(txt_row)) + return html_data_list, txt_data_list def artifact_processor(func): @@ -290,30 +423,43 @@ def artifact_processor(func): def wrapper(files_found, report_folder, seeker, wrap_text): module_name = func.__module__.split('.')[-1] func_name = func.__name__ + module_file_path = inspect.getfile(func) - func_object = func.__globals__.get(func_name, {}) - artifact_info = func_object.artifact_info #get('artifact_info', {}) + all_artifacts_info = func.__globals__.get('__artifacts_v2__', {}) + artifact_info = all_artifacts_info.get(func_name, {}) artifact_name = artifact_info.get('name', func_name) category = artifact_info.get('category', '') description = artifact_info.get('description', '') icon = artifact_info.get('artifact_icon', '') html_columns = artifact_info.get('html_columns', []) - path_regex = artifact_info.get('paths', '') output_types = artifact_info.get('output_types', ['html', 'tsv', 'timeline', 'lava', 'kml']) - data_headers, data_list, source_path = func(files_found, report_folder, seeker, wrap_text) - + Context.clear() + Context.set_report_folder(report_folder) + Context.set_seeker(seeker) + Context.set_files_found(files_found) + Context.set_artifact_info(artifact_info) + Context.set_module_name(module_name) + Context.set_module_file_path(module_file_path) + Context.set_artifact_name(artifact_name) + + sig = inspect.signature(func) + if len(sig.parameters) == 1: + data_headers, data_list, source_path = func(Context) + else: + data_headers, data_list, source_path = func(files_found, report_folder, seeker, wrap_text) + if not source_path: - logfunc(f"No file found") + logfunc("No source_path provided") - elif len(data_list): + if len(data_list): if isinstance(data_list, tuple): data_list, html_data_list = data_list else: html_data_list = data_list - logfunc(f"Found {len(data_list)} {'records' if len(data_list)>1 else 'record'} for {artifact_name}") + logfunc(f"Found {len(data_list):,} {'records' if len(data_list)>1 else 'record'} for {artifact_name}") icons.setdefault(category, {artifact_name: icon}).update({artifact_name: icon}) # Strip tuples from headers for HTML, TSV, and timeline @@ -334,12 +480,20 @@ def wrapper(files_found, report_folder, seeker, wrap_text): if check_output_types('tsv', output_types): tsv(report_folder, stripped_headers, txt_data_list if media_header_info else data_list, artifact_name) - + if check_output_types('timeline', output_types): timeline(report_folder, artifact_name, txt_data_list if media_header_info else data_list, stripped_headers) if check_output_types('lava', output_types): - table_name, object_columns, column_map = lava_process_artifact(category, module_name, artifact_name, data_headers, len(data_list), data_views=artifact_info.get("data_views")) + table_name, object_columns, column_map = lava_process_artifact(category, + module_name, + artifact_name, + data_headers, + len(data_list), + func_name=func_name, + data_views=artifact_info.get("data_views"), + artifact_icon=icon, + source_path=source_path) lava_insert_sqlite_data(table_name, data_list, object_columns, data_headers, column_map) if check_output_types('kml', output_types): @@ -402,10 +556,9 @@ def get_file_path(files_found, filename, skip=False): """Returns the path of the searched filename if exists or returns None""" try: for file_found in files_found: - if skip: - if skip in file_found: - continue - if file_found.endswith(filename): + if skip and skip in file_found: + continue + if Path(file_found).match(filename): return file_found except Exception as e: logfunc(f"Error: {str(e)}") @@ -433,7 +586,8 @@ def get_file_path_list_checking_uid(files_found, filename, position , skip=False def get_txt_file_content(file_path): try: with open(file_path, "r", encoding="utf-8") as file: - return file.readlines() + file_content = file.readlines() + return file_content except FileNotFoundError: logfunc(f"Error: File not found at {file_path}") except PermissionError: diff --git a/scripts/lavafuncs.py b/scripts/lavafuncs.py index 931c7710..18336010 100644 --- a/scripts/lavafuncs.py +++ b/scripts/lavafuncs.py @@ -1,16 +1,63 @@ +""" +This module provides functionality for initializing, processing, and finalizing +artifact data from forensic analysis. It manages both a SQLite database for +structured data storage and a JSON file for metadata and configuration. + +Global Variables: + lava_data (dict): Main data structure containing artifacts, modules, and metadata. + lava_db (sqlite3.Connection): SQLite database connection for artifact storage. + lava_db_name (str): Name of the SQLite database file. + lava_json_name (str): Name of the JSON metadata file. + +Functions: + sanitize_sql_name: Sanitizes strings for use as SQL identifiers. + get_sql_type: Maps Python types to SQL types. + initialize_lava: Initializes the LAVA data structure and database. + lava_process_artifact: Processes and stores artifact data. + lava_add_module: Adds module information to the LAVA data. + lava_create_sqlite_table: Creates a SQLite table for artifact data. + lava_insert_sqlite_data: Inserts data rows into a SQLite table. + lava_get_media_item: Retrieves media item information from database. + lava_insert_sqlite_media_item: Inserts media item metadata into database. + lava_get_media_references: Retrieves media reference information. + lava_insert_sqlite_media_references: Inserts media reference into database. + lava_get_full_media_info: Retrieves complete media information with joins. + lava_finalize_output: Finalizes and saves LAVA output files. +""" + import json import sqlite3 +import sys import os +from platform import platform from collections import OrderedDict import re import datetime +from scripts.version_info import leapp_name, leapp_version +from scripts.context import Context + # Global variables lava_data = None lava_db = None +lava_db_name = '_lava_artifacts.db' +lava_json_name = '_lava_data.lava' + def sanitize_sql_name(name): - # Remove non-alphanumeric characters and replace spaces with underscores + """ + Sanitizes a given name by removing invalid characters and formatting it. + This function takes a string `name` and performs the following operations: + 1. Removes any character that is not a word character (alphanumeric or underscore) or whitespace. + 2. Replaces consecutive whitespace characters with a single underscore. + 3. Ensures that the resulting string starts with a letter or an underscore; if not, it prepends an underscore. + 4. Converts the entire string to lowercase. + Args: + name (str): The name to be sanitized. + Returns: + str: The sanitized SQL name. + """ + sanitized = re.sub(r'[^\w\s]', '', name) sanitized = re.sub(r'\s+', '_', sanitized) # Ensure the name starts with a letter or underscore @@ -18,69 +65,166 @@ def sanitize_sql_name(name): sanitized = '_' + sanitized return sanitized.lower() + def get_sql_type(python_type): + """ + Convert Python type names to SQL type names for database schema creation. + Args: + python_type (str): The name of the Python type as a string (e.g., 'datetime', 'date', 'str'). + Returns: + str: The corresponding SQL type name. Returns 'INTEGER' for datetime and date types, + and 'TEXT' as the default for all other types. + """ + type_map = { 'datetime': 'INTEGER', 'date': 'INTEGER', } return type_map.get(python_type, 'TEXT') + def initialize_lava(input_path, output_path, input_type): + ''' + Initialize the LAVA data. + Args: + input_path: The path to the input file. + output_path: The path to the output file. + input_type: The type of input file. + selected_artifacts: List of selected artifacts. + ''' + global lava_data, lava_db - + lava_data = { + "parser_info": { + "leapp_name": leapp_name, + "leapp_version": leapp_version, + "leapp_mode": "GUI" if "leappGUI" in sys.argv[0] else "CLI", + "package": "Source code" if not getattr(sys, 'frozen', False) else "Binary", + "OS": platform(), + "start_timestamp": int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + }, "param_input": input_path, "param_output": output_path, "param_type": input_type, "processing_status": "In Progress", + "lava_db_name": lava_db_name, "modules": [], - "artifacts": OrderedDict() + "artifacts": OrderedDict(), + "meta": { + "modules": [] + } } - - db_path = os.path.join(output_path, '_lava_artifacts.db') + + db_path = os.path.join(output_path, lava_db_name) lava_db = sqlite3.connect(db_path) - + cursor = lava_db.cursor() + cursor.execute('''CREATE TABLE _artifact_search_patterns ( + id INTEGER PRIMARY KEY, + module_name TEXT NOT NULL, + artifact_name TEXT NOT NULL, + regex TEXT NOT NULL)''') + cursor.execute('''CREATE TABLE _file_path_list ( + id INTEGER PRIMARY KEY, + file_path TEXT NOT NULL)''') + cursor.execute('''CREATE TABLE _artifact_pattern_to_file ( + id INTEGER PRIMARY KEY, + artifact_search_pattern_id INTEGER NOT NULL, + file_path_id INTEGER NOT NULL, + FOREIGN KEY (artifact_search_pattern_id) REFERENCES _artifact_search_patterns(id), + FOREIGN KEY (file_path_id) REFERENCES _file_path_list(id))''') cursor.execute('''CREATE TABLE _lava_media_items ( - id TEXT PRIMARY KEY, - source_path TEXT, - extraction_path TEXT, - type TEXT, - metadata TEXT, - created_at INTEGER, - updated_at INTEGER)''') + id TEXT PRIMARY KEY, + source_path TEXT, + extraction_path TEXT, + type TEXT, + metadata TEXT, + created_at INTEGER, + updated_at INTEGER, + is_embedded INTEGER)''') cursor.execute('''CREATE TABLE _lava_media_references ( - id TEXT PRIMARY KEY, - media_item_id TEXT, - module_name TEXT, - artifact_name TEXT, + id TEXT PRIMARY KEY, + media_item_id TEXT, + module_name TEXT, + artifact_name TEXT, name TEXT, - media_path TEXT, FOREIGN KEY (media_item_id) REFERENCES _lava_media_items(id))''') - cursor.execute('''CREATE VIEW _lava_media_info AS - SELECT - lmr.id as 'media_ref_id', - lmr.media_item_id, - lmr.module_name, - lmr.artifact_name, - lmr.name, - lmr.media_path, - lmi.source_path, - lmi.extraction_path, - lmi.type, - lmi.metadata, - lmi.created_at, - lmi.updated_at - FROM _lava_media_references as lmr + cursor.execute('''CREATE VIEW _lava_media_info AS + SELECT + lmr.id as 'media_ref_id', + lmr.media_item_id, + lmr.module_name, + lmr.artifact_name, + lmr.name, + lmi.source_path, + lmi.extraction_path, + lmi.type, + lmi.metadata, + lmi.created_at, + lmi.updated_at, + lmi.is_embedded + FROM _lava_media_references as lmr LEFT JOIN _lava_media_items as lmi ON lmr.media_item_id = lmi.id''') - -def lava_process_artifact(category, module_name, artifact_name, data, record_count=None, data_views=None, create_table=True): - global lava_data - + + +def lava_process_artifact( + category, + module_name, + artifact_name, + data, + record_count=None, + func_name=None, + data_views=None, + artifact_icon=None, + source_path=None): + + ''' + Process an artifact and add it to the LAVA data. + Args: + category: The category of the artifact. + module_name: The name of the module that processed the artifact. + artifact_name: The name of the artifact. + data: The name of the columns. + func_name: The name of the function that processed the artifact. + record_count: The number of records in the artifact. + data_views: The data views of the artifact. + artifact_icon: The icon of the artifact. + source_path: The source path of the artifact. + ''' if category not in lava_data["artifacts"]: lava_data["artifacts"][category] = [] - - sanitized_table_name, column_map, object_columns = lava_create_sqlite_table(artifact_name, data, create_table) + + # To backward compatibility for modules not updated that are not passing func_name + if func_name is None: + func_name = artifact_name + + sanitized_table_name, column_map, object_columns = lava_create_sqlite_table(func_name, data) + + # Add artifact metadata + artifact_info = Context.get_artifact_info() + module_info = next((m for m in lava_data['meta']['modules'] if m['module_name'] == module_name), None) + + if not module_info: + module_info = { + "module_name": module_name, + "module_filename": os.path.basename(Context.get_module_file_path()), + "artifacts": [] + } + lava_data['meta']['modules'].append(module_info) + + artifact_meta = { + "artifact_key": sanitized_table_name, + "tablename": sanitized_table_name, + "name": artifact_name, + "description": artifact_info.get('description', ''), + "author": artifact_info.get('author', ''), + "created_date": artifact_info.get('creation_date', ''), + "last_updated_date": artifact_info.get('last_update_date', ''), + "notes": artifact_info.get('notes', ''), + "category": category + } + module_info['artifacts'].append(artifact_meta) artifact = { "name": artifact_name, @@ -88,47 +232,72 @@ def lava_process_artifact(category, module_name, artifact_name, data, record_cou "module": module_name, "column_map": column_map } + if artifact_icon: + artifact['artifact_icon'] = artifact_icon + if record_count is not None: artifact["record_count"] = record_count + + if source_path: + artifact['source_path'] = source_path + if object_columns: artifact["object_columns"] = [{"name": name, "type": type_} for name, type_ in object_columns.items()] if data_views: - if chat_params := data_views.get("chat"): + view_params = None + + # Backward compatibility for chat view. Remove 'chat' once modules are updated. + if "chat" in data_views: + view_params = data_views.pop("chat") + data_views["conversation"] = view_params # Upgrade to conversation + elif "conversation" in data_views: + view_params = data_views.get("conversation") + + if view_params: sanitized_params = {} - #Boolean value is whether or not to sanitize the column name. Should do this for parameters that map to columns - keys = { - "directionSentValue": False, - "threadDiscriminatorColumn": True, - "threadLabelColumn": True, - "textColumn": True, - "directionColumn": True, - "timeColumn": True, - "senderColumn": True, - "mediaColumn": True, - "sentMessageLabelColumn": True, - "sentMessageStaticLabel": False + # Get original column names for dynamic sanitization check + column_names = [item[0] if isinstance(item, tuple) else item for item in data] + + # Conversion map for backward compatibility. Remove once modules are updated. + convert_map = { + "threadDiscriminatorColumn": "conversationDiscriminatorColumn", + "threadLabelColumn": "conversationLabelColumn" } - for (key, value) in chat_params.items(): - if key in keys: - if keys[key]: - sanitized_params[key] = sanitize_sql_name(value) - else: - sanitized_params[key] = value + for key, value in view_params.items(): + # Remap old keys to new keys + final_key = convert_map.get(key, key) - data_views["chat"] = sanitized_params + # Sanitize value if it's a column name, otherwise pass through + if value in column_names: + sanitized_params[final_key] = sanitize_sql_name(value) + else: + sanitized_params[final_key] = value + + data_views["conversation"] = sanitized_params artifact['data_views'] = data_views - + lava_data["artifacts"][category].append(artifact) - + return sanitized_table_name, object_columns, column_map + def lava_add_module(module_name, module_status, file_count=None): - global lava_data - + """ + Adds a module to the global lava_data structure. + Parameters: + module_name (str): The name of the module to be added. + module_status (str): The status of the module (e.g., 'active', 'inactive'). + file_count (int, optional): The number of files associated with the module. Defaults to None. + Returns: + None + Global Variables: + lava_data (dict): A global dictionary that contains a list of modules under the key 'modules'. + """ + module = { "module_name": module_name, "module_status": module_status @@ -137,10 +306,28 @@ def lava_add_module(module_name, module_status, file_count=None): module["file_count"] = file_count lava_data["modules"].append(module) -def lava_create_sqlite_table(table_name, data, create_table=True): - global lava_db - + +def lava_create_sqlite_table(table_name, data): + """ + Creates a SQLite table with the specified name and columns based on the provided data. + Parameters: + table_name (str): The name of the table to be created in the SQLite database. + data (list): A list of tuples or strings representing the columns of the table. + Each tuple should contain the original column name and its data type. + If a string is provided, it is treated as a column name with a default type of TEXT. + Returns: + tuple: A tuple containing: + - sanitized_table_name (str): The sanitized name of the created table. + - column_map (dict): A mapping of sanitized column names to their original names. + - object_columns (dict): A mapping of sanitized column names to their data types. + Raises: + Exception: If there is an error during the table creation process. + """ + if not data: + return None, None, None + sanitized_table_name = sanitize_sql_name(table_name) + cursor = lava_db.cursor() columns = [] column_map = {} @@ -160,29 +347,43 @@ def lava_create_sqlite_table(table_name, data, create_table=True): column_map[sanitized_name] = original_name - if create_table: - columns_sql = ', '.join(columns) - cursor = lava_db.cursor() - cursor.execute(f"CREATE TABLE IF NOT EXISTS {sanitized_table_name} ({columns_sql})") - lava_db.commit() + columns_sql = ', '.join(columns) + cursor.execute(f"CREATE TABLE IF NOT EXISTS {sanitized_table_name} ({columns_sql})") + lava_db.commit() return sanitized_table_name, column_map, object_columns + def lava_insert_sqlite_data(table_name, data, object_columns, headers, column_map): - global lava_db - + """ + Insert data into a SQLite database table with automatic column sanitization and type conversion. + This function handles the insertion of multiple rows of data into a specified SQLite table, + with special handling for complex data types (dict, list) and datetime conversions. + Args: + table_name (str): The name of the SQLite table to insert data into. + data (list): A list of rows to insert, where each row is a sequence of values + corresponding to the headers. + object_columns (dict): A dictionary mapping column names to their data types. + Supports 'datetime' type for automatic timestamp conversion. + headers (list): A list of column headers. Each header can be a string or a tuple + where the first element is the column name. + column_map (dict): Column mapping configuration (currently unused in the function). + Returns: + None + """ + if not data: return - + cursor = lava_db.cursor() - + # Use the sanitized column names directly sanitized_columns = [sanitize_sql_name(h[0] if isinstance(h, tuple) else h) for h in headers] - + # Prepare the SQL query placeholders = ', '.join(['?' for _ in sanitized_columns]) query = f"INSERT INTO {table_name} ({', '.join(sanitized_columns)}) VALUES ({placeholders})" - + # Prepare the data for insertion rows_to_insert = [] for row in data: @@ -203,59 +404,124 @@ def lava_insert_sqlite_data(table_name, data, object_columns, headers, column_ma value = int(value.timestamp()) processed_row.append(value) rows_to_insert.append(tuple(processed_row)) - + # Execute the insert cursor.executemany(query, rows_to_insert) lava_db.commit() -def lava_create_view(table_name, artifact_query): - global lava_db - cursor = lava_db.cursor() - query = f"""CREATE VIEW IF NOT EXISTS {sanitize_sql_name(table_name)} AS {artifact_query}""" - cursor.execute(query) - lava_db.commit() def lava_get_media_item(media_id): - '''Returns a MediaItem object containing info of the media_id item stored - in the media_items table if exists or return None ''' - global lava_db + """ + Retrieve a media item from the lava database by its ID. + Args: + media_id (str): The unique identifier of the media item to retrieve. + Returns: + sqlite3.Row or None: A row object containing all columns from the _lava_media_items table + """ + cursor = lava_db.cursor() query = f"SELECT * FROM _lava_media_items WHERE id='{media_id}'" return cursor.execute(query).fetchone() # return result.fetchone() + def lava_insert_sqlite_media_item(media_item): - global lava_db - created_at = media_item.created_at if media_item.created_at else 'NULL' - updated_at = media_item.updated_at if media_item.updated_at else 'NULL' + """ + Insert a media item record into the _lava_media_items SQLite table. + Args: + media_item: A media item object containing the following attributes: + - id: Unique identifier for the media item + - source_path: Original path of the media file + - extraction_path: Path where the media was extracted + - mimetype: MIME type of the media file + - metadata: Additional metadata about the media item + - created_at: Timestamp when the item was created (optional) + - updated_at: Timestamp when the item was last updated (optional) + Returns: + None + """ + cursor = lava_db.cursor() + sql = '''INSERT INTO _lava_media_items + ("id", "source_path", "extraction_path", "type", "metadata", "created_at", "updated_at", "is_embedded") + VALUES (?, ?, ?, ?, ?, ?, ?, ?)''' + + params = ( + media_item.id, + str(media_item.source_path), + str(media_item.extraction_path), + media_item.mimetype, + media_item.metadata, + media_item.created_at if media_item.created_at else None, + media_item.updated_at if media_item.updated_at else None, + media_item.is_embedded + ) + try: - cursor.execute(f'''INSERT INTO _lava_media_items - ("id", "source_path", "extraction_path", "type", "metadata", "created_at", "updated_at") - VALUES ("{media_item.id}", "{media_item.source_path}", "{media_item.extraction_path}", - "{media_item.mimetype}", "{media_item.metadata}", {created_at}, {updated_at})''') + cursor.execute(sql, params) lava_db.commit() except sqlite3.IntegrityError as e: print(str(e)) + def lava_get_media_references(media_ref): - global lava_db + """ + Retrieves a single media reference record from the _lava_media_references table. + Args: + media_ref (str): The ID of the media reference to retrieve. + Returns: + tuple or None: A tuple containing the row data if found, None otherwise. + """ + cursor = lava_db.cursor() query = f"SELECT * FROM _lava_media_references WHERE id='{media_ref}'" return cursor.execute(query).fetchone() + def lava_insert_sqlite_media_references(media_references): - global lava_db + """ + Insert a media reference record into the _lava_media_references table. + Args: + media_references: An object containing media reference data with the following attributes: + - id: Unique identifier for the media reference + - media_item_id: ID of the associated media item + - module_name: Name of the module containing the artifact + - artifact_name: Name of the artifact + - name: Name/description of the media reference + - media_path: File path to the media item + Returns: + None + """ + cursor = lava_db.cursor() - cursor.execute(f'''INSERT INTO _lava_media_references - ("id", "media_item_id", "module_name", "artifact_name", "name", "media_path") - VALUES ("{media_references.id}", "{media_references.media_item_id}", - "{media_references.module_name}", "{media_references.artifact_name}", - "{media_references.name}", "{media_references.media_path}")''') + sql = '''INSERT INTO _lava_media_references + ("id", "media_item_id", "module_name", "artifact_name", "name") + VALUES (?, ?, ?, ?, ?)''' + + params = ( + media_references.id, + media_references.media_item_id, + media_references.module_name, + media_references.artifact_name, + media_references.name + ) + cursor.execute(sql, params) lava_db.commit() + def lava_get_full_media_info(media_ref_id): - global lava_db + """ + Retrieves complete media information for a given media reference ID from the LAVA database. + This function queries the _lava_media_info table to fetch all columns for a specific + media item identified by its reference ID. The function uses a global database connection + and sets the row factory to sqlite3.Row for dictionary-like access to results. + Args: + media_ref_id (str): The unique media reference identifier to look up in the database. + Returns: + sqlite3.Row or None: A Row object containing all media information fields if found, + None if no matching media_ref_id exists in the database. + """ + lava_db.row_factory = sqlite3.Row cursor = lava_db.cursor() query = f''' @@ -265,24 +531,111 @@ def lava_get_full_media_info(media_ref_id): ''' return cursor.execute(query).fetchone() + +def lava_insert_sqlite_artifact_search_pattern(artifact_regex_id, module_name, artifact_name, regex): + """ + Inserts artifact search pattern into the _artifact_search_patterns table. + Args: + artifact_regex_id (str): Unique identifier for the artifact search pattern. + module_name (str): Name of the module containing the artifact. + artifact_name (str): Name of the artifact. + regex (str): The regular expression for the artifact search pattern. + """ + + cursor = lava_db.cursor() + sql = '''INSERT INTO _artifact_search_patterns + ("id", "module_name", "artifact_name", "regex") + VALUES (?, ?, ?, ?)''' + + data = (artifact_regex_id, module_name, artifact_name, regex) + + try: + cursor.execute(sql, data) + lava_db.commit() + except sqlite3.IntegrityError as e: + print(str(e)) + + +def lava_insert_sqlite_file_path(file_id, file_path): + """ + Insert a file path record into the _file_path_list table. + Args: + file_id (int): Unique identifier for the file path entry. + file_path (str): Relative file path to store. + """ + + cursor = lava_db.cursor() + sql = '''INSERT INTO _file_path_list + ("id", "file_path") + VALUES (?, ?)''' + + data = (file_id, file_path) + + try: + cursor.execute(sql, data) + lava_db.commit() + except sqlite3.IntegrityError as e: + print(str(e)) + + +def lava_insert_sqlite_artifact_link_pattern_to_file(artifact_regex_id, file_id): + """ + Link an artifact search pattern to a file path entry. + Args: + artifact_regex_id (int): ID of the artifact search pattern. + file_id (int): ID of the related file path entry. + """ + + cursor = lava_db.cursor() + sql = '''INSERT INTO _artifact_pattern_to_file + ("artifact_search_pattern_id", "file_path_id") + VALUES (?, ?)''' + + data = (artifact_regex_id, file_id) + + try: + cursor.execute(sql, data) + lava_db.commit() + except sqlite3.IntegrityError as e: + print(str(e)) + + def lava_finalize_output(output_path): - global lava_data, lava_db - + """ + Finalizes the LAVA output by completing data processing and saving results. + This function performs the following operations: + 1. Sets the processing status to "Complete" + 2. Sorts modules alphabetically by module name + 3. Sorts artifact categories alphabetically + 4. Sorts artifacts within each category alphabetically by name + 5. Saves the LAVA data structure to a JSON file + 6. Closes the SQLite database connection + Args: + output_path (str): The directory path where the LAVA JSON output file will be saved + Global Variables: + lava_data (dict): Global dictionary containing LAVA processing data including modules, + artifacts, and processing status + lava_db: Global SQLite database connection object + lava_json_name (str): The filename for the LAVA JSON output file + """ + lava_data["processing_status"] = "Complete" - + # Sort modules alphabetically lava_data["modules"].sort(key=lambda x: x["module_name"]) - + # Sort artifacts categories alphabetically lava_data["artifacts"] = OrderedDict(sorted(lava_data["artifacts"].items())) - + # Sort artifacts within each category alphabetically for category in lava_data["artifacts"]: lava_data["artifacts"][category].sort(key=lambda x: x["name"]) - + + lava_data["parser_info"]["end_timestamp"] = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + # Save LAVA JSON output - with open(os.path.join(output_path, '_lava_data.json'), 'w') as f: + with open(os.path.join(output_path, lava_json_name), 'w', encoding='utf-8') as f: json.dump(lava_data, f, indent=4) - + # Close the SQLite database - lava_db.close() \ No newline at end of file + lava_db.close() diff --git a/scripts/report.py b/scripts/report.py index 078313e6..92c7059d 100644 --- a/scripts/report.py +++ b/scripts/report.py @@ -6,7 +6,7 @@ from collections import OrderedDict from scripts.html_parts import * from scripts.ilapfuncs import logfunc -from scripts.version_info import aleapp_version, aleapp_contributors +from scripts.version_info import leapp_version, aleapp_contributors from scripts.report_icons import icon_mappings, feather_icon_names def get_icon_name(category, artifact): @@ -199,15 +199,15 @@ def create_index_html(reportfolderbase, time_in_secs, time_HMS, extraction_type, """ # Get script run log (this will be tab2) - devinfo_files_path = os.path.join(reportfolderbase, 'Script Logs', 'DeviceInfo.html') + devinfo_files_path = os.path.join(reportfolderbase, '_HTML', '_Script_Logs', 'DeviceInfo.html') tab2_content = get_file_content(devinfo_files_path) # Get script run log (this will be tab3) - script_log_path = os.path.join(reportfolderbase, 'Script Logs', 'Screen Output.html') + script_log_path = os.path.join(reportfolderbase, '_HTML', '_Script_Logs', 'Screen_Output.html') tab3_content = get_file_content(script_log_path) - # Get processed files list (this will be tab3) - processed_files_path = os.path.join(reportfolderbase, 'Script Logs', 'ProcessedFilesLog.html') + # Get processed files list (this will be tab4) + processed_files_path = os.path.join(reportfolderbase, '_HTML', '_Script_Logs', 'ProcessedFilesLog.html') tab4_content = get_file_content(processed_files_path) content += tabs_code.format(tab1_content, tab2_content, tab3_content, tab4_content) @@ -228,7 +228,7 @@ def create_index_html(reportfolderbase, time_in_secs, time_HMS, extraction_type, html_reportfolderbase.mkdir(exist_ok=True) with html_reportfolderbase.joinpath(filename).open('w', encoding='utf8') as f: f.write(page_header.format(page_title)) - f.write(body_start.format(f"ALEAPP {aleapp_version}")) + f.write(body_start.format(f"ALEAPP {leapp_version}")) f.write(body_sidebar_setup + active_nav_list_data + body_sidebar_trailer) f.write(body_main_header + body_main_data_title.format(body_heading, body_description)) f.write(content) diff --git a/scripts/version_info.py b/scripts/version_info.py index 2be0127c..cb61c9d6 100755 --- a/scripts/version_info.py +++ b/scripts/version_info.py @@ -4,7 +4,8 @@ Leave blank if not available """ -aleapp_version = '3.6.0-dev.0' +leapp_name = 'ALEAPP' +leapp_version = '2.6.0-dev.1' aleapp_contributors = [ ['Alexis Brignoni', 'https://abrignoni.com', '@AlexisBrignoni', 'https://github.com/abrignoni'],