Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ jobs:
cp sinch-sdk-mockserver/features/sms/delivery-reports_servicePlanId.feature ./tests/e2e/sms/features/
cp sinch-sdk-mockserver/features/sms/batches.feature ./tests/e2e/sms/features/
cp sinch-sdk-mockserver/features/sms/batches_servicePlanId.feature ./tests/e2e/sms/features/
cp sinch-sdk-mockserver/features/sms/webhooks.feature ./tests/e2e/sms/features/
cp sinch-sdk-mockserver/features/number-lookup/lookups.feature ./tests/e2e/number-lookup/features/

- name: Wait for mock server
run: .github/scripts/wait-for-mockserver.sh
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import hashlib
import hmac
import base64
import json
from typing import Dict, Union, Optional, List


Expand Down Expand Up @@ -61,3 +63,73 @@ def get_header(header_value: Optional[Union[str, List[str]]]) -> Optional[str]:
if isinstance(header_value, list):
return header_value[0] if header_value else None
return header_value


def validate_webhook_signature_with_nonce(
callback_secret: str,
headers: Dict[str, str],
body: str
) -> bool:
"""
Validate signature headers for webhook callbacks that use nonce and timestamp.

:param callback_secret: Secret associated with the webhook.
:type callback_secret: str
:param headers: Incoming request's headers.
:type headers: Dict[str, str]
:param body: Incoming request's body.
:type body: str
:returns: True if the X-Sinch-Webhook-Signature header is valid.
:rtype: bool
"""
if callback_secret is None:
return False

normalized_headers = normalize_headers(headers)
signature = get_header(normalized_headers.get('x-sinch-webhook-signature'))
if signature is None:
return False

nonce = get_header(normalized_headers.get('x-sinch-webhook-signature-nonce'))
timestamp = get_header(normalized_headers.get('x-sinch-webhook-signature-timestamp'))

if nonce is None or timestamp is None:
return False

body_as_string = body
if isinstance(body, dict):
body_as_string = json.dumps(body)

signed_data = compute_signed_data(body_as_string, nonce, timestamp)

expected_signature = calculate_webhook_signature(signed_data, callback_secret)
return hmac.compare_digest(signature, expected_signature)


def compute_signed_data(body: str, nonce: str, timestamp: str) -> str:
"""
Compute signed data for webhook signature validation.

Format: body.nonce.timestamp (with dots as separators)
"""
return f'{body}.{nonce}.{timestamp}'


def calculate_webhook_signature(signed_data: str, secret: str) -> str:
"""
Calculate webhook signature using HMAC-SHA256 with Base64 encoding.

:param signed_data: The data to sign (body.nonce.timestamp)
:type signed_data: str
:param secret: The secret key for HMAC
:type secret: str
:returns: Base64-encoded HMAC-SHA256 signature
:rtype: str
"""
return base64.b64encode(
hmac.new(
key=secret.encode('utf-8'),
msg=signed_data.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
).decode('utf-8')
50 changes: 50 additions & 0 deletions sinch/domains/authentication/webhooks/v1/webhook_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import re
from datetime import datetime
from typing import Any, Dict


def parse_json(payload: str) -> Dict[str, Any]:
"""
Parse JSON string into a dictionary.

:param payload: JSON string to parse.
:type payload: str
:returns: Parsed dictionary.
:rtype: Dict[str, Any]
:raises ValueError: If JSON parsing fails.
"""
try:
return json.loads(payload)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to decode JSON: {e}")


def normalize_iso_timestamp(timestamp: str) -> datetime:
"""
Normalize a timestamp string to ensure compatibility with Python's `datetime.fromisoformat()`.

- Ensures that the timestamp includes a UTC offset (e.g., "+00:00") if missing.
- Replaces trailing "Z" with "+00:00" to indicate UTC.
- Trims microseconds to 6 digits.

:param timestamp: Timestamp string to normalize.
:type timestamp: str
:returns: Timezone-aware datetime object.
:rtype: datetime
:raises ValueError: If timestamp format is invalid.
"""
if timestamp.endswith("Z"):
timestamp = timestamp.replace("Z", "+00:00")
elif not re.search(r"(Z|[+-]\d{2}:?\d{2})$", timestamp):
timestamp += "+00:00"
match_ms = re.search(r"\.(\d{7,})(?=[+-])", timestamp)
if match_ms:
micro_trimmed = match_ms.group(1)[:6]
timestamp = re.sub(
r"\.\d{7,}(?=[+-])", f".{micro_trimmed}", timestamp
)
try:
return datetime.fromisoformat(timestamp)
except ValueError as e:
raise ValueError(f"Invalid timestamp format: {e}")
55 changes: 12 additions & 43 deletions sinch/domains/numbers/webhooks/v1/numbers_webhooks.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import json
from typing import Any, Dict, Union
from datetime import datetime
import re
from pydantic import StrictBool, StrictStr
from sinch.domains.authentication.webhooks.v1.authentication_validation import (
validate_signature_header,
)
from sinch.domains.authentication.webhooks.v1.webhook_utils import (
parse_json,
normalize_iso_timestamp,
)
from sinch.domains.numbers.webhooks.v1.events import NumbersWebhooksEvent


class NumbersWebhooks:
def __init__(self, callback_secret: StrictStr):
def __init__(self, callback_secret: str):
self.callback_secret = callback_secret

def validate_authentication_header(
self, headers: Dict[StrictStr, StrictStr], json_payload: StrictStr
) -> StrictBool:
self, headers: Dict[str, str], json_payload: str
) -> bool:
"""
Validate the authorization header for a callback request

:param headers: Incoming request's headers
:type headers: Dict[str, str]
:param json_payload: Incoming request's raw body
:type json_payload: StrictStr
:type json_payload: str
:returns: True if the X-Sinch-Signature header is valid
:rtype: bool
"""
Expand All @@ -31,7 +31,7 @@ def validate_authentication_header(
)

def parse_event(
self, event_body: Union[StrictStr, Dict[StrictStr, Any]]
self, event_body: Union[str, Dict[str, Any]]
) -> NumbersWebhooksEvent:
"""
Parses the event payload into a NumbersWebhooksEvent object.
Expand All @@ -41,47 +41,16 @@ def parse_event(
UTC and returns a timezone-aware ``datetime`` object.

:param event_body: The event payload.
:type event_body: Union[StrictStr, Dict[StrictStr, Any]]
:type event_body: Union[str, Dict[str, Any]]
:returns: A parsed Pydantic object with a timezone-aware ``timestamp``.
:rtype: NumbersWebhooksEvent
"""
if isinstance(event_body, str):
event_body = self._parse_json(event_body)
event_body = parse_json(event_body)
timestamp = event_body.get("timestamp")
if timestamp:
event_body["timestamp"] = self._normalize_iso_timestamp(timestamp)
event_body["timestamp"] = normalize_iso_timestamp(timestamp)
try:
return NumbersWebhooksEvent(**event_body)
except Exception as e:
raise ValueError(f"Failed to parse event body: {e}")

def _parse_json(self, payload: StrictStr) -> Dict[StrictStr, Any]:
"""
Parse JSON string into a dictionary.
"""
try:
return json.loads(payload)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to decode JSON: {e}")

def _normalize_iso_timestamp(self, timestamp: StrictStr) -> datetime:
"""
Normalize a timestamp string to ensure compatibility with Python's `datetime.fromisoformat()`
- Ensures that the timestamp includes a UTC offset (e.g., "+00:00") if missing.
- Replaces trailing "Z" with "+00:00" to indicate UTC.
- Trims microseconds to 6 digits.
"""
if timestamp.endswith("Z"):
timestamp = timestamp.replace("Z", "+00:00")
elif not re.search(r"(Z|[+-]\d{2}:?\d{2})$", timestamp):
timestamp += "+00:00"
match_ms = re.search(r"\.(\d{7,})(?=[+-])", timestamp)
if match_ms:
micro_trimmed = match_ms.group(1)[:6]
timestamp = re.sub(
r"\.\d{7,}(?=[+-])", f".{micro_trimmed}", timestamp
)
try:
return datetime.fromisoformat(timestamp)
except ValueError as e:
raise ValueError(f"Invalid timestamp format: {e}")
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional
from typing import Optional, Union
from datetime import datetime
from pydantic import Field, StrictInt, StrictStr
from pydantic import Field, StrictInt, StrictStr, field_validator
from sinch.domains.sms.models.v1.types.delivery_receipt_status_code_type import (
DeliveryReceiptStatusCodeType,
)
Expand All @@ -16,6 +16,9 @@
from sinch.domains.sms.models.v1.internal.base import (
BaseModelConfigurationResponse,
)
from sinch.domains.authentication.webhooks.v1.webhook_utils import (
normalize_iso_timestamp,
)


class RecipientDeliveryReport(BaseModelConfigurationResponse):
Expand Down Expand Up @@ -64,3 +67,12 @@ class RecipientDeliveryReport(BaseModelConfigurationResponse):
type: RecipientDeliveryReportType = Field(
default=..., description="The recipient delivery report type."
)

@field_validator("at", "operator_status_at", mode="before")
@classmethod
def normalize_timestamp(
cls, value: Optional[Union[str, datetime]]
) -> Optional[Union[str, datetime]]:
if isinstance(value, str):
return normalize_iso_timestamp(value)
return value
17 changes: 17 additions & 0 deletions sinch/domains/sms/webhooks/v1/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sinch.domains.sms.webhooks.v1.events.sms_webhooks_event import (
IncomingSMSWebhookEvent,
MOTextWebhookEvent,
MOBinaryWebhookEvent,
MOMediaWebhookEvent,
MediaBody,
MediaItem,
)

__all__ = [
"IncomingSMSWebhookEvent",
"MOTextWebhookEvent",
"MOBinaryWebhookEvent",
"MOMediaWebhookEvent",
"MediaBody",
"MediaItem",
]
97 changes: 97 additions & 0 deletions sinch/domains/sms/webhooks/v1/events/sms_webhooks_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from datetime import datetime
from typing import Optional, Union, Literal, Annotated
from pydantic import Field, StrictStr, StrictInt, conlist
from sinch.domains.sms.webhooks.v1.internal import WebhookEvent


class MediaItem(WebhookEvent):
url: StrictStr = Field(..., description="URL to the media file")
content_type: StrictStr = Field(
..., description="Content type of the media file"
)
status: Union[Literal["Uploaded", "Failed"], StrictStr] = Field(
..., description="Status of the media upload"
)
code: StrictInt = Field(..., description="Status code")


class MediaBody(WebhookEvent):
subject: Optional[StrictStr] = Field(
default=None, description="The subject text"
)
message: Optional[StrictStr] = Field(
default=None, description="The message text"
)
media: conlist(MediaItem) = Field(..., description="Array of media items")


class BaseIncomingSMSWebhookEvent(WebhookEvent):
from_: StrictStr = Field(
...,
alias="from",
description="The phone number that sent the message.",
)
id: StrictStr = Field(..., description="The ID of this inbound message.")
received_at: datetime = Field(
...,
description="When the system received the message. Formatted as ISO-8601: YYYY-MM-DDThh:mm:ss.SSSZ.",
)
to: StrictStr = Field(
...,
description="The Sinch phone number or short code to which the message was sent.",
)
client_reference: Optional[StrictStr] = Field(
default=None,
description="If this inbound message is in response to a previously sent message that contained a client reference, then this field contains that client reference. Utilizing this feature requires additional setup on your account.",
)
operator_id: Optional[StrictStr] = Field(
default=None,
description="The MCC/MNC of the sender's operator if known.",
)
sent_at: Optional[datetime] = Field(
default=None,
description="When the message left the originating device. Only available if provided by operator. Formatted as ISO-8601: YYYY-MM-DDThh:mm:ss.SSSZ.",
)


class MOTextWebhookEvent(BaseIncomingSMSWebhookEvent):
body: StrictStr = Field(
...,
description="The incoming message body. Maximum 2000 characters.",
)
type: Literal["mo_text"] = Field(
..., description="The type of incoming message. Regular SMS."
)


class MOBinaryWebhookEvent(BaseIncomingSMSWebhookEvent):
body: StrictStr = Field(
..., description="The incoming message body (Base64 encoded)."
)
type: Literal["mo_binary"] = Field(
..., description="The type of incoming message. Binary SMS."
)
udh: StrictStr = Field(
..., description="The UDH header of a binary message HEX encoded."
)


class MOMediaWebhookEvent(BaseIncomingSMSWebhookEvent):
body: MediaBody = Field(
...,
description="The media message body containing subject, message, and media items.",
)
type: Literal["mo_media"] = Field(
..., description="The type of incoming message. MMS."
)


# Union type for isinstance checks
_IncomingSMSWebhookEventUnion = Union[
MOTextWebhookEvent, MOBinaryWebhookEvent, MOMediaWebhookEvent
]

# Discriminated union for validation
IncomingSMSWebhookEvent = Annotated[
_IncomingSMSWebhookEventUnion, Field(discriminator="type")
]
5 changes: 5 additions & 0 deletions sinch/domains/sms/webhooks/v1/internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from sinch.domains.sms.webhooks.v1.internal.webhook_event import (
WebhookEvent,
)

__all__ = ["WebhookEvent"]
Loading