Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions source/app/business/iocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from app.iris_engine.utils.tracker import track_activity
from app.models.errors import BusinessProcessingError
from app.models.errors import ObjectNotFoundError
from app.models.authorization import User
from app.models.comments import Comments, IocComments
from app.datamgmt.case.case_iocs_db import get_ioc
from app.util import add_obj_history_entry
from app.datamgmt.case.case_iocs_db import get_filtered_iocs
Expand Down Expand Up @@ -106,8 +108,33 @@ def iocs_delete(ioc: Ioc):

def iocs_exports_to_json(case_id):
iocs = get_iocs(case_id)

return IocSchema().dump(iocs, many=True)
serialized_iocs = IocSchema().dump(iocs, many=True)

for ioc in serialized_iocs:
ioc['comments'] = _ioc_comments_export_to_json(ioc['ioc_id'])

return serialized_iocs


def _ioc_comments_export_to_json(ioc_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
IocComments.comment_ioc_id == ioc_id
).join(
IocComments,
Comments.comment_id == IocComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def iocs_build_filter_query(ioc_id: int = None,
Expand Down
126 changes: 118 additions & 8 deletions source/app/datamgmt/reporter/report_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from sqlalchemy import desc

from app.datamgmt.case.case_notes_db import get_notes_from_group
from app.datamgmt.case.case_notes_db import get_case_note_comments
from app.models.assets import CompromiseStatus, AssetsType, CaseAssets, AnalysisStatus
from app.models.models import TaskAssignee
from app.models.models import CaseEventsAssets
Expand All @@ -30,7 +29,7 @@
from app.models.models import CaseTasks
from app.models.cases import Cases
from app.models.cases import CasesEvent
from app.models.comments import Comments
from app.models.comments import Comments, TaskComments, AssetComments, EventComments, NotesComments, EvidencesComments
from app.models.models import EventCategory
from app.models.iocs import Ioc
from app.models.models import IocAssetLink
Expand All @@ -41,7 +40,6 @@
from app.models.iocs import Tlp
from app.models.authorization import User
from app.schema.marshables import CaseDetailsSchema
from app.schema.marshables import CommentSchema
from app.schema.marshables import CaseNoteSchema


Expand Down Expand Up @@ -173,35 +171,80 @@ def export_case_evidences_json(case_id):
).all()

if evidences:
serialized_evidences = []
for row in evidences:
serialized_evidence = row._asdict()
serialized_evidence['comments'] = export_case_evidence_comments_json(serialized_evidence['id'])
serialized_evidences.append(serialized_evidence)

return [row._asdict() for row in evidences]
return serialized_evidences

return []


def export_case_evidence_comments_json(evidence_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
EvidencesComments.comment_evidence_id == evidence_id
).join(
EvidencesComments,
Comments.comment_id == EvidencesComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def export_case_notes_json(case_id):
# Fetch all notes associated with the case
notes = Notes.query.filter(
Notes.note_case_id == case_id
).all()

# Initialize the schemas
# Initialize the schema
note_schema = CaseNoteSchema()
comments_schema = CommentSchema(many=True)

# Serialize the notes and their comments
serialized_notes = []
for note in notes:
note_comments = get_case_note_comments(note.note_id)
serialized_note = note_schema.dump(note)
serialized_note['comments'] = comments_schema.dump(note_comments)
serialized_note['comments'] = export_case_note_comments_json(note.note_id)
serialized_note['note_content'] = process_md_images_links_for_report(serialized_note['note_content'])

serialized_notes.append(serialized_note)

return serialized_notes


def export_case_note_comments_json(note_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
NotesComments.comment_note_id == note_id
).join(
NotesComments,
Comments.comment_id == NotesComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def export_case_tm_json(case_id):
timeline = CasesEvent.query.with_entities(
CasesEvent.event_id,
Expand Down Expand Up @@ -272,6 +315,7 @@ def export_case_tm_json(case_id):
).all()

ras['iocs'] = [ioc._asdict() for ioc in iocs_list]
ras['comments'] = export_case_event_comments_json(row.event_id)

tim.append(ras)

Expand Down Expand Up @@ -328,11 +372,54 @@ def export_case_tasks_json(case_id):
'id': member.id
})
task['task_assignees'] = assignee_list.get(task['id'], [])
task['comments'] = export_case_task_comments_json(task_id)
task_with_assignees.append(task)

return task_with_assignees


def export_case_task_comments_json(task_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
TaskComments.comment_task_id == task_id
).join(
TaskComments,
Comments.comment_id == TaskComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def export_case_event_comments_json(event_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
EventComments.comment_event_id == event_id
).join(
EventComments,
Comments.comment_id == EventComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def export_case_assets_json(case_id):
ret = []

Expand Down Expand Up @@ -379,6 +466,8 @@ def export_case_assets_json(case_id):
else:
row['asset_ioc'] = []

row['comments'] = export_case_asset_comments_json(row['asset_id'])

if row['asset_compromise_status_id'] is None:
row['asset_compromise_status_id'] = CompromiseStatus.unknown.value
status_text = CompromiseStatus.unknown.name.replace('_', ' ').title()
Expand All @@ -392,6 +481,27 @@ def export_case_assets_json(case_id):
return ret


def export_case_asset_comments_json(asset_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Comments.comment_uuid,
Comments.comment_text,
User.name.label('comment_by'),
Comments.comment_date
).filter(
AssetComments.comment_asset_id == asset_id
).join(
AssetComments,
Comments.comment_id == AssetComments.comment_id
).join(
Comments.user
).order_by(
Comments.comment_date.asc()
).all()

return [row._asdict() for row in comments]


def export_case_comments_json(case_id):
comments = Comments.query.with_entities(
Comments.comment_id,
Expand Down
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_asset_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for asset in assets %}
{% for comment in asset.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_event_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for event in timeline %}
{% for comment in event.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_evidence_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for evidence in evidences %}
{% for comment in evidence.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_ioc_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for ioc in iocs %}
{% for comment in ioc.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_note_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for note in notes %}
{% for comment in note.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
5 changes: 5 additions & 0 deletions tests/data/report_templates/variable_task_comments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% for task in tasks %}
{% for comment in task.comments %}
{{ comment.comment_text }}
{% endfor %}
{% endfor %}
102 changes: 102 additions & 0 deletions tests/tests_rest_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,105 @@ def test_generate_md_activities_report_should_render_variable_case_for_customer_
response = self._subject.get(f'/case/report/generate-activities/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertEqual('IrisInitialClient (legacy::use client.customer_name)', response.text)

def test_generate_md_report_should_render_task_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_task_comments.md')
case_identifier = self._subject.create_dummy_case()

task_data = {'task_assignees_id': [], 'task_description': '', 'task_status_id': 1, 'task_tags': '',
'task_title': 'dummy title', 'custom_attributes': {}}
task_response = self._subject.create(f'/api/v2/cases/{case_identifier}/tasks', task_data).json()
task_identifier = task_response['id']
comment_text = 'task comment for report export'
self._subject.create(f'/api/v2/tasks/{task_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)

def test_generate_md_report_should_render_asset_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_asset_comments.md')
case_identifier = self._subject.create_dummy_case()

asset_data = {'asset_type_id': 1, 'asset_name': 'asset with comments'}
asset_response = self._subject.create(f'/api/v2/cases/{case_identifier}/assets', asset_data).json()
asset_identifier = asset_response['asset_id']
comment_text = 'asset comment for report export'
self._subject.create(f'/api/v2/assets/{asset_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)

def test_generate_md_report_should_render_ioc_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_ioc_comments.md')
case_identifier = self._subject.create_dummy_case()

ioc_data = {'ioc_type_id': 1, 'ioc_tlp_id': 2, 'ioc_value': '8.8.8.8', 'ioc_description': '', 'ioc_tags': ''}
ioc_response = self._subject.create(f'/api/v2/cases/{case_identifier}/iocs', ioc_data).json()
ioc_identifier = ioc_response['ioc_id']
comment_text = 'ioc comment for report export'
self._subject.create(f'/api/v2/iocs/{ioc_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)

def test_generate_md_report_should_render_event_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_event_comments.md')
case_identifier = self._subject.create_dummy_case()

event_data = {'event_title': 'title', 'event_category_id': 1,
'event_date': '2025-03-26T00:00:00.000', 'event_tz': '+00:00',
'event_assets': [], 'event_iocs': []}
event_response = self._subject.create(f'/api/v2/cases/{case_identifier}/events', event_data).json()
event_identifier = event_response['event_id']
comment_text = 'event comment for report export'
self._subject.create(f'/api/v2/events/{event_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)

def test_generate_md_report_should_render_note_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_note_comments.md')
case_identifier = self._subject.create_dummy_case()

directory_response = self._subject.create(f'/api/v2/cases/{case_identifier}/notes-directories',
{'name': 'directory_name'}).json()
directory_identifier = directory_response['id']
note_response = self._subject.create(f'/api/v2/cases/{case_identifier}/notes',
{'directory_id': directory_identifier}).json()
note_identifier = note_response['note_id']
comment_text = 'note comment for report export'
self._subject.create(f'/api/v2/notes/{note_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)

def test_generate_md_report_should_render_evidence_comments(self):
data = {'report_name': 'name', 'report_type': 1, 'report_language': 1, 'report_description': 'description',
'report_name_format': 'report_name_format'}
report_identifier = self._subject.create_report(data, 'variable_evidence_comments.md')
case_identifier = self._subject.create_dummy_case()

evidence_response = self._subject.create(f'/api/v2/cases/{case_identifier}/evidences',
{'filename': 'filename'}).json()
evidence_identifier = evidence_response['id']
comment_text = 'evidence comment for report export'
self._subject.create(f'/api/v2/evidences/{evidence_identifier}/comments', {'comment_text': comment_text})

response = self._subject.get(f'/case/report/generate-investigation/{report_identifier}',
{'cid': case_identifier, 'safe': True})
self.assertIn(comment_text, response.text)