Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/e2e/catalog/price_lists/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

from tests.e2e.helper import create_fixture_resource_and_delete


@pytest.fixture
def price_list_data(product_id):
return {
"notes": "e2e - price list please delete",
"defaultMarkup": "20.0",
"product": {"id": product_id},
"currency": "USD",
"default": False,
}


@pytest.fixture
def price_lists_service(mpt_ops):
return mpt_ops.catalog.price_lists


@pytest.fixture
def created_price_list(price_lists_service, price_list_data):
with create_fixture_resource_and_delete(price_lists_service, price_list_data) as price_list:
yield price_list


@pytest.fixture
def price_list_id(created_price_list):
return created_price_list.id
74 changes: 74 additions & 0 deletions tests/e2e/catalog/price_lists/test_async_price_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest

from mpt_api_client.exceptions import MPTAPIError
from tests.e2e.helper import (
assert_async_service_filter_with_iterate,
assert_async_update_resource,
async_create_fixture_resource_and_delete,
)

pytestmark = [pytest.mark.flaky]


@pytest.fixture
def async_price_lists_service(async_mpt_ops):
return async_mpt_ops.catalog.price_lists


@pytest.fixture
async def async_created_price_list(async_price_lists_service, price_list_data):
async with async_create_fixture_resource_and_delete(
async_price_lists_service, price_list_data
) as price_list:
yield price_list


def test_create_price_list(async_created_price_list, product_id):
result = async_created_price_list

assert result.product.id == product_id


async def test_get_price_list(async_price_lists_service, async_created_price_list):
result = await async_price_lists_service.get(async_created_price_list.id)

assert result.id == async_created_price_list.id


async def test_get_price_list_by_id(async_price_lists_service, price_list_id):
result = await async_price_lists_service.get(price_list_id)

assert result.id == price_list_id


async def test_filter_price_lists(async_price_lists_service, async_created_price_list):
await assert_async_service_filter_with_iterate(
async_price_lists_service, async_created_price_list.id, ["-product"]
) # act


async def test_update_price_list(async_price_lists_service, async_created_price_list, short_uuid):
await assert_async_update_resource(
async_price_lists_service,
async_created_price_list.id,
"notes",
f"Updated notes {short_uuid}",
) # act


async def test_delete_price_list(async_price_lists_service, async_created_price_list):
await async_price_lists_service.delete(async_created_price_list.id) # act


async def test_get_price_list_not_found(async_price_lists_service):
bogus_id = "PRL-0000-NOTFOUND"

with pytest.raises(MPTAPIError, match=r"404 Not Found"):
await async_price_lists_service.get(bogus_id)


async def test_create_price_list_invalid_data(async_price_lists_service):
invalid_data = {"name": "e2e - delete me"}

with pytest.raises(MPTAPIError, match=r"400 One or more validation errors occurred"):
await async_price_lists_service.create(invalid_data)
65 changes: 65 additions & 0 deletions tests/e2e/catalog/price_lists/test_sync_price_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pytest

from mpt_api_client.exceptions import MPTAPIError
from tests.e2e.helper import (
assert_service_filter_with_iterate,
assert_update_resource,
)

pytestmark = [pytest.mark.flaky]


def test_create_price_list(created_price_list, product_id):
result = created_price_list

assert result.product.id == product_id


def test_get_price_list(price_lists_service, created_price_list):
result = price_lists_service.get(created_price_list.id)

assert result.id == created_price_list.id


def test_get_price_list_by_id(price_lists_service, price_list_id):
result = price_lists_service.get(price_list_id)

assert result.id == price_list_id


def test_iterate_price_lists(price_lists_service, created_price_list):
price_lists = list(price_lists_service.iterate())

result = any(price_list.id == created_price_list.id for price_list in price_lists)

assert result is True


def test_filter_price_lists(price_lists_service, created_price_list):
assert_service_filter_with_iterate(
price_lists_service, created_price_list.id, ["-product"]
) # act


def test_update_price_list(price_lists_service, created_price_list, short_uuid):
assert_update_resource(
price_lists_service, created_price_list.id, "notes", f"Updated notes {short_uuid}"
) # act


def test_delete_price_list(price_lists_service, created_price_list):
price_lists_service.delete(created_price_list.id) # act


def test_get_price_list_not_found(price_lists_service):
bogus_id = "PRL-0000-NOTFOUND"

with pytest.raises(MPTAPIError, match=r"404 Not Found"):
price_lists_service.get(bogus_id)


def test_create_price_list_invalid_data(price_lists_service):
invalid_data = {"name": "e2e - delete me"}

with pytest.raises(MPTAPIError, match=r"400 One or more validation errors occurred"):
price_lists_service.create(invalid_data)
68 changes: 68 additions & 0 deletions tests/e2e/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from contextlib import asynccontextmanager, contextmanager
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svazquezco I believe you were suggesting something like these.

If these is what you mean with generic tests, let me know and I will refactor the rest of the tests to use this.


from mpt_api_client import RQLQuery
from mpt_api_client.exceptions import MPTAPIError


@asynccontextmanager
async def async_create_fixture_resource_and_delete(service, resource_data):
resource = await service.create(resource_data)

yield resource

try:
await service.delete(resource.id)
except MPTAPIError as error:
print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}") # noqa: WPS421
Comment on lines +13 to +16
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove unused/unknown noqa codes on teardown prints

Ruff reports RUF100 because WPS421 isn’t a known rule here, so these # noqa: WPS421 comments are now lint errors. Since print isn’t otherwise flagged, you can safely drop the noqa markers.

Suggested change:

-    except MPTAPIError as error:
-        print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}")  # noqa:  WPS421
+    except MPTAPIError as error:
+        print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}")
@@
-    except MPTAPIError as error:
-        print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}")  # noqa:  WPS421
+    except MPTAPIError as error:
+        print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}")

Also applies to: 25-28

🧰 Tools
🪛 Ruff (0.14.6)

16-16: Unused noqa directive (unknown: WPS421)

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In tests/e2e/helper.py around lines 13-16 (and similarly lines 25-28), remove
the invalid/unknown inline noqa markers on the teardown print statements: the
prints are fine and the `# noqa:  WPS421` causes RUF100 because WPS421 is not
recognized; simply delete the `# noqa:  WPS421` suffixes so the print calls
remain and no lint suppression is present.



@contextmanager
def create_fixture_resource_and_delete(service, resource_data):
resource = service.create(resource_data)

yield resource

try:
service.delete(resource.id)
except MPTAPIError as error:
print(f"TEARDOWN - Unable to delete resource {resource}: {error.title}") # noqa: WPS421


async def assert_async_service_filter_with_iterate(service, filter_by_id, select: list[str] | None):
filtered = service.filter(RQLQuery(id=filter_by_id))
if select:
filtered = filtered.select(*select)

result = [filtered_item async for filtered_item in filtered.iterate()]

assert len(result) == 1
assert result[0].id == filter_by_id


def assert_service_filter_with_iterate(service, filter_by_id, select: list[str] | None):
filtered = service.filter(RQLQuery(id=filter_by_id))
if select:
filtered = filtered.select(*select)

result = list(filtered.iterate())

assert len(result) == 1
assert result[0].id == filter_by_id


def assert_update_resource(service, resource_id, update_field, update_value):
payload = {update_field: update_value}

result = service.update(resource_id, payload)

assert result.id == resource_id
assert result.to_dict().get(update_field) == update_value


async def assert_async_update_resource(service, resource_id, update_field, update_value):
payload = {update_field: update_value}

result = await service.update(resource_id, payload)

assert result.id == resource_id
assert result.to_dict().get(update_field) == update_value