pytest: add pinnedpubkey test cases

Add positive/negative test cases in pytest for pinned public keys.

Closes #17412
This commit is contained in:
Stefan Eissing 2025-05-21 17:40:11 +02:00 committed by Daniel Stenberg
parent 2dfe421a64
commit e1f65937a9
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
5 changed files with 50 additions and 11 deletions

View file

@ -187,6 +187,8 @@ CURLcode Curl_vquic_tls_verify_peer(struct curl_tls_ctx *ctx,
}
wolfSSL_X509_free(cert);
}
if(!result)
result = Curl_wssl_verify_pinned(cf, data, &ctx->wssl);
#endif
/* on error, remove any session we might have in the pool */
if(result)

View file

@ -1526,10 +1526,10 @@ static char *wssl_strerror(unsigned long error, char *buf,
return buf;
}
static CURLcode wssl_verify_pinned(struct Curl_cfilter *cf,
struct Curl_easy *data)
CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct wssl_ctx *wssl)
{
struct ssl_connect_data *connssl = cf->ctx;
#ifndef CURL_DISABLE_PROXY
const char * const pinnedpubkey = Curl_ssl_cf_is_proxy(cf) ?
data->set.str[STRING_SSL_PINNEDPUBLICKEY_PROXY] :
@ -1540,7 +1540,6 @@ static CURLcode wssl_verify_pinned(struct Curl_cfilter *cf,
if(pinnedpubkey) {
#ifdef KEEP_PEER_CERT
struct wssl_ctx *wssl = (struct wssl_ctx *)connssl->backend;
WOLFSSL_X509 *x509;
const char *x509_der;
int x509_der_len;
@ -2138,7 +2137,7 @@ static CURLcode wssl_connect(struct Curl_cfilter *cf,
result = wssl->hs_result;
goto out;
}
result = wssl_verify_pinned(cf, data);
result = Curl_wssl_verify_pinned(cf, data, wssl);
if(result) {
wssl->hs_result = result;
goto out;

View file

@ -75,11 +75,6 @@ CURLcode Curl_wssl_setup_x509_store(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct wssl_ctx *wssl);
CURLcode Curl_wssl_setup_session(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct wssl_ctx *wss,
const char *ssl_peer_key);
CURLcode Curl_wssl_cache_session(struct Curl_cfilter *cf,
struct Curl_easy *data,
const char *ssl_peer_key,
@ -89,6 +84,10 @@ CURLcode Curl_wssl_cache_session(struct Curl_cfilter *cf,
unsigned char *quic_tp,
size_t quic_tp_len);
CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct wssl_ctx *wssl);
#endif /* USE_WOLFSSL */
#endif /* HEADER_CURL_WOLFSSL_H */

View file

@ -513,3 +513,31 @@ class TestSSLUse:
assert r.json['SSL_CIPHER'] in ciphers, r.dump_logs()
else:
assert r.exit_code != 0, r.dump_logs()
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_17_19_wrong_pin(self, env: Env, proto, httpd):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if env.curl_uses_any_libs(['bearssl', 'rustls-ffi']):
pytest.skip('TLS backend ignores --pinnedpubkey')
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
'--pinnedpubkey', 'sha256//ffff'
])
# expect NOT_IMPLEMENTED or CURLE_SSL_PINNEDPUBKEYNOTMATCH
assert r.exit_code in [2, 90], f'{r.dump_logs()}'
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_17_20_correct_pin(self, env: Env, proto, httpd):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
curl = CurlClient(env=env)
creds = env.get_credentials(env.domain1)
assert creds
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
'--pinnedpubkey', f'sha256//{creds.pub_sha256_b64()}'
])
# expect NOT_IMPLEMENTED or OK
assert r.exit_code in [0, 2], f'{r.dump_logs()}'

View file

@ -24,6 +24,7 @@
#
###########################################################################
#
import base64
import ipaddress
import os
import re
@ -33,6 +34,7 @@ from typing import List, Any, Optional
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._serialization import PublicFormat
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
@ -149,6 +151,15 @@ class Credentials:
def private_key(self) -> Any:
return self._pkey
def pub_sha256_b64(self) -> Any:
pubkey = self._pkey.public_key()
sha256 = hashes.Hash(algorithm=hashes.SHA256())
sha256.update(pubkey.public_bytes(
encoding=Encoding.DER,
format=PublicFormat.SubjectPublicKeyInfo
))
return base64.b64encode(sha256.finalize()).decode('utf8')
@property
def certificate(self) -> Any:
return self._cert
@ -393,7 +404,7 @@ class TestCA:
issuer_subject: Optional[Credentials],
valid_from_delta: Optional[timedelta] = None,
valid_until_delta: Optional[timedelta] = None
):
) -> x509.CertificateBuilder:
pubkey = pkey.public_key()
issuer_subject = issuer_subject if issuer_subject is not None else subject