From 31e422cfcff317fa952eab1b8ff372737befa815 Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 1 Apr 2026 14:36:54 +0200 Subject: [PATCH] Refactored tests + added missing tests --- src/imio/esign/events.py | 3 +- src/imio/esign/testing.py | 87 +- src/imio/esign/tests/base.py | 26 + "src/imio/esign/tests/ic\303\264ne1.png" | Bin 0 -> 3742 bytes src/imio/esign/tests/test_actions.py | 263 ++---- src/imio/esign/tests/test_browser_settings.py | 36 + src/imio/esign/tests/test_browser_views.py | 812 +++++++---------- src/imio/esign/tests/test_services.py | 170 ++++ src/imio/esign/tests/test_setup.py | 16 +- src/imio/esign/tests/test_utils.py | 837 ++++++------------ src/imio/esign/utils.py | 8 +- 11 files changed, 1048 insertions(+), 1210 deletions(-) create mode 100644 src/imio/esign/tests/base.py create mode 100644 "src/imio/esign/tests/ic\303\264ne1.png" create mode 100644 src/imio/esign/tests/test_browser_settings.py create mode 100644 src/imio/esign/tests/test_services.py diff --git a/src/imio/esign/events.py b/src/imio/esign/events.py index 1627868..e7f7eed 100644 --- a/src/imio/esign/events.py +++ b/src/imio/esign/events.py @@ -39,8 +39,7 @@ def on_categorized_annex_updated(annex, event): # check scan_id and filename if update is False: for file_info in file_infos: - if file_info and (annex.scan_id != file_info['scan_id'] or \ - annex.file.filename != file_info['filename']): + if file_info and (annex.scan_id != file_info['scan_id'] or annex.file.filename != file_info['filename']): update = True break diff --git a/src/imio/esign/testing.py b/src/imio/esign/testing.py index 1764616..344131b 100644 --- a/src/imio/esign/testing.py +++ b/src/imio/esign/testing.py @@ -1,15 +1,24 @@ # -*- coding: utf-8 -*- +from collective.iconifiedcategory.utils import calculate_category_id from imio.fpaudit import utils as _fpaudit_utils +from plone import api from plone.app.robotframework.testing import REMOTE_LIBRARY_BUNDLE_FIXTURE from plone.app.testing import applyProfile from plone.app.testing import FunctionalTesting from plone.app.testing import IntegrationTesting +from plone.app.testing import login from plone.app.testing import PLONE_FIXTURE from plone.app.testing import PloneSandboxLayer +from plone.app.testing import setRoles +from plone.app.testing import TEST_USER_ID +from plone.app.testing import TEST_USER_NAME +from plone.namedfile.file import NamedBlobFile +from plone.namedfile.file import NamedBlobImage from plone.testing import z2 from zope.globalrequest import setLocal import imio.esign # noqa: F401 +import os logged_actions = [] @@ -27,9 +36,6 @@ class ImioEsignLayer(PloneSandboxLayer): defaultBases = (PLONE_FIXTURE,) def setUpZope(self, app, configurationContext): - # Load any other ZCML that is required for your tests. - # The z3c.autoinclude feature is disabled in the Plone fixture base - # layer. import plone.app.dexterity self.loadZCML(package=plone.app.dexterity) @@ -45,23 +51,92 @@ def setUpPloneSite(self, portal): setLocal("request", portal.REQUEST) applyProfile(portal, "imio.annex:default") applyProfile(portal, "imio.esign:default") + setRoles(portal, TEST_USER_ID, ["Manager"]) + login(portal, TEST_USER_NAME) + self._setup_content_categories(portal) + self._setup_shared_content(portal) + + def _setup_content_categories(self, portal): + """ + Creates: portal/annexes_types/annexes/to_sign (ContentCategory, to_sign=True) + """ + at_folder = api.content.create( + container=portal, + id="annexes_types", + title="Annexes Types", + type="ContentCategoryConfiguration", + exclude_from_nav=True, + ) + category_group = api.content.create( + type="ContentCategoryGroup", + title="Annexes", + container=at_folder, + id="annexes", + ) + icon_path = os.path.join(os.path.dirname(__file__), "tests", u"icône1.png") + with open(icon_path, "rb") as fl: + api.content.create( + type="ContentCategory", + title="To sign", + container=category_group, + icon=NamedBlobImage(fl.read(), filename=u"icône1.png"), + id="to_sign", + predefined_title="To be signed", + to_sign=True, + show_preview=False, + ) + + def _setup_shared_content(self, portal): + """Pre-create folders and annexes shared across multiple test classes. + + Creates: + portal/folder0/annex0,2,4,6,8,10 (annex1.pdf) + portal/folder1/annex1,3,5,7,9,11 (annex2.pdf) + """ + pdf_files = ["annex1.pdf", "annex2.pdf"] + for f in range(2): + api.content.create(container=portal, type="Folder", id="folder{}".format(f), title="Folder {}".format(f)) + for i in range(12): + self._create_annex( + portal, + portal["folder{}".format(i % 2)], + "annex{}".format(i), + "Annex {}".format(i), + "0123456000000{:02d}".format(i), + pdf_filename=pdf_files[i % 2], + ) + + def _create_annex(self, portal, container, annex_id, title, scan_id, pdf_filename="annex1.pdf"): + tests_dir = os.path.join(os.path.dirname(__file__), "tests") + category_id = calculate_category_id(portal["annexes_types"]["annexes"]["to_sign"]) + with open(os.path.join(tests_dir, pdf_filename), "rb") as f: + return api.content.create( + container=container, + type="annex", + id=annex_id, + title=title, + content_category=category_id, + scan_id=scan_id, + file=NamedBlobFile( + data=f.read(), + filename=u"{}.pdf".format(annex_id), + contentType="application/pdf", + ), + ) IMIO_ESIGN_FIXTURE = ImioEsignLayer() - IMIO_ESIGN_INTEGRATION_TESTING = IntegrationTesting( bases=(IMIO_ESIGN_FIXTURE,), name="ImioEsignLayer:IntegrationTesting", ) - IMIO_ESIGN_FUNCTIONAL_TESTING = FunctionalTesting( bases=(IMIO_ESIGN_FIXTURE,), name="ImioEsignLayer:FunctionalTesting", ) - IMIO_ESIGN_ACCEPTANCE_TESTING = FunctionalTesting( bases=( IMIO_ESIGN_FIXTURE, diff --git a/src/imio/esign/tests/base.py b/src/imio/esign/tests/base.py new file mode 100644 index 0000000..f634313 --- /dev/null +++ b/src/imio/esign/tests/base.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +"""Shared base test class for imio.esign tests.""" +from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING +from plone.app.testing import login +from plone.app.testing import setRoles +from plone.app.testing import TEST_USER_ID +from plone.app.testing import TEST_USER_NAME + +import os +import unittest + + +TESTS_DIR = os.path.dirname(__file__) + + +class BaseEsignTest(unittest.TestCase): + """Base class: shared layer, minimal setUp, and optionnal helpers.""" + + layer = IMIO_ESIGN_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer["portal"] + self.request = self.layer["request"] + self.request.form.clear() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + login(self.portal, TEST_USER_NAME) diff --git "a/src/imio/esign/tests/ic\303\264ne1.png" "b/src/imio/esign/tests/ic\303\264ne1.png" new file mode 100644 index 0000000000000000000000000000000000000000..a8047778c79b6e610111e0174c7fe6ea95ee7cb1 GIT binary patch literal 3742 zcmZuz2{=@5`yNc$rLw1t$&zBmE;3^`M21i^_8A6a51EOqS+ZphQ^=AvOHoAjJu=Cj z&}2*Yl6~^=tMB`M|KI;R*Y%$Bp8LJ;=Xvhueb05BNJ9ezD-$0R003as)zQ3u_+&pC z495@e0zVEg005I3UPHrBS3?72=;>~ccfkPwI;lP!S)^W7-@i#ctS{G|?a;z_wZ(%=bdDBaGkK4Lh2RG___YVw@RQ zOXYq{lBk{Nsj>O$kuir=|B+gN;eH6P#T!KvB%6)mSXpegICZ+GZHtzNorVoX(?qei z2O_pLX$SY~Vd?3GIvtgpE07?&x^eMFLlLjxgTMr^f`-s>0F%~`ymEjD_bhF)ab-KN z^a1egMv9npxRf+kHe=2+@N`?b%VxpcrMm94RFRo%K6D@N8`;d5I>SklFeNtzm~D|J z{MjM*EDvcAjy$#-8(Om?Gh=i#lgE0`JEQPkld#D#*;02N6|48 zK+JcTi2_FIV)IXqq~T}3cG>T)R~qkNr7k76UZdP6>XSFBSRP1n2bH@&)S?kayQVuP zESK^xFQbR&9CsSWrp^TzQ(;}cd#39mOXN(om`wDP%hEGwJr`h8$n4SC%7zqEK$Yup z)mh3jvv$vOpEtPCX-C{qOZc@a*>uwc)oYlm=9`@DL=xZ`#XryCZ zm~t>G9%9E78aPA=@i&&dNY80`BVXb>XraohrUvx(J% znI#CJ$`szLCrL9IjAsa0W9Q6KD~{*NJCUwF%znm<;f{uy;bjiGc8$n*_C(d4_acQn z7w;8kYZ$^d&U^FKGFaZ*eM0{pBzv4n;~#o5jdlRTItq9dG*)qip8?U*^I4O~`Iw%& zX`FJkh6~PC-NH>dvjz|kvu;UZQ0-!%>(@pxoLSdYk4Fj7oD_6U0O!#bX_4aD^3J@D z)k(NDLw7tP8R-)~OO`MH+Me1|9QOwd6ezwjg-Y@(f0W~}p(r$8P!0HntF?B(Ut*jK0b>f?g zlN_rdCz`$1uy+L;m;(6&&rQV%bTo>_GnxUNLrw@j1PQqanXvOQ%rj6!PqxS!1s5W{ z!Qt$8+AOeQ&mWAcz0u_tF-YRj4Hx3RE+m#-6>OsG1hy1`iPQ@wbt~B+JbBQnR>(4; zERePEi9{8{SG3qS6{2%QN5$_^g3&y+aRhqtv169S+D7T)gOvTl1crM zI^A8GRFdGFvY6^8j7%O$C8SzIvP7)~1F!zd->;!HYF2GF4$iH(k*!UsYJr96YLRkM zu=lYfY$W#H7rAyL;WvrFiPfi&MXC!GAB3L@@~ICzz4M`_>ROf3N8Crrvj~N}y@8~K z_6>or^&5;hVVwEwg<1YtzS+vR*N?YI+HA{Q=Y1NJ6jK)y_d|B>HF=smE?+ASNyG``H0HE71SyFn7mx{=Ml;Vs>fSa)CrYJa3`|R%%;ViE_v_sO&p!nFujgP^-46HYTVe^`I!Ib-B@M zHryzyG%K$W(P$p{w&^|SqfNBhqvpcwsO&~eCMK6+d%ifKr2k^BS?-mN%eJp>MXkf9 z%|$9AGjtPmW4o^zj}A_n*wm7%$Slrtjv}%pa&?_K6`#=PXWGM06kCuj z_=G7l%CT5-?t1Y+#ATm}EZwOwB?b@)Knbjtrbb(xPF&ZAE<|)JmmU{EyBpE2HH5gP z<)@XZrI(5lM_0~h$C$-{@?l&8XE^<5vjTZ(4&%QmcE6J~)UNp%r>iSr$o=400_w{iOkz|_zeq5Ybv zZI+R?x@EeYy3@2swU?i7zp(xuJX4e$pPVZuQ`BGd4j)>zT{WOCjESnaWGMp^t%tVC z=1OYRzQqr%n4Zp2?N89Z;DxrYOMCE3|EErj&?Dh{Xy!_%7n2SMT-IBo-jj11Go%@} zp0nLQy7vO)?Bxiz9U6Roh3_}ZtvIulh7FfljK4M@ST@&Eel{$|xq@7E#@2eYdtz~C zrre??2aTQ>RW#T*R<6!qSQf4cc}kMP-tf$(W!q4ozV9gDUadtdh1vUIlhd(QXjx|>T4H!$Y6 zY);_g?Bc4V3t!YJTWzC5pmm_;vd($}84-qf+Sw9!d+5sGh(e)4vV2QyV#o98 zTb=II&p$Q%PM0hg%H+P@23{6aT>sL!EwGx&qC9pd%D;F)l$y2q>b+T%S@!U!;mB8c z@_0Xezo~_wfsC_#XC+h~9o*fB*-R%@zID(eC-iYXex#U12>7nv5w;#3$sM6|(;Hq} zHS-~(tfFkPCpfj-YW=lm(RlsdzG-TSLz$xG75@zX`sHwH?C`m|%!k%X0r-WNtFHA6 z9+lLG)1?m@j``2;TJAw7H*5W89~@+yp^HAgy8>zOJFxl$TkxNx_R|-guu|#uy-ana z+AdA!h%zBBW)`cIQirxj?2GC*mY$5*kSp&y#D&*rKo3%O%8{+d$VKE!+Ev=$5Tyum z%udXN@AHlJwVs&Nf$6YDv0tyYQu<4>0tkEc4%qItHRt}tbd{@8|ff~d)o29?cVYWAM(MrXpp5^@`vk`=2$xXZZzohagJKR2}%`eRR82_10wPs!+^ zgZ2Udcx{dbjs1j79RNTJ#hakKQP=d9Y~5WU7&~_?4nlPGIAj9=%0#8ZsVmMK10uS* zxOpiNVc@?QN{90!7zzgcMe%lqfl=2CK^pF!IFKAf79t6TGl4)LWluYMrR$p7f8>X6 zFtDSyw}%oGN+1v*1R03Crvp?qfmw(we)W-{=I*Hp@$m? zXF7EA@52CRx<|KM4gk<}>1wK(Tt10Qg6W+#Vh<;0PF%Pz6c!Q^_OKcFAzud@#o)um z?M^{fR9D=;76tr{{E)}Yn8y*rP8-4)l93{CpS=zI$xngowYdZy3Et|vBD2~%CWFwZ zaX+mnck|(_f|_!wmuu-_)LCXV$gln?Hu%VGs%6(ZbPuVl9F(2TBYbMQGv~s&ftmsz=g+lZWW9&&XpG<`JO_5DyT?gb{k*Fyg0LPY_L#x z7^_rIq1=_^d2A1F8nH&D8h#X%TZ{muC`-_E}u^;US1@9^* z2#3yitnIkl2(Fs&3l=NAGF^+|Emdky86+1PWRDvuW;hA cmu6G})!Z__=dHwhM<2bemVsuOx=qNx0DLUVrvLx| literal 0 HcmV?d00001 diff --git a/src/imio/esign/tests/test_actions.py b/src/imio/esign/tests/test_actions.py index 65b097e..71fb416 100644 --- a/src/imio/esign/tests/test_actions.py +++ b/src/imio/esign/tests/test_actions.py @@ -1,29 +1,23 @@ # -*- coding: utf-8 -*- """actions tests for this package.""" from AccessControl import Unauthorized -from collective.iconifiedcategory.utils import calculate_category_id +from imio.esign.browser.actions import RemoveFromSessionView +from imio.esign.browser.actions import RemoveItemFromSessionView from imio.esign.browser.actions import SessionAnnotationInfoView -from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING +from imio.esign.tests.base import BaseEsignTest from imio.esign.utils import add_files_to_session from imio.esign.utils import get_session_annotation from plone import api from plone.app.testing import login from plone.app.testing import setRoles from plone.app.testing import TEST_USER_ID -from plone.namedfile.file import NamedBlobFile -from plone.namedfile.file import NamedBlobImage -from Products.statusmessages.interfaces import IStatusMessage -from zope.component import getMultiAdapter - -import collective.iconifiedcategory -import os -import unittest try: from html import unescape except ImportError: # Python 2 from HTMLParser import HTMLParser + unescape = HTMLParser().unescape try: @@ -32,91 +26,35 @@ string_types = (str,) -class BaseRemoveFromSession(unittest.TestCase): - """Base class to centralize setUp.""" - - layer = IMIO_ESIGN_INTEGRATION_TESTING - - def _setup_categories(self): - at_folder = api.content.create( - container=self.portal, - id="annexes_types", - title="Annexes Types", - type="ContentCategoryConfiguration", - exclude_from_nav=True, - ) - category_group = api.content.create( - type="ContentCategoryGroup", - title="Annexes", - container=at_folder, - id="annexes", - ) - icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png") - with open(icon_path, "rb") as fl: - api.content.create( - type="ContentCategory", - title="To sign", - container=category_group, - icon=NamedBlobImage(fl.read(), filename=u"icône1.png"), - id="to_sign", - predefined_title="To be signed", - to_sign=True, - show_preview=False, - ) +class TestRemoveItemFromSessionView(BaseEsignTest): + """Tests for RemoveItemFromSessionView browser view.""" def setUp(self): - self.portal = self.layer["portal"] - self.request = self.portal.REQUEST - setRoles(self.portal, TEST_USER_ID, ["Manager"]) - self._setup_categories() - # add users and annexes - api.user.create(email="user1@sign.com", username="user1", password="password1") - self.folder = api.content.create( - container=self.portal, type="Folder", id="test_folder", title="Test Folder" - ) - tests_dir = os.path.dirname(__file__) - self.annexes = [] - for i in range(3): - with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f: - annex = api.content.create( - container=self.folder, - type="annex", - id="annex{}".format(i), - title="Annex {}".format(i), - content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]), - scan_id="0123456000000{:02d}".format(i), - file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"), - ) - self.annexes.append(annex) + super(TestRemoveItemFromSessionView, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + self.folder = self.portal["folder0"] + self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2, 4)] self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - - -class TestRemoveItemFromSessionView(BaseRemoveFromSession): - """Test RemoveItemFromSessionView browser view.""" + self.view = RemoveItemFromSessionView(self.annexes[0], self.request) def test_available(self): - """Test available method returns True.""" - view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session") - self.assertFalse(view.available()) - uids = [a.UID() for a in self.annexes] - add_files_to_session(self.signers, uids) - self.assertTrue(view.available()) + """Returns False when annex not in session; True once added.""" + self.assertFalse(self.view.available()) + add_files_to_session(self.signers, [a.UID() for a in self.annexes]) + self.assertTrue(self.view.available()) - def test_index_removes_file_from_session(self): - """Test index() removes the file from the esign session.""" + def test_index(self): + """Removes file from session; removes entire session when last file is removed.""" annot = get_session_annotation() - # Add files to a session uids = [a.UID() for a in self.annexes] + + # --- remove one file from a multi-file session --- add_files_to_session(self.signers, uids) self.assertEqual(len(annot["sessions"]), 1) self.assertEqual(len(annot["uids"]), 3) session = annot["sessions"][0] self.assertEqual(len(session["files"]), 3) - - # Remove one file via the view - view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session") - view.index() - # The file should be removed from the session + self.view.index() self.assertNotIn(uids[0], annot["uids"]) self.assertEqual(len(annot["uids"]), 2) self.assertEqual(len(session["files"]), 2) @@ -125,85 +63,67 @@ def test_index_removes_file_from_session(self): self.assertIn(uids[1], remaining_uids) self.assertIn(uids[2], remaining_uids) - def test_index_removes_last_file_removes_session(self): - """Test removing the last file from a session also removes the session.""" - annot = get_session_annotation() - uids = [self.annexes[0].UID()] - add_files_to_session(self.signers, uids) - self.assertEqual(len(annot["sessions"]), 1) - - view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session") - view.index() - self.assertEqual(len(annot["sessions"]), 0) - self.assertEqual(len(annot["uids"]), 0) + # --- removing the last file removes the session entirely --- + add_files_to_session(self.signers, [uids[0]], discriminators=("single",)) + session_count = len(annot["sessions"]) + self.view.index() + self.assertEqual(len(annot["sessions"]), session_count - 1) + self.assertNotIn(uids[0], annot["uids"]) - def test_finished_shows_message_and_redirects(self): - """Test _finished sets a status message and redirects.""" - self.request.environ['HTTP_REFERER'] = self.annexes[0].absolute_url() - view = getMultiAdapter((self.annexes[0], self.request), name="remove-item-from-esign-session") - view._finished() - messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) - self.assertIn("removed from session", messages[0].message) + def test_finished(self): + """Redirects to referrer.""" + self.request.environ["HTTP_REFERER"] = self.annexes[0].absolute_url() + self.view._finished() self.assertEqual(self.request.RESPONSE.getHeader("location"), self.annexes[0].absolute_url()) -class TestRemoveFromSessionView(BaseRemoveFromSession): - """Test RemoveFromSessionView browser view.""" +class TestRemoveFromSessionView(BaseEsignTest): + """Tests for RemoveFromSessionView browser view.""" + + def setUp(self): + super(TestRemoveFromSessionView, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + self.folder = self.portal["folder0"] + self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2, 4)] + self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")] def test_available(self): - """Test available method returns True.""" + """False on the annex itself; True on parent folder once session exists; False again after removal.""" annot = get_session_annotation() self.assertFalse(annot["sessions"]) - # only available on "context_uid" annex = self.annexes[0] - view = getMultiAdapter((annex, self.request), name="remove-from-esign-session") + # View on the annex itself is never available (must be on parent/context) + view = RemoveFromSessionView(self.annexes[0], self.request) self.assertFalse(view.available()) add_files_to_session(self.signers, [annex.UID()]) self.assertTrue(annot["sessions"]) self.assertFalse(view.available()) - # available on parent - folder = annex.aq_parent - view = getMultiAdapter((folder, self.request), name="remove-from-esign-session") - self.assertTrue(view.available()) - # call the view so context is removed so no more session - view() + # Available on parent container once a session exists + folder_view = RemoveFromSessionView(annex.aq_parent, self.request) + self.assertTrue(folder_view.available()) + # Calling the view clears the session + folder_view.index() self.assertFalse(annot["sessions"]) - self.assertFalse(view.available()) + self.assertFalse(folder_view.available()) -class TestSessionAnnotationInfoView(BaseRemoveFromSession): - """Test SessionAnnotationInfoView""" +class TestSessionAnnotationInfoView(BaseEsignTest): + """Tests for SessionAnnotationInfoView.""" def setUp(self): - self.portal = self.layer["portal"] - self.request = self.portal.REQUEST - setRoles(self.portal, TEST_USER_ID, ["Manager"]) - self._setup_categories() - self.folder = api.content.create( - container=self.portal, type="Folder", id="test_session_folder", title="Test Session Folder" - ) - tests_dir = os.path.dirname(__file__) - self.annexes = [] - for i in range(2): - with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f: - annex = api.content.create( - container=self.folder, - type="annex", - id="annex{}".format(i), - title=u"Annex {}".format(i), - content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]), - scan_id="012345600000{:02d}".format(i), - file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"), - ) - self.annexes.append(annex) + super(TestSessionAnnotationInfoView, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + api.user.create(email="user2@sign.com", username="user2", password="password2") # noqa: S106 + self.folder = self.portal["folder0"] + self.annexes = [self.portal["folder0"]["annex{}".format(i)] for i in (0, 2)] self.signers = [ ("user1", "user1@sign.com", u"User 1", u"Position 1"), ("user2", "user2@sign.com", u"User 2", u"Position 2"), ] - self.view = SessionAnnotationInfoView(self.folder, self.portal.REQUEST) + self.view = SessionAnnotationInfoView(self.folder, self.request) def test_call(self): + """Raises Unauthorized for non-Manager; returns HTML string for admin.""" setRoles(self.portal, TEST_USER_ID, ["Member"]) with self.assertRaises(Unauthorized): self.view() @@ -211,71 +131,56 @@ def test_call(self): self.assertIsInstance(self.view(), string_types) def test_render_value(self): - # Dict + """Renders dicts, lists, tuples, and strings to escaped HTML.""" + # Dict: empty and with content self.assertEqual(self.view._render_value({}), u"{}") self.assertEqual( self.view._render_value({"key": "val"}), u"{\n 'key': 'val',\n}", ) - - # Indentation: nested value increases indent level + # Nested value increases indent level self.assertEqual( self.view._render_value({"key": ["a"]}), u"{\n 'key': [\n 'a',\n ],\n}", ) - - # List + # List: empty and with items self.assertEqual(self.view._render_value([]), u"[]") self.assertEqual( self.view._render_value(["a", "b"]), u"[\n 'a',\n 'b',\n]", ) - - # Tuple + # Tuple treated like list self.assertEqual(self.view._render_value(()), u"[]") - - # String + # String rendered with u'' prefix self.assertEqual(self.view._render_value(u"hello"), u"u'hello'") def test_uid_to_link(self): + """Returns clickable link for known UID; span with 'not found' title for unknown.""" uid = self.folder.UID() - result = self.view._uid_to_link(uid) self.assertEqual( - result, - u"Test Session Folder", + self.view._uid_to_link(uid), + u"Folder 0", ) - - result = self.view._uid_to_link(u"a" * 32) self.assertEqual( - result, + self.view._uid_to_link(u"a" * 32), u"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", ) def test_esign_sessions(self): + """Returns all sessions; supports session_id and context_uid filters; renders HTML correctly.""" uids = [a.UID() for a in self.annexes] add_files_to_session(self.signers, uids, title=u"[ia.parapheo] Session {sign_id}") - # Add a second session with a separate folder/annex - folder2 = api.content.create( - container=self.portal, type="Folder", id="test_session_folder2", title="Test Session Folder 2" + # Add a second session with a different folder and annex + folder2 = self.portal["folder1"] + annex2 = folder2["annex1"] + add_files_to_session( + self.signers, [annex2.UID()], title=u"[ia.parapheo] Session {sign_id}", discriminators=(u"second",) ) - tests_dir = os.path.dirname(__file__) - with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f: - annex2 = api.content.create( - container=folder2, - type="annex", - id="annex2", - title=u"Annex 2", - content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]), - scan_id="01234560000002", - file=NamedBlobFile(data=f.read(), filename=u"annex2.pdf", contentType="application/pdf"), - ) - add_files_to_session(self.signers, [annex2.UID()], title=u"[ia.parapheo] Session {sign_id}", - discriminators=(u"second",)) view = SessionAnnotationInfoView(self.folder, self.portal.REQUEST) - # No filter params — returns all sessions + # No filter — returns all sessions esign_sessions = view.esign_sessions self.assertEqual(len(esign_sessions), 2) esign_session = esign_sessions[0] @@ -304,7 +209,7 @@ def test_esign_sessions(self): self.assertEqual(len(view.esign_sessions), 0) del self.portal.REQUEST.form["context_uid"] - # Test rendered HTML + # Rendered HTML for first session self.assertEqual( unescape(view.esign_session_html(esign_session[1])), u"""{{ @@ -313,20 +218,20 @@ def test_esign_sessions(self): 'discriminators': [], 'files': [ {{ - 'context_uid': Test Session Folder, + 'context_uid': Folder 0, 'filename': u'annex0.pdf', - 'scan_id': '01234560000000', + 'scan_id': '012345600000000', 'status': '', 'title': u'Annex 0', - 'uid': Annex 0, + 'uid': Annex 0, }}, {{ - 'context_uid': Test Session Folder, - 'filename': u'annex1.pdf', - 'scan_id': '01234560000001', + 'context_uid': Folder 0, + 'filename': u'annex2.pdf', + 'scan_id': '012345600000002', 'status': '', - 'title': u'Annex 1', - 'uid': Annex 1, + 'title': u'Annex 2', + 'uid': Annex 2, }}, ], 'last_update': {}, @@ -355,6 +260,6 @@ def test_esign_sessions(self): 'title': u'[ia.parapheo] Session 012345600000', 'watchers': [], }}""".format( - repr(esign_session[1]['last_update']), + repr(esign_session[1]["last_update"]), ), ) diff --git a/src/imio/esign/tests/test_browser_settings.py b/src/imio/esign/tests/test_browser_settings.py new file mode 100644 index 0000000..bd721c6 --- /dev/null +++ b/src/imio/esign/tests/test_browser_settings.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +"""browser/settings tests for this package.""" +from imio.esign.browser.settings import validate_vat_number +from zope.interface import Invalid + +import unittest + + +class TestValidateVatNumber(unittest.TestCase): + + def test_validate_vat_number(self): + """validate_vat_number: returns True for valid/empty, raises Invalid for bad format or checksum.""" + # empty values are accepted (required constraint is enforced by the field, not the validator) + self.assertTrue(validate_vat_number(u"")) + self.assertTrue(validate_vat_number(None)) + + # must start with BE + with self.assertRaises(Invalid): + validate_vat_number(u"NL0202239951") + + # must be exactly 12 characters + with self.assertRaises(Invalid): + validate_vat_number(u"BE020223995") # 11 chars + with self.assertRaises(Invalid): + validate_vat_number(u"BE02022399510") # 13 chars + + # only digits allowed after BE + with self.assertRaises(Invalid): + validate_vat_number(u"BE020223995X") + + # bad checksum: 97 - (2022399 % 97) = 51, not 99 + with self.assertRaises(Invalid): + validate_vat_number(u"BE0202239999") + + # valid: 97 - (2022399 % 97) = 97 - 46 = 51 + self.assertTrue(validate_vat_number(u"BE0202239951")) diff --git a/src/imio/esign/tests/test_browser_views.py b/src/imio/esign/tests/test_browser_views.py index bfd38f0..d04e870 100644 --- a/src/imio/esign/tests/test_browser_views.py +++ b/src/imio/esign/tests/test_browser_views.py @@ -2,17 +2,16 @@ """Browser views tests for this package.""" from AccessControl import Unauthorized from collections import OrderedDict -from collective.iconifiedcategory.utils import calculate_category_id from datetime import datetime from datetime import timedelta from imio.esign.browser.views import DownloadFileView from imio.esign.browser.views import ExternalSessionCreateView +from imio.esign.browser.views import FacetedSessionInfoViewlet from imio.esign.browser.views import ItemSessionInfoViewlet from imio.esign.browser.views import SessionDeleteView from imio.esign.browser.views import SigningUsersCsv from imio.esign.config import set_esign_registry_signing_users_email_content -from imio.esign.testing import IMIO_ESIGN_FUNCTIONAL_TESTING -from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING +from imio.esign.tests.base import BaseEsignTest from imio.esign.utils import add_files_to_session from imio.esign.utils import get_session_annotation from imio.pyutils.utils import shortuid_encode_id @@ -22,374 +21,214 @@ from plone.app.testing import logout from plone.app.testing import setRoles from plone.app.testing import TEST_USER_ID -from plone.namedfile.file import NamedBlobFile -from plone.namedfile.file import NamedBlobImage from plone.testing import z2 +from Products.statusmessages import STATUSMESSAGEKEY from Products.statusmessages.interfaces import IStatusMessage +from zope.annotation.interfaces import IAnnotations -import collective.iconifiedcategory import json -import os -import transaction import unittest -class _BaseSessionViewTest(unittest.TestCase): - """Base test class with shared setUp for session view tests.""" +def _clear_status_messages(request): + """Clear status messages from request annotations (needed after redirects since show() skips clearing on 3xx).""" + annotations = IAnnotations(request) + annotations[STATUSMESSAGEKEY] = None + request.response.expireCookie(STATUSMESSAGEKEY, path="/") - layer = IMIO_ESIGN_INTEGRATION_TESTING - def setUp(self): - self.portal = self.layer["portal"] - self.request = self.layer["request"] - self.request.form.clear() - setRoles(self.portal, TEST_USER_ID, ["Manager"]) +class TestSessionDeleteView(BaseEsignTest): + """Tests for SessionDeleteView.""" - # Create user for signing + def setUp(self): + super(TestSessionDeleteView, self).setUp() api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 - - # Create content category configuration - at_folder = api.content.create( - container=self.portal, - id="annexes_types", - title="Annexes Types", - type="ContentCategoryConfiguration", - exclude_from_nav=True, - ) - category_group = api.content.create( - type="ContentCategoryGroup", - title="Annexes", - container=at_folder, - id="annexes", - ) - icon_path = os.path.join( - os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png" - ) - with open(icon_path, "rb") as fl: - api.content.create( - type="ContentCategory", - title="To sign", - container=category_group, - icon=NamedBlobImage(fl.read(), filename=u"icône1.png"), - id="to_sign", - predefined_title="To be signed", - to_sign=True, - show_preview=False, - ) - - # Create folder and annex - self.folder = api.content.create( - container=self.portal, - type="Folder", - id="test_folder", - title="Test Folder", - ) - tests_dir = os.path.dirname(__file__) - with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f: - annex = api.content.create( - container=self.folder, - type="annex", - id="test_annex", - title="Test Annex", - content_category=calculate_category_id( - self.portal["annexes_types"]["annexes"]["to_sign"] - ), - scan_id="012345600000001", - file=NamedBlobFile( - data=f.read(), - filename=u"annex1.pdf", - contentType="application/pdf", - ), - ) - self.annex_uid = annex.UID() - - # Seed a session in the annotation + self.folder = self.portal["folder0"] + annex = self.portal["folder0"]["annex0"] signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - self.session_id, _ = add_files_to_session(signers, (self.annex_uid,)) - - -class TestSessionDeleteView(_BaseSessionViewTest): - """Test SessionDeleteView browser view.""" + self.session_id, _session = add_files_to_session(signers, (annex.UID(),)) + self.view = SessionDeleteView(self.folder, self.request) def test_may_delete_session(self): - """Manager role grants may_delete_session.""" - view = SessionDeleteView(self.folder, self.request) - self.assertTrue(view.may_delete_session()) + """Manager: True. Member-only: False and raises Unauthorized on call.""" + # --- Manager: allowed --- + self.assertTrue(self.view.may_delete_session()) - def test_may_delete_session_no_permission(self): - """Member-only role denies may_delete_session.""" + # --- Member-only: denied --- setRoles(self.portal, TEST_USER_ID, ["Member"]) - view = SessionDeleteView(self.folder, self.request) - self.assertFalse(view.may_delete_session()) + self.assertFalse(self.view.may_delete_session()) with self.assertRaises(Unauthorized): - view() + self.view() - def test_call_no_session_id(self): - """Missing esign_session_id produces an error message and redirects to context URL.""" - view = SessionDeleteView(self.folder, self.request) - view() - messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) - self.assertIn("No session ID provided!", messages[0].message) - self.assertEqual(messages[0].type, "error") - location = self.request.RESPONSE.getHeader("location") - self.assertEqual(location, self.folder.absolute_url()) + def test_call(self): + """Missing id → error + context URL; valid id → success + session removed; unknown id → error.""" + annot = get_session_annotation() - def test_call_session_exists(self): - """Valid esign_session_id removes the session and shows a success message.""" - self.request.form["esign_session_id"] = str(self.session_id) - view = SessionDeleteView(self.folder, self.request) - view() - messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) - self.assertIn("Session deleted successfully!", messages[0].message) - self.assertEqual(messages[0].type, "info") - location = self.request.RESPONSE.getHeader("location") - self.assertIn("@@parapheo", location) - annotation = get_session_annotation() - self.assertNotIn(self.session_id, annotation["sessions"]) + # --- no session id --- + self.view() + self.assertEqual(len(annot["sessions"]), 1) + self.assertIn(self.session_id, annot["sessions"]) - def test_call_session_not_found(self): - """Non-existent esign_session_id shows an error and redirects to @@parapheo.""" + # --- unknown session id --- self.request.form["esign_session_id"] = "9999" - view = SessionDeleteView(self.folder, self.request) - view() - messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) - self.assertIn("Session not found!", messages[0].message) - self.assertEqual(messages[0].type, "error") - location = self.request.RESPONSE.getHeader("location") - self.assertIn("@@parapheo", location) + self.view() + self.assertEqual(len(annot["sessions"]), 1) + self.assertIn(self.session_id, annot["sessions"]) + # --- valid session id --- + self.request.other.pop("esign_session_id", None) + self.request.form["esign_session_id"] = str(self.session_id) + self.view() + self.assertNotIn(self.session_id, annot["sessions"]) -class TestExternalSessionCreateView(_BaseSessionViewTest): - """Test ExternalSessionCreateView browser view.""" + +class TestExternalSessionCreateView(BaseEsignTest): + """Tests for ExternalSessionCreateView.""" + + def setUp(self): + super(TestExternalSessionCreateView, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + self.folder = self.portal["folder0"] + annex = self.portal["folder0"]["annex0"] + signers = [("user1", "user1@sign.com", "User 1", "Position 1")] + self.session_id, _session = add_files_to_session(signers, (annex.UID(),)) + self.view = ExternalSessionCreateView(self.folder, self.request) def test_may_create_external_sessions(self): - """Manager role grants may_create_external_sessions.""" - view = ExternalSessionCreateView(self.folder, self.request) - self.assertTrue(view.may_create_external_sessions()) + """Manager: True. Member-only: False and raises Unauthorized on call.""" + # --- Manager --- + self.assertTrue(self.view.may_create_external_sessions()) - def test_call_unauthorized(self): - """Calling view without permission raises Unauthorized.""" + # --- Member-only --- setRoles(self.portal, TEST_USER_ID, ["Member"]) - view = ExternalSessionCreateView(self.folder, self.request) - self.assertFalse(view.may_create_external_sessions()) + self.assertFalse(self.view.may_create_external_sessions()) with self.assertRaises(Unauthorized): - view() + self.view() + + def test_call(self): + """Missing id → error; sentinel strings → specific errors; 200 → success; non-200 → error. - def test_call_no_session_id(self): - """Missing session_id produces an error message and returns URL with @@parapheo.""" - view = ExternalSessionCreateView(self.folder, self.request) - result = view() + create_external_session mocked: HTTP tested in test_utils.py. + """ + # --- no session id --- + result = self.view() messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("No session ID provided!", messages[0].message) self.assertEqual(messages[0].type, "error") - self.assertIn("@@parapheo", result) + self.assertEqual("http://nohost/plone/folder0/@@parapheo", result) + _clear_status_messages(self.request) - def test_call_session_not_found(self): - """create_external_session returning _session_not_found_ shows an error.""" self.request.form["session_id"] = str(self.session_id) - view = ExternalSessionCreateView(self.folder, self.request) - with patch("imio.esign.browser.views.create_external_session") as mock_create: - mock_create.return_value = "_session_not_found_" - result = view() + + def _run(return_value): + _clear_status_messages(self.request) + with patch("imio.esign.browser.views.create_external_session", return_value=return_value): + return ExternalSessionCreateView(self.folder, self.request)() + + # --- session not found sentinel --- + _run("_session_not_found_") messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("doesn't exist anymore", messages[0].message) self.assertEqual(messages[0].type, "error") - self.assertIn("@@parapheo", result) - def test_call_no_seal_code(self): - """create_external_session returning _no_seal_code_ shows an error.""" - self.request.form["session_id"] = str(self.session_id) - view = ExternalSessionCreateView(self.folder, self.request) - with patch("imio.esign.browser.views.create_external_session") as mock_create: - mock_create.return_value = "_no_seal_code_" - result = view() + # --- no seal code sentinel --- + _run("_no_seal_code_") messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("No seal code", messages[0].message) - self.assertEqual(messages[0].type, "error") - self.assertIn("@@parapheo", result) - def test_call_no_seal_email(self): - """create_external_session returning _no_seal_email_ shows an error.""" - self.request.form["session_id"] = str(self.session_id) - view = ExternalSessionCreateView(self.folder, self.request) - with patch("imio.esign.browser.views.create_external_session") as mock_create: - mock_create.return_value = "_no_seal_email_" - result = view() + # --- no seal email sentinel --- + _run("_no_seal_email_") messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("No seal email", messages[0].message) - self.assertEqual(messages[0].type, "error") - self.assertIn("@@parapheo", result) - def test_call_success(self): - """create_external_session returning a 200 response shows a success message.""" - self.request.form["session_id"] = str(self.session_id) - view = ExternalSessionCreateView(self.folder, self.request) - mock_response = Mock() - mock_response.status_code = 200 - with patch("imio.esign.browser.views.create_external_session") as mock_create: - mock_create.return_value = mock_response - result = view() + # --- no files sentinel --- + _run("_no_files_") + messages = IStatusMessage(self.request).show() + self.assertIn("No files", messages[0].message) + + # --- 200 success --- + mock_ok = Mock() + mock_ok.status_code = 200 + result = _run(mock_ok) messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("External session sent successfully!", messages[0].message) self.assertEqual(messages[0].type, "info") - self.assertIn("@@parapheo", result) - - def test_call_error_response(self): - """create_external_session returning a non-200 response shows the status details.""" - self.request.form["session_id"] = str(self.session_id) - view = ExternalSessionCreateView(self.folder, self.request) - mock_response = Mock() - mock_response.status_code = 500 - mock_response.reason = "Server Error" - mock_response.text = "oops" - with patch("imio.esign.browser.views.create_external_session") as mock_create: - mock_create.return_value = mock_response - result = view() + self.assertEqual("http://nohost/plone/folder0/@@parapheo", result) + + # --- non-200 error --- + mock_err = Mock() + mock_err.status_code = 500 + mock_err.reason = "Server Error" + mock_err.text = "oops" + result = _run(mock_err) messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("500", messages[0].message) self.assertEqual(messages[0].type, "error") - self.assertIn("@@parapheo", result) + self.assertEqual("http://nohost/plone/folder0/@@parapheo", result) @unittest.skip("Test skipped") -class TestDownloadFileView(unittest.TestCase): - """Test DownloadFileView browser view.""" - - layer = IMIO_ESIGN_FUNCTIONAL_TESTING +class TestDownloadFileView(BaseEsignTest): + """Tests for DownloadFileView.""" def setUp(self): - """Set up test fixtures.""" + super(TestDownloadFileView, self).setUp() self.app = self.layer["app"] - self.portal = self.layer["portal"] - self.request = self.layer["request"] - setRoles(self.portal, TEST_USER_ID, ["Manager"]) - - # Setup content category configuration (like in test_utils.py) - at_folder = api.content.create( - container=self.portal, - id="annexes_types", - title="Annexes Types", - type="ContentCategoryConfiguration", - exclude_from_nav=True, - ) - category_group = api.content.create( - type="ContentCategoryGroup", - title="Annexes", - container=at_folder, - id="annexes", - ) - icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png") - with open(icon_path, "rb") as fl: - api.content.create( - type="ContentCategory", - title="To sign", - container=category_group, - icon=NamedBlobImage(fl.read(), filename=u"icône1.png"), - id="to_sign", - predefined_title="To be signed", - to_sign=True, - show_preview=False, - ) - - # Create a folder to hold test annexes - self.folder = api.content.create( - container=self.portal, - type="Folder", - id="test_folder", - title="Test Folder", - ) - - # Create test annexes with NamedBlobFile (Plone 4.3 Archetypes) - tests_dir = os.path.dirname(__file__) - pdf_file = "annex1.pdf" - with open(os.path.join(tests_dir, pdf_file), "rb") as f: - file_data = f.read() - self.test_annex = api.content.create( - container=self.folder, - type="annex", - id="test_annex", - title="Test Annex", - content_category="to_sign", - file=NamedBlobFile( - data=file_data, - filename=u"test_document__uid.pdf", - contentType="application/pdf" - ), - ) - + self.folder = self.portal["folder0"] + self.test_annex = self.folder["annex0"] self.file_uid = self.test_annex.UID() self.encoded_uid = shortuid_encode_id(self.file_uid, separator="-", block_size=5) + self.view = DownloadFileView(self.portal, self.request) + + def test_download_file(self): + """Missing id, invalid id, unknown id, no file attr, expired download, valid download, traversal.""" + logout() + self.assertIsNone(self.view.file_id) + self.assertEqual(self.view.shortuid_separator, "-") + self.assertEqual(self.view.named_blob_file_attribute, "file") + + # --- no file id --- + self.assertIn("A file identifier must be passed in the url", self.view()) + + # --- invalid format --- + self.view.file_id = "$$$" + self.assertIn("This file identifier is not correct", self.view()) + + # --- valid format but non-existent UID --- + self.view.file_id = "aabbccddee" + self.assertIn("The corresponding file identifier cannot be retrieved", self.view()) - # Commit transaction for functional testing - transaction.commit() - - def test_download_file_view(self): - """Test DownloadFileView with various scenarios.""" - logout() # anonymous usage - - # View exists and can be instantiated - view = api.content.get_view("download-file", self.portal, self.request) - self.assertIsInstance(view, DownloadFileView) - view = DownloadFileView(self.portal, self.request) - self.assertEqual(view.file_id, None) - self.assertEqual(view.shortuid_separator, "-") - self.assertEqual(view.named_blob_file_attribute, "file") - - # Download file without UID - view = DownloadFileView(self.portal, self.request) - result = view() - self.assertIn("A file identifier must be passed in the url", result) - # invalid UID format - view.file_id = "$$$" - result = view() - self.assertIn("This file identifier is not correct", result) - # valid format but non-existent UID - view.file_id = "aabbccddee" - result = view() - self.assertIn("The corresponding file identifier cannot be retrieved", result) - # download from object without file attribute + # --- object has no file attribute --- folder_uid = self.folder.UID() encoded_folder_uid = shortuid_encode_id(folder_uid, separator="-", block_size=5) - view.file_id = encoded_folder_uid - result = view() - self.assertIn("The corresponding file content cannot be retrieved", result) + self.view.file_id = encoded_folder_uid + self.assertIn("The corresponding file content cannot be retrieved", self.view()) - # valid id but file too old - view.file_id = self.encoded_uid - view.download_time_delta = timedelta(days=1) + # --- expired download --- + self.view.file_id = self.encoded_uid + self.view.download_time_delta = timedelta(days=1) self.test_annex.setModificationDate(datetime.now() - timedelta(days=3)) - result = view() - self.assertIn("The download period for this file has expired", result) - view.download_time_delta = None # Disable date verification - result = view() + self.assertIn("The download period for this file has expired", self.view()) + + # --- date check disabled --- + self.view.download_time_delta = None + result = self.view() self.assertNotIn("The download period for this file has expired", result) self.assertIsInstance(result, str) - # Download file with valid UID - view.file_id = self.encoded_uid - view.download_time_delta = timedelta(days=7) - result = view() - # Check that we got binary data (the file content) - self.assertIsInstance(result, str) # In Python 2, binary data is str + # --- valid download --- + self.view.download_time_delta = timedelta(days=7) + result = self.view() + self.assertIsInstance(result, str) self.assertTrue(len(result) > 0) self.assertTrue(result.startswith(b"%PDF") or result.startswith("%PDF")) - # Check response headers response = self.request.RESPONSE self.assertIn("application/pdf", response.getHeader("Content-Type")) self.assertIn("inline", response.getHeader("Content-Disposition")) - self.assertIn("test_document.pdf", response.getHeader("Content-Disposition")) + self.assertIn("annex0.pdf", response.getHeader("Content-Disposition")) self.assertTrue(int(response.getHeader("Content-Length")) > 0) - # Test URL traversal mechanism + # --- URL traversal --- browser = z2.Browser(self.app) portal_url = self.portal.absolute_url() browser.open("{}/download-file/{}".format(portal_url, "aabbccddee")) @@ -400,19 +239,15 @@ def test_download_file_view(self): self.assertIn("The corresponding file identifier cannot be retrieved (aabbccddee)", browser.contents) -class TestSigningUsersCsv(unittest.TestCase): - """Tests for the SigningUsersCsv view.""" - - layer = IMIO_ESIGN_INTEGRATION_TESTING +class TestSigningUsersCsv(BaseEsignTest): + """Tests for SigningUsersCsv.""" def setUp(self): - self.portal = self.layer["portal"] - self.request = self.layer["request"] - self.request.form.clear() - setRoles(self.portal, TEST_USER_ID, ["Manager"]) + super(TestSigningUsersCsv, self).setUp() self.user = api.user.create(email="signer@test.com", username="signer_user", password="password1") # noqa: S106 + self.view = SigningUsersCsv(self.portal, self.request) - def _make_user_data(self, user): + def _user_data(self, user): return { "userid": user.getId(), "email": user.getProperty("email", ""), @@ -421,44 +256,41 @@ def _make_user_data(self, user): "fullname": user.getProperty("fullname", ""), } - # --- filter_user --- + def test_filter_user(self): + """No group → False; non-watchers group → False; watchers group → True; + held_position with 'signer' usage → True. - def test_filter_user_in_watchers_group_returns_true(self): - """User in a group ending with 'watchers' is included by default.""" - api.group.create(groupname="myservice_watchers") - api.group.add_user(groupname="myservice_watchers", user=self.user) - view = SigningUsersCsv(self.portal, self.request) - self.assertTrue(view.filter_user(self._make_user_data(self.user))) + api.content.find mocked for held_position: held_position type not in this layer. + """ + # --- not in any group --- + self.assertFalse(self.view.filter_user(self._user_data(self.user))) - def test_filter_user_not_in_any_group_returns_false(self): - """User with no held_position and no watchers group is excluded.""" - view = SigningUsersCsv(self.portal, self.request) - self.assertFalse(view.filter_user(self._make_user_data(self.user))) - - def test_filter_user_in_non_watchers_group_returns_false(self): - """User in a group not ending with 'watchers' is excluded.""" + # --- non-watchers group --- api.group.create(groupname="myservice_editors") api.group.add_user(groupname="myservice_editors", user=self.user) - view = SigningUsersCsv(self.portal, self.request) - self.assertFalse(view.filter_user(self._make_user_data(self.user))) + self.assertFalse(self.view.filter_user(self._user_data(self.user))) - def test_filter_user_with_signer_held_position_returns_true(self): - """User with a held_position having 'signer' in usages is included by default.""" - mock_hp_obj = Mock() - mock_hp_obj.usages = ["signer"] - mock_brain = Mock() - mock_brain.getObject.return_value = mock_hp_obj - view = SigningUsersCsv(self.portal, self.request) - with patch("imio.esign.browser.views.api.content.find", return_value=[mock_brain]): - result = view.filter_user(self._make_user_data(self.user)) - self.assertTrue(result) + # --- watchers group --- + api.group.create(groupname="myservice_watchers") + api.group.add_user(groupname="myservice_watchers", user=self.user) + self.assertTrue(self.view.filter_user(self._user_data(self.user))) - # --- get_users_data --- + # --- held_position with signer usage --- + hp_user = api.user.create(email="hp@test.com", username="hp_user", password="password1") # noqa: S106 + mock_hp = Mock() + mock_hp.usages = ["signer"] + mock_brain = Mock() + mock_brain.getObject.return_value = mock_hp + with patch( + "imio.esign.browser.views.api.content.find", # held_position type not in this layer + return_value=[mock_brain], + ): + self.assertTrue(self.view.filter_user(self._user_data(hp_user))) def test_get_users_data(self): - """Returns data for all users with duplicates.""" - view = SigningUsersCsv(self.portal, self.request) - users_data, _duplicates = view.get_users_data() + """Returns all users with duplicate-email flags; watchers/signers marked checked.""" + users_data, _duplicates = self.view.get_users_data() + # Users sorted alphabetically by userid: signer_user < test_user_1_ self.assertEqual( users_data, [ @@ -483,10 +315,9 @@ def test_get_users_data(self): ], ) - # Test duplicates + # --- duplicate emails --- api.user.create(email="signer@test.com", username="signer_user2", password="password1") # noqa: S106 - view = SigningUsersCsv(self.portal, self.request) - users_data, duplicates = view.get_users_data() + users_data, duplicates = self.view.get_users_data() self.assertEqual( users_data, [ @@ -521,11 +352,10 @@ def test_get_users_data(self): ) self.assertEqual(duplicates, {"signer@test.com": ["signer_user", "signer_user2"]}) - # Signers and watchers users are sorted and checked + # --- watchers group member marked checked --- api.group.create(groupname="myservice_watchers") api.group.add_user(groupname="myservice_watchers", user=self.user) - view = SigningUsersCsv(self.portal, self.request) - users_data, _duplicates = view.get_users_data() + users_data, _duplicates = self.view.get_users_data() self.assertEqual( users_data, [ @@ -559,111 +389,92 @@ def test_get_users_data(self): ], ) - # --- _get_selected_userids --- + def test_get_selected_userids(self): + """Valid JSON → list; absent → []; bad JSON → [].""" + self.assertEqual(self.view._get_selected_userids(), []) - def test_get_selected_userids_with_valid_json(self): - """Parses a JSON array of user IDs from the request form.""" self.request.form["selected_users"] = json.dumps(["user1", "user2"]) - view = SigningUsersCsv(self.portal, self.request) - self.assertEqual(view._get_selected_userids(), ["user1", "user2"]) + self.assertEqual(self.view._get_selected_userids(), ["user1", "user2"]) - def test_get_selected_userids_with_empty_param(self): - """Returns empty list when selected_users is absent.""" - view = SigningUsersCsv(self.portal, self.request) - self.assertEqual(view._get_selected_userids(), []) - - def test_get_selected_userids_with_invalid_json(self): - """Returns empty list for malformed JSON.""" + # request.get() promotes form values to request.other; clear both to reset + self.request.other.pop("selected_users", None) self.request.form["selected_users"] = "not-valid-json" - view = SigningUsersCsv(self.portal, self.request) - self.assertEqual(view._get_selected_userids(), []) - - # --- _download_csv --- + self.assertEqual(self.view._get_selected_userids(), []) - def test_download_csv_no_selected_users(self): - """Shows warning and redirects when no users are selected.""" - view = SigningUsersCsv(self.portal, self.request) - view._download_csv() + def test_download_csv(self): + """No selection → warning; valid selection → CSV with headers.""" + # --- no users --- + self.view._download_csv() messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("No users selected", messages[0].message) self.assertEqual(messages[0].type, "warning") + _clear_status_messages(self.request) - def test_download_csv_generates_csv(self): - """Returns CSV content with headers and a row for each selected user.""" + # --- valid selection --- + # request.get() promotes form values to request.other; clear to prevent stale cache + self.request.other.pop("selected_users", None) self.request.form["selected_users"] = json.dumps([self.user.getId()]) - view = SigningUsersCsv(self.portal, self.request) - result = view._download_csv() + result = self.view._download_csv() self.assertEqual( - result, "userid,email,lastname,firstname,fullname\r\nsigner_user,signer@test.com,signer_user,,\r\n" + result, + "userid,email,lastname,firstname,fullname\r\nsigner_user,signer@test.com,signer_user,,\r\n", ) self.assertIn("text/csv", self.request.RESPONSE.getHeader("Content-Type")) - # --- _send_emails --- - - def test_send_emails_no_selected_users(self): - """Shows warning and redirects when no users are selected.""" - view = SigningUsersCsv(self.portal, self.request) - view._send_emails() + def test_send_emails(self): + """No selection → warning; no content → error; no from address → error; success.""" + # --- no users --- + self.view._send_emails() messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertIn("No users selected", messages[0].message) self.assertEqual(messages[0].type, "warning") + _clear_status_messages(self.request) - def test_send_emails_no_email_content(self): - """Shows error and redirects when signing users email content is not configured.""" - set_esign_registry_signing_users_email_content(u"") + # request.get() promotes form values to request.other; clear to prevent stale cache + self.request.other.pop("selected_users", None) self.request.form["selected_users"] = json.dumps([self.user.getId()]) - view = SigningUsersCsv(self.portal, self.request) - view._send_emails() + + # --- email content not configured --- + set_esign_registry_signing_users_email_content(u"") + self.view._send_emails() messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertEqual(messages[0].message, u"Email content is not configured in the settings.") self.assertEqual(messages[0].type, "error") + _clear_status_messages(self.request) - def test_send_emails_no_portal_from_email(self): - """Shows error and redirects to mail-controlpanel when portal from email is not set.""" - self.request.form["selected_users"] = json.dumps([self.user.getId()]) - view = SigningUsersCsv(self.portal, self.request) - view._send_emails() + # --- portal from email not configured --- + set_esign_registry_signing_users_email_content(u"

Hello

") + self.view._send_emails() messages = IStatusMessage(self.request).show() - self.assertEqual(len(messages), 1) self.assertEqual(messages[0].message, u"Portal from email is not configured.") self.assertEqual(messages[0].type, "error") + _clear_status_messages(self.request) - def test_send_emails_success(self): - """Sends emails to all selected users and shows a success message.""" + # --- success --- self.portal.manage_changeProperties({"email_from_address": "from@test.com"}) - set_esign_registry_signing_users_email_content(u"

Hello

") - self.request.form["selected_users"] = json.dumps([self.user.getId()]) - view = SigningUsersCsv(self.portal, self.request) - with patch("imio.esign.browser.views.send_email", return_value=(True, None)): - view._send_emails() + with patch("imio.esign.browser.views.send_email", return_value=(True, None)): # real SMTP call + self.view._send_emails() messages = IStatusMessage(self.request).show() success_msgs = [m for m in messages if m.type == "info"] self.assertEqual(len(success_msgs), 1) self.assertIn("Emails sent successfully", success_msgs[0].message) + _clear_status_messages(self.request) - def test_send_emails_user_with_no_email(self): - """Shows per-user warning when a selected user has no email address.""" - self.portal.manage_changeProperties({"email_from_address": "from@test.com"}) - set_esign_registry_signing_users_email_content(u"

Hello

") + # --- user with no email address --- no_email_user = api.user.create( email="placeholder@test.com", username="no_email_user", password="password1" # noqa: S106 ) no_email_user.setMemberProperties({"email": ""}) + self.request.other.pop("selected_users", None) self.request.form["selected_users"] = json.dumps([no_email_user.getId()]) - view = SigningUsersCsv(self.portal, self.request) - view._send_emails() + self.view._send_emails() messages = IStatusMessage(self.request).show() no_email_msgs = [m for m in messages if "no email address" in m.message] self.assertEqual(len(no_email_msgs), 1) self.assertEqual(no_email_msgs[0].type, "warning") - # --- _render_email_content --- - def test_render_email_content(self): - """Renders a TAL template substituting values from user_data.""" + """TAL template is evaluated with user_data substitutions.""" template = u"

NAME

" user_data = { "userid": "testuser", @@ -672,95 +483,158 @@ def test_render_email_content(self): "firstname": "John", "fullname": "John Smith", } - view = SigningUsersCsv(self.portal, self.request) - result = view._render_email_content(template, user_data) + result = self.view._render_email_content(template, user_data) self.assertEqual(result, u"

John Smith

") -class TestItemSessionInfoViewlet(unittest.TestCase): - """Test ItemSessionInfoViewlet multi-session support.""" +class TestFacetedSessionInfoViewlet(BaseEsignTest): + """Tests for FacetedSessionInfoViewlet.""" - layer = IMIO_ESIGN_INTEGRATION_TESTING + def setUp(self): + super(TestFacetedSessionInfoViewlet, self).setUp() + self.folder = self.portal["folder0"] + self.annex = self.portal["folder0"]["annex0"] + self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")] + self.collection_uid = "test-collection-uid" + + def _make_viewlet(self, uid="test-collection-uid"): + class _ConcreteFacetedSessionInfoViewlet(FacetedSessionInfoViewlet): + """Minimal concrete subclass for testing FacetedSessionInfoViewlet.""" + + _sessions_collection_uid = "test-collection-uid" + + @property + def sessions_collection_uid(self): + return self._sessions_collection_uid + + def index(self): + return "" + + v = _ConcreteFacetedSessionInfoViewlet(self.folder, self.request, None, None) + v._sessions_collection_uid = uid + return v + + def test_available(self): + """False when sessions_collection_uid is None; True when set.""" + self.assertFalse(self._make_viewlet(uid=None).available()) + self.assertTrue(self._make_viewlet().available()) + + def test_sessions(self): + """sessions CachedProperty: {} for missing/invalid/unknown id; {id: info} for known id.""" + # --- no esign_session_id[] param --- + self.assertEqual(self._make_viewlet().sessions, {}) + + # --- non-integer value --- + self.request.form["esign_session_id[]"] = "not-an-int" + self.assertEqual(self._make_viewlet().sessions, {}) + + # --- valid integer but no matching session --- + self.request.form["esign_session_id[]"] = "999" + self.assertEqual(self._make_viewlet().sessions, {}) + + # --- valid session id → {session_id: session_info} --- + sid, session = add_files_to_session(self.signers, [self.annex.UID()]) + self.request.form["esign_session_id[]"] = str(sid) + sessions = self._make_viewlet().sessions + self.assertEqual(list(sessions.keys()), [sid]) + self.assertEqual(sessions[sid]["sign_id"], session["sign_id"]) + + def test_render(self): + """'': c1[] absent or non-matching; real render_table() when no session; + index() when session selected. + + Sessions annotation is empty when the "no session selected" branch runs, so + ActionsColumn.renderCell / get_dashboard_link are never reached — no mock needed. + """ + # --- c1[] absent → '' --- + self.assertEqual(self._make_viewlet().render(), "") + + # --- c1[] non-matching → '' --- + self.request.form["c1[]"] = "other-uid" + self.assertEqual(self._make_viewlet().render(), "") + + # --- c1[] matches (right collection), no session selected → sessions_listing_view.render_table() --- + self.request.form["c1[]"] = self.collection_uid + result = self._make_viewlet().render() + self.assertIn("") + + def test_get_table_rows(self): + """Column 1 → session fields; column 2 → signer/link fields; unknown → [].""" + v = self._make_viewlet() + self.assertEqual(v.get_table_rows(1), ["session_id", "state", "update_date", "sealed"]) + self.assertEqual(v.get_table_rows(2), ["external_link", "signers"]) + self.assertEqual(v.get_table_rows(99), []) + + def test_ext_session_link(self): + """No sign_url → with title; sign_url present → .""" + v = self._make_viewlet() + + # --- no sign_url: span --- + session = {"sign_id": "012345600000", "sign_url": None, "title": u"My Session"} + result = v.ext_session_link(session) + self.assertEqual(result, u"My Session") + + # --- sign_url present: anchor --- + session["sign_url"] = "https://sign.example.com/s/1" + result = v.ext_session_link(session) + self.assertEqual(result, u'My Session') + + def test_get_state_description(self): + """Known state → non-empty translated string; unknown state → ''.""" + v = self._make_viewlet() + self.assertTrue(len(v.get_state_description("draft")) > 0) + self.assertEqual(v.get_state_description("unknown_state"), "") + + +class TestItemSessionInfoViewlet(BaseEsignTest): + """Tests for ItemSessionInfoViewlet.""" def setUp(self): - self.portal = self.layer["portal"] - self.request = self.portal.REQUEST - setRoles(self.portal, TEST_USER_ID, ["Manager"]) - at_folder = api.content.create( - container=self.portal, id="annexes_types", title="Annexes Types", - type="ContentCategoryConfiguration", exclude_from_nav=True, - ) - category_group = api.content.create( - type="ContentCategoryGroup", title="Annexes", - container=at_folder, id="annexes", - ) - icon_path = os.path.join( - os.path.dirname(collective.iconifiedcategory.__file__), "tests", u"ic\xf4ne1.png" - ) - with open(icon_path, "rb") as fl: - api.content.create( - type="ContentCategory", title="To sign", - container=category_group, - icon=NamedBlobImage(fl.read(), filename=u"ic\xf4ne1.png"), - id="to_sign", predefined_title="To be signed", - to_sign=True, show_preview=False, - ) - api.user.create(email="user1@sign.com", username="user1", password="password1") - self.folder = api.content.create( - container=self.portal, type="Folder", - id="test_folder", title="Test Folder", - ) - tests_dir = os.path.dirname(__file__) - self.annexes = [] - for i in range(2): - with open(os.path.join(tests_dir, "annex1.pdf"), "rb") as f: - annex = api.content.create( - container=self.folder, type="annex", - id="annex{}".format(i), title="Annex {}".format(i), - content_category=calculate_category_id( - self.portal["annexes_types"]["annexes"]["to_sign"] - ), - scan_id="0123456000000{:02d}".format(i), - file=NamedBlobFile( - data=f.read(), filename=u"annex{}.pdf".format(i), - contentType="application/pdf", - ), - ) - self.annexes.append(annex) + super(TestItemSessionInfoViewlet, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + self.folder = self.portal["folder0"] + self.annexes = [self.portal["folder0"]["annex0"], self.portal["folder0"]["annex2"]] self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - for key in list(self.request.form.keys()): - del self.request.form[key] - def test_sessions_empty(self): - """No files in esign annotation → sessions returns empty list.""" + def test_sessions(self): + """No files → empty OrderedDict + render ''; one session → one entry; two sessions → two entries.""" + # --- no files in annotation --- viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None) self.assertEqual(viewlet.sessions, OrderedDict()) self.assertEqual(viewlet.render(), "") - def test_sessions_single_session(self): - """All context files in one session → sessions returns one dict.""" - uids = [a.UID() for a in self.annexes] - add_files_to_session(self.signers, uids) + # --- all context files in one session --- + add_files_to_session(self.signers, [self.annexes[0].UID()], discriminators=("a",)) viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None) sessions = viewlet.sessions self.assertEqual(len(sessions), 1) - self.assertEqual(sessions.keys(), [0]) - self.assertEqual(len(sessions[0]["files"]), len(uids)) + self.assertEqual(list(sessions.keys()), [0]) + self.assertEqual(len(sessions[0]["files"]), 1) + self.assertEqual(sessions[0]["files"][0]["uid"], self.annexes[0].UID()) - def test_sessions_multiple_sessions(self): - """Files in two sessions (different discriminators) → sessions returns two dicts.""" - add_files_to_session(self.signers, [self.annexes[0].UID()], discriminators=("a",)) + # --- files split across two sessions (different discriminators) --- add_files_to_session(self.signers, [self.annexes[1].UID()], discriminators=("b",)) viewlet = ItemSessionInfoViewlet(self.folder, self.request, None, None) sessions = viewlet.sessions self.assertEqual(len(sessions), 2) - session_ids = sessions.keys() - self.assertEqual(session_ids, [0, 1]) + self.assertEqual(list(sessions.keys()), [0, 1]) -class TestSessionsListingView(_BaseSessionViewTest): +class TestSessionsListingView(BaseEsignTest): """Test SessionsListingView browser view.""" + def setUp(self): + super(TestSessionsListingView, self).setUp() + self.folder = self.portal["folder0"] + annex = self.portal["folder0"]["annex0"] + signers = [("user1", "user1@sign.com", "User 1", "Position 1")] + add_files_to_session(signers, (annex.UID(),)) + def test_get_sessions(self): """Test obtain sessions and stored annotation not modified.""" self.assertFalse("id" in get_session_annotation()['sessions'][0]) diff --git a/src/imio/esign/tests/test_services.py b/src/imio/esign/tests/test_services.py new file mode 100644 index 0000000..1630e03 --- /dev/null +++ b/src/imio/esign/tests/test_services.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +"""Services tests for this package.""" +from imio.esign.services.external_session_feedback import ExternalSessionFeedbackPost +from imio.esign.tests.base import BaseEsignTest +from imio.esign.utils import add_files_to_session +from mock import patch + +import json + + +class TestExternalSessionFeedbackPost(BaseEsignTest): + """Tests for ExternalSessionFeedbackPost.""" + + def setUp(self): + super(TestExternalSessionFeedbackPost, self).setUp() + self.request.other.pop("BODY", None) + self.signers = [("user1", "user1@sign.com", "User 1", "Position 1")] + self.annex = self.portal["folder0"]["annex0"] + self.session_id, self.session = add_files_to_session(self.signers, [self.annex.UID()]) + self.sign_id = self.session["sign_id"] + self.request._auth = "Bearer test-token" + + def _make_service(self): + """Instantiate ExternalSessionFeedbackPost without the adapter mechanism.""" + service = ExternalSessionFeedbackPost.__new__(ExternalSessionFeedbackPost) + service.context = self.portal + service.request = self.request + return service + + def _reply(self, data): + """Call reply() with mocked verify_auth_token; inject body via request.other.""" + self.request.set("BODY", json.dumps(data)) + with patch( + "imio.esign.services.external_session_feedback.verify_auth_token", return_value=True + ): # real Keycloak OAuth endpoint — no local server + return self._make_service().reply() + + def test_authorized(self): + """_authorized() returns False without valid Bearer token; True with verified token.""" + service = self._make_service() + + # no _auth attribute + del self.request._auth + self.assertFalse(service._authorized()) + + # non-Bearer prefix + self.request._auth = "Basic dXNlcjpwYXNz" + self.assertFalse(service._authorized()) + + # Bearer with empty token + self.request._auth = "Bearer " + self.assertFalse(service._authorized()) + + # valid format, verify_auth_token returns False + self.request._auth = "Bearer test-token" + with patch("imio.esign.services.external_session_feedback.verify_auth_token", return_value=False): + self.assertFalse(service._authorized()) + + # valid format, verify_auth_token returns True + with patch("imio.esign.services.external_session_feedback.verify_auth_token", return_value=True): + self.assertTrue(service._authorized()) + + def test_reply(self): + """reply() validates auth/input, processes all feedback codes, updates session state.""" + annex1 = self.portal["folder0"]["annex2"] + + # missing app_session_id + result = self._reply({"code": 21}) + self.assertEqual(self.request.response.getStatus(), 400) + self.assertIn("app_session_id", result["message"]) + self.request.response.setStatus(200) + + # session not found + result = self._reply({"app_session_id": "012345699999", "code": 21}) + self.assertEqual(self.request.response.getStatus(), 400) + self.assertIn("not found", result["message"]) + self.request.response.setStatus(200) + + # code 21: state updated, sign_url set, returns appended + result = self._reply( + { + "app_session_id": self.sign_id, + "code": 21, + "session_state": "to_sign", + "value": {"sign_session_url": "https://sign.example.com/session/1"}, + "message": "Session confirmed", + } + ) + self.assertEqual(result, {"message": "Information correctly handled"}) + self.assertEqual(self.session["state"], "to_sign") + self.assertEqual(self.session["sign_url"], "https://sign.example.com/session/1") + self.assertEqual(self.session["returns"][0][0], 21) + + # code 21: existing sign_url not overwritten + self._reply( + { + "app_session_id": self.sign_id, + "code": 21, + "value": {"sign_session_url": "https://new.example.com/"}, + } + ) + self.assertEqual(self.session["sign_url"], "https://sign.example.com/session/1") + + # code 22: matching signer email → status 'signed' + _, s22 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c22",)) + self._reply( + { + "app_session_id": s22["sign_id"], + "code": 22, + "value": {"signed_users": ["user1@sign.com"]}, + } + ) + self.assertEqual(s22["signers"][0]["status"], "signed") + + # code 22: signer already 'refused' is not updated + _, s22b = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c22b",)) + s22b["signers"][0]["status"] = "refused" + self._reply( + { + "app_session_id": s22b["sign_id"], + "code": 22, + "value": {"signed_users": ["user1@sign.com"]}, + } + ) + self.assertEqual(s22b["signers"][0]["status"], "refused") + + # code 23: state → 'returned' + _, s23 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c23",)) + self._reply({"app_session_id": s23["sign_id"], "code": 23}) + self.assertEqual(s23["state"], "returned") + + # code 52: state → 'refused'; matching signer → 'refused' + _, s52 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c52",)) + self._reply( + { + "app_session_id": s52["sign_id"], + "code": 52, + "value": {"user": "user1@sign.com"}, + } + ) + self.assertEqual(s52["state"], "refused") + self.assertEqual(s52["signers"][0]["status"], "refused") + + # code 53: state → 'signed' (documents signed but not returned) + _, s53 = add_files_to_session(self.signers, [annex1.UID()], discriminators=("c53",)) + self._reply({"app_session_id": s53["sign_id"], "code": 53}) + self.assertEqual(s53["state"], "signed") + + # error codes: state → 'errored' + for code in (50, 51, 54, 55, 56, 57, 58, 59): + _, serr = add_files_to_session(self.signers, [annex1.UID()], discriminators=(str(code),)) + self._reply({"app_session_id": serr["sign_id"], "code": code}) + self.assertEqual(serr["state"], "errored") + + # returns: entry structure (code, db_state, value, message, datetime) + _, srtn = add_files_to_session(self.signers, [annex1.UID()], discriminators=("rtn",)) + self._reply( + { + "app_session_id": srtn["sign_id"], + "code": 23, + "session_state": "completed", + "value": {"info": "ok"}, + "message": "All done", + } + ) + entry = srtn["returns"][0] + self.assertEqual(entry[0], 23) + self.assertEqual(entry[1], "completed") + self.assertEqual(entry[2], {"info": "ok"}) + self.assertEqual(entry[3], "All done") diff --git a/src/imio/esign/tests/test_setup.py b/src/imio/esign/tests/test_setup.py index e80c7ff..5840235 100644 --- a/src/imio/esign/tests/test_setup.py +++ b/src/imio/esign/tests/test_setup.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Setup tests for this package.""" from imio.esign import PLONE_VERSION -from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING # noqa: E501 +from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING from plone import api from plone.app.testing import setRoles from plone.app.testing import TEST_USER_ID @@ -19,7 +19,6 @@ class TestSetup(unittest.TestCase): layer = IMIO_ESIGN_INTEGRATION_TESTING def setUp(self): - """Custom shared utility setup for tests.""" self.portal = self.layer["portal"] if PLONE_VERSION < 5: self.installer = api.portal.get_tool("portal_quickinstaller") @@ -27,14 +26,14 @@ def setUp(self): self.installer = get_installer(self.portal, self.layer["request"]) def test_product_installed(self): - """Test if imio.esign is installed.""" + """Product is installed and IImioEsignLayer browser layer is registered.""" + # --- install state --- if PLONE_VERSION < 5: self.assertTrue(self.installer.isProductInstalled("imio.esign")) else: self.assertTrue(self.installer.is_product_installed("imio.esign")) - def test_browserlayer(self): - """Test that IImioEsignLayer is registered.""" + # --- browser layer --- from imio.esign.interfaces import IImioEsignLayer from plone.browserlayer import utils @@ -42,6 +41,7 @@ def test_browserlayer(self): class TestUninstall(unittest.TestCase): + """Test that imio.esign uninstalls cleanly.""" layer = IMIO_ESIGN_INTEGRATION_TESTING @@ -58,14 +58,14 @@ def setUp(self): setRoles(self.portal, TEST_USER_ID, roles_before) def test_product_uninstalled(self): - """Test if imio.esign is cleanly uninstalled.""" + """Product is uninstalled and IImioEsignLayer browser layer is removed.""" + # --- uninstall state --- if PLONE_VERSION < 5: self.assertFalse(self.installer.isProductInstalled("imio.esign")) else: self.assertFalse(self.installer.is_product_installed("imio.esign")) - def test_browserlayer_removed(self): - """Test that IImioEsignLayer is removed.""" + # --- browser layer removed --- from imio.esign.interfaces import IImioEsignLayer from plone.browserlayer import utils diff --git a/src/imio/esign/tests/test_utils.py b/src/imio/esign/tests/test_utils.py index d16232f..17df69d 100644 --- a/src/imio/esign/tests/test_utils.py +++ b/src/imio/esign/tests/test_utils.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- """utils tests for this package.""" from collections import OrderedDict -from collective.iconifiedcategory.utils import calculate_category_id from datetime import date from datetime import timedelta from imio.esign.config import get_esign_registry_max_session_size from imio.esign.config import set_esign_registry_external_watchers from imio.esign.config import set_esign_registry_max_session_size -from imio.esign.testing import IMIO_ESIGN_INTEGRATION_TESTING +from imio.esign.tests.base import BaseEsignTest from imio.esign.utils import add_files_to_session from imio.esign.utils import create_external_session from imio.esign.utils import get_file_download_url @@ -26,90 +25,26 @@ from mock import Mock from mock import patch from plone import api -from plone.app.testing import setRoles -from plone.app.testing import TEST_USER_ID from plone.namedfile.file import NamedBlobFile -from plone.namedfile.file import NamedBlobImage from zope.annotation import IAnnotations from zope.event import notify from zope.lifecycleevent import ObjectModifiedEvent -import collective.iconifiedcategory import json import os -import unittest -class TestUtils(unittest.TestCase): - - layer = IMIO_ESIGN_INTEGRATION_TESTING - +class TestUtils(BaseEsignTest): def setUp(self): - self.portal = self.layer["portal"] - setRoles(self.portal, TEST_USER_ID, ["Manager"]) - # add some users - api.user.create(email="user1@sign.com", username="user1", password="password1") - api.user.create(email="user2@sign.com", username="user2", password="password2") - # add content category configuration - at_folder = api.content.create( - container=self.portal, - id="annexes_types", - title="Annexes Types", - type="ContentCategoryConfiguration", - exclude_from_nav=True, - ) - category_group = api.content.create( - type="ContentCategoryGroup", - title="Annexes", - container=at_folder, - id="annexes", - ) - icon_path = os.path.join(os.path.dirname(collective.iconifiedcategory.__file__), "tests", "icône1.png") - with open(icon_path, "rb") as fl: - api.content.create( - type="ContentCategory", - title="To sign", - container=category_group, - icon=NamedBlobImage(fl.read(), filename=u"icône1.png"), - id="to_sign", - predefined_title="To be signed", - # confidential=True, - # to_print=True, - to_sign=True, - # signed=True, - # publishable=True, - # only_pdf=True, - show_preview=False, - ) - # add annexes - self.folders = [] - for f in range(2): - folder = api.content.create( - container=self.portal, - id="folder{}".format(f), - title="Folder {}".format(f), - type="Folder", - ) - self.folders.append(folder) - tests_dir = os.path.dirname(__file__) - pdf_files = ["annex1.pdf", "annex2.pdf"] - self.uids = [] - for i in range(12): - pdf_file = pdf_files[i % len(pdf_files)] - container = self.folders[i % len(self.folders)] - with open(os.path.join(tests_dir, pdf_file), "rb") as f: - annex = api.content.create( - container=container, - type="annex", - id="annex{}".format(i), - title="Annex {}".format(i), - content_category=calculate_category_id(self.portal["annexes_types"]["annexes"]["to_sign"]), - scan_id="0123456000000{:02d}".format(i), - file=NamedBlobFile(data=f.read(), filename=u"annex{}.pdf".format(i), contentType="application/pdf"), - ) - self.uids.append(annex.UID()) - - def test_add_remove_files_to_session(self): + super(TestUtils, self).setUp() + api.user.create(email="user1@sign.com", username="user1", password="password1") # noqa: S106 + api.user.create(email="user2@sign.com", username="user2", password="password2") # noqa: S106 + self.folders = [self.portal["folder0"], self.portal["folder1"]] + self.uids = [self.portal["folder{}".format(i % 2)]["annex{}".format(i)].UID() for i in range(12)] + + def test_add_files_to_session(self): + """add_files_to_session: session creation, discrimination, reuse, size splitting, + duplicate filenames, and idempotent metadata update.""" root_annot = IAnnotations(self.portal) self.assertNotIn("imio.esign", root_annot) signers = [ @@ -117,7 +52,7 @@ def test_add_remove_files_to_session(self): ("user2", "user2@sign.com", "User 2", "Position 2"), ] - # add files, no session_id, no discriminator + # --- create first session --- sid, session = add_files_to_session(signers, (self.uids[0],), title="my title", watchers=("stalker@sign.com",)) self.assertEqual(sid, 0) annot = root_annot["imio.esign"] @@ -134,7 +69,6 @@ def test_add_remove_files_to_session(self): self.assertIsNone(session["sign_url"]) self.assertEqual(session["client_id"], "0123456") self.assertEqual(len(session["watchers"]), 1) - self.assertEqual(len(session["files"]), 1) self.assertListEqual( list(session["files"]), [ @@ -149,11 +83,10 @@ def test_add_remove_files_to_session(self): ], ) self.assertEqual(len(session["signers"]), 2) - self.assertEqual(session["size"], 6968) # annex1.pdf + self.assertEqual(session["size"], 6968) - # add files, no session_id => same session reused - signers[1] = ("user2", "user2@sign.com", "User 2", "Position 2b") # changed position => not discriminant - # rename uids[1] to same name as uids[0] to test duplicate filename handling + # --- same signers → session reused; duplicate filename renamed --- + signers[1] = ("user2", "user2@sign.com", "User 2", "Position 2b") # position change is non-discriminant annex1_obj = uuidToObject(uuid=self.uids[1], unrestricted=True) annex1_obj.file.filename = u"annex0.pdf" sid, session = add_files_to_session(signers, (self.uids[1],)) @@ -165,12 +98,11 @@ def test_add_remove_files_to_session(self): self.assertEqual(len(annot["c_uids"]), 2) self.assertIn(self.folders[1].UID(), annot["c_uids"]) self.assertEqual(len(session["files"]), 2) - # verify duplicate filename was renamed self.assertEqual(session["files"][1]["filename"], u"annex0-1.pdf") self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 1) self.assertEqual(session["size"], 6968 + 7014) # annex1 + annex2 - # add files, no session_id, new discriminations => new session + # --- new discriminator → new session --- sid, session = add_files_to_session(signers, (self.uids[2],), discriminators=("council1",)) self.assertEqual(sid, 1) self.assertEqual(annot["numbering"], 2) @@ -180,7 +112,7 @@ def test_add_remove_files_to_session(self): self.assertEqual(len(session["files"]), 1) self.assertEqual(len(session["watchers"]), 0) - # add files, no session_id, same discriminations => same session + # --- same discriminator → reuse session --- sid, session = add_files_to_session(signers, (self.uids[3],), discriminators=("council1",)) self.assertEqual(sid, 1) self.assertEqual(annot["numbering"], 2) @@ -189,28 +121,28 @@ def test_add_remove_files_to_session(self): self.assertIn(self.uids[3], annot["uids"]) self.assertEqual(len(session["files"]), 2) - # add files, no session_id, other discriminations => other session - sid, session = add_files_to_session(signers, (self.uids[4],), discriminators=("council2",)) + # --- different discriminator → new session --- + sid, _session = add_files_to_session(signers, (self.uids[4],), discriminators=("council2",)) self.assertEqual(sid, 2) - # add files, session_id, other discriminations => reused session - sid, session = add_files_to_session(signers, (self.uids[5],), session_id=0, discriminators=("council3",)) + # --- explicit session_id overrides discrimination --- + sid, _session = add_files_to_session(signers, (self.uids[5],), session_id=0, discriminators=("council3",)) self.assertEqual(sid, 0) - # add files, session_id unfound, other discriminations => new session - sid, session = add_files_to_session(signers, (self.uids[6],), session_id=999, discriminators=("council3",)) + # --- unfound session_id → new session --- + sid, _session = add_files_to_session(signers, (self.uids[6],), session_id=999, discriminators=("council3",)) self.assertEqual(sid, 3) - # add files, no session_id, different signers => new session - sid, session = add_files_to_session([signers[0]], (self.uids[7],)) + # --- different signers → new session --- + sid, _session = add_files_to_session([signers[0]], (self.uids[7],)) self.assertEqual(sid, 4) - # add files, no session_id, different seal => new session - sid, session = add_files_to_session(signers, (self.uids[8],), seal="seal1") + # --- different seal → new session --- + sid, _session = add_files_to_session(signers, (self.uids[8],), seal="seal1") self.assertEqual(sid, 5) - # add files, no session_id, same seal, different acroform => new session - sid, session = add_files_to_session(signers, (self.uids[9],), acroform=False) + # --- different acroform → new session --- + sid, _session = add_files_to_session(signers, (self.uids[9],), acroform=False) self.assertEqual(sid, 6) self.assertEqual(len(annot["uids"]), 10) self.assertEqual(len(annot["c_uids"]), 2) @@ -218,217 +150,114 @@ def test_add_remove_files_to_session(self): self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 5) self.assertEqual(len(annot["sessions"]), 7) - # add files, no session_id, no signers => new session + # --- no signers → new session --- sid, session = add_files_to_session([], (self.uids[10],), seal="seal2") self.assertEqual(sid, 7) self.assertEqual(len(annot["sessions"]), 8) self.assertEqual(session["signers"], []) self.assertEqual(session["seal"], "seal2") - # add files, no session_id, same seal, different states => new session + # --- same seal but session already sent → new session --- session["state"] = "sent" - sid, session = add_files_to_session([], (self.uids[11],), seal="seal2") + sid, _session = add_files_to_session([], (self.uids[11],), seal="seal2") self.assertEqual(sid, 8) self.assertEqual(len(annot["sessions"]), 9) - # now we can start to remove - remove_files_from_session((self.uids[0], self.uids[1])) # 2 of 3 session files - self.assertEqual(len(annot["uids"]), 10) - self.assertEqual(len(annot["sessions"][0]["files"]), 1) - self.assertEqual(annot["sessions"][0]["size"], 7014) # only uid[5] (annex2) remains - self.assertEqual(len(annot["c_uids"][self.folders[0].UID()]), 5) - self.assertEqual(len(annot["c_uids"][self.folders[1].UID()]), 5) - remove_files_from_session((self.uids[5],)) # no more session files, session removed - self.assertEqual(len(annot["uids"]), 9) - self.assertEqual(len(annot["sessions"]), 8) - self.assertNotIn(0, annot["sessions"]) - remove_files_from_session((self.uids[2], self.uids[3])) # all session files, session removed - self.assertEqual(len(annot["uids"]), 7) - self.assertEqual(len(annot["sessions"]), 7) - self.assertNotIn(1, annot["sessions"]) - remove_files_from_session((self.uids[4],)) - remove_files_from_session((self.uids[6],)) - remove_files_from_session((self.uids[7],)) - remove_files_from_session((self.uids[8],)) - remove_files_from_session((self.uids[9],)) - remove_files_from_session((self.uids[10],)) - remove_files_from_session((self.uids[11],)) - self.assertEqual(len(annot["uids"]), 0) - self.assertEqual(len(annot["c_uids"]), 0) - self.assertEqual(len(annot["sessions"]), 0) - - def test_create_external_session(self): - """Test create_external_session with mocked requests.post and get_auth_token.""" - from mock import MagicMock - from mock import patch - - # Case 1: session not found => returns sentinel string, no HTTP call made - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - result = create_external_session(999) - self.assertEqual(result, "_session_not_found_") - mock_requests.post.assert_not_called() + # --- duplicate filenames: each gets a numbered suffix --- + del root_annot["imio.esign"] + for i in range(3): + api.content.get(UID=self.uids[i]).file.filename = u"same_filename.pdf" + s, ses = add_files_to_session(signers, (self.uids[0],)) + self.assertEqual(len(ses["files"]), 1) + self.assertEqual(ses["files"][0]["filename"], "same_filename.pdf") + s, ses = add_files_to_session(signers, (self.uids[1],), session_id=s) + self.assertEqual(len(ses["files"]), 2) + self.assertIn("same_filename-1.pdf", [f["filename"] for f in ses["files"]]) + s, ses = add_files_to_session(signers, (self.uids[2],), session_id=s) + self.assertEqual(len(ses["files"]), 3) + filenames = [f["filename"] for f in ses["files"]] + self.assertIn("same_filename.pdf", filenames) + self.assertIn("same_filename-1.pdf", filenames) + self.assertIn("same_filename-2.pdf", filenames) - # Setup: create a session with two signers and a watcher - signers = [ - ("user1", "user1@sign.com", "User 1", "Position 1"), - ("user2", "user2@sign.com", "User 2", "Position 2"), - ] - sid, session = add_files_to_session( - signers, (self.uids[0], self.uids[2],), title=u"Test session", watchers=("watcher@sign.com",) - ) + # --- re-adding same file updates metadata, does not duplicate --- + del root_annot["imio.esign"] + annex0_uid = self.uids[0] + annex0 = api.content.get(UID=annex0_uid) + annex0.file.filename = u"annex0.pdf" + # reset annex1 filename which was changed in the duplicate-filename block above + annex1_reset = api.content.get(UID=self.uids[1]) + annex1_reset.file.filename = u"annex1.pdf" + sid, ses = add_files_to_session(signers, (annex0_uid,)) self.assertEqual(sid, 0) - self.assertEqual(session["state"], "draft") - - # Case 2: HTTP 200 => state becomes "sent", request payload is verified - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.text = '{"message": "Request received"}' - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - mock_requests.post.return_value = mock_response - result = create_external_session(sid) - self.assertIs(result, mock_response) - self.assertEqual(session["state"], "sent") - mock_requests.post.assert_called_once() - call_args = mock_requests.post.call_args - posted_url = call_args[0][0] - self.assertIn("imio/esign/v1/luxtrust/sessions", posted_url) - self.assertEqual(call_args[1]["headers"]["Authorization"], "Bearer test_token") - data = json.loads(call_args[1]["data"]["data"]) - self.assertIn("commonData", data) - self.assertIn("signData", data) - self.assertNotIn("sealData", data) - self.assertEqual(data["commonData"]["sessionName"], u"Test session") - self.assertEqual(data["commonData"]["imioAppSessionId"], session["sign_id"]) - self.assertEqual(data["signData"]["users"], ["user1@sign.com", "user2@sign.com"]) - self.assertEqual(data["signData"]["watchers"], ["watcher@sign.com"]) - files_param = call_args[1]["files"] - self.assertEqual(len(files_param), 2) - self.assertEqual(files_param[0][0], "files") - self.assertEqual(files_param[0][1][0], u"annex0.pdf") - self.assertEqual(files_param[1][0], "files") - self.assertEqual(files_param[1][1][0], u"annex2.pdf") - - # Case 3: HTTP non-200 => state unchanged (stays "draft") - sid2, session2 = add_files_to_session(signers, (self.uids[2],), title=u"Test session 2") - mock_response_fail = MagicMock() - mock_response_fail.status_code = 400 - mock_response_fail.text = '{"error": "Bad request"}' - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - mock_requests.post.return_value = mock_response_fail - result = create_external_session(sid2) - self.assertIs(result, mock_response_fail) - self.assertEqual(session2["state"], "draft") - - # Case 4: seal session without seal_code configured => returns "_no_seal_code_", no HTTP call - api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com") - annex0_obj = uuidToObject(uuid=self.uids[0], unrestricted=True) - annex0_obj.file.filename = u"annex1.pdf" - sid3, session3 = add_files_to_session(signers, (self.uids[4], self.uids[1], self.uids[0]), seal="PADES_SEAL") - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - result = create_external_session(sid3) - self.assertEqual(result, "_no_seal_code_") - mock_requests.post.assert_not_called() - - # Case 5: seal session with seal_code, custom URL => sealData in payload, correct URL used - api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL") - api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com") - mock_response_seal = MagicMock() - mock_response_seal.status_code = 200 - mock_response_seal.text = '{"message": "OK"}' - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - mock_requests.post.return_value = mock_response_seal - result = create_external_session(sid3, esign_root_url="https://custom.example.com") - self.assertIs(result, mock_response_seal) - self.assertEqual(session3["state"], "sent") - call_args = mock_requests.post.call_args - self.assertEqual(call_args[0][0], "https://custom.example.com/imio/esign/v1/luxtrust/sessions") - data = json.loads(call_args[1]["data"]["data"]) - self.assertIn("sealData", data) - self.assertEqual(data["sealData"]["sealCode"], "PADES_SEAL") - self.assertEqual(data["sealData"]["users"], ["seal@example.com"]) - self.assertTrue(data["sealData"]["acroform"]) - files_param = call_args[1]["files"] - self.assertEqual(len(files_param), 3) - self.assertEqual(files_param[0][0], "files") - self.assertEqual(files_param[0][1][0], u"annex1-1.pdf") - self.assertEqual(files_param[1][0], "files") - self.assertEqual(files_param[1][1][0], u"annex4.pdf") - self.assertEqual(files_param[2][1][0], u"annex1.pdf") - - # Case 6: session without files to send => returns "_no_files_", no HTTP call - for i in range(len(session3["files"])): - session3["files"][i]["uid"] = "nonexistent_uid_{}".format(i) - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - result = create_external_session(sid3) - self.assertEqual(result, "_no_files_") - mock_requests.post.assert_not_called() - - # case 7: bad session number to send => returns "_session_not_found_", no HTTP call - with patch('imio.esign.utils.get_auth_token', return_value='test_token'), \ - patch('imio.esign.utils.requests') as mock_requests: - result = create_external_session(99) - self.assertEqual(result, "_session_not_found_") - mock_requests.post.assert_not_called() - - def test_get_filesize(self): - """Test get_filesize returns the correct file size.""" - # even index => annex1.pdf (6968 bytes) - self.assertEqual(get_filesize(self.uids[0]), 6968) - # odd index => annex2.pdf (7014 bytes) - self.assertEqual(get_filesize(self.uids[1]), 7014) - # invalid UID returns 0 - self.assertEqual(get_filesize("nonexistent_uid"), 0) - # check size get robustness - annex = uuidToObject(uuid=self.uids[0], unrestricted=True) - self.assertEqual(annex.content_category, "plone-annexes_types_-_annexes_-_to_sign") - folder = annex.__parent__ - self.assertEqual(folder.categorized_elements[self.uids[0]]["filesize"], 6968) - folder.categorized_elements[self.uids[0]]["filesize"] = 1111 - self.assertEqual(get_filesize(self.uids[0]), 1111) - del folder.categorized_elements[self.uids[0]]["filesize"] - self.assertEqual(get_filesize(self.uids[0]), 6968) - del folder.categorized_elements[self.uids[0]] - self.assertEqual(get_filesize(self.uids[0]), 6968) - delattr(folder, "categorized_elements") - self.assertEqual(get_filesize(self.uids[0]), 6968) - - def test_session_size_discrimination(self): - """Test that sessions are split when they would exceed max_session_size.""" - signers = [ - ("user1", "user1@sign.com", "User 1", "Position 1"), - ] - - # add one file to create a session (annex1.pdf = 6968 bytes) - sid, session = add_files_to_session(signers, (self.uids[0],)) + self.assertEqual(len(ses["files"]), 1) + self.assertEqual(ses["files"][0]["filename"], "annex0.pdf") + self.assertEqual(ses["files"][0]["title"], "Annex 0") + self.assertEqual(ses["size"], 6968) + annex0.file.filename = u"new_annex0.pdf" + annex0.setTitle("New Annex 0") + sid, ses = add_files_to_session(signers, (annex0_uid,)) self.assertEqual(sid, 0) - self.assertEqual(session["size"], 6968) - - # set session size just under the 1 MB limit so adding another file exceeds it - session["size"] = 1 * 1024**2 - 1 # 1 MB - 1 byte - previous_max = get_esign_registry_max_session_size() - self.addCleanup(set_esign_registry_max_session_size, previous_max) + self.assertEqual(len(ses["files"]), 1) + self.assertEqual(ses["files"][0]["filename"], "new_annex0.pdf") + self.assertEqual(ses["files"][0]["title"], "New Annex 0") + self.assertEqual(ses["size"], 6968) + sid, ses = add_files_to_session(signers, (annex0_uid,)) + self.assertEqual(sid, 0) + self.assertEqual(len(ses["files"]), 1) + self.assertEqual(ses["files"][0]["filename"], "new_annex0.pdf") + self.assertEqual(ses["files"][0]["title"], "New Annex 0") + self.assertEqual(ses["size"], 6968) + annex1_uid = self.uids[1] + annex1 = api.content.get(UID=annex1_uid) + sid, ses = add_files_to_session(signers, (annex1_uid,)) + self.assertEqual(sid, 0) + self.assertEqual(len(ses["files"]), 2) + self.assertEqual(ses["files"][1]["filename"], "annex1.pdf") + self.assertEqual(ses["files"][1]["title"], "Annex 1") + self.assertEqual(ses["size"], 13982) + annex1.setTitle("New Annex 1") + with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f: + annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex1.pdf", contentType="application/pdf") + notify(ObjectModifiedEvent(annex1)) + self.assertEqual(len(ses["files"]), 2) + self.assertEqual(ses["files"][1]["filename"], "new_annex1.pdf") + self.assertEqual(ses["files"][1]["title"], "New Annex 1") + self.assertEqual(ses["files"][1]["scan_id"], "012345600000001") + self.assertEqual(ses["size"], 13936) + with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f: + annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex0.pdf", contentType="application/pdf") + annex1.scan_id = "012345600000002" + notify(ObjectModifiedEvent(annex1)) + self.assertEqual(ses["files"][1]["filename"], "new_annex0-1.pdf") + self.assertEqual(ses["files"][1]["scan_id"], "012345600000002") + annex2_uid = self.uids[2] + annex2 = api.content.get(UID=annex2_uid) + notify(ObjectModifiedEvent(annex2)) + remove_files_from_session((annex0_uid,)) + self.assertEqual(ses["size"], 6968) + + # --- size-based session splitting --- + del root_annot["imio.esign"] + annex0.file.filename = u"annex0.pdf" + sid0, ses0 = add_files_to_session(signers, (annex0_uid,)) + self.assertEqual(sid0, 0) + self.assertEqual(ses0["size"], 6968) + ses0["size"] = 1 * 1024 ** 2 - 1 + prev_max = get_esign_registry_max_session_size() + self.addCleanup(set_esign_registry_max_session_size, prev_max) set_esign_registry_max_session_size(1) - - # adding a file (~7KB) would exceed 1 MB => new session created - sid2, session2 = add_files_to_session(signers, (self.uids[1],)) + sid1, ses1 = add_files_to_session(signers, (self.uids[1],)) + self.assertEqual(sid1, 1) + self.assertIsNot(ses1, ses0) + self.assertEqual(ses1["size"], 6968) + sid2, ses2 = add_files_to_session(signers, (self.uids[2],)) self.assertEqual(sid2, 1) - self.assertIsNot(session2, session) - self.assertEqual(session2["size"], 7014) - - # with enough room, files go to the same session - sid3, session3 = add_files_to_session(signers, (self.uids[2],)) - self.assertEqual(sid3, 1) - self.assertIs(session3, session2) - self.assertEqual(session3["size"], 7014 + 6968) + self.assertIs(ses2, ses1) + self.assertEqual(ses2["size"], 6968 + 6968) def test_add_files_ordering_by_context(self): - """Files added to a session are ordered by their position within their context.""" + """add_files_to_session: files are ordered by their sibling position within their context.""" def reset_annotation(): annot = get_session_annotation() annot["sessions"].clear() @@ -460,147 +289,103 @@ def reset_annotation(): sid, session = add_files_to_session(signers, (self.uids[4],), session_id=sid) sid, session = add_files_to_session(signers, (self.uids[1],), session_id=sid) sid, session = add_files_to_session(signers, (self.uids[2],), session_id=sid) - self.assertEqual([f["uid"] for f in session["files"]], [self.uids[0], self.uids[2], self.uids[4], self.uids[1]]) - - def test_add_files_with_duplicate_filenames(self): - """Test that files with duplicate filenames are renamed with suffix.""" - annot = get_session_annotation() - self.assertEqual(len(annot["sessions"]), 0) + self.assertEqual( + [f["uid"] for f in session["files"]], + [self.uids[0], self.uids[2], self.uids[4], self.uids[1]], + ) + def test_remove_files_from_session(self): + """remove_files_from_session: partial removal keeps session; removing last file deletes it.""" signers = [ ("user1", "user1@sign.com", "User 1", "Position 1"), + ("user2", "user2@sign.com", "User 2", "Position 2"), ] + annot = get_session_annotation() + add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2])) - for i in range(3): - annex = api.content.get(UID=self.uids[i]) - annex.file.filename = u"same_filename.pdf" - # annex.reindexObject() + # --- partial removal: 2 of 3 files --- + remove_files_from_session((self.uids[0], self.uids[1])) + self.assertEqual(len(annot["uids"]), 1) + self.assertEqual(len(annot["sessions"][0]["files"]), 1) + self.assertEqual(annot["sessions"][0]["size"], 6968) # only uids[2] (annex1.pdf) remains - sid, session = add_files_to_session(signers, (self.uids[0],)) - self.assertEqual(len(session["files"]), 1) - self.assertEqual(session["files"][0]["filename"], "same_filename.pdf") - sid, session = add_files_to_session(signers, (self.uids[1],), session_id=sid) - self.assertEqual(len(session["files"]), 2) - self.assertIn("same_filename-1.pdf", [f["filename"] for f in session["files"]]) + # --- remove last file: session deleted --- + remove_files_from_session((self.uids[2],)) + self.assertEqual(len(annot["uids"]), 0) + self.assertEqual(len(annot["sessions"]), 0) - sid, session = add_files_to_session(signers, (self.uids[2],), session_id=sid) - self.assertEqual(len(session["files"]), 3) - filenames = [f["filename"] for f in session["files"]] - self.assertIn("same_filename.pdf", filenames) - self.assertIn("same_filename-1.pdf", filenames) - self.assertIn("same_filename-2.pdf", filenames) + # --- remove_empty_session=False: session kept with empty files list --- + add_files_to_session(signers, (self.uids[0],)) + session_id = annot["uids"][self.uids[0]] + remove_files_from_session((self.uids[0],), remove_empty_session=False) + self.assertEqual(len(annot["uids"]), 0) + self.assertEqual(len(annot["sessions"]), 1) + self.assertEqual(annot["sessions"][session_id]["files"], []) - def test_add_files_already_exist_is_updated(self): - """When adding a file to the same session, it is updated.""" - annot = get_session_annotation() - self.assertEqual(len(annot["sessions"]), 0) + def test_get_filesize(self): + """get_filesize: reads cached value from categorized_elements, falls back to blob.""" + self.assertEqual(get_filesize(self.uids[0]), 6968) # annex0 uses annex1.pdf (6968 bytes) + self.assertEqual(get_filesize(self.uids[1]), 7014) # annex1 uses annex2.pdf (7014 bytes) + self.assertEqual(get_filesize("nonexistent_uid"), 0) - signers = [ - ("user1", "user1@sign.com", "User 1", "Position 1"), - ] + annex = uuidToObject(uuid=self.uids[0], unrestricted=True) + self.assertEqual(annex.content_category, "plone-annexes_types_-_annexes_-_to_sign") + folder = annex.__parent__ + self.assertEqual(folder.categorized_elements[self.uids[0]]["filesize"], 6968) - annex0_uid = self.uids[0] - annex0 = api.content.get(UID=annex0_uid) + folder.categorized_elements[self.uids[0]]["filesize"] = 1111 + self.assertEqual(get_filesize(self.uids[0]), 1111) - sid, session = add_files_to_session(signers, (annex0_uid,)) - self.assertEqual(sid, 0) - self.assertEqual(len(session["files"]), 1) - self.assertEqual(session["files"][0]["filename"], "annex0.pdf") - self.assertEqual(session["files"][0]["title"], "Annex 0") - self.assertEqual(session["size"], 6968) - # edit annex and add again, still one annex in session and data are updated - annex0.file.filename = u"new_annex0.pdf" - annex0.setTitle('New Annex 0') - sid, session = add_files_to_session(signers, (annex0_uid,)) - # same session_id - self.assertEqual(sid, 0) - self.assertEqual(len(session["files"]), 1) - self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf") - self.assertEqual(session["files"][0]["title"], "New Annex 0") - self.assertEqual(session["size"], 6968) - # add again exact same file - sid, session = add_files_to_session(signers, (annex0_uid,)) - self.assertEqual(sid, 0) - self.assertEqual(len(session["files"]), 1) - self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf") - self.assertEqual(session["files"][0]["title"], "New Annex 0") - self.assertEqual(session["size"], 6968) - # add second file 2 times - annex1_uid = self.uids[1] - annex1 = api.content.get(UID=annex1_uid) - sid, session = add_files_to_session(signers, (annex1_uid,)) - self.assertEqual(sid, 0) - self.assertEqual(len(session["files"]), 2) - self.assertEqual(session["files"][1]["filename"], "annex1.pdf") - self.assertEqual(session["files"][1]["title"], "Annex 1") - self.assertEqual(session["size"], 13982) - # edit and add again - annex1.setTitle('New Annex 1') - # edit file, filename and content so size changed - with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f: - annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex1.pdf", contentType="application/pdf") - notify(ObjectModifiedEvent(annex1)) - # data were already updated - self.assertEqual(len(session["files"]), 2) - self.assertEqual(session["files"][1]["filename"], "new_annex1.pdf") - self.assertEqual(session["files"][1]["title"], "New Annex 1") - self.assertEqual(session["files"][1]["scan_id"], "012345600000001") - self.assertEqual(session["size"], 13936) - # change file but add a filename already used, change scan_id as well - with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f: - annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex0.pdf", contentType="application/pdf") - annex1.scan_id = "012345600000002" - notify(ObjectModifiedEvent(annex1)) - self.assertEqual(session["files"][1]["filename"], "new_annex0-1.pdf") - self.assertEqual(session["files"][1]["scan_id"], "012345600000002") - # edit annex out of any session - annex2_uid = self.uids[2] - annex2 = api.content.get(UID=annex2_uid) - notify(ObjectModifiedEvent(annex2)) - # just to check, remove annex0 - remove_files_from_session((annex0_uid,)) - self.assertEqual(session["size"], 6968) + del folder.categorized_elements[self.uids[0]]["filesize"] + self.assertEqual(get_filesize(self.uids[0]), 6968) + + del folder.categorized_elements[self.uids[0]] + self.assertEqual(get_filesize(self.uids[0]), 6968) + + delattr(folder, "categorized_elements") + self.assertEqual(get_filesize(self.uids[0]), 6968) def test_remove_context_from_session(self): - """Test removing a context from a session.""" - annot = get_session_annotation() - self.assertEqual(len(annot["sessions"]), 0) + """remove_context_from_session: removes all files for a context UID; deletes session when empty.""" signers = [ ("user1", "user1@sign.com", "User 1", "Position 1"), ("user2", "user2@sign.com", "User 2", "Position 2"), ] - sid, session = add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2], self.uids[3])) + annot = get_session_annotation() + self.assertEqual(len(annot["sessions"]), 0) + add_files_to_session(signers, (self.uids[0], self.uids[1], self.uids[2], self.uids[3])) self.assertEqual(len(annot["uids"]), 4) self.assertEqual(len(annot["c_uids"]), 2) self.assertEqual(len(annot["sessions"]), 1) + remove_context_from_session((self.folders[0].UID(),)) self.assertEqual(len(annot["uids"]), 2) self.assertEqual(len(annot["c_uids"]), 1) self.assertEqual(len(annot["sessions"]), 1) + remove_context_from_session((self.folders[1].UID(),)) self.assertEqual(len(annot["uids"]), 0) self.assertEqual(len(annot["c_uids"]), 0) self.assertEqual(len(annot["sessions"]), 0) def test_remove_session(self): - """Test removing a session.""" - annot = get_session_annotation() - self.assertEqual(len(annot["sessions"]), 0) + """remove_session: deletes the session and clears its file UID references.""" signers = [ ("user1", "user1@sign.com", "User 1", "Position 1"), ("user2", "user2@sign.com", "User 2", "Position 2"), ] - sid, session = add_files_to_session(signers, (self.uids[0], self.uids[1])) - self.assertEqual(sid, 0) - sid, session = add_files_to_session(signers, (self.uids[2], self.uids[3]), seal="seal1") - self.assertEqual(sid, 1) - remove_session(0) # remove first session + annot = get_session_annotation() + self.assertEqual(len(annot["sessions"]), 0) + add_files_to_session(signers, (self.uids[0], self.uids[1])) + sid2, _session = add_files_to_session(signers, (self.uids[2], self.uids[3]), seal="seal1") + self.assertEqual(sid2, 1) + remove_session(0) self.assertEqual(len(annot["uids"]), 2) self.assertEqual(len(annot["c_uids"]), 2) self.assertEqual(len(annot["sessions"]), 1) def test_get_session_info(self): - """Test getting info about a given session id.""" + """get_session_info: returns {} for unknown id; returns session dict for known id.""" annot = get_session_annotation() self.assertEqual(len(annot["sessions"]), 0) self.assertEqual(get_session_info(0), {}) @@ -614,74 +399,70 @@ def test_get_session_info(self): self.assertEqual(get_session_info(sid), session) def test_get_sessions_for(self): - """Test getting sessions for a given context_uid.""" - # no session + """get_sessions_for: returns OrderedDict keyed by session id; honours readonly flag.""" context_uid = self.folders[0].UID() self.assertEqual(get_sessions_for(context_uid), OrderedDict()) - # one session + signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - sid, session = add_files_to_session(signers, (self.uids[0],)) - self.assertEqual(get_sessions_for(context_uid).keys(), [0]) - # two sessions - signers = [("user2", "user2@sign.com", "User 2", "Position 2")] - sid, session = add_files_to_session(signers, (self.uids[2],)) - self.assertEqual(get_sessions_for(context_uid).keys(), [0, 1]) - # readonly=True + add_files_to_session(signers, (self.uids[0],)) + self.assertEqual(list(get_sessions_for(context_uid).keys()), [0]) + + signers2 = [("user2", "user2@sign.com", "User 2", "Position 2")] + add_files_to_session(signers2, (self.uids[2],)) + self.assertEqual(list(get_sessions_for(context_uid).keys()), [0, 1]) + + # readonly=True (default): mutations do not persist sessions = get_sessions_for(context_uid) - self.assertEqual(get_session_info(0)['watchers'], []) - sessions[0]['watchers'] = ["watcher@sign.com"] - self.assertEqual(get_session_info(0)['watchers'], []) - # readonly=False + self.assertEqual(get_session_info(0)["watchers"], []) + sessions[0]["watchers"] = ["watcher@sign.com"] + self.assertEqual(get_session_info(0)["watchers"], []) + + # readonly=False: mutations persist sessions = get_sessions_for(context_uid, readonly=False) - self.assertEqual(get_session_info(0)['watchers'], []) - sessions[0]['watchers'] = ["watcher@sign.com"] - self.assertEqual(get_session_info(0)['watchers'], ["watcher@sign.com"]) + self.assertEqual(get_session_info(0)["watchers"], []) + sessions[0]["watchers"] = ["watcher@sign.com"] + self.assertEqual(get_session_info(0)["watchers"], ["watcher@sign.com"]) def test_get_file_info(self): - """Test getting infos for a given file.""" + """get_file_info: returns None for unknown session/file; honours readonly flag.""" annex0_uid = self.uids[0] annex1_uid = self.uids[1] - # no session self.assertIsNone(get_file_info(0, annex0_uid)) - # create session + signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - sid, session = add_files_to_session(signers, (annex0_uid,)) - self.assertEqual(get_file_info(0, annex0_uid)['uid'], annex0_uid) + add_files_to_session(signers, (annex0_uid,)) + self.assertEqual(get_file_info(0, annex0_uid)["uid"], annex0_uid) self.assertIsNone(get_file_info(0, annex1_uid)) - # readonly=True by default + + # readonly=True: mutations do not persist file_info = get_file_info(0, annex0_uid) - file_info['title'] = u'New title annex 0' - # not changed in the annotation - self.assertEqual( - get_session_annotation()['sessions'][0]['files'][0]['title'], u'Annex 0') - # readonly=False + file_info["title"] = u"New title annex 0" + self.assertEqual(get_session_annotation()["sessions"][0]["files"][0]["title"], u"Annex 0") + + # readonly=False: mutations persist file_info = get_file_info(0, annex0_uid, readonly=False) - file_info['title'] = u'New title annex 0' - # changed in the annotation - self.assertEqual( - get_session_annotation()['sessions'][0]['files'][0]['title'], u'New title annex 0') + file_info["title"] = u"New title annex 0" + self.assertEqual(get_session_annotation()["sessions"][0]["files"][0]["title"], u"New title annex 0") def test_get_file_download_url(self): - """Test generating file download URL from UID.""" + """get_file_download_url: encodes UID; respects custom root_url; accepts pre-computed short_uid.""" uid = "f40682caafc045b4b81973bd82ea9ab6" - api.portal.set_registry_record("imio.esign.file_url", "https://downloads.files.com") result = get_file_download_url(uid) - self.assertEqual(result, ("https://downloads.files.com/Rzgwy-9BVG9-viEts-5GBkn-Rm", - "Rzgwy-9BVG9-viEts-5GBkn-Rm")) + self.assertEqual( + result, ("https://downloads.files.com/Rzgwy-9BVG9-viEts-5GBkn-Rm", "Rzgwy-9BVG9-viEts-5GBkn-Rm") + ) custom_url = "https://custom.domain.org/" result = get_file_download_url(uid, root_url=custom_url) self.assertEqual(result[0], "https://custom.domain.org/Rzgwy-9BVG9-viEts-5GBkn-Rm") - # Test with another UID to verify encoding works uid2 = self.uids[0] result2 = get_file_download_url(uid2) self.assertTrue(result2[0].startswith("https://downloads.files.com/")) - self.assertEqual(shortuid_decode_id(result2[1], separator="-"), uid2) # correctly decoded + self.assertEqual(shortuid_decode_id(result2[1], separator="-"), uid2) - # Test with pre-computed short_uid parameter result3 = get_file_download_url(None, short_uid="MyCustom-Short-UID") self.assertEqual(result3, ("https://downloads.files.com/MyCustom-Short-UID", "MyCustom-Short-UID")) @@ -693,86 +474,87 @@ def test_get_max_download_date(self): today = date.today() self.assertEqual(get_max_download_date(None, timedelta(days=0), today), today) - def test_create_external_session_not_found(self): - """Returns _session_not_found_ for a non-existent session id.""" + def test_create_external_session(self): + """create_external_session: validates state, seal config, file resolution, and builds payloads.""" + signers = [("user1", "user1@sign.com", "User 1", "Position 1")] + + # --- session not found --- result = create_external_session(9999) self.assertEqual(result, "_session_not_found_") - def test_create_external_session_no_seal_email(self): - """Returns _no_seal_email_ when session requires a seal but no email configured.""" + # --- seal session: email missing --- sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL") - # seal_email defaults to "" (falsy) — no registry setup needed result = create_external_session(sid, esign_root_url="http://test.example.com") self.assertEqual(result, "_no_seal_email_") - def test_create_external_session_no_seal_code(self): - """Returns _no_seal_code_ when seal email is set but seal code is missing.""" - sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL") + # --- seal session: code missing --- api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com") self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"") - # seal_code defaults to "" (falsy) result = create_external_session(sid, esign_root_url="http://test.example.com") self.assertEqual(result, "_no_seal_code_") - def test_create_external_session_error_response(self): - """Returns response object and leaves session state unchanged on non-200.""" - signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - sid, session = add_files_to_session(signers, (self.uids[0],)) + # --- error response: state unchanged --- + sid2, session2 = add_files_to_session(signers, (self.uids[1],)) mock_response = Mock() mock_response.status_code = 500 mock_response.text = "Internal Server Error" - with patch("imio.esign.utils.requests.post", return_value=mock_response): - with patch("imio.esign.utils.get_auth_token", return_value="test-token"): - result = create_external_session(sid, esign_root_url="http://test.example.com") + with patch("imio.esign.utils.requests.post", return_value=mock_response): # real HTTP call + with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth + result = create_external_session(sid2, esign_root_url="http://test.example.com") self.assertIs(result, mock_response) - self.assertEqual(session["state"], "draft") + self.assertEqual(session2["state"], "draft") - def test_create_external_session_sign_payload(self): - """Returns response object, sets session state to 'sent', and signData contains signer email.""" - signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - sid, session = add_files_to_session(signers, (self.uids[0],)) + # --- sign payload: correct structure, state set to 'sent' --- + sid3, session3 = add_files_to_session(signers, (self.uids[2],)) mock_response = Mock() mock_response.status_code = 200 mock_response.text = '{"message": "OK"}' - with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: - with patch("imio.esign.utils.get_auth_token", return_value="test-token"): - result = create_external_session(sid, esign_root_url="http://test.example.com") + with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call + with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth + result = create_external_session(sid3, esign_root_url="http://test.example.com") self.assertIs(result, mock_response) - self.assertEqual(session["state"], "sent") + self.assertEqual(session3["state"], "sent") payload = json.loads(mock_post.call_args[1]["data"]["data"]) + posted_files = mock_post.call_args[1]["files"] + self.assertIsInstance(posted_files, list) + self.assertEqual([f[0] for f in posted_files], ["files", "files"]) + self.assertEqual([f[1][0] for f in posted_files], [u"annex1.pdf", u"annex2.pdf"]) self.assertEqual( payload, { u"commonData": { - u"imioAppSessionId": u"012345600000", + u"imioAppSessionId": u"012345600001", u"vatNumber": None, u"endpointUrl": u"http://nohost/plone/@external_session_feedback", - u"sessionName": u"Session 0", + u"sessionName": u"Session {}".format(sid3), u"documentData": [ { - u"uniqueCode": u"012345600000000__{}".format(self.uids[0]), - u"docUuid": get_suid_from_uuid(self.uids[0]), - u"filename": u"annex0.pdf", - } + u"uniqueCode": u"012345600000001__{}".format(self.uids[1]), + u"docUuid": get_suid_from_uuid(self.uids[1]), + u"filename": u"annex1.pdf", + }, + { + u"uniqueCode": u"012345600000002__{}".format(self.uids[2]), + u"docUuid": get_suid_from_uuid(self.uids[2]), + u"filename": u"annex2.pdf", + }, ], }, u"signData": {u"acroform": True, u"users": [u"user1@sign.com"]}, }, ) - def test_create_external_session_seal_payload(self): - """Seal-only session: sealData contains seal email and code; no signData.""" - sid, _session = add_files_to_session([], (self.uids[0],), seal="SEAL") - api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com") + # --- seal-only payload --- api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL") - self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"") self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_code", u"") + sid4, _session = add_files_to_session([], (self.uids[3],), seal="SEAL") mock_response = Mock() mock_response.status_code = 200 mock_response.text = '{"message": "OK"}' - with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: - with patch("imio.esign.utils.get_auth_token", return_value="test-token"): - create_external_session(sid, esign_root_url="http://test.example.com") + with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call + with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth + create_external_session(sid4, esign_root_url="http://test.example.com") + self.assertEqual(mock_post.call_args[0][0], "http://test.example.com/imio/esign/v1/luxtrust/sessions") payload = json.loads(mock_post.call_args[1]["data"]["data"]) self.assertEqual( payload, @@ -781,13 +563,18 @@ def test_create_external_session_seal_payload(self): u"imioAppSessionId": u"012345600000", u"vatNumber": None, u"endpointUrl": u"http://nohost/plone/@external_session_feedback", - u"sessionName": u"Session 0", + u"sessionName": u"Session {}".format(sid4), u"documentData": [ { u"uniqueCode": u"012345600000000__{}".format(self.uids[0]), u"docUuid": get_suid_from_uuid(self.uids[0]), u"filename": u"annex0.pdf", - } + }, + { + u"uniqueCode": u"012345600000003__{}".format(self.uids[3]), + u"docUuid": get_suid_from_uuid(self.uids[3]), + u"filename": u"annex3.pdf", + }, ], }, u"sealData": { @@ -798,22 +585,21 @@ def test_create_external_session_seal_payload(self): }, }, ) - - def test_create_external_session_both_payload(self): - """Session with signers and seal: both signData and sealData are present.""" - signers = [("user1", "user1@sign.com", "User 1", "Position 1")] - sid, _session = add_files_to_session(signers, (self.uids[0],), seal="SEAL") - api.portal.set_registry_record("imio.esign.seal_email", u"seal@example.com") - api.portal.set_registry_record("imio.esign.seal_code", u"PADES_SEAL") - set_esign_registry_external_watchers(u"example@imlo.be") # Also test with one external watcher - self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_email", u"") - self.addCleanup(api.portal.set_registry_record, "imio.esign.seal_code", u"") + posted_files = mock_post.call_args[1]["files"] + self.assertIsInstance(posted_files, list) + self.assertEqual([f[0] for f in posted_files], ["files", "files"]) + self.assertEqual([f[1][0] for f in posted_files], [u"annex0.pdf", u"annex3.pdf"]) + + # --- combined sign + seal payload --- + sid5, _session = add_files_to_session(signers, (self.uids[4],), seal="SEAL") + set_esign_registry_external_watchers(u"example@imlo.be") + self.addCleanup(set_esign_registry_external_watchers, u"") mock_response = Mock() mock_response.status_code = 200 mock_response.text = '{"message": "OK"}' - with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: - with patch("imio.esign.utils.get_auth_token", return_value="test-token"): - create_external_session(sid, esign_root_url="http://test.example.com") + with patch("imio.esign.utils.requests.post", return_value=mock_response) as mock_post: # real HTTP call + with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth + create_external_session(sid5, esign_root_url="http://test.example.com") payload = json.loads(mock_post.call_args[1]["data"]["data"]) self.assertEqual( payload, @@ -821,18 +607,18 @@ def test_create_external_session_both_payload(self): u"signData": { u"acroform": True, u"users": [u"user1@sign.com"], - u"watchers": [u'example@imlo.be'], + u"watchers": [u"example@imlo.be"], }, u"commonData": { - u"imioAppSessionId": u"012345600000", + u"imioAppSessionId": u"012345600002", u"vatNumber": None, u"endpointUrl": u"http://nohost/plone/@external_session_feedback", - u"sessionName": u"Session 0", + u"sessionName": u"Session {}".format(sid5), u"documentData": [ { - u"uniqueCode": u"012345600000000__{}".format(self.uids[0]), - u"docUuid": get_suid_from_uuid(self.uids[0]), - u"filename": u"annex0.pdf", + u"uniqueCode": u"012345600000004__{}".format(self.uids[4]), + u"docUuid": get_suid_from_uuid(self.uids[4]), + u"filename": u"annex4.pdf", } ], }, @@ -840,56 +626,23 @@ def test_create_external_session_both_payload(self): u"sealCode": u"PADES_SEAL", u"acroform": True, u"users": [u"seal@example.com"], - u"watchers": [u'example@imlo.be'], + u"watchers": [u"example@imlo.be"], }, }, ) - - -# example of annotation content -""" -{ - "numbering": 1, - "uids": {"3c0528c0ad364641be8b9cbaedbf6620": 0}, - "c_uids": {"f66b3da2d2e947fd81ab65e3e36c039d": ["3c0528c0ad364641be8b9cbaedbf6620"]}, - "sessions": { - 0: { - "acroform": True, - "cliend_id": "0123456", - "discriminators": (), - "files": [ - { - "context_uid": "f66b3da2d2e947fd81ab65e3e36c039d", - "scan_id": "012345600000000", - "title": u"Annex 0", - "uid": "3c0528c0ad364641be8b9cbaedbf6620", - "filename": u"annex0.pdf", - } - ], - "last_update": datetime.datetime(2025, 8, 13, 13, 22, 41, 107895), - "returns": [] - "seal": None, - "sign_id": None, - "sign_url": None, - "signers": [ - { - "status": "", - "userid": "user1", - "email": "user1@sign.com", - "fullname": "User 1", - "position": "Position 1", - }, - { - "status": "", - "userid": "user2", - "email": "user2@sign.com", - "fullname": "User 2", - "position": "Position 2", - }, - ], - "state": "draft", - "title": "my title", - } - }, -} -""" + posted_files = mock_post.call_args[1]["files"] + self.assertIsInstance(posted_files, list) + self.assertEqual([f[0] for f in posted_files], ["files"]) + self.assertEqual([f[1][0] for f in posted_files], [u"annex4.pdf"]) + + # --- no resolvable files: returns _no_files_, no HTTP call made --- + sid6, session6 = add_files_to_session(signers, (self.uids[5],)) + for i in range(len(session6["files"])): + session6["files"][i]["uid"] = "nonexistent_uid_{}".format(i) + with patch("imio.esign.utils.get_auth_token", return_value="test-token"): # remote OAuth + with patch( + "imio.esign.utils.requests" + ) as mock_requests: # real HTTP call; whole module patched to assert .post not called + result = create_external_session(sid6, esign_root_url="http://test.example.com") + self.assertEqual(result, "_no_files_") + mock_requests.post.assert_not_called() diff --git a/src/imio/esign/utils.py b/src/imio/esign/utils.py index 659556e..e74a18b 100644 --- a/src/imio/esign/utils.py +++ b/src/imio/esign/utils.py @@ -183,9 +183,9 @@ def create_external_session(session_id, esign_root_url=None): data_payload = { "commonData": { "endpointUrl": portal.absolute_url() + "/@external_session_feedback", - "documentData": [{"filename": filename, "uniqueCode": "{}__{}".format(unique_code, uid), - "docUuid": get_suid_from_uuid(uid)} - for unique_code, filename, z, uid in files], + "documentData": [{"filename": filename, "uniqueCode": "{}__{}".format(unique_code, fuid), + "docUuid": get_suid_from_uuid(fuid)} + for unique_code, filename, z, fuid in files], "imioAppSessionId": session["sign_id"], "sessionName": session["title"], } @@ -227,7 +227,7 @@ def create_external_session(session_id, esign_root_url=None): } # files_payload = {filename: file_content for z, filename, file_content, uid in files} - files_payload = [("files", (filename, file_content)) for z, filename, file_content, uid in files] + files_payload = [("files", (filename, file_content)) for z, filename, file_content, _uid in files] # Headers avec autorisation headers = {