Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 62 additions & 85 deletions synapse/models/inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,119 +423,96 @@ def _getPort(self, valu):

return valu[0]

async def _normPort(self, valu):
parts = valu.split(':', 1)
if len(parts) == 2:
valu, port = parts
port = (await self.porttype.norm(port))[0]
return valu, port, f':{port}'

if self.defport:
return valu, self.defport, f':{self.defport}'

return valu, None, ''

async def _normPyStr(self, valu, view=None):
# todo: exceptions in _normPyDict wont have original str context
orig = valu
subs = {}
virts = {}
ctor = {}

# no protos use case sensitivity yet...
valu = valu.lower()

proto = self.defproto
parts = valu.split('://', 1)
if len(parts) == 2:
proto, valu = parts

if proto not in self.protos:
protostr = ','.join(self.protos)
mesg = f'inet:sockaddr protocol must be one of: {protostr}'
raise s_exc.BadTypeValu(mesg=mesg, valu=orig, name=self.name)

subs['proto'] = (self.prototype.typehash, proto, {})
ctor['proto'], valu = parts

valu = valu.strip().strip('/')

# Treat as IPv6 if starts with [ or contains multiple :
if valu.startswith('['):
match = srv6re.match(valu)
if match:
ipv6, port = match.groups()

ipv6, norminfo = await self.iptype.norm(ipv6)
host = self.iptype.repr(ipv6)
subs['ip'] = (self.iptype.typehash, ipv6, norminfo)
virts['ip'] = (ipv6, self.iptype.stortype)
if (match := srv6re.match(valu)) is None:
mesg = f'Invalid IPv6 w/port ({orig})'
raise s_exc.BadTypeValu(valu=orig, name=self.name, mesg=mesg)

portstr = ''
if port is not None:
port, norminfo = await self.porttype.norm(port)
subs['port'] = (self.porttype.typehash, port, norminfo)
virts['port'] = (port, self.porttype.stortype)
portstr = f':{port}'
ipv6, port = match.groups()
ctor['ip'] = ipv6 # todo: provide as tuple to force ipv6?
if port is not None:
ctor['port'] = port
return await self._normPyDict(ctor, view=view)

elif self.defport:
subs['port'] = (self.porttype.typehash, self.defport, {})
virts['port'] = (self.defport, self.porttype.stortype)
portstr = f':{self.defport}'
elif valu.count(':') >= 2:
ctor['ip'] = valu
return await self._normPyDict(ctor, view=view)

if port and proto in self.noports:
mesg = f'Protocol {proto} does not allow specifying ports.'
raise s_exc.BadTypeValu(mesg=mesg, valu=orig)
# Otherwise treat as IPv4
parts = valu.split(':', 1)
if len(parts) == 2:
valu, port = parts
ctor['port'] = port

return f'{proto}://[{host}]{portstr}', {'subs': subs, 'virts': virts}
ctor['ip'] = valu

mesg = f'Invalid IPv6 w/port ({orig})'
raise s_exc.BadTypeValu(valu=orig, name=self.name, mesg=mesg)
return await self._normPyDict(ctor, view=view)

elif valu.count(':') >= 2:
ipv6, norminfo = await self.iptype.norm(valu)
host = self.iptype.repr(ipv6)
subs['ip'] = (self.iptype.typehash, ipv6, norminfo)
virts['ip'] = (ipv6, self.iptype.stortype)
async def _normPyTuple(self, valu, view=None):
return await self._normPyDict({'ip': valu}, view=view)

if self.defport:
subs['port'] = (self.porttype.typehash, self.defport, {})
virts['port'] = (self.defport, self.porttype.stortype)
return f'{proto}://[{host}]:{self.defport}', {'subs': subs, 'virts': virts}
async def _normPyDict(self, valu, view=None):
subs = {}
virts = {}

return f'{proto}://{host}', {'subs': subs, 'virts': virts}
ipaddr, ipnorminfo = await self.iptype.norm(valu.get('ip'))
ip_repr = self.iptype.repr(ipaddr)
subs['ip'] = (self.iptype.typehash, ipaddr, ipnorminfo)
virts['ip'] = (ipaddr, self.iptype.stortype)

if 'proto' in valu:
proto, protonorminfo = await self.prototype.norm(valu['proto'])
proto_repr = self.prototype.repr(proto)
if proto not in self.protos:
protostr = ','.join(self.protos)
mesg = f'inet:sockaddr protocol must be one of: {protostr}'
raise s_exc.BadTypeValu(mesg=mesg, valu=valu['proto'], name=self.name)
else:
proto, protonorminfo = self.defproto, {}
proto_repr = self.defproto

# Otherwise treat as IPv4
valu, port, pstr = await self._normPort(valu)
if port:
subs['port'] = (self.porttype.typehash, port, {})
virts['port'] = (port, self.porttype.stortype)
subs['proto'] = (self.prototype.typehash, proto, {})

if port and proto in self.noports:
mesg = f'Protocol {proto} does not allow specifying ports.'
raise s_exc.BadTypeValu(mesg=mesg, valu=orig)
noport = proto in self.noports
port_repr = None

ipv4, norminfo = await self.iptype.norm(valu)
ipv4_repr = self.iptype.repr(ipv4)
subs['ip'] = (self.iptype.typehash, ipv4, norminfo)
virts['ip'] = (ipv4, self.iptype.stortype)
if 'port' in valu:
if noport:
mesg = f'Protocol {proto} does not allow specifying ports.'
raise s_exc.BadTypeValu(mesg=mesg)
port, portnorminfo = await self.porttype.norm(valu['port'])
port_repr = self.porttype.repr(port)

return f'{proto}://{ipv4_repr}{pstr}', {'subs': subs, 'virts': virts}
elif not noport and self.defport is not None:
port, portnorminfo = self.defport, {}
port_repr = self.defport

async def _normPyTuple(self, valu, view=None):
ipaddr, norminfo = await self.iptype.norm(valu)
if ipaddr[0] == 6 and port_repr is not None:
servstr = f'{proto}://[{ip_repr}]'
else:
servstr = f'{proto}://{ip_repr}'

ip_repr = self.iptype.repr(ipaddr)
subs = {'ip': (self.iptype.typehash, ipaddr, norminfo)}
virts = {'ip': (ipaddr, self.iptype.stortype)}
proto = self.defproto

if self.defport:
subs['port'] = (self.porttype.typehash, self.defport, {})
virts['port'] = (self.defport, self.porttype.stortype)
if ipaddr[0] == 6:
return f'{proto}://[{ip_repr}]:{self.defport}', {'subs': subs, 'virts': virts}
else:
return f'{proto}://{ip_repr}:{self.defport}', {'subs': subs, 'virts': virts}
if port_repr is not None:
subs['port'] = (self.porttype.typehash, port, portnorminfo)
virts['port'] = (port, self.porttype.stortype)
servstr = f'{servstr}:{port_repr}'

return f'{proto}://{ip_repr}', {'subs': subs, 'virts': virts}
return servstr, {'subs': subs, 'virts': virts}

class Email(s_types.Str):

Expand Down
8 changes: 7 additions & 1 deletion synapse/tests/test_model_inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ async def test_sockaddr(self):

subs = {'ip': ipsub, 'proto': tcpsub}
virts = {'ip': ((6, 1), 26)}
self.eq(await t.norm('tcp://[::1]'), ('tcp://[::1]', {'subs': subs, 'virts': virts}))
# todo: norm one way for ipv6 w/o port?
Copy link
Contributor Author

@mikemoritz mikemoritz Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: f41de92 (SYN-10137)

# self.eq(await t.norm('tcp://[::1]'), ('tcp://[::1]', {'subs': subs, 'virts': virts}))
self.eq(await t.norm('tcp://[::1]'), ('tcp://::1', {'subs': subs, 'virts': virts}))

ipnorm, ipinfo = await t.iptype.norm('::fFfF:0102:0304')
ipsub = (t.iptype.typehash, (6, 0xffff01020304), ipinfo)
Expand All @@ -201,6 +203,8 @@ async def test_sockaddr(self):
('tcp://[::ffff:1.2.3.4]:2', {'subs': subs, 'virts': virts}))
await self.asyncraises(s_exc.BadTypeValu, t.norm('tcp://[::1')) # bad ipv6 w/ port

# todo: norm dict test

async def test_asn_collection(self):

async with self.getTestCore() as core:
Expand Down Expand Up @@ -1516,6 +1520,8 @@ async def test_server(self):

self.eq(ctx.exception.get('mesg'), 'inet:sockaddr protocol must be one of: tcp,udp,icmp,gre')

# todo: norm dict test

async def test_url(self):
formname = 'inet:url'
async with self.getTestCore() as core:
Expand Down