diff --git a/src/imio/esign/events.py b/src/imio/esign/events.py
index 1627868..e7f7eed 100644
--- a/src/imio/esign/events.py
+++ b/src/imio/esign/events.py
@@ -39,8 +39,7 @@ def on_categorized_annex_updated(annex, event):
# check scan_id and filename
if update is False:
for file_info in file_infos:
- if file_info and (annex.scan_id != file_info['scan_id'] or \
- annex.file.filename != file_info['filename']):
+ if file_info and (annex.scan_id != file_info['scan_id'] or annex.file.filename != file_info['filename']):
update = True
break
diff --git a/src/imio/esign/testing.py b/src/imio/esign/testing.py
index 1764616..344131b 100644
--- a/src/imio/esign/testing.py
+++ b/src/imio/esign/testing.py
@@ -1,15 +1,24 @@
# -*- coding: utf-8 -*-
+from collective.iconifiedcategory.utils import calculate_category_id
from imio.fpaudit import utils as _fpaudit_utils
+from plone import api
from plone.app.robotframework.testing import REMOTE_LIBRARY_BUNDLE_FIXTURE
from plone.app.testing import applyProfile
from plone.app.testing import FunctionalTesting
from plone.app.testing import IntegrationTesting
+from plone.app.testing import login
from plone.app.testing import PLONE_FIXTURE
from plone.app.testing import PloneSandboxLayer
+from plone.app.testing import setRoles
+from plone.app.testing import TEST_USER_ID
+from plone.app.testing import TEST_USER_NAME
+from plone.namedfile.file import NamedBlobFile
+from plone.namedfile.file import NamedBlobImage
from plone.testing import z2
from zope.globalrequest import setLocal
import imio.esign # noqa: F401
+import os
logged_actions = []
@@ -27,9 +36,6 @@ class ImioEsignLayer(PloneSandboxLayer):
defaultBases = (PLONE_FIXTURE,)
def setUpZope(self, app, configurationContext):
- # Load any other ZCML that is required for your tests.
- # The z3c.autoinclude feature is disabled in the Plone fixture base
- # layer.
import plone.app.dexterity
self.loadZCML(package=plone.app.dexterity)
@@ -45,23 +51,92 @@ def setUpPloneSite(self, portal):
setLocal("request", portal.REQUEST)
applyProfile(portal, "imio.annex:default")
applyProfile(portal, "imio.esign:default")
+ setRoles(portal, TEST_USER_ID, ["Manager"])
+ login(portal, TEST_USER_NAME)
+ self._setup_content_categories(portal)
+ self._setup_shared_content(portal)
+
+ def _setup_content_categories(self, portal):
+ """
+ Creates: portal/annexes_types/annexes/to_sign (ContentCategory, to_sign=True)
+ """
+ at_folder = api.content.create(
+ container=portal,
+ id="annexes_types",
+ title="Annexes Types",
+ type="ContentCategoryConfiguration",
+ exclude_from_nav=True,
+ )
+ category_group = api.content.create(
+ type="ContentCategoryGroup",
+ title="Annexes",
+ container=at_folder,
+ id="annexes",
+ )
+ icon_path = os.path.join(os.path.dirname(__file__), "tests", u"icône1.png")
+ with open(icon_path, "rb") as fl:
+ api.content.create(
+ type="ContentCategory",
+ title="To sign",
+ container=category_group,
+ icon=NamedBlobImage(fl.read(), filename=u"icône1.png"),
+ id="to_sign",
+ predefined_title="To be signed",
+ to_sign=True,
+ show_preview=False,
+ )
+
+ def _setup_shared_content(self, portal):
+ """Pre-create folders and annexes shared across multiple test classes.
+
+ Creates:
+ portal/folder0/annex0,2,4,6,8,10 (annex1.pdf)
+ portal/folder1/annex1,3,5,7,9,11 (annex2.pdf)
+ """
+ pdf_files = ["annex1.pdf", "annex2.pdf"]
+ for f in range(2):
+ api.content.create(container=portal, type="Folder", id="folder{}".format(f), title="Folder {}".format(f))
+ for i in range(12):
+ self._create_annex(
+ portal,
+ portal["folder{}".format(i % 2)],
+ "annex{}".format(i),
+ "Annex {}".format(i),
+ "0123456000000{:02d}".format(i),
+ pdf_filename=pdf_files[i % 2],
+ )
+
+ def _create_annex(self, portal, container, annex_id, title, scan_id, pdf_filename="annex1.pdf"):
+ tests_dir = os.path.join(os.path.dirname(__file__), "tests")
+ category_id = calculate_category_id(portal["annexes_types"]["annexes"]["to_sign"])
+ with open(os.path.join(tests_dir, pdf_filename), "rb") as f:
+ return api.content.create(
+ container=container,
+ type="annex",
+ id=annex_id,
+ title=title,
+ content_category=category_id,
+ scan_id=scan_id,
+ file=NamedBlobFile(
+ data=f.read(),
+ filename=u"{}.pdf".format(annex_id),
+ contentType="application/pdf",
+ ),
+ )
IMIO_ESIGN_FIXTURE = ImioEsignLayer()
-
IMIO_ESIGN_INTEGRATION_TESTING = IntegrationTesting(
bases=(IMIO_ESIGN_FIXTURE,),
name="ImioEsignLayer:IntegrationTesting",
)
-
IMIO_ESIGN_FUNCTIONAL_TESTING = FunctionalTesting(
bases=(IMIO_ESIGN_FIXTURE,),
name="ImioEsignLayer:FunctionalTesting",
)
-
IMIO_ESIGN_ACCEPTANCE_TESTING = FunctionalTesting(
bases=(
IMIO_ESIGN_FIXTURE,
diff --git a/src/imio/esign/tests/base.py b/src/imio/esign/tests/base.py
new file mode 100644
index 0000000..f634313
--- /dev/null
+++ b/src/imio/esign/tests/base.py
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+"""Shared base test class for imio.esign tests."""
+from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING
+from plone.app.testing import login
+from plone.app.testing import setRoles
+from plone.app.testing import TEST_USER_ID
+from plone.app.testing import TEST_USER_NAME
+
+import os
+import unittest
+
+
+TESTS_DIR = os.path.dirname(__file__)
+
+
+class BaseEsignTest(unittest.TestCase):
+ """Base class: shared layer, minimal setUp, and optionnal helpers."""
+
+ layer = IMIO_ESIGN_INTEGRATION_TESTING
+
+ def setUp(self):
+ self.portal = self.layer["portal"]
+ self.request = self.layer["request"]
+ self.request.form.clear()
+ setRoles(self.portal, TEST_USER_ID, ["Manager"])
+ login(self.portal, TEST_USER_NAME)
diff --git "a/src/imio/esign/tests/ic\303\264ne1.png" "b/src/imio/esign/tests/ic\303\264ne1.png"
new file mode 100644
index 0000000..a804777
Binary files /dev/null and "b/src/imio/esign/tests/ic\303\264ne1.png" differ
diff --git a/src/imio/esign/tests/test_actions.py b/src/imio/esign/tests/test_actions.py
index 65b097e..71fb416 100644
--- a/src/imio/esign/tests/test_actions.py
+++ b/src/imio/esign/tests/test_actions.py
@@ -1,29 +1,23 @@
# -*- coding: utf-8 -*-
"""actions tests for this package."""
from AccessControl import Unauthorized
-from collective.iconifiedcategory.utils import calculate_category_id
+from imio.esign.browser.actions import RemoveFromSessionView
+from imio.esign.browser.actions import RemoveItemFromSessionView
from imio.esign.browser.actions import SessionAnnotationInfoView
-from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING
+from imio.esign.tests.base import BaseEsignTest
from imio.esign.utils import add_files_to_session
from imio.esign.utils import get_session_annotation
from plone import api
from plone.app.testing import login
from plone.app.testing import setRoles
from plone.app.testing import TEST_USER_ID
-from plone.namedfile.file import NamedBlobFile
-from plone.namedfile.file import NamedBlobImage
-from Products.statusmessages.interfaces import IStatusMessage
-from zope.component import getMultiAdapter
-
-import collective.iconifiedcategory
-import os
-import unittest
try:
from html import unescape
except ImportError: # Python 2
from HTMLParser import HTMLParser
+
unescape = HTMLParser().unescape
try:
@@ -32,91 +26,35 @@
string_types = (str,)
-class BaseRemoveFromSession(unittest.TestCase):
- """Base class to centralize setUp."""
-
- layer = IMIO_ESIGN_INTEGRATION_TESTING
-
- def _setup_categories(self):
- at_folder = api.content.create(
- container=self.portal,
- id="annexes_types",
- title="Annexes Types",
- type="ContentCategoryConfiguration",
- exclude_from_nav=True,
- )
- category_group = api.content.create(
- type="ContentCategoryGroup",
- title="Annexes",
- container=at_folder,
- id="annexes",
- )
- icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png")
- with open(icon_path, "rb") as fl:
- api.content.create(
- type="ContentCategory",
- title="To sign",
- container=category_group,
- icon=NamedBlobImage(fl.read(), filename=u"icône1.png"),
- id="to_sign",
- predefined_title="To be signed",
- to_sign=True,
- show_preview=False,
- )
+class TestRemoveItemFromSessionView(BaseEsignTest):
+ """Tests for RemoveItemFromSessionView browser view."""
def setUp(self):
- self.portal = self.layer["portal"]
- self.request = self.portal.REQUEST
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
- self._setup_categories()
- # add users and annexes
- api.user.create(email="user1@sign.com", username="user1", password="password1")
- self.folder = api.content.create(
- container=self.portal, type="Folder", id="test_folder", title="Test Folder"
- )
- tests_dir = os.path.dirname(__file__)
- self.annexes = []
- for i in range(3):
- with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f:
- annex = api.content.create(
- container=self.folder,
- type="annex",
- id="annex{}".format(i),
- title="Annex {}".format(i),
- content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]),
- scan_id="0123456000000{:02d}".format(i),
- file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"),
- )
- self.annexes.append(annex)
+ super(TestRemoveItemFromSessionView, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ self.folder = self.portal["folder0"]
+ self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2, 4)]
self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
-
-
-class TestRemoveItemFromSessionView(BaseRemoveFromSession):
- """Test RemoveItemFromSessionView browser view."""
+ self.view = RemoveItemFromSessionView(self.annexes[0], self.request)
def test_available(self):
- """Test available method returns True."""
- view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session")
- self.assertFalse(view.available())
- uids = [a.UID() for a in self.annexes]
- add_files_to_session(self.signers, uids)
- self.assertTrue(view.available())
+ """Returns False when annex not in session; True once added."""
+ self.assertFalse(self.view.available())
+ add_files_to_session(self.signers, [a.UID() for a in self.annexes])
+ self.assertTrue(self.view.available())
- def test_index_removes_file_from_session(self):
- """Test index() removes the file from the esign session."""
+ def test_index(self):
+ """Removes file from session; removes entire session when last file is removed."""
annot = get_session_annotation()
- # Add files to a session
uids = [a.UID() for a in self.annexes]
+
+ # --- remove one file from a multi-file session ---
add_files_to_session(self.signers, uids)
self.assertEqual(len(annot["sessions"]), 1)
self.assertEqual(len(annot["uids"]), 3)
session = annot["sessions"][0]
self.assertEqual(len(session["files"]), 3)
-
- # Remove one file via the view
- view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session")
- view.index()
- # The file should be removed from the session
+ self.view.index()
self.assertNotIn(uids[0], annot["uids"])
self.assertEqual(len(annot["uids"]), 2)
self.assertEqual(len(session["files"]), 2)
@@ -125,85 +63,67 @@ def test_index_removes_file_from_session(self):
self.assertIn(uids[1], remaining_uids)
self.assertIn(uids[2], remaining_uids)
- def test_index_removes_last_file_removes_session(self):
- """Test removing the last file from a session also removes the session."""
- annot = get_session_annotation()
- uids = [self.annexes[0].UID()]
- add_files_to_session(self.signers, uids)
- self.assertEqual(len(annot["sessions"]), 1)
-
- view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session")
- view.index()
- self.assertEqual(len(annot["sessions"]), 0)
- self.assertEqual(len(annot["uids"]), 0)
+ # --- removing the last file removes the session entirely ---
+ add_files_to_session(self.signers, [uids[0]], discriminators=("single",))
+ session_count = len(annot["sessions"])
+ self.view.index()
+ self.assertEqual(len(annot["sessions"]), session_count - 1)
+ self.assertNotIn(uids[0], annot["uids"])
- def test_finished_shows_message_and_redirects(self):
- """Test _finished sets a status message and redirects."""
- self.request.environ['HTTP_REFERER'] = self.annexes[0].absolute_url()
- view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session")
- view._finished()
- messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
- self.assertIn("removed from session", messages[0].message)
+ def test_finished(self):
+ """Redirects to referrer."""
+ self.request.environ["HTTP_REFERER"] = self.annexes[0].absolute_url()
+ self.view._finished()
self.assertEqual(self.request.RESPONSE.getHeader("location"), self.annexes[0].absolute_url())
-class TestRemoveFromSessionView(BaseRemoveFromSession):
- """Test RemoveFromSessionView browser view."""
+class TestRemoveFromSessionView(BaseEsignTest):
+ """Tests for RemoveFromSessionView browser view."""
+
+ def setUp(self):
+ super(TestRemoveFromSessionView, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ self.folder = self.portal["folder0"]
+ self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2, 4)]
+ self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
def test_available(self):
- """Test available method returns True."""
+ """False on the annex itself; True on parent folder once session exists; False again after removal."""
annot = get_session_annotation()
self.assertFalse(annot["sessions"])
- # only available on "context_uid"
annex = self.annexes[0]
- view = getMultiAdapter((annex, self.request), name="remove-from-esign-session")
+ # View on the annex itself is never available (must be on parent/context)
+ view = RemoveFromSessionView(self.annexes[0], self.request)
self.assertFalse(view.available())
add_files_to_session(self.signers, [annex.UID()])
self.assertTrue(annot["sessions"])
self.assertFalse(view.available())
- # available on parent
- folder = annex.aq_parent
- view = getMultiAdapter((folder, self.request), name="remove-from-esign-session")
- self.assertTrue(view.available())
- # call the view so context is removed so no more session
- view()
+ # Available on parent container once a session exists
+ folder_view = RemoveFromSessionView(annex.aq_parent, self.request)
+ self.assertTrue(folder_view.available())
+ # Calling the view clears the session
+ folder_view.index()
self.assertFalse(annot["sessions"])
- self.assertFalse(view.available())
+ self.assertFalse(folder_view.available())
-class TestSessionAnnotationInfoView(BaseRemoveFromSession):
- """Test SessionAnnotationInfoView"""
+class TestSessionAnnotationInfoView(BaseEsignTest):
+ """Tests for SessionAnnotationInfoView."""
def setUp(self):
- self.portal = self.layer["portal"]
- self.request = self.portal.REQUEST
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
- self._setup_categories()
- self.folder = api.content.create(
- container=self.portal, type="Folder", id="test_session_folder", title="Test Session Folder"
- )
- tests_dir = os.path.dirname(__file__)
- self.annexes = []
- for i in range(2):
- with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f:
- annex = api.content.create(
- container=self.folder,
- type="annex",
- id="annex{}".format(i),
- title=u"Annex {}".format(i),
- content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]),
- scan_id="012345600000{:02d}".format(i),
- file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"),
- )
- self.annexes.append(annex)
+ super(TestSessionAnnotationInfoView, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ api.user.create(email="user2@sign.com", username="user2", password="password2") # noqa: S106
+ self.folder = self.portal["folder0"]
+ self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2)]
self.signers = [
("user1", "user1@sign.com", u"User 1", u"Position 1"),
("user2", "user2@sign.com", u"User 2", u"Position 2"),
]
- self.view = SessionAnnotationInfoView(self.folder, self.portal.REQUEST)
+ self.view = SessionAnnotationInfoView(self.folder, self.request)
def test_call(self):
+ """Raises Unauthorized for non-Manager; returns HTML string for admin."""
setRoles(self.portal, TEST_USER_ID, ["Member"])
with self.assertRaises(Unauthorized):
self.view()
@@ -211,71 +131,56 @@ def test_call(self):
self.assertIsInstance(self.view(), string_types)
def test_render_value(self):
- # Dict
+ """Renders dicts, lists, tuples, and strings to escaped HTML."""
+ # Dict: empty and with content
self.assertEqual(self.view._render_value({}), u"{}")
self.assertEqual(
self.view._render_value({"key": "val"}),
u"{\n 'key': 'val',\n}",
)
-
- # Indentation: nested value increases indent level
+ # Nested value increases indent level
self.assertEqual(
self.view._render_value({"key": ["a"]}),
u"{\n 'key': [\n 'a',\n ],\n}",
)
-
- # List
+ # List: empty and with items
self.assertEqual(self.view._render_value([]), u"[]")
self.assertEqual(
self.view._render_value(["a", "b"]),
u"[\n 'a',\n 'b',\n]",
)
-
- # Tuple
+ # Tuple treated like list
self.assertEqual(self.view._render_value(()), u"[]")
-
- # String
+ # String rendered with u'' prefix
self.assertEqual(self.view._render_value(u"hello"), u"u'hello'")
def test_uid_to_link(self):
+ """Returns clickable link for known UID; span with 'not found' title for unknown."""
uid = self.folder.UID()
- result = self.view._uid_to_link(uid)
self.assertEqual(
- result,
- u"Test Session Folder",
+ self.view._uid_to_link(uid),
+ u"Folder 0",
)
-
- result = self.view._uid_to_link(u"a" * 32)
self.assertEqual(
- result,
+ self.view._uid_to_link(u"a" * 32),
u"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
def test_esign_sessions(self):
+ """Returns all sessions; supports session_id and context_uid filters; renders HTML correctly."""
uids = [a.UID() for a in self.annexes]
add_files_to_session(self.signers, uids, title=u"[ia.parapheo] Session {sign_id}")
- # Add a second session with a separate folder/annex
- folder2 = api.content.create(
- container=self.portal, type="Folder", id="test_session_folder2", title="Test Session Folder 2"
+ # Add a second session with a different folder and annex
+ folder2 = self.portal["folder1"]
+ annex2 = folder2["annex1"]
+ add_files_to_session(
+ self.signers, [annex2.UID()], title=u"[ia.parapheo] Session {sign_id}", discriminators=(u"second",)
)
- tests_dir = os.path.dirname(__file__)
- with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f:
- annex2 = api.content.create(
- container=folder2,
- type="annex",
- id="annex2",
- title=u"Annex 2",
- content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]),
- scan_id="01234560000002",
- file=NamedBlobFile(data=f.read(), filename=u"annex2.pdf", contentType="application/pdf"),
- )
- add_files_to_session(self.signers, [annex2.UID()], title=u"[ia.parapheo] Session {sign_id}",
- discriminators=(u"second",))
view = SessionAnnotationInfoView(self.folder, self.portal.REQUEST)
- # No filter params — returns all sessions
+ # No filter — returns all sessions
esign_sessions = view.esign_sessions
self.assertEqual(len(esign_sessions), 2)
esign_session = esign_sessions[0]
@@ -304,7 +209,7 @@ def test_esign_sessions(self):
self.assertEqual(len(view.esign_sessions), 0)
del self.portal.REQUEST.form["context_uid"]
- # Test rendered HTML
+ # Rendered HTML for first session
self.assertEqual(
unescape(view.esign_session_html(esign_session[1])),
u"""{{
@@ -313,20 +218,20 @@ def test_esign_sessions(self):
'discriminators': [],
'files': [
{{
- 'context_uid': Test Session Folder,
+ 'context_uid': Folder 0,
'filename': u'annex0.pdf',
- 'scan_id': '01234560000000',
+ 'scan_id': '012345600000000',
'status': '',
'title': u'Annex 0',
- 'uid': Annex 0,
+ 'uid': Annex 0,
}},
{{
- 'context_uid': Test Session Folder,
- 'filename': u'annex1.pdf',
- 'scan_id': '01234560000001',
+ 'context_uid': Folder 0,
+ 'filename': u'annex2.pdf',
+ 'scan_id': '012345600000002',
'status': '',
- 'title': u'Annex 1',
- 'uid': Annex 1,
+ 'title': u'Annex 2',
+ 'uid': Annex 2,
}},
],
'last_update': {},
@@ -355,6 +260,6 @@ def test_esign_sessions(self):
'title': u'[ia.parapheo] Session 012345600000',
'watchers': [],
}}""".format(
- repr(esign_session[1]['last_update']),
+ repr(esign_session[1]["last_update"]),
),
)
diff --git a/src/imio/esign/tests/test_browser_settings.py b/src/imio/esign/tests/test_browser_settings.py
new file mode 100644
index 0000000..bd721c6
--- /dev/null
+++ b/src/imio/esign/tests/test_browser_settings.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+"""browser/settings tests for this package."""
+from imio.esign.browser.settings import validate_vat_number
+from zope.interface import Invalid
+
+import unittest
+
+
+class TestValidateVatNumber(unittest.TestCase):
+
+ def test_validate_vat_number(self):
+ """validate_vat_number: returns True for valid/empty, raises Invalid for bad format or checksum."""
+ # empty values are accepted (required constraint is enforced by the field, not the validator)
+ self.assertTrue(validate_vat_number(u""))
+ self.assertTrue(validate_vat_number(None))
+
+ # must start with BE
+ with self.assertRaises(Invalid):
+ validate_vat_number(u"NL0202239951")
+
+ # must be exactly 12 characters
+ with self.assertRaises(Invalid):
+ validate_vat_number(u"BE020223995") # 11 chars
+ with self.assertRaises(Invalid):
+ validate_vat_number(u"BE02022399510") # 13 chars
+
+ # only digits allowed after BE
+ with self.assertRaises(Invalid):
+ validate_vat_number(u"BE020223995X")
+
+ # bad checksum: 97 - (2022399 % 97) = 51, not 99
+ with self.assertRaises(Invalid):
+ validate_vat_number(u"BE0202239999")
+
+ # valid: 97 - (2022399 % 97) = 97 - 46 = 51
+ self.assertTrue(validate_vat_number(u"BE0202239951"))
diff --git a/src/imio/esign/tests/test_browser_views.py b/src/imio/esign/tests/test_browser_views.py
index bfd38f0..d04e870 100644
--- a/src/imio/esign/tests/test_browser_views.py
+++ b/src/imio/esign/tests/test_browser_views.py
@@ -2,17 +2,16 @@
"""Browser views tests for this package."""
from AccessControl import Unauthorized
from collections import OrderedDict
-from collective.iconifiedcategory.utils import calculate_category_id
from datetime import datetime
from datetime import timedelta
from imio.esign.browser.views import DownloadFileView
from imio.esign.browser.views import ExternalSessionCreateView
+from imio.esign.browser.views import FacetedSessionInfoViewlet
from imio.esign.browser.views import ItemSessionInfoViewlet
from imio.esign.browser.views import SessionDeleteView
from imio.esign.browser.views import SigningUsersCsv
from imio.esign.config import set_esign_registry_signing_users_email_content
-from imio.esign.testing import IMIO_ESIGN_FUNCTIONAL_TESTING
-from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING
+from imio.esign.tests.base import BaseEsignTest
from imio.esign.utils import add_files_to_session
from imio.esign.utils import get_session_annotation
from imio.pyutils.utils import shortuid_encode_id
@@ -22,374 +21,214 @@
from plone.app.testing import logout
from plone.app.testing import setRoles
from plone.app.testing import TEST_USER_ID
-from plone.namedfile.file import NamedBlobFile
-from plone.namedfile.file import NamedBlobImage
from plone.testing import z2
+from Products.statusmessages import STATUSMESSAGEKEY
from Products.statusmessages.interfaces import IStatusMessage
+from zope.annotation.interfaces import IAnnotations
-import collective.iconifiedcategory
import json
-import os
-import transaction
import unittest
-class _BaseSessionViewTest(unittest.TestCase):
- """Base test class with shared setUp for session view tests."""
+def _clear_status_messages(request):
+ """Clear status messages from request annotations (needed after redirects since show() skips clearing on 3xx)."""
+ annotations = IAnnotations(request)
+ annotations[STATUSMESSAGEKEY] = None
+ request.response.expireCookie(STATUSMESSAGEKEY, path="/")
- layer = IMIO_ESIGN_INTEGRATION_TESTING
- def setUp(self):
- self.portal = self.layer["portal"]
- self.request = self.layer["request"]
- self.request.form.clear()
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
+class TestSessionDeleteView(BaseEsignTest):
+ """Tests for SessionDeleteView."""
- # Create user for signing
+ def setUp(self):
+ super(TestSessionDeleteView, self).setUp()
api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
-
- # Create content category configuration
- at_folder = api.content.create(
- container=self.portal,
- id="annexes_types",
- title="Annexes Types",
- type="ContentCategoryConfiguration",
- exclude_from_nav=True,
- )
- category_group = api.content.create(
- type="ContentCategoryGroup",
- title="Annexes",
- container=at_folder,
- id="annexes",
- )
- icon_path = os.path.join(
- os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png"
- )
- with open(icon_path, "rb") as fl:
- api.content.create(
- type="ContentCategory",
- title="To sign",
- container=category_group,
- icon=NamedBlobImage(fl.read(), filename=u"icône1.png"),
- id="to_sign",
- predefined_title="To be signed",
- to_sign=True,
- show_preview=False,
- )
-
- # Create folder and annex
- self.folder = api.content.create(
- container=self.portal,
- type="Folder",
- id="test_folder",
- title="Test Folder",
- )
- tests_dir = os.path.dirname(__file__)
- with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f:
- annex = api.content.create(
- container=self.folder,
- type="annex",
- id="test_annex",
- title="Test Annex",
- content_category=calculate_category_id(
- self.portal["annexes_types"]["annexes"]["to_sign"]
- ),
- scan_id="012345600000001",
- file=NamedBlobFile(
- data=f.read(),
- filename=u"annex1.pdf",
- contentType="application/pdf",
- ),
- )
- self.annex_uid = annex.UID()
-
- # Seed a session in the annotation
+ self.folder = self.portal["folder0"]
+ annex = self.portal["folder0"]["annex0"]
signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- self.session_id, _ = add_files_to_session(signers, (self.annex_uid,))
-
-
-class TestSessionDeleteView(_BaseSessionViewTest):
- """Test SessionDeleteView browser view."""
+ self.session_id, _session = add_files_to_session(signers, (annex.UID(),))
+ self.view = SessionDeleteView(self.folder, self.request)
def test_may_delete_session(self):
- """Manager role grants may_delete_session."""
- view = SessionDeleteView(self.folder, self.request)
- self.assertTrue(view.may_delete_session())
+ """Manager: True. Member-only: False and raises Unauthorized on call."""
+ # --- Manager: allowed ---
+ self.assertTrue(self.view.may_delete_session())
- def test_may_delete_session_no_permission(self):
- """Member-only role denies may_delete_session."""
+ # --- Member-only: denied ---
setRoles(self.portal, TEST_USER_ID, ["Member"])
- view = SessionDeleteView(self.folder, self.request)
- self.assertFalse(view.may_delete_session())
+ self.assertFalse(self.view.may_delete_session())
with self.assertRaises(Unauthorized):
- view()
+ self.view()
- def test_call_no_session_id(self):
- """Missing esign_session_id produces an error message and redirects to context URL."""
- view = SessionDeleteView(self.folder, self.request)
- view()
- messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
- self.assertIn("No session ID provided!", messages[0].message)
- self.assertEqual(messages[0].type, "error")
- location = self.request.RESPONSE.getHeader("location")
- self.assertEqual(location, self.folder.absolute_url())
+ def test_call(self):
+ """Missing id → error + context URL; valid id → success + session removed; unknown id → error."""
+ annot = get_session_annotation()
- def test_call_session_exists(self):
- """Valid esign_session_id removes the session and shows a success message."""
- self.request.form["esign_session_id"] = str(self.session_id)
- view = SessionDeleteView(self.folder, self.request)
- view()
- messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
- self.assertIn("Session deleted successfully!", messages[0].message)
- self.assertEqual(messages[0].type, "info")
- location = self.request.RESPONSE.getHeader("location")
- self.assertIn("@@parapheo", location)
- annotation = get_session_annotation()
- self.assertNotIn(self.session_id, annotation["sessions"])
+ # --- no session id ---
+ self.view()
+ self.assertEqual(len(annot["sessions"]), 1)
+ self.assertIn(self.session_id, annot["sessions"])
- def test_call_session_not_found(self):
- """Non-existent esign_session_id shows an error and redirects to @@parapheo."""
+ # --- unknown session id ---
self.request.form["esign_session_id"] = "9999"
- view = SessionDeleteView(self.folder, self.request)
- view()
- messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
- self.assertIn("Session not found!", messages[0].message)
- self.assertEqual(messages[0].type, "error")
- location = self.request.RESPONSE.getHeader("location")
- self.assertIn("@@parapheo", location)
+ self.view()
+ self.assertEqual(len(annot["sessions"]), 1)
+ self.assertIn(self.session_id, annot["sessions"])
+ # --- valid session id ---
+ self.request.other.pop("esign_session_id", None)
+ self.request.form["esign_session_id"] = str(self.session_id)
+ self.view()
+ self.assertNotIn(self.session_id, annot["sessions"])
-class TestExternalSessionCreateView(_BaseSessionViewTest):
- """Test ExternalSessionCreateView browser view."""
+
+class TestExternalSessionCreateView(BaseEsignTest):
+ """Tests for ExternalSessionCreateView."""
+
+ def setUp(self):
+ super(TestExternalSessionCreateView, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ self.folder = self.portal["folder0"]
+ annex = self.portal["folder0"]["annex0"]
+ signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
+ self.session_id, _session = add_files_to_session(signers, (annex.UID(),))
+ self.view = ExternalSessionCreateView(self.folder, self.request)
def test_may_create_external_sessions(self):
- """Manager role grants may_create_external_sessions."""
- view = ExternalSessionCreateView(self.folder, self.request)
- self.assertTrue(view.may_create_external_sessions())
+ """Manager: True. Member-only: False and raises Unauthorized on call."""
+ # --- Manager ---
+ self.assertTrue(self.view.may_create_external_sessions())
- def test_call_unauthorized(self):
- """Calling view without permission raises Unauthorized."""
+ # --- Member-only ---
setRoles(self.portal, TEST_USER_ID, ["Member"])
- view = ExternalSessionCreateView(self.folder, self.request)
- self.assertFalse(view.may_create_external_sessions())
+ self.assertFalse(self.view.may_create_external_sessions())
with self.assertRaises(Unauthorized):
- view()
+ self.view()
+
+ def test_call(self):
+ """Missing id → error; sentinel strings → specific errors; 200 → success; non-200 → error.
- def test_call_no_session_id(self):
- """Missing session_id produces an error message and returns URL with @@parapheo."""
- view = ExternalSessionCreateView(self.folder, self.request)
- result = view()
+ create_external_session mocked: HTTP tested in test_utils.py.
+ """
+ # --- no session id ---
+ result = self.view()
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("No session ID provided!", messages[0].message)
self.assertEqual(messages[0].type, "error")
- self.assertIn("@@parapheo", result)
+ self.assertEqual("http://nohost/plone/folder0/@@parapheo", result)
+ _clear_status_messages(self.request)
- def test_call_session_not_found(self):
- """create_external_session returning _session_not_found_ shows an error."""
self.request.form["session_id"] = str(self.session_id)
- view = ExternalSessionCreateView(self.folder, self.request)
- with patch("imio.esign.browser.views.create_external_session") as mock_create:
- mock_create.return_value = "_session_not_found_"
- result = view()
+
+ def _run(return_value):
+ _clear_status_messages(self.request)
+ with patch("imio.esign.browser.views.create_external_session", return_value=return_value):
+ return ExternalSessionCreateView(self.folder, self.request)()
+
+ # --- session not found sentinel ---
+ _run("_session_not_found_")
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("doesn't exist anymore", messages[0].message)
self.assertEqual(messages[0].type, "error")
- self.assertIn("@@parapheo", result)
- def test_call_no_seal_code(self):
- """create_external_session returning _no_seal_code_ shows an error."""
- self.request.form["session_id"] = str(self.session_id)
- view = ExternalSessionCreateView(self.folder, self.request)
- with patch("imio.esign.browser.views.create_external_session") as mock_create:
- mock_create.return_value = "_no_seal_code_"
- result = view()
+ # --- no seal code sentinel ---
+ _run("_no_seal_code_")
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("No seal code", messages[0].message)
- self.assertEqual(messages[0].type, "error")
- self.assertIn("@@parapheo", result)
- def test_call_no_seal_email(self):
- """create_external_session returning _no_seal_email_ shows an error."""
- self.request.form["session_id"] = str(self.session_id)
- view = ExternalSessionCreateView(self.folder, self.request)
- with patch("imio.esign.browser.views.create_external_session") as mock_create:
- mock_create.return_value = "_no_seal_email_"
- result = view()
+ # --- no seal email sentinel ---
+ _run("_no_seal_email_")
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("No seal email", messages[0].message)
- self.assertEqual(messages[0].type, "error")
- self.assertIn("@@parapheo", result)
- def test_call_success(self):
- """create_external_session returning a 200 response shows a success message."""
- self.request.form["session_id"] = str(self.session_id)
- view = ExternalSessionCreateView(self.folder, self.request)
- mock_response = Mock()
- mock_response.status_code = 200
- with patch("imio.esign.browser.views.create_external_session") as mock_create:
- mock_create.return_value = mock_response
- result = view()
+ # --- no files sentinel ---
+ _run("_no_files_")
+ messages = IStatusMessage(self.request).show()
+ self.assertIn("No files", messages[0].message)
+
+ # --- 200 success ---
+ mock_ok = Mock()
+ mock_ok.status_code = 200
+ result = _run(mock_ok)
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("External session sent successfully!", messages[0].message)
self.assertEqual(messages[0].type, "info")
- self.assertIn("@@parapheo", result)
-
- def test_call_error_response(self):
- """create_external_session returning a non-200 response shows the status details."""
- self.request.form["session_id"] = str(self.session_id)
- view = ExternalSessionCreateView(self.folder, self.request)
- mock_response = Mock()
- mock_response.status_code = 500
- mock_response.reason = "Server Error"
- mock_response.text = "oops"
- with patch("imio.esign.browser.views.create_external_session") as mock_create:
- mock_create.return_value = mock_response
- result = view()
+ self.assertEqual("http://nohost/plone/folder0/@@parapheo", result)
+
+ # --- non-200 error ---
+ mock_err = Mock()
+ mock_err.status_code = 500
+ mock_err.reason = "Server Error"
+ mock_err.text = "oops"
+ result = _run(mock_err)
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("500", messages[0].message)
self.assertEqual(messages[0].type, "error")
- self.assertIn("@@parapheo", result)
+ self.assertEqual("http://nohost/plone/folder0/@@parapheo", result)
@unittest.skip("Test skipped")
-class TestDownloadFileView(unittest.TestCase):
- """Test DownloadFileView browser view."""
-
- layer = IMIO_ESIGN_FUNCTIONAL_TESTING
+class TestDownloadFileView(BaseEsignTest):
+ """Tests for DownloadFileView."""
def setUp(self):
- """Set up test fixtures."""
+ super(TestDownloadFileView, self).setUp()
self.app = self.layer["app"]
- self.portal = self.layer["portal"]
- self.request = self.layer["request"]
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
-
- # Setup content category configuration (like in test_utils.py)
- at_folder = api.content.create(
- container=self.portal,
- id="annexes_types",
- title="Annexes Types",
- type="ContentCategoryConfiguration",
- exclude_from_nav=True,
- )
- category_group = api.content.create(
- type="ContentCategoryGroup",
- title="Annexes",
- container=at_folder,
- id="annexes",
- )
- icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png")
- with open(icon_path, "rb") as fl:
- api.content.create(
- type="ContentCategory",
- title="To sign",
- container=category_group,
- icon=NamedBlobImage(fl.read(), filename=u"icône1.png"),
- id="to_sign",
- predefined_title="To be signed",
- to_sign=True,
- show_preview=False,
- )
-
- # Create a folder to hold test annexes
- self.folder = api.content.create(
- container=self.portal,
- type="Folder",
- id="test_folder",
- title="Test Folder",
- )
-
- # Create test annexes with NamedBlobFile (Plone 4.3 Archetypes)
- tests_dir = os.path.dirname(__file__)
- pdf_file = "annex1.pdf"
- with open(os.path.join(tests_dir, pdf_file), "rb") as f:
- file_data = f.read()
- self.test_annex = api.content.create(
- container=self.folder,
- type="annex",
- id="test_annex",
- title="Test Annex",
- content_category="to_sign",
- file=NamedBlobFile(
- data=file_data,
- filename=u"test_document__uid.pdf",
- contentType="application/pdf"
- ),
- )
-
+ self.folder = self.portal["folder0"]
+ self.test_annex = self.folder["annex0"]
self.file_uid = self.test_annex.UID()
self.encoded_uid = shortuid_encode_id(self.file_uid, separator="-", block_size=5)
+ self.view = DownloadFileView(self.portal, self.request)
+
+ def test_download_file(self):
+ """Missing id, invalid id, unknown id, no file attr, expired download, valid download, traversal."""
+ logout()
+ self.assertIsNone(self.view.file_id)
+ self.assertEqual(self.view.shortuid_separator, "-")
+ self.assertEqual(self.view.named_blob_file_attribute, "file")
+
+ # --- no file id ---
+ self.assertIn("A file identifier must be passed in the url", self.view())
+
+ # --- invalid format ---
+ self.view.file_id = "$$$"
+ self.assertIn("This file identifier is not correct", self.view())
+
+ # --- valid format but non-existent UID ---
+ self.view.file_id = "aabbccddee"
+ self.assertIn("The corresponding file identifier cannot be retrieved", self.view())
- # Commit transaction for functional testing
- transaction.commit()
-
- def test_download_file_view(self):
- """Test DownloadFileView with various scenarios."""
- logout() # anonymous usage
-
- # View exists and can be instantiated
- view = api.content.get_view("download-file", self.portal, self.request)
- self.assertIsInstance(view, DownloadFileView)
- view = DownloadFileView(self.portal, self.request)
- self.assertEqual(view.file_id, None)
- self.assertEqual(view.shortuid_separator, "-")
- self.assertEqual(view.named_blob_file_attribute, "file")
-
- # Download file without UID
- view = DownloadFileView(self.portal, self.request)
- result = view()
- self.assertIn("A file identifier must be passed in the url", result)
- # invalid UID format
- view.file_id = "$$$"
- result = view()
- self.assertIn("This file identifier is not correct", result)
- # valid format but non-existent UID
- view.file_id = "aabbccddee"
- result = view()
- self.assertIn("The corresponding file identifier cannot be retrieved", result)
- # download from object without file attribute
+ # --- object has no file attribute ---
folder_uid = self.folder.UID()
encoded_folder_uid = shortuid_encode_id(folder_uid, separator="-", block_size=5)
- view.file_id = encoded_folder_uid
- result = view()
- self.assertIn("The corresponding file content cannot be retrieved", result)
+ self.view.file_id = encoded_folder_uid
+ self.assertIn("The corresponding file content cannot be retrieved", self.view())
- # valid id but file too old
- view.file_id = self.encoded_uid
- view.download_time_delta = timedelta(days=1)
+ # --- expired download ---
+ self.view.file_id = self.encoded_uid
+ self.view.download_time_delta = timedelta(days=1)
self.test_annex.setModificationDate(datetime.now() - timedelta(days=3))
- result = view()
- self.assertIn("The download period for this file has expired", result)
- view.download_time_delta = None # Disable date verification
- result = view()
+ self.assertIn("The download period for this file has expired", self.view())
+
+ # --- date check disabled ---
+ self.view.download_time_delta = None
+ result = self.view()
self.assertNotIn("The download period for this file has expired", result)
self.assertIsInstance(result, str)
- # Download file with valid UID
- view.file_id = self.encoded_uid
- view.download_time_delta = timedelta(days=7)
- result = view()
- # Check that we got binary data (the file content)
- self.assertIsInstance(result, str) # In Python 2, binary data is str
+ # --- valid download ---
+ self.view.download_time_delta = timedelta(days=7)
+ result = self.view()
+ self.assertIsInstance(result, str)
self.assertTrue(len(result) > 0)
self.assertTrue(result.startswith(b"%PDF") or result.startswith("%PDF"))
- # Check response headers
response = self.request.RESPONSE
self.assertIn("application/pdf", response.getHeader("Content-Type"))
self.assertIn("inline", response.getHeader("Content-Disposition"))
- self.assertIn("test_document.pdf", response.getHeader("Content-Disposition"))
+ self.assertIn("annex0.pdf", response.getHeader("Content-Disposition"))
self.assertTrue(int(response.getHeader("Content-Length")) > 0)
- # Test URL traversal mechanism
+ # --- URL traversal ---
browser = z2.Browser(self.app)
portal_url = self.portal.absolute_url()
browser.open("{}/download-file/{}".format(portal_url, "aabbccddee"))
@@ -400,19 +239,15 @@ def test_download_file_view(self):
self.assertIn("The corresponding file identifier cannot be retrieved (aabbccddee)", browser.contents)
-class TestSigningUsersCsv(unittest.TestCase):
- """Tests for the SigningUsersCsv view."""
-
- layer = IMIO_ESIGN_INTEGRATION_TESTING
+class TestSigningUsersCsv(BaseEsignTest):
+ """Tests for SigningUsersCsv."""
def setUp(self):
- self.portal = self.layer["portal"]
- self.request = self.layer["request"]
- self.request.form.clear()
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
+ super(TestSigningUsersCsv, self).setUp()
self.user = api.user.create(email="signer@test.com", username="signer_user", password="password1") # noqa: S106
+ self.view = SigningUsersCsv(self.portal, self.request)
- def _make_user_data(self, user):
+ def _user_data(self, user):
return {
"userid": user.getId(),
"email": user.getProperty("email", ""),
@@ -421,44 +256,41 @@ def _make_user_data(self, user):
"fullname": user.getProperty("fullname", ""),
}
- # --- filter_user ---
+ def test_filter_user(self):
+ """No group → False; non-watchers group → False; watchers group → True;
+ held_position with 'signer' usage → True.
- def test_filter_user_in_watchers_group_returns_true(self):
- """User in a group ending with 'watchers' is included by default."""
- api.group.create(groupname="myservice_watchers")
- api.group.add_user(groupname="myservice_watchers", user=self.user)
- view = SigningUsersCsv(self.portal, self.request)
- self.assertTrue(view.filter_user(self._make_user_data(self.user)))
+ api.content.find mocked for held_position: held_position type not in this layer.
+ """
+ # --- not in any group ---
+ self.assertFalse(self.view.filter_user(self._user_data(self.user)))
- def test_filter_user_not_in_any_group_returns_false(self):
- """User with no held_position and no watchers group is excluded."""
- view = SigningUsersCsv(self.portal, self.request)
- self.assertFalse(view.filter_user(self._make_user_data(self.user)))
-
- def test_filter_user_in_non_watchers_group_returns_false(self):
- """User in a group not ending with 'watchers' is excluded."""
+ # --- non-watchers group ---
api.group.create(groupname="myservice_editors")
api.group.add_user(groupname="myservice_editors", user=self.user)
- view = SigningUsersCsv(self.portal, self.request)
- self.assertFalse(view.filter_user(self._make_user_data(self.user)))
+ self.assertFalse(self.view.filter_user(self._user_data(self.user)))
- def test_filter_user_with_signer_held_position_returns_true(self):
- """User with a held_position having 'signer' in usages is included by default."""
- mock_hp_obj = Mock()
- mock_hp_obj.usages = ["signer"]
- mock_brain = Mock()
- mock_brain.getObject.return_value = mock_hp_obj
- view = SigningUsersCsv(self.portal, self.request)
- with patch("imio.esign.browser.views.api.content.find", return_value=[mock_brain]):
- result = view.filter_user(self._make_user_data(self.user))
- self.assertTrue(result)
+ # --- watchers group ---
+ api.group.create(groupname="myservice_watchers")
+ api.group.add_user(groupname="myservice_watchers", user=self.user)
+ self.assertTrue(self.view.filter_user(self._user_data(self.user)))
- # --- get_users_data ---
+ # --- held_position with signer usage ---
+ hp_user = api.user.create(email="hp@test.com", username="hp_user", password="password1") # noqa: S106
+ mock_hp = Mock()
+ mock_hp.usages = ["signer"]
+ mock_brain = Mock()
+ mock_brain.getObject.return_value = mock_hp
+ with patch(
+ "imio.esign.browser.views.api.content.find", # held_position type not in this layer
+ return_value=[mock_brain],
+ ):
+ self.assertTrue(self.view.filter_user(self._user_data(hp_user)))
def test_get_users_data(self):
- """Returns data for all users with duplicates."""
- view = SigningUsersCsv(self.portal, self.request)
- users_data, _duplicates = view.get_users_data()
+ """Returns all users with duplicate-email flags; watchers/signers marked checked."""
+ users_data, _duplicates = self.view.get_users_data()
+ # Users sorted alphabetically by userid: signer_user < test_user_1_
self.assertEqual(
users_data,
[
@@ -483,10 +315,9 @@ def test_get_users_data(self):
],
)
- # Test duplicates
+ # --- duplicate emails ---
api.user.create(email="signer@test.com", username="signer_user2", password="password1") # noqa: S106
- view = SigningUsersCsv(self.portal, self.request)
- users_data, duplicates = view.get_users_data()
+ users_data, duplicates = self.view.get_users_data()
self.assertEqual(
users_data,
[
@@ -521,11 +352,10 @@ def test_get_users_data(self):
)
self.assertEqual(duplicates, {"signer@test.com": ["signer_user", "signer_user2"]})
- # Signers and watchers users are sorted and checked
+ # --- watchers group member marked checked ---
api.group.create(groupname="myservice_watchers")
api.group.add_user(groupname="myservice_watchers", user=self.user)
- view = SigningUsersCsv(self.portal, self.request)
- users_data, _duplicates = view.get_users_data()
+ users_data, _duplicates = self.view.get_users_data()
self.assertEqual(
users_data,
[
@@ -559,111 +389,92 @@ def test_get_users_data(self):
],
)
- # --- _get_selected_userids ---
+ def test_get_selected_userids(self):
+ """Valid JSON → list; absent → []; bad JSON → []."""
+ self.assertEqual(self.view._get_selected_userids(), [])
- def test_get_selected_userids_with_valid_json(self):
- """Parses a JSON array of user IDs from the request form."""
self.request.form["selected_users"] = json.dumps(["user1", "user2"])
- view = SigningUsersCsv(self.portal, self.request)
- self.assertEqual(view._get_selected_userids(), ["user1", "user2"])
+ self.assertEqual(self.view._get_selected_userids(), ["user1", "user2"])
- def test_get_selected_userids_with_empty_param(self):
- """Returns empty list when selected_users is absent."""
- view = SigningUsersCsv(self.portal, self.request)
- self.assertEqual(view._get_selected_userids(), [])
-
- def test_get_selected_userids_with_invalid_json(self):
- """Returns empty list for malformed JSON."""
+ # request.get() promotes form values to request.other; clear both to reset
+ self.request.other.pop("selected_users", None)
self.request.form["selected_users"] = "not-valid-json"
- view = SigningUsersCsv(self.portal, self.request)
- self.assertEqual(view._get_selected_userids(), [])
-
- # --- _download_csv ---
+ self.assertEqual(self.view._get_selected_userids(), [])
- def test_download_csv_no_selected_users(self):
- """Shows warning and redirects when no users are selected."""
- view = SigningUsersCsv(self.portal, self.request)
- view._download_csv()
+ def test_download_csv(self):
+ """No selection → warning; valid selection → CSV with headers."""
+ # --- no users ---
+ self.view._download_csv()
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("No users selected", messages[0].message)
self.assertEqual(messages[0].type, "warning")
+ _clear_status_messages(self.request)
- def test_download_csv_generates_csv(self):
- """Returns CSV content with headers and a row for each selected user."""
+ # --- valid selection ---
+ # request.get() promotes form values to request.other; clear to prevent stale cache
+ self.request.other.pop("selected_users", None)
self.request.form["selected_users"] = json.dumps([self.user.getId()])
- view = SigningUsersCsv(self.portal, self.request)
- result = view._download_csv()
+ result = self.view._download_csv()
self.assertEqual(
- result, "userid,email,lastname,firstname,fullname\r\nsigner_user,signer@test.com,signer_user,,\r\n"
+ result,
+ "userid,email,lastname,firstname,fullname\r\nsigner_user,signer@test.com,signer_user,,\r\n",
)
self.assertIn("text/csv", self.request.RESPONSE.getHeader("Content-Type"))
- # --- _send_emails ---
-
- def test_send_emails_no_selected_users(self):
- """Shows warning and redirects when no users are selected."""
- view = SigningUsersCsv(self.portal, self.request)
- view._send_emails()
+ def test_send_emails(self):
+ """No selection → warning; no content → error; no from address → error; success."""
+ # --- no users ---
+ self.view._send_emails()
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertIn("No users selected", messages[0].message)
self.assertEqual(messages[0].type, "warning")
+ _clear_status_messages(self.request)
- def test_send_emails_no_email_content(self):
- """Shows error and redirects when signing users email content is not configured."""
- set_esign_registry_signing_users_email_content(u"")
+ # request.get() promotes form values to request.other; clear to prevent stale cache
+ self.request.other.pop("selected_users", None)
self.request.form["selected_users"] = json.dumps([self.user.getId()])
- view = SigningUsersCsv(self.portal, self.request)
- view._send_emails()
+
+ # --- email content not configured ---
+ set_esign_registry_signing_users_email_content(u"")
+ self.view._send_emails()
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].message, u"Email content is not configured in the settings.")
self.assertEqual(messages[0].type, "error")
+ _clear_status_messages(self.request)
- def test_send_emails_no_portal_from_email(self):
- """Shows error and redirects to mail-controlpanel when portal from email is not set."""
- self.request.form["selected_users"] = json.dumps([self.user.getId()])
- view = SigningUsersCsv(self.portal, self.request)
- view._send_emails()
+ # --- portal from email not configured ---
+ set_esign_registry_signing_users_email_content(u"
Hello
")
+ self.view._send_emails()
messages = IStatusMessage(self.request).show()
- self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].message, u"Portal from email is not configured.")
self.assertEqual(messages[0].type, "error")
+ _clear_status_messages(self.request)
- def test_send_emails_success(self):
- """Sends emails to all selected users and shows a success message."""
+ # --- success ---
self.portal.manage_changeProperties({"email_from_address": "from@test.com"})
- set_esign_registry_signing_users_email_content(u"Hello
")
- self.request.form["selected_users"] = json.dumps([self.user.getId()])
- view = SigningUsersCsv(self.portal, self.request)
- with patch("imio.esign.browser.views.send_email", return_value=(True, None)):
- view._send_emails()
+ with patch("imio.esign.browser.views.send_email", return_value=(True, None)): # real SMTP call
+ self.view._send_emails()
messages = IStatusMessage(self.request).show()
success_msgs = [m for m in messages if m.type == "info"]
self.assertEqual(len(success_msgs), 1)
self.assertIn("Emails sent successfully", success_msgs[0].message)
+ _clear_status_messages(self.request)
- def test_send_emails_user_with_no_email(self):
- """Shows per-user warning when a selected user has no email address."""
- self.portal.manage_changeProperties({"email_from_address": "from@test.com"})
- set_esign_registry_signing_users_email_content(u"Hello
")
+ # --- user with no email address ---
no_email_user = api.user.create(
email="placeholder@test.com", username="no_email_user", password="password1" # noqa: S106
)
no_email_user.setMemberProperties({"email": ""})
+ self.request.other.pop("selected_users", None)
self.request.form["selected_users"] = json.dumps([no_email_user.getId()])
- view = SigningUsersCsv(self.portal, self.request)
- view._send_emails()
+ self.view._send_emails()
messages = IStatusMessage(self.request).show()
no_email_msgs = [m for m in messages if "no email address" in m.message]
self.assertEqual(len(no_email_msgs), 1)
self.assertEqual(no_email_msgs[0].type, "warning")
- # --- _render_email_content ---
-
def test_render_email_content(self):
- """Renders a TAL template substituting values from user_data."""
+ """TAL template is evaluated with user_data substitutions."""
template = u"NAME
"
user_data = {
"userid": "testuser",
@@ -672,95 +483,158 @@ def test_render_email_content(self):
"firstname": "John",
"fullname": "John Smith",
}
- view = SigningUsersCsv(self.portal, self.request)
- result = view._render_email_content(template, user_data)
+ result = self.view._render_email_content(template, user_data)
self.assertEqual(result, u"John Smith
")
-class TestItemSessionInfoViewlet(unittest.TestCase):
- """Test ItemSessionInfoViewlet multi-session support."""
+class TestFacetedSessionInfoViewlet(BaseEsignTest):
+ """Tests for FacetedSessionInfoViewlet."""
- layer = IMIO_ESIGN_INTEGRATION_TESTING
+ def setUp(self):
+ super(TestFacetedSessionInfoViewlet, self).setUp()
+ self.folder = self.portal["folder0"]
+ self.annex = self.portal["folder0"]["annex0"]
+ self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
+ self.collection_uid = "test-collection-uid"
+
+ def _make_viewlet(self, uid="test-collection-uid"):
+ class _ConcreteFacetedSessionInfoViewlet(FacetedSessionInfoViewlet):
+ """Minimal concrete subclass for testing FacetedSessionInfoViewlet."""
+
+ _sessions_collection_uid = "test-collection-uid"
+
+ @property
+ def sessions_collection_uid(self):
+ return self._sessions_collection_uid
+
+ def index(self):
+ return ""
+
+ v = _ConcreteFacetedSessionInfoViewlet(self.folder, self.request, None, None)
+ v._sessions_collection_uid = uid
+ return v
+
+ def test_available(self):
+ """False when sessions_collection_uid is None; True when set."""
+ self.assertFalse(self._make_viewlet(uid=None).available())
+ self.assertTrue(self._make_viewlet().available())
+
+ def test_sessions(self):
+ """sessions CachedProperty: {} for missing/invalid/unknown id; {id: info} for known id."""
+ # --- no esign_session_id[] param ---
+ self.assertEqual(self._make_viewlet().sessions, {})
+
+ # --- non-integer value ---
+ self.request.form["esign_session_id[]"] = "not-an-int"
+ self.assertEqual(self._make_viewlet().sessions, {})
+
+ # --- valid integer but no matching session ---
+ self.request.form["esign_session_id[]"] = "999"
+ self.assertEqual(self._make_viewlet().sessions, {})
+
+ # --- valid session id → {session_id: session_info} ---
+ sid, session = add_files_to_session(self.signers, [self.annex.UID()])
+ self.request.form["esign_session_id[]"] = str(sid)
+ sessions = self._make_viewlet().sessions
+ self.assertEqual(list(sessions.keys()), [sid])
+ self.assertEqual(sessions[sid]["sign_id"], session["sign_id"])
+
+ def test_render(self):
+ """'': c1[] absent or non-matching; real render_table() when no session;
+ index() when session selected.
+
+ Sessions annotation is empty when the "no session selected" branch runs, so
+ ActionsColumn.renderCell / get_dashboard_link are never reached — no mock needed.
+ """
+ # --- c1[] absent → '' ---
+ self.assertEqual(self._make_viewlet().render(), "")
+
+ # --- c1[] non-matching → '' ---
+ self.request.form["c1[]"] = "other-uid"
+ self.assertEqual(self._make_viewlet().render(), "")
+
+ # --- c1[] matches (right collection), no session selected → sessions_listing_view.render_table() ---
+ self.request.form["c1[]"] = self.collection_uid
+ result = self._make_viewlet().render()
+ self.assertIn("")
+
+ def test_get_table_rows(self):
+ """Column 1 → session fields; column 2 → signer/link fields; unknown → []."""
+ v = self._make_viewlet()
+ self.assertEqual(v.get_table_rows(1), ["session_id", "state", "update_date", "sealed"])
+ self.assertEqual(v.get_table_rows(2), ["external_link", "signers"])
+ self.assertEqual(v.get_table_rows(99), [])
+
+ def test_ext_session_link(self):
+ """No sign_url → with title; sign_url present → ."""
+ v = self._make_viewlet()
+
+ # --- no sign_url: span ---
+ session = {"sign_id": "012345600000", "sign_url": None, "title": u"My Session"}
+ result = v.ext_session_link(session)
+ self.assertEqual(result, u"My Session")
+
+ # --- sign_url present: anchor ---
+ session["sign_url"] = "https://sign.example.com/s/1"
+ result = v.ext_session_link(session)
+ self.assertEqual(result, u'My Session')
+
+ def test_get_state_description(self):
+ """Known state → non-empty translated string; unknown state → ''."""
+ v = self._make_viewlet()
+ self.assertTrue(len(v.get_state_description("draft")) > 0)
+ self.assertEqual(v.get_state_description("unknown_state"), "")
+
+
+class TestItemSessionInfoViewlet(BaseEsignTest):
+ """Tests for ItemSessionInfoViewlet."""
def setUp(self):
- self.portal = self.layer["portal"]
- self.request = self.portal.REQUEST
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
- at_folder = api.content.create(
- container=self.portal, id="annexes_types", title="Annexes Types",
- type="ContentCategoryConfiguration", exclude_from_nav=True,
- )
- category_group = api.content.create(
- type="ContentCategoryGroup", title="Annexes",
- container=at_folder, id="annexes",
- )
- icon_path = os.path.join(
- os.path.dirname(collective.iconifiedcategory.__file__), "tests", u"ic\xf4ne1.png"
- )
- with open(icon_path, "rb") as fl:
- api.content.create(
- type="ContentCategory", title="To sign",
- container=category_group,
- icon=NamedBlobImage(fl.read(), filename=u"ic\xf4ne1.png"),
- id="to_sign", predefined_title="To be signed",
- to_sign=True, show_preview=False,
- )
- api.user.create(email="user1@sign.com", username="user1", password="password1")
- self.folder = api.content.create(
- container=self.portal, type="Folder",
- id="test_folder", title="Test Folder",
- )
- tests_dir = os.path.dirname(__file__)
- self.annexes = []
- for i in range(2):
- with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f:
- annex = api.content.create(
- container=self.folder, type="annex",
- id="annex{}".format(i), title="Annex {}".format(i),
- content_category=calculate_category_id(
- self.portal["annexes_types"]["annexes"]["to_sign"]
- ),
- scan_id="0123456000000{:02d}".format(i),
- file=NamedBlobFile(
- data=f.read(), filename=u"annex{}.pdf".format(i),
- contentType="application/pdf",
- ),
- )
- self.annexes.append(annex)
+ super(TestItemSessionInfoViewlet, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ self.folder = self.portal["folder0"]
+ self.annexes = [self.portal["folder0"]["annex0"], self.portal["folder0"]["annex2"]]
self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- for key in list(self.request.form.keys()):
- del self.request.form[key]
- def test_sessions_empty(self):
- """No files in esign annotation → sessions returns empty list."""
+ def test_sessions(self):
+ """No files → empty OrderedDict + render ''; one session → one entry; two sessions → two entries."""
+ # --- no files in annotation ---
viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None)
self.assertEqual(viewlet.sessions, OrderedDict())
self.assertEqual(viewlet.render(), "")
- def test_sessions_single_session(self):
- """All context files in one session → sessions returns one dict."""
- uids = [a.UID() for a in self.annexes]
- add_files_to_session(self.signers, uids)
+ # --- all context files in one session ---
+ add_files_to_session(self.signers, [self.annexes[0].UID()], discriminators=("a",))
viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None)
sessions = viewlet.sessions
self.assertEqual(len(sessions), 1)
- self.assertEqual(sessions.keys(), [0])
- self.assertEqual(len(sessions[0]["files"]), len(uids))
+ self.assertEqual(list(sessions.keys()), [0])
+ self.assertEqual(len(sessions[0]["files"]), 1)
+ self.assertEqual(sessions[0]["files"][0]["uid"], self.annexes[0].UID())
- def test_sessions_multiple_sessions(self):
- """Files in two sessions (different discriminators) → sessions returns two dicts."""
- add_files_to_session(self.signers, [self.annexes[0].UID()], discriminators=("a",))
+ # --- files split across two sessions (different discriminators) ---
add_files_to_session(self.signers, [self.annexes[1].UID()], discriminators=("b",))
viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None)
sessions = viewlet.sessions
self.assertEqual(len(sessions), 2)
- session_ids = sessions.keys()
- self.assertEqual(session_ids, [0, 1])
+ self.assertEqual(list(sessions.keys()), [0, 1])
-class TestSessionsListingView(_BaseSessionViewTest):
+class TestSessionsListingView(BaseEsignTest):
"""Test SessionsListingView browser view."""
+ def setUp(self):
+ super(TestSessionsListingView, self).setUp()
+ self.folder = self.portal["folder0"]
+ annex = self.portal["folder0"]["annex0"]
+ signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
+ add_files_to_session(signers, (annex.UID(),))
+
def test_get_sessions(self):
"""Test obtain sessions and stored annotation not modified."""
self.assertFalse("id" in get_session_annotation()['sessions'][0])
diff --git a/src/imio/esign/tests/test_services.py b/src/imio/esign/tests/test_services.py
new file mode 100644
index 0000000..1630e03
--- /dev/null
+++ b/src/imio/esign/tests/test_services.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+"""Services tests for this package."""
+from imio.esign.services.external_session_feedback import ExternalSessionFeedbackPost
+from imio.esign.tests.base import BaseEsignTest
+from imio.esign.utils import add_files_to_session
+from mock import patch
+
+import json
+
+
+class TestExternalSessionFeedbackPost(BaseEsignTest):
+ """Tests for ExternalSessionFeedbackPost."""
+
+ def setUp(self):
+ super(TestExternalSessionFeedbackPost, self).setUp()
+ self.request.other.pop("BODY", None)
+ self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
+ self.annex = self.portal["folder0"]["annex0"]
+ self.session_id, self.session = add_files_to_session(self.signers, [self.annex.UID()])
+ self.sign_id = self.session["sign_id"]
+ self.request._auth = "Bearer test-token"
+
+ def _make_service(self):
+ """Instantiate ExternalSessionFeedbackPost without the adapter mechanism."""
+ service = ExternalSessionFeedbackPost.__new__(ExternalSessionFeedbackPost)
+ service.context = self.portal
+ service.request = self.request
+ return service
+
+ def _reply(self, data):
+ """Call reply() with mocked verify_auth_token; inject body via request.other."""
+ self.request.set("BODY", json.dumps(data))
+ with patch(
+ "imio.esign.services.external_session_feedback.verify_auth_token", return_value=True
+ ): # real Keycloak OAuth endpoint — no local server
+ return self._make_service().reply()
+
+ def test_authorized(self):
+ """_authorized() returns False without valid Bearer token; True with verified token."""
+ service = self._make_service()
+
+ # no _auth attribute
+ del self.request._auth
+ self.assertFalse(service._authorized())
+
+ # non-Bearer prefix
+ self.request._auth = "Basic dXNlcjpwYXNz"
+ self.assertFalse(service._authorized())
+
+ # Bearer with empty token
+ self.request._auth = "Bearer "
+ self.assertFalse(service._authorized())
+
+ # valid format, verify_auth_token returns False
+ self.request._auth = "Bearer test-token"
+ with patch("imio.esign.services.external_session_feedback.verify_auth_token", return_value=False):
+ self.assertFalse(service._authorized())
+
+ # valid format, verify_auth_token returns True
+ with patch("imio.esign.services.external_session_feedback.verify_auth_token", return_value=True):
+ self.assertTrue(service._authorized())
+
+ def test_reply(self):
+ """reply() validates auth/input, processes all feedback codes, updates session state."""
+ annex1 = self.portal["folder0"]["annex2"]
+
+ # missing app_session_id
+ result = self._reply({"code": 21})
+ self.assertEqual(self.request.response.getStatus(), 400)
+ self.assertIn("app_session_id", result["message"])
+ self.request.response.setStatus(200)
+
+ # session not found
+ result = self._reply({"app_session_id": "012345699999", "code": 21})
+ self.assertEqual(self.request.response.getStatus(), 400)
+ self.assertIn("not found", result["message"])
+ self.request.response.setStatus(200)
+
+ # code 21: state updated, sign_url set, returns appended
+ result = self._reply(
+ {
+ "app_session_id": self.sign_id,
+ "code": 21,
+ "session_state": "to_sign",
+ "value": {"sign_session_url": "https://sign.example.com/session/1"},
+ "message": "Session confirmed",
+ }
+ )
+ self.assertEqual(result, {"message": "Information correctly handled"})
+ self.assertEqual(self.session["state"], "to_sign")
+ self.assertEqual(self.session["sign_url"], "https://sign.example.com/session/1")
+ self.assertEqual(self.session["returns"][0][0], 21)
+
+ # code 21: existing sign_url not overwritten
+ self._reply(
+ {
+ "app_session_id": self.sign_id,
+ "code": 21,
+ "value": {"sign_session_url": "https://new.example.com/"},
+ }
+ )
+ self.assertEqual(self.session["sign_url"], "https://sign.example.com/session/1")
+
+ # code 22: matching signer email → status 'signed'
+ _, s22 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c22",))
+ self._reply(
+ {
+ "app_session_id": s22["sign_id"],
+ "code": 22,
+ "value": {"signed_users": ["user1@sign.com"]},
+ }
+ )
+ self.assertEqual(s22["signers"][0]["status"], "signed")
+
+ # code 22: signer already 'refused' is not updated
+ _, s22b = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c22b",))
+ s22b["signers"][0]["status"] = "refused"
+ self._reply(
+ {
+ "app_session_id": s22b["sign_id"],
+ "code": 22,
+ "value": {"signed_users": ["user1@sign.com"]},
+ }
+ )
+ self.assertEqual(s22b["signers"][0]["status"], "refused")
+
+ # code 23: state → 'returned'
+ _, s23 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c23",))
+ self._reply({"app_session_id": s23["sign_id"], "code": 23})
+ self.assertEqual(s23["state"], "returned")
+
+ # code 52: state → 'refused'; matching signer → 'refused'
+ _, s52 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c52",))
+ self._reply(
+ {
+ "app_session_id": s52["sign_id"],
+ "code": 52,
+ "value": {"user": "user1@sign.com"},
+ }
+ )
+ self.assertEqual(s52["state"], "refused")
+ self.assertEqual(s52["signers"][0]["status"], "refused")
+
+ # code 53: state → 'signed' (documents signed but not returned)
+ _, s53 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c53",))
+ self._reply({"app_session_id": s53["sign_id"], "code": 53})
+ self.assertEqual(s53["state"], "signed")
+
+ # error codes: state → 'errored'
+ for code in (50, 51, 54, 55, 56, 57, 58, 59):
+ _, serr = add_files_to_session(self.signers, [annex1.UID()], discriminators=(str(code),))
+ self._reply({"app_session_id": serr["sign_id"], "code": code})
+ self.assertEqual(serr["state"], "errored")
+
+ # returns: entry structure (code, db_state, value, message, datetime)
+ _, srtn = add_files_to_session(self.signers, [annex1.UID()], discriminators=("rtn",))
+ self._reply(
+ {
+ "app_session_id": srtn["sign_id"],
+ "code": 23,
+ "session_state": "completed",
+ "value": {"info": "ok"},
+ "message": "All done",
+ }
+ )
+ entry = srtn["returns"][0]
+ self.assertEqual(entry[0], 23)
+ self.assertEqual(entry[1], "completed")
+ self.assertEqual(entry[2], {"info": "ok"})
+ self.assertEqual(entry[3], "All done")
diff --git a/src/imio/esign/tests/test_setup.py b/src/imio/esign/tests/test_setup.py
index e80c7ff..5840235 100644
--- a/src/imio/esign/tests/test_setup.py
+++ b/src/imio/esign/tests/test_setup.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
"""Setup tests for this package."""
from imio.esign import PLONE_VERSION
-from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING # noqa: E501
+from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING
from plone import api
from plone.app.testing import setRoles
from plone.app.testing import TEST_USER_ID
@@ -19,7 +19,6 @@ class TestSetup(unittest.TestCase):
layer = IMIO_ESIGN_INTEGRATION_TESTING
def setUp(self):
- """Custom shared utility setup for tests."""
self.portal = self.layer["portal"]
if PLONE_VERSION < 5:
self.installer = api.portal.get_tool("portal_quickinstaller")
@@ -27,14 +26,14 @@ def setUp(self):
self.installer = get_installer(self.portal, self.layer["request"])
def test_product_installed(self):
- """Test if imio.esign is installed."""
+ """Product is installed and IImioEsignLayer browser layer is registered."""
+ # --- install state ---
if PLONE_VERSION < 5:
self.assertTrue(self.installer.isProductInstalled("imio.esign"))
else:
self.assertTrue(self.installer.is_product_installed("imio.esign"))
- def test_browserlayer(self):
- """Test that IImioEsignLayer is registered."""
+ # --- browser layer ---
from imio.esign.interfaces import IImioEsignLayer
from plone.browserlayer import utils
@@ -42,6 +41,7 @@ def test_browserlayer(self):
class TestUninstall(unittest.TestCase):
+ """Test that imio.esign uninstalls cleanly."""
layer = IMIO_ESIGN_INTEGRATION_TESTING
@@ -58,14 +58,14 @@ def setUp(self):
setRoles(self.portal, TEST_USER_ID, roles_before)
def test_product_uninstalled(self):
- """Test if imio.esign is cleanly uninstalled."""
+ """Product is uninstalled and IImioEsignLayer browser layer is removed."""
+ # --- uninstall state ---
if PLONE_VERSION < 5:
self.assertFalse(self.installer.isProductInstalled("imio.esign"))
else:
self.assertFalse(self.installer.is_product_installed("imio.esign"))
- def test_browserlayer_removed(self):
- """Test that IImioEsignLayer is removed."""
+ # --- browser layer removed ---
from imio.esign.interfaces import IImioEsignLayer
from plone.browserlayer import utils
diff --git a/src/imio/esign/tests/test_utils.py b/src/imio/esign/tests/test_utils.py
index d16232f..17df69d 100644
--- a/src/imio/esign/tests/test_utils.py
+++ b/src/imio/esign/tests/test_utils.py
@@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
"""utils tests for this package."""
from collections import OrderedDict
-from collective.iconifiedcategory.utils import calculate_category_id
from datetime import date
from datetime import timedelta
from imio.esign.config import get_esign_registry_max_session_size
from imio.esign.config import set_esign_registry_external_watchers
from imio.esign.config import set_esign_registry_max_session_size
-from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING
+from imio.esign.tests.base import BaseEsignTest
from imio.esign.utils import add_files_to_session
from imio.esign.utils import create_external_session
from imio.esign.utils import get_file_download_url
@@ -26,90 +25,26 @@
from mock import Mock
from mock import patch
from plone import api
-from plone.app.testing import setRoles
-from plone.app.testing import TEST_USER_ID
from plone.namedfile.file import NamedBlobFile
-from plone.namedfile.file import NamedBlobImage
from zope.annotation import IAnnotations
from zope.event import notify
from zope.lifecycleevent import ObjectModifiedEvent
-import collective.iconifiedcategory
import json
import os
-import unittest
-class TestUtils(unittest.TestCase):
-
- layer = IMIO_ESIGN_INTEGRATION_TESTING
-
+class TestUtils(BaseEsignTest):
def setUp(self):
- self.portal = self.layer["portal"]
- setRoles(self.portal, TEST_USER_ID, ["Manager"])
- # add some users
- api.user.create(email="user1@sign.com", username="user1", password="password1")
- api.user.create(email="user2@sign.com", username="user2", password="password2")
- # add content category configuration
- at_folder = api.content.create(
- container=self.portal,
- id="annexes_types",
- title="Annexes Types",
- type="ContentCategoryConfiguration",
- exclude_from_nav=True,
- )
- category_group = api.content.create(
- type="ContentCategoryGroup",
- title="Annexes",
- container=at_folder,
- id="annexes",
- )
- icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png")
- with open(icon_path, "rb") as fl:
- api.content.create(
- type="ContentCategory",
- title="To sign",
- container=category_group,
- icon=NamedBlobImage(fl.read(), filename=u"icône1.png"),
- id="to_sign",
- predefined_title="To be signed",
- # confidential=True,
- # to_print=True,
- to_sign=True,
- # signed=True,
- # publishable=True,
- # only_pdf=True,
- show_preview=False,
- )
- # add annexes
- self.folders = []
- for f in range(2):
- folder = api.content.create(
- container=self.portal,
- id="folder{}".format(f),
- title="Folder {}".format(f),
- type="Folder",
- )
- self.folders.append(folder)
- tests_dir = os.path.dirname(__file__)
- pdf_files = ["annex1.pdf", "annex2.pdf"]
- self.uids = []
- for i in range(12):
- pdf_file = pdf_files[i % len(pdf_files)]
- container = self.folders[i % len(self.folders)]
- with open(os.path.join(tests_dir, pdf_file), "rb") as f:
- annex = api.content.create(
- container=container,
- type="annex",
- id="annex{}".format(i),
- title="Annex {}".format(i),
- content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]),
- scan_id="0123456000000{:02d}".format(i),
- file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"),
- )
- self.uids.append(annex.UID())
-
- def test_add_remove_files_to_session(self):
+ super(TestUtils, self).setUp()
+ api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106
+ api.user.create(email="user2@sign.com", username="user2", password="password2") # noqa: S106
+ self.folders = [self.portal["folder0"], self.portal["folder1"]]
+ self.uids = [self.portal["folder{}".format(i % 2)]["annex{}".format(i)].UID() for i in range(12)]
+
+ def test_add_files_to_session(self):
+ """add_files_to_session: session creation, discrimination, reuse, size splitting,
+ duplicate filenames, and idempotent metadata update."""
root_annot = IAnnotations(self.portal)
self.assertNotIn("imio.esign", root_annot)
signers = [
@@ -117,7 +52,7 @@ def test_add_remove_files_to_session(self):
("user2", "user2@sign.com", "User 2", "Position 2"),
]
- # add files, no session_id, no discriminator
+ # --- create first session ---
sid, session = add_files_to_session(signers, (self.uids[0],), title="my title", watchers=("stalker@sign.com",))
self.assertEqual(sid, 0)
annot = root_annot["imio.esign"]
@@ -134,7 +69,6 @@ def test_add_remove_files_to_session(self):
self.assertIsNone(session["sign_url"])
self.assertEqual(session["client_id"], "0123456")
self.assertEqual(len(session["watchers"]), 1)
- self.assertEqual(len(session["files"]), 1)
self.assertListEqual(
list(session["files"]),
[
@@ -149,11 +83,10 @@ def test_add_remove_files_to_session(self):
],
)
self.assertEqual(len(session["signers"]), 2)
- self.assertEqual(session["size"], 6968) # annex1.pdf
+ self.assertEqual(session["size"], 6968)
- # add files, no session_id => same session reused
- signers[1] = ("user2", "user2@sign.com", "User 2", "Position 2b") # changed position => not discriminant
- # rename uids[1] to same name as uids[0] to test duplicate filename handling
+ # --- same signers → session reused; duplicate filename renamed ---
+ signers[1] = ("user2", "user2@sign.com", "User 2", "Position 2b") # position change is non-discriminant
annex1_obj = uuidToObject(uuid=self.uids[1], unrestricted=True)
annex1_obj.file.filename = u"annex0.pdf"
sid, session = add_files_to_session(signers, (self.uids[1],))
@@ -165,12 +98,11 @@ def test_add_remove_files_to_session(self):
self.assertEqual(len(annot["c_uids"]), 2)
self.assertIn(self.folders[1].UID(), annot["c_uids"])
self.assertEqual(len(session["files"]), 2)
- # verify duplicate filename was renamed
self.assertEqual(session["files"][1]["filename"], u"annex0-1.pdf")
self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 1)
self.assertEqual(session["size"], 6968 + 7014) # annex1 + annex2
- # add files, no session_id, new discriminations => new session
+ # --- new discriminator → new session ---
sid, session = add_files_to_session(signers, (self.uids[2],), discriminators=("council1",))
self.assertEqual(sid, 1)
self.assertEqual(annot["numbering"], 2)
@@ -180,7 +112,7 @@ def test_add_remove_files_to_session(self):
self.assertEqual(len(session["files"]), 1)
self.assertEqual(len(session["watchers"]), 0)
- # add files, no session_id, same discriminations => same session
+ # --- same discriminator → reuse session ---
sid, session = add_files_to_session(signers, (self.uids[3],), discriminators=("council1",))
self.assertEqual(sid, 1)
self.assertEqual(annot["numbering"], 2)
@@ -189,28 +121,28 @@ def test_add_remove_files_to_session(self):
self.assertIn(self.uids[3], annot["uids"])
self.assertEqual(len(session["files"]), 2)
- # add files, no session_id, other discriminations => other session
- sid, session = add_files_to_session(signers, (self.uids[4],), discriminators=("council2",))
+ # --- different discriminator → new session ---
+ sid, _session = add_files_to_session(signers, (self.uids[4],), discriminators=("council2",))
self.assertEqual(sid, 2)
- # add files, session_id, other discriminations => reused session
- sid, session = add_files_to_session(signers, (self.uids[5],), session_id=0, discriminators=("council3",))
+ # --- explicit session_id overrides discrimination ---
+ sid, _session = add_files_to_session(signers, (self.uids[5],), session_id=0, discriminators=("council3",))
self.assertEqual(sid, 0)
- # add files, session_id unfound, other discriminations => new session
- sid, session = add_files_to_session(signers, (self.uids[6],), session_id=999, discriminators=("council3",))
+ # --- unfound session_id → new session ---
+ sid, _session = add_files_to_session(signers, (self.uids[6],), session_id=999, discriminators=("council3",))
self.assertEqual(sid, 3)
- # add files, no session_id, different signers => new session
- sid, session = add_files_to_session([signers[0]], (self.uids[7],))
+ # --- different signers → new session ---
+ sid, _session = add_files_to_session([signers[0]], (self.uids[7],))
self.assertEqual(sid, 4)
- # add files, no session_id, different seal => new session
- sid, session = add_files_to_session(signers, (self.uids[8],), seal="seal1")
+ # --- different seal → new session ---
+ sid, _session = add_files_to_session(signers, (self.uids[8],), seal="seal1")
self.assertEqual(sid, 5)
- # add files, no session_id, same seal, different acroform => new session
- sid, session = add_files_to_session(signers, (self.uids[9],), acroform=False)
+ # --- different acroform → new session ---
+ sid, _session = add_files_to_session(signers, (self.uids[9],), acroform=False)
self.assertEqual(sid, 6)
self.assertEqual(len(annot["uids"]), 10)
self.assertEqual(len(annot["c_uids"]), 2)
@@ -218,217 +150,114 @@ def test_add_remove_files_to_session(self):
self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 5)
self.assertEqual(len(annot["sessions"]), 7)
- # add files, no session_id, no signers => new session
+ # --- no signers → new session ---
sid, session = add_files_to_session([], (self.uids[10],), seal="seal2")
self.assertEqual(sid, 7)
self.assertEqual(len(annot["sessions"]), 8)
self.assertEqual(session["signers"], [])
self.assertEqual(session["seal"], "seal2")
- # add files, no session_id, same seal, different states => new session
+ # --- same seal but session already sent → new session ---
session["state"] = "sent"
- sid, session = add_files_to_session([], (self.uids[11],), seal="seal2")
+ sid, _session = add_files_to_session([], (self.uids[11],), seal="seal2")
self.assertEqual(sid, 8)
self.assertEqual(len(annot["sessions"]), 9)
- # now we can start to remove
- remove_files_from_session((self.uids[0], self.uids[1])) # 2 of 3 session files
- self.assertEqual(len(annot["uids"]), 10)
- self.assertEqual(len(annot["sessions"][0]["files"]), 1)
- self.assertEqual(annot["sessions"][0]["size"], 7014) # only uid[5] (annex2) remains
- self.assertEqual(len(annot["c_uids"][self.folders[0].UID()]), 5)
- self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 5)
- remove_files_from_session((self.uids[5],)) # no more session files, session removed
- self.assertEqual(len(annot["uids"]), 9)
- self.assertEqual(len(annot["sessions"]), 8)
- self.assertNotIn(0, annot["sessions"])
- remove_files_from_session((self.uids[2], self.uids[3])) # all session files, session removed
- self.assertEqual(len(annot["uids"]), 7)
- self.assertEqual(len(annot["sessions"]), 7)
- self.assertNotIn(1, annot["sessions"])
- remove_files_from_session((self.uids[4],))
- remove_files_from_session((self.uids[6],))
- remove_files_from_session((self.uids[7],))
- remove_files_from_session((self.uids[8],))
- remove_files_from_session((self.uids[9],))
- remove_files_from_session((self.uids[10],))
- remove_files_from_session((self.uids[11],))
- self.assertEqual(len(annot["uids"]), 0)
- self.assertEqual(len(annot["c_uids"]), 0)
- self.assertEqual(len(annot["sessions"]), 0)
-
- def test_create_external_session(self):
- """Test create_external_session with mocked requests.post and get_auth_token."""
- from mock import MagicMock
- from mock import patch
-
- # Case 1: session not found => returns sentinel string, no HTTP call made
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- result = create_external_session(999)
- self.assertEqual(result, "_session_not_found_")
- mock_requests.post.assert_not_called()
+ # --- duplicate filenames: each gets a numbered suffix ---
+ del root_annot["imio.esign"]
+ for i in range(3):
+ api.content.get(UID=self.uids[i]).file.filename = u"same_filename.pdf"
+ s, ses = add_files_to_session(signers, (self.uids[0],))
+ self.assertEqual(len(ses["files"]), 1)
+ self.assertEqual(ses["files"][0]["filename"], "same_filename.pdf")
+ s, ses = add_files_to_session(signers, (self.uids[1],), session_id=s)
+ self.assertEqual(len(ses["files"]), 2)
+ self.assertIn("same_filename-1.pdf", [f["filename"] for f in ses["files"]])
+ s, ses = add_files_to_session(signers, (self.uids[2],), session_id=s)
+ self.assertEqual(len(ses["files"]), 3)
+ filenames = [f["filename"] for f in ses["files"]]
+ self.assertIn("same_filename.pdf", filenames)
+ self.assertIn("same_filename-1.pdf", filenames)
+ self.assertIn("same_filename-2.pdf", filenames)
- # Setup: create a session with two signers and a watcher
- signers = [
- ("user1", "user1@sign.com", "User 1", "Position 1"),
- ("user2", "user2@sign.com", "User 2", "Position 2"),
- ]
- sid, session = add_files_to_session(
- signers, (self.uids[0], self.uids[2],), title=u"Test session", watchers=("watcher@sign.com",)
- )
+ # --- re-adding same file updates metadata, does not duplicate ---
+ del root_annot["imio.esign"]
+ annex0_uid = self.uids[0]
+ annex0 = api.content.get(UID=annex0_uid)
+ annex0.file.filename = u"annex0.pdf"
+ # reset annex1 filename which was changed in the duplicate-filename block above
+ annex1_reset = api.content.get(UID=self.uids[1])
+ annex1_reset.file.filename = u"annex1.pdf"
+ sid, ses = add_files_to_session(signers, (annex0_uid,))
self.assertEqual(sid, 0)
- self.assertEqual(session["state"], "draft")
-
- # Case 2: HTTP 200 => state becomes "sent", request payload is verified
- mock_response = MagicMock()
- mock_response.status_code = 200
- mock_response.text = '{"message": "Request received"}'
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- mock_requests.post.return_value = mock_response
- result = create_external_session(sid)
- self.assertIs(result, mock_response)
- self.assertEqual(session["state"], "sent")
- mock_requests.post.assert_called_once()
- call_args = mock_requests.post.call_args
- posted_url = call_args[0][0]
- self.assertIn("imio/esign/v1/luxtrust/sessions", posted_url)
- self.assertEqual(call_args[1]["headers"]["Authorization"], "Bearer test_token")
- data = json.loads(call_args[1]["data"]["data"])
- self.assertIn("commonData", data)
- self.assertIn("signData", data)
- self.assertNotIn("sealData", data)
- self.assertEqual(data["commonData"]["sessionName"], u"Test session")
- self.assertEqual(data["commonData"]["imioAppSessionId"], session["sign_id"])
- self.assertEqual(data["signData"]["users"], ["user1@sign.com", "user2@sign.com"])
- self.assertEqual(data["signData"]["watchers"], ["watcher@sign.com"])
- files_param = call_args[1]["files"]
- self.assertEqual(len(files_param), 2)
- self.assertEqual(files_param[0][0], "files")
- self.assertEqual(files_param[0][1][0], u"annex0.pdf")
- self.assertEqual(files_param[1][0], "files")
- self.assertEqual(files_param[1][1][0], u"annex2.pdf")
-
- # Case 3: HTTP non-200 => state unchanged (stays "draft")
- sid2, session2 = add_files_to_session(signers, (self.uids[2],), title=u"Test session 2")
- mock_response_fail = MagicMock()
- mock_response_fail.status_code = 400
- mock_response_fail.text = '{"error": "Bad request"}'
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- mock_requests.post.return_value = mock_response_fail
- result = create_external_session(sid2)
- self.assertIs(result, mock_response_fail)
- self.assertEqual(session2["state"], "draft")
-
- # Case 4: seal session without seal_code configured => returns "_no_seal_code_", no HTTP call
- api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com")
- annex0_obj = uuidToObject(uuid=self.uids[0], unrestricted=True)
- annex0_obj.file.filename = u"annex1.pdf"
- sid3, session3 = add_files_to_session(signers, (self.uids[4], self.uids[1], self.uids[0]), seal="PADES_SEAL")
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- result = create_external_session(sid3)
- self.assertEqual(result, "_no_seal_code_")
- mock_requests.post.assert_not_called()
-
- # Case 5: seal session with seal_code, custom URL => sealData in payload, correct URL used
- api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL")
- api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com")
- mock_response_seal = MagicMock()
- mock_response_seal.status_code = 200
- mock_response_seal.text = '{"message": "OK"}'
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- mock_requests.post.return_value = mock_response_seal
- result = create_external_session(sid3, esign_root_url="https://custom.example.com")
- self.assertIs(result, mock_response_seal)
- self.assertEqual(session3["state"], "sent")
- call_args = mock_requests.post.call_args
- self.assertEqual(call_args[0][0], "https://custom.example.com/imio/esign/v1/luxtrust/sessions")
- data = json.loads(call_args[1]["data"]["data"])
- self.assertIn("sealData", data)
- self.assertEqual(data["sealData"]["sealCode"], "PADES_SEAL")
- self.assertEqual(data["sealData"]["users"], ["seal@example.com"])
- self.assertTrue(data["sealData"]["acroform"])
- files_param = call_args[1]["files"]
- self.assertEqual(len(files_param), 3)
- self.assertEqual(files_param[0][0], "files")
- self.assertEqual(files_param[0][1][0], u"annex1-1.pdf")
- self.assertEqual(files_param[1][0], "files")
- self.assertEqual(files_param[1][1][0], u"annex4.pdf")
- self.assertEqual(files_param[2][1][0], u"annex1.pdf")
-
- # Case 6: session without files to send => returns "_no_files_", no HTTP call
- for i in range(len(session3["files"])):
- session3["files"][i]["uid"] = "nonexistent_uid_{}".format(i)
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- result = create_external_session(sid3)
- self.assertEqual(result, "_no_files_")
- mock_requests.post.assert_not_called()
-
- # case 7: bad session number to send => returns "_session_not_found_", no HTTP call
- with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \
- patch('imio.esign.utils.requests') as mock_requests:
- result = create_external_session(99)
- self.assertEqual(result, "_session_not_found_")
- mock_requests.post.assert_not_called()
-
- def test_get_filesize(self):
- """Test get_filesize returns the correct file size."""
- # even index => annex1.pdf (6968 bytes)
- self.assertEqual(get_filesize(self.uids[0]), 6968)
- # odd index => annex2.pdf (7014 bytes)
- self.assertEqual(get_filesize(self.uids[1]), 7014)
- # invalid UID returns 0
- self.assertEqual(get_filesize("nonexistent_uid"), 0)
- # check size get robustness
- annex = uuidToObject(uuid=self.uids[0], unrestricted=True)
- self.assertEqual(annex.content_category, "plone-annexes_types_-_annexes_-_to_sign")
- folder = annex.__parent__
- self.assertEqual(folder.categorized_elements[self.uids[0]]["filesize"], 6968)
- folder.categorized_elements[self.uids[0]]["filesize"] = 1111
- self.assertEqual(get_filesize(self.uids[0]), 1111)
- del folder.categorized_elements[self.uids[0]]["filesize"]
- self.assertEqual(get_filesize(self.uids[0]), 6968)
- del folder.categorized_elements[self.uids[0]]
- self.assertEqual(get_filesize(self.uids[0]), 6968)
- delattr(folder, "categorized_elements")
- self.assertEqual(get_filesize(self.uids[0]), 6968)
-
- def test_session_size_discrimination(self):
- """Test that sessions are split when they would exceed max_session_size."""
- signers = [
- ("user1", "user1@sign.com", "User 1", "Position 1"),
- ]
-
- # add one file to create a session (annex1.pdf = 6968 bytes)
- sid, session = add_files_to_session(signers, (self.uids[0],))
+ self.assertEqual(len(ses["files"]), 1)
+ self.assertEqual(ses["files"][0]["filename"], "annex0.pdf")
+ self.assertEqual(ses["files"][0]["title"], "Annex 0")
+ self.assertEqual(ses["size"], 6968)
+ annex0.file.filename = u"new_annex0.pdf"
+ annex0.setTitle("New Annex 0")
+ sid, ses = add_files_to_session(signers, (annex0_uid,))
self.assertEqual(sid, 0)
- self.assertEqual(session["size"], 6968)
-
- # set session size just under the 1 MB limit so adding another file exceeds it
- session["size"] = 1 * 1024**2 - 1 # 1 MB - 1 byte
- previous_max = get_esign_registry_max_session_size()
- self.addCleanup(set_esign_registry_max_session_size, previous_max)
+ self.assertEqual(len(ses["files"]), 1)
+ self.assertEqual(ses["files"][0]["filename"], "new_annex0.pdf")
+ self.assertEqual(ses["files"][0]["title"], "New Annex 0")
+ self.assertEqual(ses["size"], 6968)
+ sid, ses = add_files_to_session(signers, (annex0_uid,))
+ self.assertEqual(sid, 0)
+ self.assertEqual(len(ses["files"]), 1)
+ self.assertEqual(ses["files"][0]["filename"], "new_annex0.pdf")
+ self.assertEqual(ses["files"][0]["title"], "New Annex 0")
+ self.assertEqual(ses["size"], 6968)
+ annex1_uid = self.uids[1]
+ annex1 = api.content.get(UID=annex1_uid)
+ sid, ses = add_files_to_session(signers, (annex1_uid,))
+ self.assertEqual(sid, 0)
+ self.assertEqual(len(ses["files"]), 2)
+ self.assertEqual(ses["files"][1]["filename"], "annex1.pdf")
+ self.assertEqual(ses["files"][1]["title"], "Annex 1")
+ self.assertEqual(ses["size"], 13982)
+ annex1.setTitle("New Annex 1")
+ with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f:
+ annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex1.pdf", contentType="application/pdf")
+ notify(ObjectModifiedEvent(annex1))
+ self.assertEqual(len(ses["files"]), 2)
+ self.assertEqual(ses["files"][1]["filename"], "new_annex1.pdf")
+ self.assertEqual(ses["files"][1]["title"], "New Annex 1")
+ self.assertEqual(ses["files"][1]["scan_id"], "012345600000001")
+ self.assertEqual(ses["size"], 13936)
+ with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f:
+ annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex0.pdf", contentType="application/pdf")
+ annex1.scan_id = "012345600000002"
+ notify(ObjectModifiedEvent(annex1))
+ self.assertEqual(ses["files"][1]["filename"], "new_annex0-1.pdf")
+ self.assertEqual(ses["files"][1]["scan_id"], "012345600000002")
+ annex2_uid = self.uids[2]
+ annex2 = api.content.get(UID=annex2_uid)
+ notify(ObjectModifiedEvent(annex2))
+ remove_files_from_session((annex0_uid,))
+ self.assertEqual(ses["size"], 6968)
+
+ # --- size-based session splitting ---
+ del root_annot["imio.esign"]
+ annex0.file.filename = u"annex0.pdf"
+ sid0, ses0 = add_files_to_session(signers, (annex0_uid,))
+ self.assertEqual(sid0, 0)
+ self.assertEqual(ses0["size"], 6968)
+ ses0["size"] = 1 * 1024 ** 2 - 1
+ prev_max = get_esign_registry_max_session_size()
+ self.addCleanup(set_esign_registry_max_session_size, prev_max)
set_esign_registry_max_session_size(1)
-
- # adding a file (~7KB) would exceed 1 MB => new session created
- sid2, session2 = add_files_to_session(signers, (self.uids[1],))
+ sid1, ses1 = add_files_to_session(signers, (self.uids[1],))
+ self.assertEqual(sid1, 1)
+ self.assertIsNot(ses1, ses0)
+ self.assertEqual(ses1["size"], 6968)
+ sid2, ses2 = add_files_to_session(signers, (self.uids[2],))
self.assertEqual(sid2, 1)
- self.assertIsNot(session2, session)
- self.assertEqual(session2["size"], 7014)
-
- # with enough room, files go to the same session
- sid3, session3 = add_files_to_session(signers, (self.uids[2],))
- self.assertEqual(sid3, 1)
- self.assertIs(session3, session2)
- self.assertEqual(session3["size"], 7014 + 6968)
+ self.assertIs(ses2, ses1)
+ self.assertEqual(ses2["size"], 6968 + 6968)
def test_add_files_ordering_by_context(self):
- """Files added to a session are ordered by their position within their context."""
+ """add_files_to_session: files are ordered by their sibling position within their context."""
def reset_annotation():
annot = get_session_annotation()
annot["sessions"].clear()
@@ -460,147 +289,103 @@ def reset_annotation():
sid, session = add_files_to_session(signers, (self.uids[4],), session_id=sid)
sid, session = add_files_to_session(signers, (self.uids[1],), session_id=sid)
sid, session = add_files_to_session(signers, (self.uids[2],), session_id=sid)
- self.assertEqual([f["uid"] for f in session["files"]], [self.uids[0], self.uids[2], self.uids[4], self.uids[1]])
-
- def test_add_files_with_duplicate_filenames(self):
- """Test that files with duplicate filenames are renamed with suffix."""
- annot = get_session_annotation()
- self.assertEqual(len(annot["sessions"]), 0)
+ self.assertEqual(
+ [f["uid"] for f in session["files"]],
+ [self.uids[0], self.uids[2], self.uids[4], self.uids[1]],
+ )
+ def test_remove_files_from_session(self):
+ """remove_files_from_session: partial removal keeps session; removing last file deletes it."""
signers = [
("user1", "user1@sign.com", "User 1", "Position 1"),
+ ("user2", "user2@sign.com", "User 2", "Position 2"),
]
+ annot = get_session_annotation()
+ add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2]))
- for i in range(3):
- annex = api.content.get(UID=self.uids[i])
- annex.file.filename = u"same_filename.pdf"
- # annex.reindexObject()
+ # --- partial removal: 2 of 3 files ---
+ remove_files_from_session((self.uids[0], self.uids[1]))
+ self.assertEqual(len(annot["uids"]), 1)
+ self.assertEqual(len(annot["sessions"][0]["files"]), 1)
+ self.assertEqual(annot["sessions"][0]["size"], 6968) # only uids[2] (annex1.pdf) remains
- sid, session = add_files_to_session(signers, (self.uids[0],))
- self.assertEqual(len(session["files"]), 1)
- self.assertEqual(session["files"][0]["filename"], "same_filename.pdf")
- sid, session = add_files_to_session(signers, (self.uids[1],), session_id=sid)
- self.assertEqual(len(session["files"]), 2)
- self.assertIn("same_filename-1.pdf", [f["filename"] for f in session["files"]])
+ # --- remove last file: session deleted ---
+ remove_files_from_session((self.uids[2],))
+ self.assertEqual(len(annot["uids"]), 0)
+ self.assertEqual(len(annot["sessions"]), 0)
- sid, session = add_files_to_session(signers, (self.uids[2],), session_id=sid)
- self.assertEqual(len(session["files"]), 3)
- filenames = [f["filename"] for f in session["files"]]
- self.assertIn("same_filename.pdf", filenames)
- self.assertIn("same_filename-1.pdf", filenames)
- self.assertIn("same_filename-2.pdf", filenames)
+ # --- remove_empty_session=False: session kept with empty files list ---
+ add_files_to_session(signers, (self.uids[0],))
+ session_id = annot["uids"][self.uids[0]]
+ remove_files_from_session((self.uids[0],), remove_empty_session=False)
+ self.assertEqual(len(annot["uids"]), 0)
+ self.assertEqual(len(annot["sessions"]), 1)
+ self.assertEqual(annot["sessions"][session_id]["files"], [])
- def test_add_files_already_exist_is_updated(self):
- """When adding a file to the same session, it is updated."""
- annot = get_session_annotation()
- self.assertEqual(len(annot["sessions"]), 0)
+ def test_get_filesize(self):
+ """get_filesize: reads cached value from categorized_elements, falls back to blob."""
+ self.assertEqual(get_filesize(self.uids[0]), 6968) # annex0 uses annex1.pdf (6968 bytes)
+ self.assertEqual(get_filesize(self.uids[1]), 7014) # annex1 uses annex2.pdf (7014 bytes)
+ self.assertEqual(get_filesize("nonexistent_uid"), 0)
- signers = [
- ("user1", "user1@sign.com", "User 1", "Position 1"),
- ]
+ annex = uuidToObject(uuid=self.uids[0], unrestricted=True)
+ self.assertEqual(annex.content_category, "plone-annexes_types_-_annexes_-_to_sign")
+ folder = annex.__parent__
+ self.assertEqual(folder.categorized_elements[self.uids[0]]["filesize"], 6968)
- annex0_uid = self.uids[0]
- annex0 = api.content.get(UID=annex0_uid)
+ folder.categorized_elements[self.uids[0]]["filesize"] = 1111
+ self.assertEqual(get_filesize(self.uids[0]), 1111)
- sid, session = add_files_to_session(signers, (annex0_uid,))
- self.assertEqual(sid, 0)
- self.assertEqual(len(session["files"]), 1)
- self.assertEqual(session["files"][0]["filename"], "annex0.pdf")
- self.assertEqual(session["files"][0]["title"], "Annex 0")
- self.assertEqual(session["size"], 6968)
- # edit annex and add again, still one annex in session and data are updated
- annex0.file.filename = u"new_annex0.pdf"
- annex0.setTitle('New Annex 0')
- sid, session = add_files_to_session(signers, (annex0_uid,))
- # same session_id
- self.assertEqual(sid, 0)
- self.assertEqual(len(session["files"]), 1)
- self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf")
- self.assertEqual(session["files"][0]["title"], "New Annex 0")
- self.assertEqual(session["size"], 6968)
- # add again exact same file
- sid, session = add_files_to_session(signers, (annex0_uid,))
- self.assertEqual(sid, 0)
- self.assertEqual(len(session["files"]), 1)
- self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf")
- self.assertEqual(session["files"][0]["title"], "New Annex 0")
- self.assertEqual(session["size"], 6968)
- # add second file 2 times
- annex1_uid = self.uids[1]
- annex1 = api.content.get(UID=annex1_uid)
- sid, session = add_files_to_session(signers, (annex1_uid,))
- self.assertEqual(sid, 0)
- self.assertEqual(len(session["files"]), 2)
- self.assertEqual(session["files"][1]["filename"], "annex1.pdf")
- self.assertEqual(session["files"][1]["title"], "Annex 1")
- self.assertEqual(session["size"], 13982)
- # edit and add again
- annex1.setTitle('New Annex 1')
- # edit file, filename and content so size changed
- with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f:
- annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex1.pdf", contentType="application/pdf")
- notify(ObjectModifiedEvent(annex1))
- # data were already updated
- self.assertEqual(len(session["files"]), 2)
- self.assertEqual(session["files"][1]["filename"], "new_annex1.pdf")
- self.assertEqual(session["files"][1]["title"], "New Annex 1")
- self.assertEqual(session["files"][1]["scan_id"], "012345600000001")
- self.assertEqual(session["size"], 13936)
- # change file but add a filename already used, change scan_id as well
- with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f:
- annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex0.pdf", contentType="application/pdf")
- annex1.scan_id = "012345600000002"
- notify(ObjectModifiedEvent(annex1))
- self.assertEqual(session["files"][1]["filename"], "new_annex0-1.pdf")
- self.assertEqual(session["files"][1]["scan_id"], "012345600000002")
- # edit annex out of any session
- annex2_uid = self.uids[2]
- annex2 = api.content.get(UID=annex2_uid)
- notify(ObjectModifiedEvent(annex2))
- # just to check, remove annex0
- remove_files_from_session((annex0_uid,))
- self.assertEqual(session["size"], 6968)
+ del folder.categorized_elements[self.uids[0]]["filesize"]
+ self.assertEqual(get_filesize(self.uids[0]), 6968)
+
+ del folder.categorized_elements[self.uids[0]]
+ self.assertEqual(get_filesize(self.uids[0]), 6968)
+
+ delattr(folder, "categorized_elements")
+ self.assertEqual(get_filesize(self.uids[0]), 6968)
def test_remove_context_from_session(self):
- """Test removing a context from a session."""
- annot = get_session_annotation()
- self.assertEqual(len(annot["sessions"]), 0)
+ """remove_context_from_session: removes all files for a context UID; deletes session when empty."""
signers = [
("user1", "user1@sign.com", "User 1", "Position 1"),
("user2", "user2@sign.com", "User 2", "Position 2"),
]
- sid, session = add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2], self.uids[3]))
+ annot = get_session_annotation()
+ self.assertEqual(len(annot["sessions"]), 0)
+ add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2], self.uids[3]))
self.assertEqual(len(annot["uids"]), 4)
self.assertEqual(len(annot["c_uids"]), 2)
self.assertEqual(len(annot["sessions"]), 1)
+
remove_context_from_session((self.folders[0].UID(),))
self.assertEqual(len(annot["uids"]), 2)
self.assertEqual(len(annot["c_uids"]), 1)
self.assertEqual(len(annot["sessions"]), 1)
+
remove_context_from_session((self.folders[1].UID(),))
self.assertEqual(len(annot["uids"]), 0)
self.assertEqual(len(annot["c_uids"]), 0)
self.assertEqual(len(annot["sessions"]), 0)
def test_remove_session(self):
- """Test removing a session."""
- annot = get_session_annotation()
- self.assertEqual(len(annot["sessions"]), 0)
+ """remove_session: deletes the session and clears its file UID references."""
signers = [
("user1", "user1@sign.com", "User 1", "Position 1"),
("user2", "user2@sign.com", "User 2", "Position 2"),
]
- sid, session = add_files_to_session(signers, (self.uids[0], self.uids[1]))
- self.assertEqual(sid, 0)
- sid, session = add_files_to_session(signers, (self.uids[2], self.uids[3]), seal="seal1")
- self.assertEqual(sid, 1)
- remove_session(0) # remove first session
+ annot = get_session_annotation()
+ self.assertEqual(len(annot["sessions"]), 0)
+ add_files_to_session(signers, (self.uids[0], self.uids[1]))
+ sid2, _session = add_files_to_session(signers, (self.uids[2], self.uids[3]), seal="seal1")
+ self.assertEqual(sid2, 1)
+ remove_session(0)
self.assertEqual(len(annot["uids"]), 2)
self.assertEqual(len(annot["c_uids"]), 2)
self.assertEqual(len(annot["sessions"]), 1)
def test_get_session_info(self):
- """Test getting info about a given session id."""
+ """get_session_info: returns {} for unknown id; returns session dict for known id."""
annot = get_session_annotation()
self.assertEqual(len(annot["sessions"]), 0)
self.assertEqual(get_session_info(0), {})
@@ -614,74 +399,70 @@ def test_get_session_info(self):
self.assertEqual(get_session_info(sid), session)
def test_get_sessions_for(self):
- """Test getting sessions for a given context_uid."""
- # no session
+ """get_sessions_for: returns OrderedDict keyed by session id; honours readonly flag."""
context_uid = self.folders[0].UID()
self.assertEqual(get_sessions_for(context_uid), OrderedDict())
- # one session
+
signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- sid, session = add_files_to_session(signers, (self.uids[0],))
- self.assertEqual(get_sessions_for(context_uid).keys(), [0])
- # two sessions
- signers = [("user2", "user2@sign.com", "User 2", "Position 2")]
- sid, session = add_files_to_session(signers, (self.uids[2],))
- self.assertEqual(get_sessions_for(context_uid).keys(), [0, 1])
- # readonly=True
+ add_files_to_session(signers, (self.uids[0],))
+ self.assertEqual(list(get_sessions_for(context_uid).keys()), [0])
+
+ signers2 = [("user2", "user2@sign.com", "User 2", "Position 2")]
+ add_files_to_session(signers2, (self.uids[2],))
+ self.assertEqual(list(get_sessions_for(context_uid).keys()), [0, 1])
+
+ # readonly=True (default): mutations do not persist
sessions = get_sessions_for(context_uid)
- self.assertEqual(get_session_info(0)['watchers'], [])
- sessions[0]['watchers'] = ["watcher@sign.com"]
- self.assertEqual(get_session_info(0)['watchers'], [])
- # readonly=False
+ self.assertEqual(get_session_info(0)["watchers"], [])
+ sessions[0]["watchers"] = ["watcher@sign.com"]
+ self.assertEqual(get_session_info(0)["watchers"], [])
+
+ # readonly=False: mutations persist
sessions = get_sessions_for(context_uid, readonly=False)
- self.assertEqual(get_session_info(0)['watchers'], [])
- sessions[0]['watchers'] = ["watcher@sign.com"]
- self.assertEqual(get_session_info(0)['watchers'], ["watcher@sign.com"])
+ self.assertEqual(get_session_info(0)["watchers"], [])
+ sessions[0]["watchers"] = ["watcher@sign.com"]
+ self.assertEqual(get_session_info(0)["watchers"], ["watcher@sign.com"])
def test_get_file_info(self):
- """Test getting infos for a given file."""
+ """get_file_info: returns None for unknown session/file; honours readonly flag."""
annex0_uid = self.uids[0]
annex1_uid = self.uids[1]
- # no session
self.assertIsNone(get_file_info(0, annex0_uid))
- # create session
+
signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- sid, session = add_files_to_session(signers, (annex0_uid,))
- self.assertEqual(get_file_info(0, annex0_uid)['uid'], annex0_uid)
+ add_files_to_session(signers, (annex0_uid,))
+ self.assertEqual(get_file_info(0, annex0_uid)["uid"], annex0_uid)
self.assertIsNone(get_file_info(0, annex1_uid))
- # readonly=True by default
+
+ # readonly=True: mutations do not persist
file_info = get_file_info(0, annex0_uid)
- file_info['title'] = u'New title annex 0'
- # not changed in the annotation
- self.assertEqual(
- get_session_annotation()['sessions'][0]['files'][0]['title'], u'Annex 0')
- # readonly=False
+ file_info["title"] = u"New title annex 0"
+ self.assertEqual(get_session_annotation()["sessions"][0]["files"][0]["title"], u"Annex 0")
+
+ # readonly=False: mutations persist
file_info = get_file_info(0, annex0_uid, readonly=False)
- file_info['title'] = u'New title annex 0'
- # changed in the annotation
- self.assertEqual(
- get_session_annotation()['sessions'][0]['files'][0]['title'], u'New title annex 0')
+ file_info["title"] = u"New title annex 0"
+ self.assertEqual(get_session_annotation()["sessions"][0]["files"][0]["title"], u"New title annex 0")
def test_get_file_download_url(self):
- """Test generating file download URL from UID."""
+ """get_file_download_url: encodes UID; respects custom root_url; accepts pre-computed short_uid."""
uid = "f40682caafc045b4b81973bd82ea9ab6"
-
api.portal.set_registry_record("imio.esign.file_url", "https://downloads.files.com")
result = get_file_download_url(uid)
- self.assertEqual(result, ("https://downloads.files.com/Rzgwy-9BVG9-viEts-5GBkn-Rm",
- "Rzgwy-9BVG9-viEts-5GBkn-Rm"))
+ self.assertEqual(
+ result, ("https://downloads.files.com/Rzgwy-9BVG9-viEts-5GBkn-Rm", "Rzgwy-9BVG9-viEts-5GBkn-Rm")
+ )
custom_url = "https://custom.domain.org/"
result = get_file_download_url(uid, root_url=custom_url)
self.assertEqual(result[0], "https://custom.domain.org/Rzgwy-9BVG9-viEts-5GBkn-Rm")
- # Test with another UID to verify encoding works
uid2 = self.uids[0]
result2 = get_file_download_url(uid2)
self.assertTrue(result2[0].startswith("https://downloads.files.com/"))
- self.assertEqual(shortuid_decode_id(result2[1], separator="-"), uid2) # correctly decoded
+ self.assertEqual(shortuid_decode_id(result2[1], separator="-"), uid2)
- # Test with pre-computed short_uid parameter
result3 = get_file_download_url(None, short_uid="MyCustom-Short-UID")
self.assertEqual(result3, ("https://downloads.files.com/MyCustom-Short-UID", "MyCustom-Short-UID"))
@@ -693,86 +474,87 @@ def test_get_max_download_date(self):
today = date.today()
self.assertEqual(get_max_download_date(None, timedelta(days=0), today), today)
- def test_create_external_session_not_found(self):
- """Returns _session_not_found_ for a non-existent session id."""
+ def test_create_external_session(self):
+ """create_external_session: validates state, seal config, file resolution, and builds payloads."""
+ signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
+
+ # --- session not found ---
result = create_external_session(9999)
self.assertEqual(result, "_session_not_found_")
- def test_create_external_session_no_seal_email(self):
- """Returns _no_seal_email_ when session requires a seal but no email configured."""
+ # --- seal session: email missing ---
sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL")
- # seal_email defaults to "" (falsy) — no registry setup needed
result = create_external_session(sid, esign_root_url="http://test.example.com")
self.assertEqual(result, "_no_seal_email_")
- def test_create_external_session_no_seal_code(self):
- """Returns _no_seal_code_ when seal email is set but seal code is missing."""
- sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL")
+ # --- seal session: code missing ---
api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com")
self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"")
- # seal_code defaults to "" (falsy)
result = create_external_session(sid, esign_root_url="http://test.example.com")
self.assertEqual(result, "_no_seal_code_")
- def test_create_external_session_error_response(self):
- """Returns response object and leaves session state unchanged on non-200."""
- signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- sid, session = add_files_to_session(signers, (self.uids[0],))
+ # --- error response: state unchanged ---
+ sid2, session2 = add_files_to_session(signers, (self.uids[1],))
mock_response = Mock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
- with patch("imio.esign.utils.requests.post", return_value=mock_response):
- with patch("imio.esign.utils.get_auth_token", return_value="test-token"):
- result = create_external_session(sid, esign_root_url="http://test.example.com")
+ with patch("imio.esign.utils.requests.post", return_value=mock_response): # real HTTP call
+ with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth
+ result = create_external_session(sid2, esign_root_url="http://test.example.com")
self.assertIs(result, mock_response)
- self.assertEqual(session["state"], "draft")
+ self.assertEqual(session2["state"], "draft")
- def test_create_external_session_sign_payload(self):
- """Returns response object, sets session state to 'sent', and signData contains signer email."""
- signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- sid, session = add_files_to_session(signers, (self.uids[0],))
+ # --- sign payload: correct structure, state set to 'sent' ---
+ sid3, session3 = add_files_to_session(signers, (self.uids[2],))
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = '{"message": "OK"}'
- with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post:
- with patch("imio.esign.utils.get_auth_token", return_value="test-token"):
- result = create_external_session(sid, esign_root_url="http://test.example.com")
+ with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call
+ with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth
+ result = create_external_session(sid3, esign_root_url="http://test.example.com")
self.assertIs(result, mock_response)
- self.assertEqual(session["state"], "sent")
+ self.assertEqual(session3["state"], "sent")
payload = json.loads(mock_post.call_args[1]["data"]["data"])
+ posted_files = mock_post.call_args[1]["files"]
+ self.assertIsInstance(posted_files, list)
+ self.assertEqual([f[0] for f in posted_files], ["files", "files"])
+ self.assertEqual([f[1][0] for f in posted_files], [u"annex1.pdf", u"annex2.pdf"])
self.assertEqual(
payload,
{
u"commonData": {
- u"imioAppSessionId": u"012345600000",
+ u"imioAppSessionId": u"012345600001",
u"vatNumber": None,
u"endpointUrl": u"http://nohost/plone/@external_session_feedback",
- u"sessionName": u"Session 0",
+ u"sessionName": u"Session {}".format(sid3),
u"documentData": [
{
- u"uniqueCode": u"012345600000000__{}".format(self.uids[0]),
- u"docUuid": get_suid_from_uuid(self.uids[0]),
- u"filename": u"annex0.pdf",
- }
+ u"uniqueCode": u"012345600000001__{}".format(self.uids[1]),
+ u"docUuid": get_suid_from_uuid(self.uids[1]),
+ u"filename": u"annex1.pdf",
+ },
+ {
+ u"uniqueCode": u"012345600000002__{}".format(self.uids[2]),
+ u"docUuid": get_suid_from_uuid(self.uids[2]),
+ u"filename": u"annex2.pdf",
+ },
],
},
u"signData": {u"acroform": True, u"users": [u"user1@sign.com"]},
},
)
- def test_create_external_session_seal_payload(self):
- """Seal-only session: sealData contains seal email and code; no signData."""
- sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL")
- api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com")
+ # --- seal-only payload ---
api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL")
- self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"")
self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_code", u"")
+ sid4, _session = add_files_to_session([], (self.uids[3],), seal="SEAL")
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = '{"message": "OK"}'
- with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post:
- with patch("imio.esign.utils.get_auth_token", return_value="test-token"):
- create_external_session(sid, esign_root_url="http://test.example.com")
+ with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call
+ with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth
+ create_external_session(sid4, esign_root_url="http://test.example.com")
+ self.assertEqual(mock_post.call_args[0][0], "http://test.example.com/imio/esign/v1/luxtrust/sessions")
payload = json.loads(mock_post.call_args[1]["data"]["data"])
self.assertEqual(
payload,
@@ -781,13 +563,18 @@ def test_create_external_session_seal_payload(self):
u"imioAppSessionId": u"012345600000",
u"vatNumber": None,
u"endpointUrl": u"http://nohost/plone/@external_session_feedback",
- u"sessionName": u"Session 0",
+ u"sessionName": u"Session {}".format(sid4),
u"documentData": [
{
u"uniqueCode": u"012345600000000__{}".format(self.uids[0]),
u"docUuid": get_suid_from_uuid(self.uids[0]),
u"filename": u"annex0.pdf",
- }
+ },
+ {
+ u"uniqueCode": u"012345600000003__{}".format(self.uids[3]),
+ u"docUuid": get_suid_from_uuid(self.uids[3]),
+ u"filename": u"annex3.pdf",
+ },
],
},
u"sealData": {
@@ -798,22 +585,21 @@ def test_create_external_session_seal_payload(self):
},
},
)
-
- def test_create_external_session_both_payload(self):
- """Session with signers and seal: both signData and sealData are present."""
- signers = [("user1", "user1@sign.com", "User 1", "Position 1")]
- sid, _session = add_files_to_session(signers, (self.uids[0],), seal="SEAL")
- api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com")
- api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL")
- set_esign_registry_external_watchers(u"example@imlo.be") # Also test with one external watcher
- self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"")
- self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_code", u"")
+ posted_files = mock_post.call_args[1]["files"]
+ self.assertIsInstance(posted_files, list)
+ self.assertEqual([f[0] for f in posted_files], ["files", "files"])
+ self.assertEqual([f[1][0] for f in posted_files], [u"annex0.pdf", u"annex3.pdf"])
+
+ # --- combined sign + seal payload ---
+ sid5, _session = add_files_to_session(signers, (self.uids[4],), seal="SEAL")
+ set_esign_registry_external_watchers(u"example@imlo.be")
+ self.addCleanup(set_esign_registry_external_watchers, u"")
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = '{"message": "OK"}'
- with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post:
- with patch("imio.esign.utils.get_auth_token", return_value="test-token"):
- create_external_session(sid, esign_root_url="http://test.example.com")
+ with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call
+ with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth
+ create_external_session(sid5, esign_root_url="http://test.example.com")
payload = json.loads(mock_post.call_args[1]["data"]["data"])
self.assertEqual(
payload,
@@ -821,18 +607,18 @@ def test_create_external_session_both_payload(self):
u"signData": {
u"acroform": True,
u"users": [u"user1@sign.com"],
- u"watchers": [u'example@imlo.be'],
+ u"watchers": [u"example@imlo.be"],
},
u"commonData": {
- u"imioAppSessionId": u"012345600000",
+ u"imioAppSessionId": u"012345600002",
u"vatNumber": None,
u"endpointUrl": u"http://nohost/plone/@external_session_feedback",
- u"sessionName": u"Session 0",
+ u"sessionName": u"Session {}".format(sid5),
u"documentData": [
{
- u"uniqueCode": u"012345600000000__{}".format(self.uids[0]),
- u"docUuid": get_suid_from_uuid(self.uids[0]),
- u"filename": u"annex0.pdf",
+ u"uniqueCode": u"012345600000004__{}".format(self.uids[4]),
+ u"docUuid": get_suid_from_uuid(self.uids[4]),
+ u"filename": u"annex4.pdf",
}
],
},
@@ -840,56 +626,23 @@ def test_create_external_session_both_payload(self):
u"sealCode": u"PADES_SEAL",
u"acroform": True,
u"users": [u"seal@example.com"],
- u"watchers": [u'example@imlo.be'],
+ u"watchers": [u"example@imlo.be"],
},
},
)
-
-
-# example of annotation content
-"""
-{
- "numbering": 1,
- "uids": {"3c0528c0ad364641be8b9cbaedbf6620": 0},
- "c_uids": {"f66b3da2d2e947fd81ab65e3e36c039d": ["3c0528c0ad364641be8b9cbaedbf6620"]},
- "sessions": {
- 0: {
- "acroform": True,
- "cliend_id": "0123456",
- "discriminators": (),
- "files": [
- {
- "context_uid": "f66b3da2d2e947fd81ab65e3e36c039d",
- "scan_id": "012345600000000",
- "title": u"Annex 0",
- "uid": "3c0528c0ad364641be8b9cbaedbf6620",
- "filename": u"annex0.pdf",
- }
- ],
- "last_update": datetime.datetime(2025, 8, 13, 13, 22, 41, 107895),
- "returns": []
- "seal": None,
- "sign_id": None,
- "sign_url": None,
- "signers": [
- {
- "status": "",
- "userid": "user1",
- "email": "user1@sign.com",
- "fullname": "User 1",
- "position": "Position 1",
- },
- {
- "status": "",
- "userid": "user2",
- "email": "user2@sign.com",
- "fullname": "User 2",
- "position": "Position 2",
- },
- ],
- "state": "draft",
- "title": "my title",
- }
- },
-}
-"""
+ posted_files = mock_post.call_args[1]["files"]
+ self.assertIsInstance(posted_files, list)
+ self.assertEqual([f[0] for f in posted_files], ["files"])
+ self.assertEqual([f[1][0] for f in posted_files], [u"annex4.pdf"])
+
+ # --- no resolvable files: returns _no_files_, no HTTP call made ---
+ sid6, session6 = add_files_to_session(signers, (self.uids[5],))
+ for i in range(len(session6["files"])):
+ session6["files"][i]["uid"] = "nonexistent_uid_{}".format(i)
+ with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth
+ with patch(
+ "imio.esign.utils.requests"
+ ) as mock_requests: # real HTTP call; whole module patched to assert .post not called
+ result = create_external_session(sid6, esign_root_url="http://test.example.com")
+ self.assertEqual(result, "_no_files_")
+ mock_requests.post.assert_not_called()
diff --git a/src/imio/esign/utils.py b/src/imio/esign/utils.py
index 659556e..e74a18b 100644
--- a/src/imio/esign/utils.py
+++ b/src/imio/esign/utils.py
@@ -183,9 +183,9 @@ def create_external_session(session_id, esign_root_url=None):
data_payload = {
"commonData": {
"endpointUrl": portal.absolute_url() + "/@external_session_feedback",
- "documentData": [{"filename": filename, "uniqueCode": "{}__{}".format(unique_code, uid),
- "docUuid": get_suid_from_uuid(uid)}
- for unique_code, filename, z, uid in files],
+ "documentData": [{"filename": filename, "uniqueCode": "{}__{}".format(unique_code, fuid),
+ "docUuid": get_suid_from_uuid(fuid)}
+ for unique_code, filename, z, fuid in files],
"imioAppSessionId": session["sign_id"],
"sessionName": session["title"],
}
@@ -227,7 +227,7 @@ def create_external_session(session_id, esign_root_url=None):
}
# files_payload = {filename: file_content for z, filename, file_content, uid in files}
- files_payload = [("files", (filename, file_content)) for z, filename, file_content, uid in files]
+ files_payload = [("files", (filename, file_content)) for z, filename, file_content, _uid in files]
# Headers avec autorisation
headers = {