curl/tests/http/testenv/h2o.py
Stefan Eissing e4139a73c8
h3-proxy: fixes around H3 proxy
code:
- less exception handling in existing code
- true ip happy eyeballing
- enable certificate verification
- cf-h2-proxy: abort connection when server closed connection

tests:
- remove all --insecure and --proxy-insecure args
- make session reuse test_60_12 a working one
- resolve port conflicts between h2o and nghttpx
- use proxy args better
- make test_60_06 run shorter
- kill h2o at the end of tests, normal stop takes too long

Ref: 59213f8248 #21789
Follow-up to e78b1b3ecc #21153

Closes #21798
2026-05-28 14:41:27 +02:00

438 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ***************************************************************************
# _ _ ____ _
# Project ___| | | | _ \| |
# / __| | | | |_) | |
# | (__| |_| | _ <| |___
# \___|\___/|_| \_\_____|
#
# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
# SPDX-License-Identifier: curl
#
###########################################################################
#
import logging
import os
import signal
import socket
import subprocess
import time
from datetime import datetime, timedelta
from typing import Dict, Optional
from .curl import CurlClient
from .env import Env
from .ports import alloc_ports_and_do
log = logging.getLogger(__name__)
class H2o:
def __init__(self, env: Env, name: str, domain: str, cred_name: str):
self.env = env
self._name = name
self._domain = domain
self._port = 0 # defaults to h3_port
self._cred_name = cred_name
self._loaded_cred_name = None
self._process = None
self._tmp_dir = os.path.join(self.env.gen_dir, self._name)
self._run_dir = os.path.join(self._tmp_dir, "run")
self._conf_file = os.path.join(self._run_dir, "h2o.conf")
self._error_log = os.path.join(self._run_dir, "h2o.log")
self._pid_file = os.path.join(self._run_dir, "h2o.pid")
self._stderr = os.path.join(self._run_dir, "h2o.stderr")
self._cmd = env.CONFIG.h2o
# For proxy subclasses
self._h1_port = None
self._h2_port = None
@property
def port(self) -> int:
return self._port
@property
def h1_port(self) -> Optional[int]:
return getattr(self, "_h1_port", None)
@property
def h2_port(self) -> Optional[int]:
return getattr(self, "_h2_port", None)
def clear_logs(self):
self._rmf(self._error_log)
self._rmf(self._stderr)
def dump_logs(self):
lines = []
lines.append(f"stderr of {self._name}")
lines.append("-------------------------------------------")
self._dump_file(self._stderr, lines)
lines.append("")
lines.append(f"errorlog of {self._name}")
lines.append("-------------------------------------------")
self._dump_file(self._error_log, lines)
lines.append("")
return lines
def _rmf(self, path):
if os.path.isfile(path):
os.remove(path)
return
def _dump_file(self, path, lines):
if os.path.isfile(path):
with open(path) as fd:
for line in fd:
lines.append(line.rstrip())
def _mkpath(self, path):
if not os.path.exists(path):
os.makedirs(path)
return
def _log(self, level, msg):
getattr(log, level)(f"[{self._name}] {msg}")
def is_running(self):
if self._process:
self._process.poll()
return self._process.returncode is None
return False
def initial_start(self):
self._rmf(self._pid_file)
self._rmf(self._error_log)
self._mkpath(self._run_dir)
self.write_config()
def start(self, wait_live=True):
self._mkpath(self._tmp_dir)
self._mkpath(self._run_dir)
if self._process:
self.stop()
self._loaded_cred_name = self._cred_name
self.write_config()
args = [self._cmd, "-c", self._conf_file]
ngerr = open(self._stderr, "a")
self._process = subprocess.Popen(args=args, stderr=ngerr)
if self._process.returncode is not None:
return False
if wait_live:
time.sleep(1)
# fail fast if h2o rejected the config and already exited
self._process.poll()
if self._process.returncode is not None:
self._log("error",
f"h2o exited early (rc={self._process.returncode})"
f" - check {self._stderr} for details")
self._process = None
return False
return not wait_live or self.wait_for_state(
live=True, timeout=timedelta(seconds=Env.SERVER_TIMEOUT)
)
def stop(self, wait_dead=True):
self._mkpath(self._tmp_dir)
if self._process:
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=2)
self._process = None
return not wait_dead or self.wait_for_state(
live=False, timeout=timedelta(seconds=5)
)
return True
def kill(self, wait_dead=True):
if self._process:
self._process.kill()
return True
return False
def restart(self):
self.stop()
return self.start()
def reload(self, timeout: timedelta = timedelta(seconds=Env.SERVER_TIMEOUT)):
if self._process:
running = self._process
self._process = None
os.kill(running.pid, signal.SIGQUIT)
end_wait = datetime.now() + timedelta(seconds=5)
exited = False
if not self.start(wait_live=False):
self._process = running
return False
while datetime.now() < end_wait:
try:
self._log("debug", f"waiting for h2o({running.pid}) to exit.")
running.wait(1)
self._log(
"debug",
f"h2o({running.pid}) terminated -> {running.returncode}",
)
exited = True
break
except subprocess.TimeoutExpired:
self._log("warning", f"h2o({running.pid}), not shut down yet.")
os.kill(running.pid, signal.SIGQUIT)
if not exited and datetime.now() >= end_wait:
self._log("error", f"h2o({running.pid}), terminate forcefully.")
os.kill(running.pid, signal.SIGKILL)
running.terminate()
running.wait(1)
return self.wait_for_state(live=True, timeout=timeout)
return False
def wait_for_state(
self,
live: bool,
timeout: timedelta,
url: Optional[str] = None,
log_prefix: str = "h2o",
):
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
if url is None:
url = f"https://{self._domain}:{self._port}/"
while datetime.now() < try_until:
if live:
r = curl.http_get(
url=url, extra_args=["--trace", "curl.trace", "--trace-time"]
)
if r.exit_code == 0:
return True
else:
r = curl.http_get(url=url)
if r.exit_code != 0:
return True
time.sleep(0.1)
if live:
self._log("error", f"Server still not responding after {timeout}")
else:
self._log("debug", f"Server still responding after {timeout}")
return False
def write_config(self):
# To be overridden by subclasses
with open(self._conf_file, "w") as fd:
fd.write("# h2o test config\n")
class H2oServer(H2o):
"""h2o HTTP/3 server for testing."""
PORT_SPECS = {
"h2o_https": socket.SOCK_STREAM,
}
def __init__(self, env: Env):
super().__init__(
env=env, name="h2o-server", domain=env.domain1, cred_name=env.domain1
)
self._docs_dir = os.path.join(self.env.gen_dir, "docs")
@property
def docs_dir(self):
return self._docs_dir
def initial_start(self):
super().initial_start()
def startup(ports: Dict[str, int]) -> bool:
self._port = ports["h2o_https"]
if self.start():
self.env.update_ports(ports)
return True
self.stop()
self._port = 0
return False
return alloc_ports_and_do(
H2oServer.PORT_SPECS, startup, self.env.gen_root, max_tries=3
)
def write_config(self):
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this is not None
self._mkpath(self._docs_dir)
self._mkpath(self._run_dir)
# Create a simple test file
with open(os.path.join(self._docs_dir, "data.json"), "w") as f:
f.write('{"message": "Hello from h2o HTTP/3 server"}\n')
with open(self._conf_file, "w") as fd:
fd.write(f"""# h2o HTTP/3 server configuration
server-name: "h2o-test-server"
num-threads: 1
listen: &ssl_listen
port: {self._port}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
listen:
<<: *ssl_listen
type: quic
hosts:
"{self._domain}":
paths:
"/":
file.dir: {self._docs_dir}
http2-reprioritize-blocking-assets: ON
access-log: {self._run_dir}/access.log
error-log: {self._error_log}
""")
class H2oProxy(H2o):
"""h2o MASQUE proxy for testing."""
def __init__(self, env: Env):
super().__init__(
env=env,
name="h2o-proxy",
domain=env.proxy_domain,
cred_name=env.proxy_domain,
)
def initial_start(self):
super().initial_start()
def startup(ports: Dict[str, int]) -> bool:
self._port = ports["h2o_h3proxys"]
self._h2_port = ports["h2o_h2proxys"]
self._h1_port = ports["h2o_proxys"]
if self.start():
self.env.update_ports(ports)
return True
self.stop()
self._port = 0
self._h2_port = 0
self._h1_port = 0
return False
return alloc_ports_and_do(
{
"h2o_h3proxys": socket.SOCK_DGRAM,
"h2o_h2proxys": socket.SOCK_STREAM,
"h2o_proxys": socket.SOCK_STREAM,
},
startup,
self.env.gen_root,
max_tries=3,
)
def write_config(self):
creds = self.env.get_credentials(self._cred_name)
assert creds # convince pytype this is not None
self._mkpath(self._run_dir)
with open(self._conf_file, "w") as fd:
fd.write(f"""# h2o MASQUE proxy configuration
server-name: "h2o-test-proxy"
num-threads: 1
proxy.tunnel: ON
# HTTP/1.1 proxy listener
listen: &h1_listen
port: {getattr(self, "_h1_port", self._port)}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# HTTP/2 proxy listener
listen: &h2_listen
port: {getattr(self, "_h2_port", self._port)}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# HTTP/3 proxy listener (main port)
listen: &h3_listen
port: {self._port}
ssl:
certificate-file: {creds.cert_file}
key-file: {creds.pkey_file}
neverbleed: OFF
minimum-version: TLSv1.2
ocsp-update-interval: 0
# QUIC listener for HTTP/3
listen:
<<: *h3_listen
type: quic
hosts:
"{self._domain}":
paths:
"/":
proxy.connect: [+*]
proxy.ssl.verify-peer: OFF
"/.well-known/masque/udp":
proxy.connect-udp: [+*]
proxy.ssl.verify-peer: OFF
http2-reprioritize-blocking-assets: ON
access-log: {self._run_dir}/access.log
error-log: {self._error_log}
""")
def wait_for_state(
self,
live: bool,
timeout: timedelta,
url: Optional[str] = None,
log_prefix: str = "h2o",
):
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
if url is None:
url = f"https://{self.env.proxy_domain}:{self._port}/"
while datetime.now() < try_until:
if live:
r = curl.http_get(
url=url, extra_args=["--trace", "curl.trace", "--trace-time"]
)
if r.exit_code == 0:
return True
else:
r = curl.http_get(url=url)
if r.exit_code != 0:
return True
time.sleep(0.1)
if live:
self._log("error", f"Proxy still not responding after {timeout}")
else:
self._log("debug", f"Proxy still responding after {timeout}")
return False