Skip to content

Conversation

@Takuto88
Copy link
Collaborator

  • Introduced USSD configuration handling in config.yaml.
  • Added support for GSUP USSD operation in request_dispatcher.py.
  • Implemented SSController class to process USSD requests.
  • Integrated ASN.1 USSD definitions for encoding/decoding.
  • Added new dependencies: asn1tools and smspdudecoder.

The primary author of this is Alexander Couzens. I (Lennart) only contributed the dynamic configuration of the USSD message, which was hard-coded in the original implementation.

@Takuto88 Takuto88 requested a review from osmith42 January 21, 2026 16:31
@Takuto88
Copy link
Collaborator Author

CC @lynxis FYI

- Introduced USSD configuration handling in `config.yaml`.
- Added support for GSUP USSD operation in `request_dispatcher.py`.
- Implemented `SSController` class to process USSD requests.
- Integrated ASN.1 USSD definitions for encoding/decoding.
- Added new dependencies: `asn1tools` and `smspdudecoder`.

The primary author of this is Alexander Couzens. I (Lennart) only
contributed the dynamic configuration of the USSD message, which was
hard-coded in the original implementation.

Co-authored-by: Lennart Rosam <hello@takuto.de>
Copy link
Collaborator

@osmith42 osmith42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for implementing this! Would it be feasible to add a small unit test for this?

Comment on lines +60 to +63
def copy_field(key: str):
field = GsupMessageUtil.get_first_ie_by_name(key, message)
if field:
response.with_ie(key, field)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about deduplicating copy_field, e.g. by adding message as parameter and moving it to lib/gsup/protocol/gsup_msg.py:GsupMessageUtil()?

OrderedDict([('ussd-DataCodingScheme', b'\x0f'),
('ussd-String', b'\xaaQ\x0c\x06\x1b\x01')])
"""
attr = USSD.modules['Foo']['USSD-Arg']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Foo" sounds like WIP code, how about e.g. "Main"?

def encode_component(invoke_id: int, answer: str):
"""
Generate a full response which only needs to be encoded into GSUP
FIXME: clean this up more
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This function looks pretty clean to me, so I'm wondering: was this already cleaned up / do we need to keep the FIXME?)




USSD-Arg ::= SEQUENCE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation until the end of file is different from the rest, remove the leading 4 spaces?

Comment on lines +39 to +40
# Simple 2G / 3G USSD support via GSUP.
# Define USSD codes and messages here. The %msisdn% and %imsi% variables can be used in messages.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Simple 2G / 3G USSD support via GSUP.
# Define USSD codes and messages here. The %msisdn% and %imsi% variables can be used in messages.

Other config options in this file don't have comments either, the top comment says:

See ../config.yaml for reference.

pyhss = ""

[tool.setuptools.package-data]
pyhss = ["*.asn1"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried this out and found that the line must be the following, or else the asn1 file does not get included. This is because of the non-standard directory structure we have in pyhss, and related tool.setuptools and tool.setuptools.package-dir options to make up for it. (Following the usual python project structures, we would have paths like src/pyhss/lib/gsup/controller).

Suggested change
pyhss = ["*.asn1"]
"*" = ["*.asn1"]

See also: https://setuptools.pypa.io/en/latest/userguide/datafiles.html

Comment on lines +144 to +146
except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd in handle_ussd: {str(e)}")
raise UnknownUSSD("Invalid class or constructed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code might mask errors and lead to misleading error messages.

For example:

  • If data['opCode'] would return a ValueError("opCode") because the opCode key was not in data because of a bug.
  • Then we get a log message Error while handling ussd in handle_ussd: opCode (without mentioning ValueError).
  • And a new exception UnknownUSSD("Invalid class or constructed") gets raised.
  • Some high level code may at some point print the exception, but now it doesn't point at the original source of the error anymore and doesn't have the same type of exception. So now one needs to look for the related error message, and figure out where it comes from (which might still be obvious here, but I've debugged other errors in PyHSS codebase and it took me a while to realize what was going on due to similar error masking).

So I would recommend removing the whole try ... except code in handle_ussd, so if an exception gets raised, it could be logged by a caller with the full stack trace that points at the exact line of the source of the error, and with the proper type of the exception. Even if we don't log it yet (PyHSS code does this in some code places, but not always), we can add it later and at least don't mask the error here.

Comment on lines +148 to +187
async def handle_message(self, peer, message):
message = message.to_dict()
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"IMSI not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session state not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session id not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

try:
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"No subscriber for IMSI found. WTF?! {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)

except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd: {str(e)}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
Copy link
Collaborator

@osmith42 osmith42 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about shortening this by sending the error only in the except block and printing a traceback on error? (untested)

Suggested change
async def handle_message(self, peer, message):
message = message.to_dict()
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"IMSI not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session state not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"Session id not found in SS message from {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
try:
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
await self._logger.logAsync(service='GSUP', level='WARN', message=f"No subscriber for IMSI found. WTF?! {peer}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)
except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling ussd: {str(e)}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return
async def handle_message(self, peer, message):
message = message.to_dict()
try:
imsi = GsupMessageUtil.get_first_ie_by_name('imsi', message)
if imsi is None:
raise ValueError("IMSI not found")
# Currently, we only support non-continuous sessions
session_state = GsupMessageUtil.get_first_ie_by_name('session_state', message)
if session_state is None:
raise ValueError("Session state not found")
session_id = GsupMessageUtil.get_first_ie_by_name('session_id', message)
if session_id is None:
raise ValueError("Session id not found")
subscriber = self._database.Get_Subscriber(imsi=imsi)
if subscriber is None:
raise ValueError(f"No subscriber found for IMSI={imsi}")
ussd_data = GsupMessageUtil.get_first_ie_by_name('supplementary_service_info', message)
await self.handle_ussd(peer, message, subscriber, ussd_data)
except Exception as e:
await self._logger.logAsync(service='GSUP', level='ERROR', message=f"Error while handling USSD from {peer}: {traceback.format_exc()}")
response = self.error_from_request(message)
await self._send_gsup_response(peer, response)
return

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lynxis wrote:

When an exception happens, we should also terminate the USSD session with a proper error code.

So if using the approach above, we would need to define a custom exception and pass the error code through it (and also handle the case when there is a different exception without error code). There seem to be some ways to do this, see e.g. https://stackoverflow.com/a/10270732.

@lynxis
Copy link
Contributor

lynxis commented Jan 22, 2026

Hey @Takuto88,
Hey @osmith42

I did this feature as part of the 39c3 preparation, so it's in a working-good-enough-for-39c3 state, which was done in a very short amount of time.

It would be great if you two could take over this early state of the feature,
otherwise it take me a while until I've time to clean it up to be merged upstream.

I would also recommend adding unit tests with different message lengths, because we had an '@' at the end of the message when we returned a 5 digit number to the user instead of 4 digit (with the additional text of 'Your ex...'.

When an exception happens, we should also terminate the USSD session with a proper error code.

@Takuto88
Copy link
Collaborator Author

I've taken a quick look at all the comments and I don't think I have anything to object here. I will clean this up some more and let you know when it is ready for another look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants