Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ hss:
gsup:
bind_ip: "0.0.0.0"
bind_port: 4222
# Simple 2G / 3G USSD support via GSUP.
# Define USSD codes and messages here. The %msisdn% and %imsi% variables can be used in messages.
ussd:
unknown_code_msg: "The USSD code you have entered is not recognized."
codes:
- code: "*#100#"
msg: "Your MSISDN is %msisdn%"
- code: "*#101#"
msg: "Your IMSI is %imsi%"

api:
page_size: 200
Expand Down
9 changes: 9 additions & 0 deletions docker/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ hss:
gsup:
bind_ip: "${HSS_GSUP_BIND_IP:-0.0.0.0}"
bind_port: ${HSS_GSUP_BIND_PORT:-4222}
# Simple 2G / 3G USSD support via GSUP.
# Define USSD codes and messages here. The %msisdn% and %imsi% variables can be used in messages.
ussd:
unknown_code_msg: "The USSD code you have entered is not recognized."
codes:
- code: "*#100#"
msg: "Your MSISDN is: %msisdn%"
- code: "*#101#"
msg: "Your IMSI is: %imsi%"

api:
page_size: ${API_PAGE_SIZE:-200}
Expand Down
187 changes: 187 additions & 0 deletions lib/gsup/controller/ss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# PyHSS GSUP SS Controller
# Copyright 2025-2026 Alexander Couzens <lynxis@fe80.eu>
# Copyright 2026 Lennart Rosam <hello@takuto.de>
# SPDX-License-Identifier: AGPL-3.0-or-later

from collections import OrderedDict
from pathlib import Path

import asn1tools
import binascii
from osmocom.gsup.message import MsgType
from smspdudecoder.codecs import GSM

from gsup.controller.abstract_controller import GsupController
from gsup.protocol.gsup_msg import GsupMessageUtil, GsupMessageBuilder
from pyhss_config import config


class UnknownUSSD(RuntimeError):
""" Unknown USSD message """
pass

asn1path = Path(__file__).with_name("ussd.asn1").resolve()
USSD = asn1tools.compile_files([str(asn1path)])

class SSController(GsupController):
def __init__(self, logger, database):
super().__init__(logger, database)

ussd_config = config.get('hss', {}).get('gsup', {}).get('ussd', {})
if not ussd_config or not ussd_config.get('codes', []):
self.targets = {}
self.unknown_ussd_message = "USSD is not supported on this network."
else:
ussd_targets = ussd_config.get('codes', [])
self.targets = {code['code']: code['msg'] for code in ussd_targets}
self.unknown_ussd_message = ussd_config.get('unknown_code_msg', "The USSD code you have entered is not recognized.")


@staticmethod
def error_from_request(message: dict):
""" Generate a SS Error by using the old message """
response = GsupMessageBuilder().with_msg_type(MsgType.PROC_SS_RESULT)

def copy_field(key: str):
field = GsupMessageUtil.get_first_ie_by_name(key, message)
if field:
response.with_ie(key, field)

copy_field('imsi')
copy_field('session_id')

return response.with_ie('session_state', 'end').build()

@staticmethod
def gsup_from_ussd(message: dict, ussd_encoded: bytes):
""" Generate a full GSUP message """
response = GsupMessageBuilder().with_msg_type(MsgType.PROC_SS_RESULT)

def copy_field(key: str):
field = GsupMessageUtil.get_first_ie_by_name(key, message)
if field:
response.with_ie(key, field)
Comment on lines +60 to +63
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about deduplicating copy_field, e.g. by adding message as parameter and moving it to lib/gsup/protocol/gsup_msg.py:GsupMessageUtil()?


copy_field('imsi')
copy_field('session_id')

return response.with_ie('session_state', 'end').with_ie('supplementary_service_info', ussd_encoded).build()

@staticmethod
def encode_ussd_arg(answer: str) -> bytes:
"""
Encode USSD-Arg of MAP into bytes

OrderedDict([('ussd-DataCodingScheme', b'\x0f'),
('ussd-String', b'\xaaQ\x0c\x06\x1b\x01')])
"""
attr = USSD.modules['Foo']['USSD-Arg']
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Foo" sounds like WIP code, how about e.g. "Main"?

data = OrderedDict()
data['ussd-DataCodingScheme'] = b'\x0f'
data['ussd-String'] = binascii.a2b_hex(GSM().encode(answer))

return attr.encode(data)

@staticmethod
def encode_component(invoke_id: int, answer: str):
"""
Generate a full response which only needs to be encoded into GSUP
FIXME: clean this up more
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This function looks pretty clean to me, so I'm wondering: was this already cleaned up / do we need to keep the FIXME?)


The result should look like this:
('returnResultLast',
OrderedDict(
('invokeID', 1),
('resultretres',
OrderedDict(('opCode', ('localValue', 59)),
('returnparameter',
bytearray(b'0\x1e\x04\x01\x0f\x04\x19\xd9w]\x0eJ'
b'6\xa7IPz\x0e\x92\xd9d4\x99\xed'
b'F\xbb\xe1f0\x99\xad\x06'))))))
"""
comp = USSD.modules['Foo']['Component']
outer = OrderedDict()
outer['invokeID'] = invoke_id

inner = OrderedDict()
inner['opCode'] = ('localValue', 59)
inner['returnparameter'] = SSController.encode_ussd_arg(answer)
outer['resultretres'] = inner

answer = comp.encode(('returnResultLast', outer))
return answer

async def handle_ussd(self, peer, answer, subscriber, ussd_data):
try:
op, data = USSD.decode('Component', ussd_data)
if op == "invoke":
if data['opCode'] != ('localValue', 59):
raise UnknownUSSD(f"Invalid opCode in invoke {data}")

invoke_id = data['invokeID']
ussd = USSD.decode('USSD-Arg', data['invokeparameter'])
target = GSM().decode(str(binascii.b2a_hex(ussd['ussd-String']), 'utf-8'))
await self._logger.logAsync(service='GSUP', level='INFO', message=f"Received USSD request {target}")

answer = self.targets.get(target, self.unknown_ussd_message)
if "%imsi%" in answer:
answer = answer.replace("%imsi%", subscriber['imsi'])
if "%msisdn%" in answer:
answer = answer.replace("%msisdn%", subscriber['msisdn'])

component = self.encode_component(invoke_id, answer)
response = self.gsup_from_ussd(answer, component)
await self._send_gsup_response(peer, response)
return
elif op == "returnResultLast":
pass
else:
raise UnknownUSSD(f"Invalid class or constructed {op} with {data}")

response = self.error_from_request(answer)
await self._send_gsup_response(peer, response)

except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd in handle_ussd: {str(e)}")
raise UnknownUSSD("Invalid class or constructed")
Comment on lines +144 to +146
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code might mask errors and lead to misleading error messages.

For example:

  • If data['opCode'] would return a ValueError("opCode") because the opCode key was not in data because of a bug.
  • Then we get a log message Error while handling ussd in handle_ussd: opCode (without mentioning ValueError).
  • And a new exception UnknownUSSD("Invalid class or constructed") gets raised.
  • Some high level code may at some point print the exception, but now it doesn't point at the original source of the error anymore and doesn't have the same type of exception. So now one needs to look for the related error message, and figure out where it comes from (which might still be obvious here, but I've debugged other errors in PyHSS codebase and it took me a while to realize what was going on due to similar error masking).

So I would recommend removing the whole try ... except code in handle_ussd, so if an exception gets raised, it could be logged by a caller with the full stack trace that points at the exact line of the source of the error, and with the proper type of the exception. Even if we don't log it yet (PyHSS code does this in some code places, but not always), we can add it later and at least don't mask the error here.


async def handle_message(self, peer, message):
message = message.to_dict()
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"IMSI not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session state not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session id not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

try:
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"No subscriber for IMSI found. WTF?! {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)

except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd: {str(e)}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
Comment on lines +148 to +187
Copy link
Copy Markdown
Collaborator

@osmith42 osmith42 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about shortening this by sending the error only in the except block and printing a traceback on error? (untested)

Suggested change
async def handle_message(self, peer, message):
message = message.to_dict()
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"IMSI not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session state not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session id not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
try:
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"No subscriber for IMSI found. WTF?! {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)
except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd: {str(e)}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
async def handle_message(self, peer, message):
message = message.to_dict()
try:
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
raise ValueError("IMSI not found")
# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
raise ValueError("Session state not found")
session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
raise ValueError("Session id not found")
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
raise ValueError(f"No subscriber found for IMSI={imsi}")
ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)
except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling USSD from {peer}: {traceback.format_exc()}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lynxis wrote:

When an exception happens, we should also terminate the USSD session with a proper error code.

So if using the approach above, we would need to define a custom exception and pass the error code through it (and also handle the case when there is a different exception without error code). There seem to be some ways to do this, see e.g. https://stackoverflow.com/a/10270732.

Loading