From 8d3c6f1966744e869d3713618e5d6b5271a150ff Mon Sep 17 00:00:00 2001 From: Sukochev Date: Thu, 23 Apr 2026 16:22:39 +0930 Subject: [PATCH 1/3] instagram-threads-update --- scripts/artifacts/instagramThreads.py | 291 +++++++++++++++----------- 1 file changed, 169 insertions(+), 122 deletions(-) diff --git a/scripts/artifacts/instagramThreads.py b/scripts/artifacts/instagramThreads.py index 01b647357..c53e58912 100644 --- a/scripts/artifacts/instagramThreads.py +++ b/scripts/artifacts/instagramThreads.py @@ -1,62 +1,94 @@ -import sqlite3 +__artifacts_v2__ = { + "get_instagramThreads": { + "name": "Instagram Threads", + "description": "Existing messages sent and received in the Instagram App.", + "author": "Alexis Brignoni", + "version": "0.7", + "creation_date": "2021-03-09", + "last_update_date": "2026-04-23", + "requirements": "", + "category": "Instagram", + "notes": "", + "paths": ( + "*/mobile/Containers/Data/Application/*/Library/Application Support/DirectSQLiteDatabase/*.db*" + ), + "output_types": ["html", "tsv", "timeline"], + "artifact_icon": "instagram", + } +} + +import biplist import io -import json +import plistlib +import sys + import nska_deserialize as nd +from scripts.ilapfuncs import artifact_processor, logfunc, open_sqlite_db_readonly -from packaging import version -from scripts.artifact_report import ArtifactHtmlReport -from scripts.ilapfuncs import logfunc, logdevinfo, timeline, kmlgen, tsv, is_platform_windows, open_sqlite_db_readonly +@artifact_processor +def get_instagramThreads( + files_found, report_folder, seeker, wrap_text, timezone_offset +): -def get_instagramThreads(files_found, report_folder, seeker, wrap_text, timezone_offset): for file_found in files_found: file_found = str(file_found) - - if file_found.endswith('.db'): + + if file_found.endswith(".db"): break - + db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - cursor.execute(''' + cursor.execute(""" select metadata from threads - ''') - + """) + all_rows = cursor.fetchall() usageentries = len(all_rows) fila = 0 userdict = {} data_list = [] video_calls = [] - + if usageentries > 0: for row in all_rows: - plist = '' + plist = "" plist_file_object = io.BytesIO(row[0]) - if row[0].find(b'NSKeyedArchiver') == -1: + if row[0].find(b"NSKeyedArchiver") == -1: if sys.version_info >= (3, 9): plist = plistlib.load(plist_file_object) else: plist = biplist.readPlist(plist_file_object) else: try: - plist = nd.deserialize_plist(plist_file_object) - except (nd.DeserializeError, nd.biplist.NotBinaryPlistException, nd.biplist.InvalidPlistException, - nd.plistlib.InvalidFileException, nd.ccl_bplist.BplistError, ValueError, TypeError, OSError, OverflowError) as ex: - logfunc(f'Failed to read plist for {row[0]}, error was:' + str(ex)) - - for i in plist['NSArray*users']: - for x, y in enumerate(plist['NSArray*users']): - userPk = plist['NSArray*users'][x]['pk'] - userFull = (plist['NSArray*users'][x]['fullName']) + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[0]}, error was:" + str(ex)) + + for i in plist["NSArray*users"]: + for x, y in enumerate(plist["NSArray*users"]): + userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] + userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] userdict[userPk] = userFull - - inviterPk = plist['IGUser*inviter']['pk'] - inviterFull = plist['IGUser*inviter']['fullName'] + + # DEPRECATED? + inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] + inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] userdict[inviterPk] = inviterFull - - cursor.execute(''' + + cursor.execute(""" select messages.message_id, messages.thread_id, @@ -66,120 +98,135 @@ def get_instagramThreads(files_found, report_folder, seeker, wrap_text, timezone threads.visual_message_info from messages, threads where messages.thread_id = threads.thread_id - ''') + """) all_rows = cursor.fetchall() usageentries = len(all_rows) - + if usageentries > 0: for row in all_rows: - plist = '' - senderpk ='' - serverTimestamp = '' - message = '' - videoChatTitle = '' - videoChatCallID = '' - dmreaction = '' - reactionServerTimestamp = '' - reactionUserID = '' - sharedMediaID = '' - sharedMediaURL = '' - + plist = "" + senderpk = "" + serverTimestamp = "" + message = "" + videoChatTitle = "" + videoChatCallID = "" + dmreaction = "" + reactionServerTimestamp = "" + reactionUserID = "" + sharedMediaID = "" + sharedMediaURL = "" + shared_media_url_expiration_date = "" + plist_file_object = io.BytesIO(row[2]) - if row[2].find(b'NSKeyedArchiver') == -1: + if row[2].find(b"NSKeyedArchiver") == -1: if sys.version_info >= (3, 9): plist = plistlib.load(plist_file_object) else: plist = biplist.readPlist(plist_file_object) else: try: - plist = nd.deserialize_plist(plist_file_object) - except (nd.DeserializeError, nd.biplist.NotBinaryPlistException, nd.biplist.InvalidPlistException, - nd.plistlib.InvalidFileException, nd.ccl_bplist.BplistError, ValueError, TypeError, OSError, OverflowError) as ex: - logfunc(f'Failed to read plist for {row[2]}, error was:' + str(ex)) - - #Messages - senderpk = plist['IGDirectPublishedMessageMetadata*metadata']['NSString*senderPk'] - serverTimestamp = plist['IGDirectPublishedMessageMetadata*metadata']['NSDate*serverTimestamp'] - message = plist['IGDirectPublishedMessageContent*content'].get('NSString*string') - - #VOIP calls - if plist['IGDirectPublishedMessageContent*content'].get('IGDirectThreadActivityAnnouncement*threadActivity') is not None: - videoChatTitle = plist['IGDirectPublishedMessageContent*content']['IGDirectThreadActivityAnnouncement*threadActivity'].get('NSString*voipTitle') - videoChatCallID = plist['IGDirectPublishedMessageContent*content']['IGDirectThreadActivityAnnouncement*threadActivity'].get('NSString*videoCallId') - - - #Reactions - reactions = (plist['NSArray*reactions']) + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[2]}, error was:" + str(ex)) + + # Messages + senderpk = plist["IGDirectPublishedMessageMetadata*metadata"]["NSString*senderPk"] + serverTimestamp = plist["IGDirectPublishedMessageMetadata*metadata"]["NSDate*serverTimestamp"] + message = plist["IGDirectPublishedMessageContent*content"].get("NSString*string") + + # VOIP calls + if (plist["IGDirectPublishedMessageContent*content"].get("IGDirectThreadActivityAnnouncement*threadActivity") is not None): + videoChatTitle = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*voipTitle") + videoChatCallID = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*videoCallId") + + # Reactions + reactions = plist["NSArray*reactions"] if reactions: - dmreaction = reactions[0].get('emojiUnicode') - reactionServerTimestamp = reactions[0].get('serverTimestamp') - reactionUserID = reactions[0].get('userId') - - #Shared media - if (plist['IGDirectPublishedMessageContent*content'].get('IGDirectPublishedMessageMedia*media')): + dmreaction = reactions[0].get("emojiUnicode") + reactionServerTimestamp = reactions[0].get("serverTimestamp") + reactionUserID = reactions[0].get("userId") + + # Shared media + if plist["IGDirectPublishedMessageContent*content"].get("IGDirectPublishedMessageMedia*media"): try: - sharedMediaID = plist['IGDirectPublishedMessageContent*content']['IGDirectPublishedMessageMedia*media']['IGDirectPublishedMessagePermanentMedia*permanentMedia']['IGPhoto*photo']['kIGPhotoMediaID'] + sharedMediaID = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["kIGPhotoMediaID"] except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: - print('Had exception: ' + str(ex)) + print("Had exception: " + str(ex)) sharedMediaID = None - + + try: + sharedMediaURL = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["url"]["NS.relative"] + except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: + print("Had exception: " + str(ex)) + sharedMediaURL = None + try: - sharedMediaURL = plist['IGDirectPublishedMessageContent*content']['IGDirectPublishedMessageMedia*media']['IGDirectPublishedMessagePermanentMedia*permanentMedia']['IGPhoto*photo']['imageVersions'][0]['url']['NS.relative'] + shared_media_url_expiration_date = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["expiration_date"] except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: - print('Had exception: ' + str(ex)) - sharedMediaURL= None - + print("Had exception: " + str(ex)) + shared_media_url_expiration_date = None + if senderpk in userdict: user = userdict[senderpk] else: - user = '' - - data_list.append((serverTimestamp, senderpk, user, message, videoChatTitle, videoChatCallID, dmreaction, reactionServerTimestamp, reactionUserID, sharedMediaID, sharedMediaURL)) + user = "" + + data_list.append( + ( + serverTimestamp, + senderpk, + user, + message, + videoChatTitle, + videoChatCallID, + dmreaction, + reactionServerTimestamp, + reactionUserID, + sharedMediaID, + sharedMediaURL, + shared_media_url_expiration_date, + ) + ) if videoChatTitle: - video_calls.append((serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID)) - - description = 'Instagram Threads' - report = ArtifactHtmlReport('Instagram Threads') - report.start_artifact_report(report_folder, 'Instagram Threads', description) - report.add_script() - data_headers = ('Timestamp', 'Sender ID', 'Username', 'Message', 'Video Chat Title', 'Video Chat ID', 'DM Reaction', 'DM Reaction Server Timestamp', 'Reaction User ID', 'Shared Media ID', 'Shared Media URL') - report.write_artifact_data_table(data_headers, data_list, file_found) - report.end_artifact_report() - - tsvname = 'Instagram Threads' - tsv(report_folder, data_headers, data_list, tsvname) - - tlactivity = 'Instagram Threads' - timeline(report_folder, tlactivity, data_list, data_headers) - - else: - logfunc('No Instagram Threads data available') - + video_calls.append( + (serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID) + ) + + data_headers = ( + "Timestamp (UTC)", + "Sender ID", + "Username", + "Message", + "Video Chat Title", + "Video Chat ID", + "DM Reaction", + "DM Reaction Server Timestamp", + "Reaction User ID", + "Shared Media ID", + "Shared Media URL", + "Shared Media URL Expiration Date" + ) + return data_headers, data_list, file_found + if len(video_calls) > 0: - description = 'Instagram Threads Calls' - report = ArtifactHtmlReport('Instagram Threads Calls') - report.start_artifact_report(report_folder, 'Instagram Threads Calls', description) - report.add_script() - data_headersv = ('Timestamp', 'Sender ID', 'Username', 'Video Chat Title', 'Video Chat ID') - report.write_artifact_data_table(data_headersv, video_calls, file_found) - report.end_artifact_report() - - tsvname = 'Instagram Threads Calls' - tsv(report_folder, data_headersv, video_calls, tsvname) - - tlactivity = 'Instagram Threads Calls' - timeline(report_folder, tlactivity, video_calls, data_headersv) - - else: - logfunc('No Instagram Threads Video Calls data available') - + data_headersv = ( + "Timestamp", + "Sender ID", + "Username", + "Video Chat Title", + "Video Chat ID", + ) + return data_headersv, video_calls, file_found + db.close() - - -__artifacts__ = { - "instagramThreads": ( - "Instagram", - ('*/mobile/Containers/Data/Application/*/Library/Application Support/DirectSQLiteDatabase/*.db*'), - get_instagramThreads) -} \ No newline at end of file From ae8d60e8bda1c31ce214e657ad3051d409425ec0 Mon Sep 17 00:00:00 2001 From: Sukochev Date: Fri, 24 Apr 2026 16:18:25 +0930 Subject: [PATCH 2/3] instagram-threads-update --- scripts/artifacts/instagramThreads.py | 384 +++++++++++++------------- 1 file changed, 191 insertions(+), 193 deletions(-) diff --git a/scripts/artifacts/instagramThreads.py b/scripts/artifacts/instagramThreads.py index c53e58912..dfd173587 100644 --- a/scripts/artifacts/instagramThreads.py +++ b/scripts/artifacts/instagramThreads.py @@ -28,205 +28,203 @@ @artifact_processor def get_instagramThreads( - files_found, report_folder, seeker, wrap_text, timezone_offset + files_found, _report_folder, _seeker, _wrap_text, _timezone_offset ): for file_found in files_found: file_found = str(file_found) if file_found.endswith(".db"): - break - - db = open_sqlite_db_readonly(file_found) - cursor = db.cursor() - cursor.execute(""" - select - metadata - from threads - """) - - all_rows = cursor.fetchall() - usageentries = len(all_rows) - fila = 0 - userdict = {} - data_list = [] - video_calls = [] - - if usageentries > 0: - for row in all_rows: - plist = "" - plist_file_object = io.BytesIO(row[0]) - if row[0].find(b"NSKeyedArchiver") == -1: - if sys.version_info >= (3, 9): - plist = plistlib.load(plist_file_object) - else: - plist = biplist.readPlist(plist_file_object) - else: - try: - plist = nd.deserialize_plist(plist_file_object) - except ( - nd.DeserializeError, - nd.biplist.NotBinaryPlistException, - nd.biplist.InvalidPlistException, - nd.plistlib.InvalidFileException, - nd.ccl_bplist.BplistError, - ValueError, - TypeError, - OSError, - OverflowError, - ) as ex: - logfunc(f"Failed to read plist for {row[0]}, error was:" + str(ex)) - - for i in plist["NSArray*users"]: - for x, y in enumerate(plist["NSArray*users"]): - userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] - userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] - userdict[userPk] = userFull - - # DEPRECATED? - inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] - inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] - userdict[inviterPk] = inviterFull - - cursor.execute(""" - select - messages.message_id, - messages.thread_id, - messages.archive, - threads.metadata, - threads.thread_messages_range, - threads.visual_message_info - from messages, threads - where messages.thread_id = threads.thread_id - """) - - all_rows = cursor.fetchall() - usageentries = len(all_rows) - - if usageentries > 0: - for row in all_rows: - plist = "" - senderpk = "" - serverTimestamp = "" - message = "" - videoChatTitle = "" - videoChatCallID = "" - dmreaction = "" - reactionServerTimestamp = "" - reactionUserID = "" - sharedMediaID = "" - sharedMediaURL = "" - shared_media_url_expiration_date = "" - - plist_file_object = io.BytesIO(row[2]) - if row[2].find(b"NSKeyedArchiver") == -1: - if sys.version_info >= (3, 9): - plist = plistlib.load(plist_file_object) - else: - plist = biplist.readPlist(plist_file_object) - else: - try: - plist = nd.deserialize_plist(plist_file_object) - except ( - nd.DeserializeError, - nd.biplist.NotBinaryPlistException, - nd.biplist.InvalidPlistException, - nd.plistlib.InvalidFileException, - nd.ccl_bplist.BplistError, - ValueError, - TypeError, - OSError, - OverflowError, - ) as ex: - logfunc(f"Failed to read plist for {row[2]}, error was:" + str(ex)) - - # Messages - senderpk = plist["IGDirectPublishedMessageMetadata*metadata"]["NSString*senderPk"] - serverTimestamp = plist["IGDirectPublishedMessageMetadata*metadata"]["NSDate*serverTimestamp"] - message = plist["IGDirectPublishedMessageContent*content"].get("NSString*string") - - # VOIP calls - if (plist["IGDirectPublishedMessageContent*content"].get("IGDirectThreadActivityAnnouncement*threadActivity") is not None): - videoChatTitle = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*voipTitle") - videoChatCallID = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*videoCallId") - - # Reactions - reactions = plist["NSArray*reactions"] - if reactions: - dmreaction = reactions[0].get("emojiUnicode") - reactionServerTimestamp = reactions[0].get("serverTimestamp") - reactionUserID = reactions[0].get("userId") - - # Shared media - if plist["IGDirectPublishedMessageContent*content"].get("IGDirectPublishedMessageMedia*media"): - try: - sharedMediaID = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["kIGPhotoMediaID"] - except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: - print("Had exception: " + str(ex)) - sharedMediaID = None - - try: - sharedMediaURL = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["url"]["NS.relative"] - except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: - print("Had exception: " + str(ex)) - sharedMediaURL = None - - try: - shared_media_url_expiration_date = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["expiration_date"] - except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: - print("Had exception: " + str(ex)) - shared_media_url_expiration_date = None - - if senderpk in userdict: - user = userdict[senderpk] - else: - user = "" - - data_list.append( - ( - serverTimestamp, - senderpk, - user, - message, - videoChatTitle, - videoChatCallID, - dmreaction, - reactionServerTimestamp, - reactionUserID, - sharedMediaID, - sharedMediaURL, - shared_media_url_expiration_date, + + db = open_sqlite_db_readonly(file_found) + cursor = db.cursor() + cursor.execute(""" + select + metadata + from threads + """) + + all_rows = cursor.fetchall() + usageentries = len(all_rows) + userdict = {} + data_list = [] + video_calls = [] + + if usageentries > 0: + for row in all_rows: + plist = "" + plist_file_object = io.BytesIO(row[0]) + if row[0].find(b"NSKeyedArchiver") == -1: + if sys.version_info >= (3, 9): + plist = plistlib.load(plist_file_object) + else: + plist = biplist.readPlist(plist_file_object) + else: + try: + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[0]}, error was:" + str(ex)) + + for _i in plist["NSArray*users"]: + for x, _y in enumerate(plist["NSArray*users"]): + userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] + userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] + userdict[userPk] = userFull + + # DEPRECATED? + inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] + inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] + userdict[inviterPk] = inviterFull + + cursor.execute(""" + select + messages.message_id, + messages.thread_id, + messages.archive, + threads.metadata, + threads.thread_messages_range, + threads.visual_message_info + from messages, threads + where messages.thread_id = threads.thread_id + """) + + all_rows = cursor.fetchall() + usageentries = len(all_rows) + + if usageentries > 0: + for row in all_rows: + plist = "" + senderpk = "" + serverTimestamp = "" + message = "" + videoChatTitle = "" + videoChatCallID = "" + dmreaction = "" + reactionServerTimestamp = "" + reactionUserID = "" + sharedMediaID = "" + sharedMediaURL = "" + shared_media_url_expiration_date = "" + + plist_file_object = io.BytesIO(row[2]) + if row[2].find(b"NSKeyedArchiver") == -1: + if sys.version_info >= (3, 9): + plist = plistlib.load(plist_file_object) + else: + plist = biplist.readPlist(plist_file_object) + else: + try: + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[2]}, error was:" + str(ex)) + + # Messages + senderpk = plist["IGDirectPublishedMessageMetadata*metadata"]["NSString*senderPk"] + serverTimestamp = plist["IGDirectPublishedMessageMetadata*metadata"]["NSDate*serverTimestamp"] + message = plist["IGDirectPublishedMessageContent*content"].get("NSString*string") + + # VOIP calls + if (plist["IGDirectPublishedMessageContent*content"].get("IGDirectThreadActivityAnnouncement*threadActivity") is not None): + videoChatTitle = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*voipTitle") + videoChatCallID = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*videoCallId") + + # Reactions + reactions = plist["NSArray*reactions"] + if reactions: + dmreaction = reactions[0].get("emojiUnicode") + reactionServerTimestamp = reactions[0].get("serverTimestamp") + reactionUserID = reactions[0].get("userId") + + # Shared media + if plist["IGDirectPublishedMessageContent*content"].get("IGDirectPublishedMessageMedia*media"): + try: + sharedMediaID = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["kIGPhotoMediaID"] + except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: + print("Had exception: " + str(ex)) + sharedMediaID = None + + try: + sharedMediaURL = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["url"]["NS.relative"] + except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: + print("Had exception: " + str(ex)) + sharedMediaURL = None + + try: + shared_media_url_expiration_date = plist["IGDirectPublishedMessageContent*content"]["IGDirectPublishedMessageMedia*media"]["IGDirectPublishedMessagePermanentMedia*permanentMedia"]["IGPhoto*photo"]["imageVersions"][0]["expiration_date"] + except (KeyError, ValueError, TypeError, OSError, OverflowError) as ex: + print("Had exception: " + str(ex)) + shared_media_url_expiration_date = None + + if senderpk in userdict: + user = userdict[senderpk] + else: + user = "" + + data_list.append( + ( + serverTimestamp, + senderpk, + user, + message, + videoChatTitle, + videoChatCallID, + dmreaction, + reactionServerTimestamp, + reactionUserID, + sharedMediaID, + sharedMediaURL, + shared_media_url_expiration_date, + ) + ) + if videoChatTitle: + video_calls.append( + (serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID) + ) + + data_headers = ( + "Timestamp (UTC)", + "Sender ID", + "Username", + "Message", + "Video Chat Title", + "Video Chat ID", + "DM Reaction", + "DM Reaction Server Timestamp", + "Reaction User ID", + "Shared Media ID", + "Shared Media URL", + "Shared Media URL Expiration Date" ) - ) - if videoChatTitle: - video_calls.append( - (serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID) + return data_headers, data_list, file_found + + if len(video_calls) > 0: + data_headersv = ( + "Timestamp", + "Sender ID", + "Username", + "Video Chat Title", + "Video Chat ID", ) + return data_headersv, video_calls, file_found - data_headers = ( - "Timestamp (UTC)", - "Sender ID", - "Username", - "Message", - "Video Chat Title", - "Video Chat ID", - "DM Reaction", - "DM Reaction Server Timestamp", - "Reaction User ID", - "Shared Media ID", - "Shared Media URL", - "Shared Media URL Expiration Date" - ) - return data_headers, data_list, file_found - - if len(video_calls) > 0: - data_headersv = ( - "Timestamp", - "Sender ID", - "Username", - "Video Chat Title", - "Video Chat ID", - ) - return data_headersv, video_calls, file_found - - db.close() + db.close() From f373a4baa4e7fd74933251a1d8515ebc473444a2 Mon Sep 17 00:00:00 2001 From: Sukochev Date: Wed, 29 Apr 2026 10:25:52 +0930 Subject: [PATCH 3/3] instagram-threads-update --- scripts/artifacts/instagramThreads.py | 204 ++++++++++++++++++++++++-- 1 file changed, 192 insertions(+), 12 deletions(-) diff --git a/scripts/artifacts/instagramThreads.py b/scripts/artifacts/instagramThreads.py index dfd173587..9f2757a54 100644 --- a/scripts/artifacts/instagramThreads.py +++ b/scripts/artifacts/instagramThreads.py @@ -14,6 +14,22 @@ ), "output_types": ["html", "tsv", "timeline"], "artifact_icon": "instagram", + }, + "get_instagram_calls": { + "name": "Instagram Threads Calls", + "description": "Existing calls sent and received in the Instagram App.", + "author": "Alexis Brignoni", + "version": "0.7", + "creation_date": "2021-03-09", + "last_update_date": "2026-04-23", + "requirements": "", + "category": "Instagram", + "notes": "", + "paths": ( + "*/mobile/Containers/Data/Application/*/Library/Application Support/DirectSQLiteDatabase/*.db*" + ), + "output_types": ["html", "tsv", "timeline"], + "artifact_icon": "phone", } } @@ -48,7 +64,6 @@ def get_instagramThreads( usageentries = len(all_rows) userdict = {} data_list = [] - video_calls = [] if usageentries > 0: for row in all_rows: @@ -74,16 +89,39 @@ def get_instagramThreads( OverflowError, ) as ex: logfunc(f"Failed to read plist for {row[0]}, error was:" + str(ex)) - + for _i in plist["NSArray*users"]: for x, _y in enumerate(plist["NSArray*users"]): - userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] - userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] + if plist["NSArray*users"][x].get("NSDictionary*userDict", {}).get("pk"): + userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] + elif plist["NSArray*users"][x].get("pk"): + userPk = plist["NSArray*users"][x]["pk"] + else: + userPk = None + + if plist["NSArray*users"][x].get("NSDictionary*userDict", {}).get("full_name"): + userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] + elif plist["NSArray*users"][x].get("fullName"): + userFull = (plist["NSArray*users"][x]["fullName"]) + elif plist["NSArray*users"][x].get("userName"): + userFull = (plist["NSArray*users"][x]["userName"]) + else: + userFull = None userdict[userPk] = userFull + + if plist.get("IGUser*inviter_DEPRECATED", {}).get("NSDictionary*userDict", {}).get("pk"): + inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] + elif plist["IGUser*inviter"].get("pk"): + inviterPk = plist["IGUser*inviter"]["pk"] + else: + inviterPk = None - # DEPRECATED? - inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] - inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] + if plist.get("IGUser*inviter_DEPRECATED", {}).get("NSDictionary*userDict", {}).get("full_name"): + inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] + elif plist["IGUser*inviter"].get("fullName"): + inviterFull = plist["IGUser*inviter"]["fullName"] + else: + inviterFull = None userdict[inviterPk] = inviterFull cursor.execute(""" @@ -196,10 +234,6 @@ def get_instagramThreads( shared_media_url_expiration_date, ) ) - if videoChatTitle: - video_calls.append( - (serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID) - ) data_headers = ( "Timestamp (UTC)", @@ -217,7 +251,153 @@ def get_instagramThreads( ) return data_headers, data_list, file_found - if len(video_calls) > 0: + db.close() + + +@artifact_processor +def get_instagram_calls( + files_found, _report_folder, _seeker, _wrap_text, _timezone_offset +): + + for file_found in files_found: + file_found = str(file_found) + + if file_found.endswith(".db"): + + db = open_sqlite_db_readonly(file_found) + cursor = db.cursor() + cursor.execute(""" + select + metadata + from threads + """) + + all_rows = cursor.fetchall() + usageentries = len(all_rows) + userdict = {} + video_calls = [] + + if usageentries > 0: + for row in all_rows: + plist = "" + plist_file_object = io.BytesIO(row[0]) + if row[0].find(b"NSKeyedArchiver") == -1: + if sys.version_info >= (3, 9): + plist = plistlib.load(plist_file_object) + else: + plist = biplist.readPlist(plist_file_object) + else: + try: + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[0]}, error was:" + str(ex)) + + for _i in plist["NSArray*users"]: + for x, _y in enumerate(plist["NSArray*users"]): + if plist["NSArray*users"][x].get("NSDictionary*userDict", {}).get("pk"): + userPk = plist["NSArray*users"][x]["NSDictionary*userDict"]["pk"] + elif plist["NSArray*users"][x].get("pk"): + userPk = plist["NSArray*users"][x]["pk"] + else: + userPk = None + + if plist["NSArray*users"][x].get("NSDictionary*userDict", {}).get("full_name"): + userFull = plist["NSArray*users"][x]["NSDictionary*userDict"]["full_name"] + elif plist["NSArray*users"][x].get("fullName"): + userFull = (plist["NSArray*users"][x]["fullName"]) + elif plist["NSArray*users"][x].get("userName"): + userFull = (plist["NSArray*users"][x]["userName"]) + else: + userFull = None + userdict[userPk] = userFull + + if plist.get("IGUser*inviter_DEPRECATED", {}).get("NSDictionary*userDict", {}).get("pk"): + inviterPk = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["pk"] + elif plist["IGUser*inviter"].get("pk"): + inviterPk = plist["IGUser*inviter"]["pk"] + else: + inviterPk = None + + if plist.get("IGUser*inviter_DEPRECATED", {}).get("NSDictionary*userDict", {}).get("full_name"): + inviterFull = plist["IGUser*inviter_DEPRECATED"]["NSDictionary*userDict"]["full_name"] + elif plist["IGUser*inviter"].get("fullName"): + inviterFull = plist["IGUser*inviter"]["fullName"] + else: + inviterFull = None + userdict[inviterPk] = inviterFull + + cursor.execute(""" + select + messages.message_id, + messages.thread_id, + messages.archive, + threads.metadata, + threads.thread_messages_range, + threads.visual_message_info + from messages, threads + where messages.thread_id = threads.thread_id + """) + + all_rows = cursor.fetchall() + usageentries = len(all_rows) + + if usageentries > 0: + for row in all_rows: + plist = "" + senderpk = "" + serverTimestamp = "" + videoChatTitle = "" + videoChatCallID = "" + + plist_file_object = io.BytesIO(row[2]) + if row[2].find(b"NSKeyedArchiver") == -1: + if sys.version_info >= (3, 9): + plist = plistlib.load(plist_file_object) + else: + plist = biplist.readPlist(plist_file_object) + else: + try: + plist = nd.deserialize_plist(plist_file_object) + except ( + nd.DeserializeError, + nd.biplist.NotBinaryPlistException, + nd.biplist.InvalidPlistException, + nd.plistlib.InvalidFileException, + nd.ccl_bplist.BplistError, + ValueError, + TypeError, + OSError, + OverflowError, + ) as ex: + logfunc(f"Failed to read plist for {row[2]}, error was:" + str(ex)) + + # Messages + senderpk = plist["IGDirectPublishedMessageMetadata*metadata"]["NSString*senderPk"] + serverTimestamp = plist["IGDirectPublishedMessageMetadata*metadata"]["NSDate*serverTimestamp"] + + # VOIP calls + if (plist["IGDirectPublishedMessageContent*content"].get("IGDirectThreadActivityAnnouncement*threadActivity") is not None): + videoChatTitle = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*voipTitle") + videoChatCallID = plist["IGDirectPublishedMessageContent*content"]["IGDirectThreadActivityAnnouncement*threadActivity"].get("NSString*videoCallId") + + if senderpk in userdict: + user = userdict[senderpk] + else: + user = "" + + if videoChatTitle: + video_calls.append((serverTimestamp, senderpk, user, videoChatTitle, videoChatCallID)) + data_headersv = ( "Timestamp", "Sender ID",