Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/test/_test_eintr.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ def os_open(self, path):

@unittest.skipIf(sys.platform == "darwin",
"hangs under macOS; see bpo-25234, bpo-35363")
@unittest.skipIf(sys.platform.startswith('netbsd'),
"hangs on NetBSD; see gh-137397")
def test_os_open(self):
self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
self.os_open)
Expand Down
34 changes: 34 additions & 0 deletions Lib/test/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,40 @@ def testSetSockOpt(self):
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assertFalse(reuse == 0, "failed to set reuse mode")

def test_setsockopt_errors(self):
# See issue #107546.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # No error expected.

with self.assertRaises(OverflowError):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 2 ** 100)

with self.assertRaises(OverflowError):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, - 2 ** 100)

with self.assertRaises(OverflowError):
sock.setsockopt(socket.SOL_SOCKET, 2 ** 100, 1)

with self.assertRaises(OverflowError):
sock.setsockopt(2 ** 100, socket.SO_REUSEADDR, 1)

with self.assertRaisesRegex(TypeError, "socket option should be int, bytes-like object or None"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, dict())

with self.assertRaisesRegex(TypeError, "requires 4 arguments when the third argument is None"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, None)

with self.assertRaisesRegex(TypeError, "only takes 4 arguments when the third argument is None"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1, 2)

with self.assertRaisesRegex(TypeError, "takes at least 3 arguments"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)

with self.assertRaisesRegex(TypeError, "takes at most 4 arguments"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1, 2, 3)

def testSendAfterClose(self):
# testing send() after close() with timeout
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
Expand Down
11 changes: 9 additions & 2 deletions Lib/test/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
HOST = socket_helper.HOST
IS_OPENSSL_3_0_0 = ssl.OPENSSL_VERSION_INFO >= (3, 0, 0)
CAN_GET_SELECTED_OPENSSL_GROUP = ssl.OPENSSL_VERSION_INFO >= (3, 2)
CAN_IGNORE_UNKNOWN_OPENSSL_GROUPS = ssl.OPENSSL_VERSION_INFO >= (3, 3)
CAN_GET_AVAILABLE_OPENSSL_GROUPS = ssl.OPENSSL_VERSION_INFO >= (3, 5)
PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')

Expand Down Expand Up @@ -964,8 +965,14 @@ def test_get_ciphers(self):

def test_set_groups(self):
ctx = ssl.create_default_context()
self.assertIsNone(ctx.set_groups('P-256:X25519'))
self.assertRaises(ssl.SSLError, ctx.set_groups, 'P-256:xxx')
# We use P-256 and P-384 (FIPS 186-4) that are alloed by OpenSSL
# even if FIPS module is enabled. Ignoring unknown groups is only
# supported since OpenSSL 3.3.
self.assertIsNone(ctx.set_groups('P-256:P-384'))

self.assertRaises(ssl.SSLError, ctx.set_groups, 'P-256:foo')
if CAN_IGNORE_UNKNOWN_OPENSSL_GROUPS:
self.assertIsNone(ctx.set_groups('P-256:?foo'))

@unittest.skipUnless(CAN_GET_AVAILABLE_OPENSSL_GROUPS,
"OpenSSL version doesn't support getting groups")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Improve the error messages that may be raised by
:meth:`~socket.socket.setsockopt`.
93 changes: 62 additions & 31 deletions Modules/socketmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3332,32 +3332,59 @@ sock_setsockopt(PyObject *self, PyObject *args)
{
PySocketSockObject *s = _PySocketSockObject_CAST(self);

Py_ssize_t arglen;
int level;
int optname;
int res;
Py_buffer optval;
Py_buffer buffer;
int flag;
unsigned int optlen;
PyObject *none;
PyObject *optval;

if (!PyArg_ParseTuple(args, "iiO|I:setsockopt",
&level, &optname, &optval, &optlen))
{
return NULL;
}

arglen = PyTuple_Size(args);
if (arglen == 3 && optval == Py_None) {
PyErr_Format(PyExc_TypeError,
"setsockopt() requires 4 arguments when the third argument is None",
arglen);
return NULL;
}
if (arglen == 4 && optval != Py_None) {
PyErr_Format(PyExc_TypeError,
"setsockopt() only takes 4 arguments when the third argument is None (got %T)",
optval);
return NULL;
}

#ifdef AF_VSOCK
if (s->sock_family == AF_VSOCK) {
if (!PyIndex_Check(optval)) {
PyErr_Format(PyExc_TypeError,
"setsockopt() argument 3 for AF_VSOCK must be an int (got %T)",
optval);
}
uint64_t vflag; // Must be set width of 64 bits
/* setsockopt(level, opt, flag) */
if (PyArg_ParseTuple(args, "iiK:setsockopt",
&level, &optname, &vflag)) {
// level should always be set to AF_VSOCK
res = setsockopt(get_sock_fd(s), level, optname,
(void*)&vflag, sizeof vflag);
goto done;
if (!PyArg_Parse(optval, "K", &vflag)) {
return NULL;
}
return NULL;
// level should always be set to AF_VSOCK
res = setsockopt(get_sock_fd(s), level, optname,
(void*)&vflag, sizeof vflag);
goto done;
}
#endif

/* setsockopt(level, opt, flag) */
if (PyArg_ParseTuple(args, "iii:setsockopt",
&level, &optname, &flag)) {
if (PyIndex_Check(optval)) {
if (!PyArg_Parse(optval, "i", &flag)) {
return NULL;
}
#ifdef MS_WINDOWS
if (optname == SIO_TCP_SET_ACK_FREQUENCY) {
DWORD dummy;
Expand All @@ -3374,36 +3401,40 @@ sock_setsockopt(PyObject *self, PyObject *args)
goto done;
}

PyErr_Clear();
/* setsockopt(level, opt, None, flag) */
if (PyArg_ParseTuple(args, "iiO!I:setsockopt",
&level, &optname, Py_TYPE(Py_None), &none, &optlen)) {
/* setsockopt(level, opt, None, optlen) */
if (optval == Py_None) {
assert(sizeof(socklen_t) >= sizeof(unsigned int));
res = setsockopt(get_sock_fd(s), level, optname,
NULL, (socklen_t)optlen);
goto done;
}

PyErr_Clear();
/* setsockopt(level, opt, buffer) */
if (!PyArg_ParseTuple(args, "iiy*:setsockopt",
&level, &optname, &optval))
return NULL;

if (PyObject_CheckBuffer(optval)) {
if (!PyArg_Parse(optval, "y*", &buffer)) {
return NULL;
}
#ifdef MS_WINDOWS
if (optval.len > INT_MAX) {
PyBuffer_Release(&optval);
PyErr_Format(PyExc_OverflowError,
"socket option is larger than %i bytes",
INT_MAX);
return NULL;
}
res = setsockopt(get_sock_fd(s), level, optname,
optval.buf, (int)optval.len);
if (buffer.len > INT_MAX) {
PyBuffer_Release(&buffer);
PyErr_Format(PyExc_OverflowError,
"socket option is larger than %i bytes",
INT_MAX);
return NULL;
}
res = setsockopt(get_sock_fd(s), level, optname,
buffer.buf, (int)buffer.len);
#else
res = setsockopt(get_sock_fd(s), level, optname, optval.buf, optval.len);
res = setsockopt(get_sock_fd(s), level, optname, buffer.buf, buffer.len);
#endif
PyBuffer_Release(&optval);
PyBuffer_Release(&buffer);
goto done;
}

PyErr_Format(PyExc_TypeError,
"socket option should be int, bytes-like object or None (got %T)",
optval);
return NULL;

done:
if (res < 0) {
Expand Down
Loading