curl/tests/http/testenv/h2o.py
Aritra Basu e78b1b3ecc
HTTP/3: add proxy CONNECT and MASQUE CONNECT-UDP support (ngtcp2 QUIC)
This patch adds two major proxy capabilities to curl (ngtcp2 QUIC):
- HTTP/3 Proxy CONNECT: Tunnel HTTP/1.1 or HTTP/2 traffic through an
  HTTPS proxy that speaks HTTP/3 (QUIC) using the standard CONNECT
  method over an HTTP/3 connection.
- MASQUE CONNECT-UDP: Tunnel HTTP/3 (QUIC) traffic through an HTTP
  proxy (speaking HTTP/1.1, HTTP/2, or HTTP/3) using the extended
  CONNECT method with the CONNECT-UDP protocol (RFC9297 & RFC9298).

Public API additions:
- `CURLPROXY_HTTPS3`: new proxy type constant for HTTP/3 proxy
- `--proxy-http3`: new CLI flag to negotiate HTTP/3 with HTTPS proxy

The implementation adds two new filters:
- `H3-PROXY` - enables negotiating HTTP/3 (QUIC) to the proxy and
  running CONNECT/CONNECT-UDP through that proxy transport.
- `CAPSULE` - dedicated filter inserted between QUIC transport and
  HTTP-PROXY to handle datagram capsule encapsulation/decapsulation.

Here is how the curl filter chaining looks in different scenarios:
- HTTP/3 Proxy CONNECT (tunneling TCP protocols over QUIC proxy):
  conn -> HTTP/1.1 or HTTP/2  -> SSL -> HTTP-PROXY ->
                                 H3-PROXY -> HAPPY-EYEBALLS -> UDP
- MASQUE CONNECT-UDP (tunneling QUIC over any proxy):
  conn -> HTTP/3 -> CAPSULE -> HTTP-PROXY -> H3-PROXY ->
                               HAPPY-EYEBALLS -> UDP
  conn -> HTTP/3 -> CAPSULE -> HTTP-PROXY -> H1-PROXY or H2-PROXY ->
                               SSL -> HAPPY-EYEBALLS -> TCP

- Both features currently require the ngtcp2 QUIC backend.
- Both features are experimental (disabled by default). Enable with
  `--enable-proxy-http3`(autotools) or `-DUSE_PROXY_HTTP3=ON`(CMake).

Tests:
- tests/unit/unit3400.c: Unit tests for capsule protocol encode/decode
- tests/http/test_60_h3_proxy.py: Comprehensive pytest integration suite
- tests/http/testenv/h2o.py: Managing h2o instances with HTTP/1.1, HTTP/2,
  and HTTP/3 (QUIC) listeners, proxy.connect and proxy.connect-udp enabled.

References:
  RFC 9297 - HTTP Datagrams and the Capsule Protocol
  RFC 9298 - Proxying UDP in HTTP
  RFC 9000 §16 — Variable-Length Integer Encoding

Signed-off-by: Aritra Basu <aritrbas+gh@cisco.com>

Closes #21153
2026-05-27 08:49:53 +02:00

428 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ***************************************************************************
# _ _ ____ _
# Project ___| | | | _ \| |
# / __| | | | |_) | |
# | (__| |_| | _ <| |___
# \___|\___/|_| \_\_____|
#
# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
# SPDX-License-Identifier: curl
#
###########################################################################
#
import logging
import os
import signal
import socket
import subprocess
import time
from datetime import datetime, timedelta
from typing import Dict, Optional
from .curl import CurlClient
from .env import Env
from .ports import alloc_ports_and_do
log = logging.getLogger(__name__)
class H2o:
def __init__(self, env: Env, name: str, domain: str, cred_name: str):
self.env = env
self._name = name
self._domain = domain
self._port = 0 # defaults to h3_port
self._cred_name = cred_name
self._loaded_cred_name = None
self._process = None
self._tmp_dir = os.path.join(self.env.gen_dir, self._name)
self._run_dir = os.path.join(self._tmp_dir, "run")
self._conf_file = os.path.join(self._run_dir, "h2o.conf")
self._error_log = os.path.join(self._run_dir, "h2o.log")
self._pid_file = os.path.join(self._run_dir, "h2o.pid")
self._stderr = os.path.join(self._run_dir, "h2o.stderr")
self._cmd = env.CONFIG.h2o
# For proxy subclasses
self._h1_port = None
self._h2_port = None
@property
def port(self) -> int:
return self._port
@property
def h1_port(self) -> Optional[int]:
return getattr(self, "_h1_port", None)
@property
def h2_port(self) -> Optional[int]:
return getattr(self, "_h2_port", None)
def clear_logs(self):
self._rmf(self._error_log)
self._rmf(self._stderr)
def dump_logs(self):
lines = []
lines.append(f"stderr of {self._name}")
lines.append("-------------------------------------------")
self._dump_file(self._stderr, lines)
lines.append("")
lines.append(f"errorlog of {self._name}")
lines.append("-------------------------------------------")
self._dump_file(self._error_log, lines)
lines.append("")
return lines
def _rmf(self, path):
if os.path.isfile(path):
os.remove(path)
return
def _dump_file(self, path, lines):
if os.path.isfile(path):
with open(path) as fd:
for line in fd:
lines.append(line.rstrip())
def _mkpath(self, path):
if not os.path.exists(path):
os.makedirs(path)
return
def _log(self, level, msg):
getattr(log, level)(f"[{self._name}] {msg}")
def is_running(self):
if self._process:
self._process.poll()
return self._process.returncode is None
return False
def initial_start(self):
self._rmf(self._pid_file)
self._rmf(self._error_log)
self._mkpath(self._run_dir)
self.write_config()
def start(self, wait_live=True):
self._mkpath(self._tmp_dir)
self._mkpath(self._run_dir)
if self._process:
self.stop()
self._loaded_cred_name = self._cred_name
self.write_config()
args = [self._cmd, "-c", self._conf_file]
ngerr = open(self._stderr, "a")
self._process = subprocess.Popen(args=args, stderr=ngerr)
if self._process.returncode is not None:
return False
if wait_live:
time.sleep(1)
# fail fast if h2o rejected the config and already exited
self._process.poll()
if self._process.returncode is not None:
self._log("error",
f"h2o exited early (rc={self._process.returncode})"
f" - check {self._stderr} for details")
self._process = None
return False
return not wait_live or self.wait_for_state(
live=True, timeout=timedelta(seconds=Env.SERVER_TIMEOUT)
)
def stop(self, wait_dead=True):
self._mkpath(self._tmp_dir)
if self._process:
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=2)
self._process = None
return not wait_dead or self.wait_for_state(
live=False, timeout=timedelta(seconds=5)
)
return True
def restart(self):
self.stop()
return self.start()
def reload(self, timeout: timedelta = timedelta(seconds=Env.SERVER_TIMEOUT)):
if self._process:
running = self._process
self._process = None
os.kill(running.pid, signal.SIGQUIT)
end_wait = datetime.now() + timedelta(seconds=5)
exited = False
if not self.start(wait_live=False):
self._process = running
return False
while datetime.now() < end_wait:
try:
self._log("debug", f"waiting for h2o({running.pid}) to exit.")
running.wait(1)
self._log(
"debug",
f"h2o({running.pid}) terminated -> {running.returncode}",
)
exited = True
break
except subprocess.TimeoutExpired:
self._log("warning", f"h2o({running.pid}), not shut down yet.")
os.kill(running.pid, signal.SIGQUIT)
if not exited and datetime.now() >= end_wait:
self._log("error", f"h2o({running.pid}), terminate forcefully.")
os.kill(running.pid, signal.SIGKILL)
running.terminate()
running.wait(1)
return self.wait_for_state(live=True, timeout=timeout)
return False
def wait_for_state(
self,
live: bool,
timeout: timedelta,
url: Optional[str] = None,
log_prefix: str = "h2o",
):
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
if url is None:
url = f"https://{self._domain}:{self._port}/"
while datetime.now() < try_until:
if live:
r = curl.http_get(
url=url, extra_args=["--trace", "curl.trace", "--trace-time"]
)
if r.exit_code == 0:
return True
else:
r = curl.http_get(url=url)
if r.exit_code != 0:
return True
time.sleep(0.1)
if live:
self._log("error", f"Server still not responding after {timeout}")
else:
self._log("debug", f"Server still responding after {timeout}")
return False
def write_config(self):
# To be overridden by subclasses
with open(self._conf_file, "w") as fd:
fd.write("# h2o test config\n")
class H2oServer(H2o):
"""h2o HTTP/3 server for testing."""
PORT_SPECS = {
"h2o_https": socket.SOCK_STREAM,
}
def __init__(self, env: Env):
super().__init__(
env=env, name="h2o-server", domain=env.domain1, cred_name=env.domain1
)
def initial_start(self):
super().initial_start()
def startup(ports: Dict[str, int]) -> bool:
self._port = ports["h2o_https"]
if self.start():
self.env.update_ports(ports)
return True
self.stop()
self._port = 0
return False
return alloc_ports_and_do(
H2oServer.PORT_SPECS, startup, self.env.gen_root, max_tries=3
)
def write_config(self):
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this is not None
doc_root = os.path.join(self.env.gen_dir, "docs")
self._mkpath(doc_root)
self._mkpath(self._run_dir)
# Create a simple test file
with open(os.path.join(doc_root, "data.json"), "w") as f:
f.write('{"message": "Hello from h2o HTTP/3 server"}\n')
with open(self._conf_file, "w") as fd:
fd.write(f"""# h2o HTTP/3 server configuration
server-name: "h2o-test-server"
num-threads: 1
listen: &ssl_listen
port: {self._port}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
listen:
<<: *ssl_listen
type: quic
hosts:
"{self._domain}":
paths:
"/":
file.dir: {doc_root}
http2-reprioritize-blocking-assets: ON
access-log: {self._run_dir}/access.log
error-log: {self._error_log}
""")
class H2oProxy(H2o):
"""h2o MASQUE proxy for testing."""
def __init__(self, env: Env):
super().__init__(
env=env,
name="h2o-proxy",
domain=env.proxy_domain,
cred_name=env.proxy_domain,
)
def initial_start(self):
super().initial_start()
def startup(ports: Dict[str, int]) -> bool:
self._port = ports["h3proxys"]
self._h2_port = ports["h2proxys"]
self._h1_port = ports["proxys"]
if self.start():
self.env.update_ports(ports)
return True
self.stop()
self._port = 0
self._h2_port = 0
self._h1_port = 0
return False
return alloc_ports_and_do(
{
"h3proxys": socket.SOCK_DGRAM,
"h2proxys": socket.SOCK_STREAM,
"proxys": socket.SOCK_STREAM,
},
startup,
self.env.gen_root,
max_tries=3,
)
def write_config(self):
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this is not None
self._mkpath(self._run_dir)
with open(self._conf_file, "w") as fd:
fd.write(f"""# h2o MASQUE proxy configuration
server-name: "h2o-test-proxy"
num-threads: 1
proxy.tunnel: ON
# HTTP/1.1 proxy listener
listen: &h1_listen
port: {getattr(self, "_h1_port", self._port)}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# HTTP/2 proxy listener
listen: &h2_listen
port: {getattr(self, "_h2_port", self._port)}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# HTTP/3 proxy listener (main port)
listen: &h3_listen
port: {self._port}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# QUIC listener for HTTP/3
listen:
<<: *h3_listen
type: quic
hosts:
"{self._domain}":
paths:
"/":
proxy.connect: [+*]
proxy.ssl.verify-peer: OFF
"/.well-known/masque/udp":
proxy.connect-udp: [+*]
proxy.ssl.verify-peer: OFF
http2-reprioritize-blocking-assets: ON
access-log: {self._run_dir}/access.log
error-log: {self._error_log}
""")
def wait_for_state(
self,
live: bool,
timeout: timedelta,
url: Optional[str] = None,
log_prefix: str = "h2o",
):
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
if url is None:
url = f"https://{self.env.proxy_domain}:{self._port}/"
while datetime.now() < try_until:
if live:
r = curl.http_get(
url=url, extra_args=["--trace", "curl.trace", "--trace-time"]
)
if r.exit_code == 0:
return True
else:
r = curl.http_get(url=url)
if r.exit_code != 0:
return True
time.sleep(0.1)
if live:
self._log("error", f"Proxy still not responding after {timeout}")
else:
self._log("debug", f"Proxy still responding after {timeout}")
return False