pytest tls: extend coverage

Add possibility to reload QUIC test server with another certificate. Add
tests for more coverage of handshakes.

Closes #17382
This commit is contained in:
Stefan Eissing 2025-05-17 12:19:01 +02:00 committed by Daniel Stenberg
parent dd22442e3b
commit a85f1df480
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
7 changed files with 87 additions and 37 deletions

View file

@ -179,15 +179,13 @@ CURLcode Curl_vquic_tls_verify_peer(struct curl_tls_ctx *ctx,
#elif defined(USE_WOLFSSL)
(void)data;
if(conn_config->verifyhost) {
if(peer->sni) {
WOLFSSL_X509* cert = wolfSSL_get_peer_certificate(ctx->wssl.ssl);
if(wolfSSL_X509_check_host(cert, peer->sni, strlen(peer->sni), 0, NULL)
== WOLFSSL_FAILURE) {
result = CURLE_PEER_FAILED_VERIFICATION;
}
wolfSSL_X509_free(cert);
char *snihost = peer->sni ? peer->sni : peer->hostname;
WOLFSSL_X509* cert = wolfSSL_get_peer_certificate(ctx->wssl.ssl);
if(wolfSSL_X509_check_host(cert, snihost, strlen(snihost), 0, NULL)
== WOLFSSL_FAILURE) {
result = CURLE_PEER_FAILED_VERIFICATION;
}
wolfSSL_X509_free(cert);
}
#endif
/* on error, remove any session we might have in the pool */

View file

@ -135,11 +135,19 @@ def configures_httpd(env, httpd) -> Generator[bool, None, None]:
# include this fixture as test parameter if the test configures httpd itself
yield True
@pytest.fixture(scope='session')
def configures_nghttpx(env, httpd) -> Generator[bool, None, None]:
# include this fixture as test parameter if the test configures nghttpx itself
yield True
@pytest.fixture(autouse=True, scope='function')
def server_reset(request, env, httpd):
def server_reset(request, env, httpd, nghttpx):
# make sure httpd is in default configuration when a test starts
if 'configures_httpd' not in request.node._fixtureinfo.argnames:
httpd.clear_extra_configs()
httpd.set_proxy_auth(False)
httpd.reset_config()
httpd.reload_if_config_changed()
if env.have_h3() and \
'nghttpx' in request.node._fixtureinfo.argnames and \
'configures_nghttpx' not in request.node._fixtureinfo.argnames:
nghttpx.reset_config()
nghttpx.reload_if_config_changed()

View file

@ -41,7 +41,7 @@ class TestReuse:
# check if HTTP/1.1 handles 'Connection: close' correctly
@pytest.mark.parametrize("proto", ['http/1.1'])
def test_12_01_h1_conn_close(self, env: Env, httpd, configures_httpd, nghttpx, proto):
httpd.clear_extra_configs()
httpd.reset_config()
httpd.set_extra_config('base', [
'MaxKeepAliveRequests 1',
])
@ -60,7 +60,7 @@ class TestReuse:
reason="httpd 2.5+ handles KeepAlives different")
@pytest.mark.parametrize("proto", ['http/1.1'])
def test_12_02_h1_conn_timeout(self, env: Env, httpd, configures_httpd, nghttpx, proto):
httpd.clear_extra_configs()
httpd.reset_config()
httpd.set_extra_config('base', [
'KeepAliveTimeout 1',
])

View file

@ -131,7 +131,7 @@ class TestSSLUse:
# use ip address for connect
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_17_05_ip_addr(self, env: Env, proto, httpd, nghttpx):
def test_17_05_good_ip_addr(self, env: Env, proto, httpd, nghttpx):
if env.curl_uses_lib('bearssl'):
pytest.skip("BearSSL does not support cert verification with IP addresses")
if env.curl_uses_lib('mbedtls'):
@ -148,6 +148,23 @@ class TestSSLUse:
# the SNI should not have been used
assert 'SSL_TLS_SNI' not in r.json, f'{r.json}'
# use IP address that is not in cert
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_17_05_bad_ip_addr(self, env: Env, proto,
httpd, configures_httpd,
nghttpx, configures_nghttpx):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
httpd.set_domain1_cred_name('domain1-no-ip')
httpd.reload_if_config_changed()
if proto == 'h3':
nghttpx.set_cred_name('domain1-no-ip')
nghttpx.reload_if_config_changed()
curl = CurlClient(env=env)
url = f'https://127.0.0.1:{env.port_for(proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto)
assert r.exit_code == 60, f'{r}'
# use localhost for connect
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_17_06_localhost(self, env: Env, proto, httpd, nghttpx):

View file

@ -144,6 +144,7 @@ class EnvConfig:
self.expired_domain = f"expired.{self.tld}"
self.cert_specs = [
CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
CertificateSpec(name='domain1-no-ip', domains=[self.domain1, self.domain1brotli], key_type='rsa2048'),
CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),

View file

@ -71,7 +71,7 @@ class Httpd:
'proxys': socket.SOCK_STREAM,
}
def __init__(self, env: Env, proxy_auth: bool = False):
def __init__(self, env: Env):
self.env = env
self._apache_dir = os.path.join(env.gen_dir, 'apache')
self._run_dir = os.path.join(self._apache_dir, 'run')
@ -86,10 +86,13 @@ class Httpd:
self._digest_passwords = os.path.join(self._conf_dir, 'digest.passwords')
self._mods_dir = None
self._auth_digest = True
self._proxy_auth_basic = proxy_auth
self._proxy_auth_basic = False
# name used to lookup credentials for env.domain1
self._domain1_cred_name = env.domain1
self._extra_configs = {}
self._loaded_extra_configs = None
self._loaded_proxy_auth = None
self._loaded_domain1_cred_name = None
assert env.apxs
p = subprocess.run(args=[env.apxs, '-q', 'libexecdir'],
capture_output=True, text=True)
@ -121,12 +124,17 @@ class Httpd:
else:
self._extra_configs[domain] = lines
def clear_extra_configs(self):
def reset_config(self):
self._extra_configs = {}
self.set_proxy_auth(False)
self._domain1_cred_name = self.env.domain1
def set_proxy_auth(self, active: bool):
self._proxy_auth_basic = active
def set_domain1_cred_name(self, name):
self._domain1_cred_name = name
def _run(self, args, intext=''):
env = os.environ.copy()
env['APACHE_RUN_DIR'] = self._run_dir
@ -210,7 +218,8 @@ class Httpd:
def reload_if_config_changed(self):
if self._maybe_running and \
self._loaded_extra_configs == self._extra_configs and \
self._loaded_proxy_auth == self._proxy_auth_basic:
self._loaded_proxy_auth == self._proxy_auth_basic and \
self._loaded_domain1_cred_name == self._domain1_cred_name:
return True
return self.reload()
@ -250,8 +259,9 @@ class Httpd:
def _write_config(self):
domain1 = self.env.domain1
domain1brotli = self.env.domain1brotli
creds1 = self.env.get_credentials(domain1)
creds1 = self.env.get_credentials(self._domain1_cred_name)
assert creds1 # convince pytype this isn't None
self._loaded_domain1_cred_name = self._domain1_cred_name
domain2 = self.env.domain2
creds2 = self.env.get_credentials(domain2)
assert creds2 # convince pytype this isn't None

View file

@ -42,9 +42,10 @@ log = logging.getLogger(__name__)
class Nghttpx:
def __init__(self, env: Env, name: str):
def __init__(self, env: Env, name: str, domain: str, cred_name: str):
self.env = env
self._name = name
self._domain = domain
self._port = 0
self._https_port = 0
self._cmd = env.nghttpx
@ -55,11 +56,25 @@ class Nghttpx:
self._stderr = os.path.join(self._run_dir, 'nghttpx.stderr')
self._tmp_dir = os.path.join(self._run_dir, 'tmp')
self._process: Optional[subprocess.Popen] = None
self._cred_name = self._def_cred_name = cred_name
self._loaded_cred_name = ''
self._rmf(self._pid_file)
self._rmf(self._error_log)
self._mkpath(self._run_dir)
self._write_config()
def set_cred_name(self, name: str):
self._cred_name = name
def reset_config(self):
self._cred_name = self._def_cred_name
def reload_if_config_changed(self):
if self._process and self._port > 0 and \
self._loaded_cred_name == self._cred_name:
return True
return self.reload()
@property
def https_port(self):
return self._https_port
@ -101,7 +116,7 @@ class Nghttpx:
self.stop()
return self.start()
def reload(self, timeout: timedelta):
def reload(self, timeout: timedelta = timedelta(seconds=Env.SERVER_TIMEOUT)):
if self._process:
running = self._process
self._process = None
@ -132,13 +147,13 @@ class Nghttpx:
try_until = datetime.now() + timeout
while datetime.now() < try_until:
if self._https_port > 0:
check_url = f'https://{self.env.domain1}:{self._https_port}/'
check_url = f'https://{self._domain}:{self._port}/'
r = curl.http_get(url=check_url, extra_args=[
'--trace', 'curl.trace', '--trace-time',
'--connect-timeout', '1'
])
else:
check_url = f'https://{self.env.domain1}:{self._port}/'
check_url = f'https://{self._domain}:{self._port}/'
r = curl.http_get(url=check_url, extra_args=[
'--trace', 'curl.trace', '--trace-time',
'--http3-only', '--connect-timeout', '1'
@ -155,13 +170,13 @@ class Nghttpx:
try_until = datetime.now() + timeout
while datetime.now() < try_until:
if self._https_port > 0:
check_url = f'https://{self.env.domain1}:{self._https_port}/'
check_url = f'https://{self._domain}:{self._port}/'
r = curl.http_get(url=check_url, extra_args=[
'--trace', 'curl.trace', '--trace-time',
'--connect-timeout', '1'
])
else:
check_url = f'https://{self.env.domain1}:{self._port}/'
check_url = f'https://{self._domain}:{self._port}/'
r = curl.http_get(url=check_url, extra_args=[
'--http3-only', '--trace', 'curl.trace', '--trace-time',
'--connect-timeout', '1'
@ -195,7 +210,8 @@ class NghttpxQuic(Nghttpx):
}
def __init__(self, env: Env):
super().__init__(env=env, name='nghttpx-quic')
super().__init__(env=env, name='nghttpx-quic',
domain=env.domain1, cred_name=env.domain1)
self._https_port = env.https_port
def initial_start(self):
@ -216,14 +232,15 @@ class NghttpxQuic(Nghttpx):
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
creds = self.env.get_credentials(self.env.domain1)
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this isn't None
self._loaded_cred_name = self._cred_name
args = [
self._cmd,
f'--frontend=*,{self._port};tls',
f'--frontend=*,{self.env.h3_port};quic',
'--frontend-quic-early-data',
f'--frontend=*,{self._port};tls',
f'--backend=127.0.0.1,{self.env.https_port};{self.env.domain1};sni={self.env.domain1};proto=h2;tls',
f'--backend=127.0.0.1,{self.env.https_port};{self._domain};sni={self._domain};proto=h2;tls',
f'--backend=127.0.0.1,{self.env.http_port}',
'--log-level=INFO',
f'--pid-file={self._pid_file}',
@ -247,12 +264,10 @@ class NghttpxQuic(Nghttpx):
class NghttpxFwd(Nghttpx):
PORT_SPECS = {
'h2proxys': socket.SOCK_STREAM,
}
def __init__(self, env: Env):
super().__init__(env=env, name='nghttpx-fwd')
super().__init__(env=env, name='nghttpx-fwd',
domain=env.proxy_domain,
cred_name=env.proxy_domain)
def initial_start(self):
@ -265,16 +280,17 @@ class NghttpxFwd(Nghttpx):
self._port = 0
return False
return alloc_ports_and_do(NghttpxFwd.PORT_SPECS, startup,
self.env.gen_root, max_tries=3)
return alloc_ports_and_do({'h2proxys': socket.SOCK_STREAM},
startup, self.env.gen_root, max_tries=3)
def start(self, wait_live=True):
assert self._port > 0
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
creds = self.env.get_credentials(self.env.proxy_domain)
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this isn't None
self._loaded_cred_name = self._cred_name
args = [
self._cmd,
'--http2-proxy',