cf-socket: make socket data_pending a nop

Eliminating the socket readability check in the socket connection
filters for the 'data_pending' callback. Improves performance of
handling of transfers, up to ~30%, depending on parallelism and response
size.

Whatever `data_pending()` once was, its semantics are now:
"Is there anything buffered in the connection filters that needs
 receive?"
Any checks of the socket's readability are done via `multi_wait()`
and friends.

Fix the one place in HTTP/1 proxy code that checked `data_pending()` and
did an early return if false. Remove that check and actually try to
receive data every time.

Closes #17785
This commit is contained in:
Stefan Eissing 2025-06-30 08:53:31 +02:00 committed by Daniel Stenberg
parent a487a4e4bd
commit 21ecc7e376
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
5 changed files with 116 additions and 85 deletions

View file

@ -374,9 +374,6 @@ static CURLcode recv_CONNECT_resp(struct Curl_cfilter *cf,
error = SELECT_OK;
*done = FALSE;
if(!Curl_conn_data_pending(data, cf->sockindex))
return CURLE_OK;
while(ts->keepon) {
size_t nread;
char byte;

View file

@ -1404,17 +1404,6 @@ static void cf_socket_adjust_pollset(struct Curl_cfilter *cf,
}
}
static bool cf_socket_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct cf_socket_ctx *ctx = cf->ctx;
int readable;
(void)data;
readable = SOCKET_READABLE(ctx->sock, 0);
return readable > 0 && (readable & CURL_CSELECT_IN);
}
#ifdef USE_WINSOCK
#ifndef SIO_IDEAL_SEND_BACKLOG_QUERY
@ -1750,7 +1739,7 @@ struct Curl_cftype Curl_cft_tcp = {
cf_socket_close,
cf_socket_shutdown,
cf_socket_adjust_pollset,
cf_socket_data_pending,
Curl_cf_def_data_pending,
cf_socket_send,
cf_socket_recv,
cf_socket_cntrl,
@ -1904,7 +1893,7 @@ struct Curl_cftype Curl_cft_udp = {
cf_socket_close,
cf_socket_shutdown,
cf_socket_adjust_pollset,
cf_socket_data_pending,
Curl_cf_def_data_pending,
cf_socket_send,
cf_socket_recv,
cf_socket_cntrl,
@ -1958,7 +1947,7 @@ struct Curl_cftype Curl_cft_unix = {
cf_socket_close,
cf_socket_shutdown,
cf_socket_adjust_pollset,
cf_socket_data_pending,
Curl_cf_def_data_pending,
cf_socket_send,
cf_socket_recv,
cf_socket_cntrl,
@ -2178,7 +2167,7 @@ struct Curl_cftype Curl_cft_tcp_accept = {
cf_socket_close,
cf_socket_shutdown,
cf_socket_adjust_pollset,
cf_socket_data_pending,
Curl_cf_def_data_pending,
cf_socket_send,
cf_socket_recv,
cf_socket_cntrl,

View file

@ -1941,18 +1941,6 @@ out:
return CURLE_OK;
}
/*
* Called from transfer.c:data_pending to know if we should keep looping
* to receive more data from the connection.
*/
static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
(void)cf;
(void)data;
return FALSE;
}
static CURLcode h3_data_pause(struct Curl_cfilter *cf,
struct Curl_easy *data,
bool pause)
@ -2728,7 +2716,7 @@ struct Curl_cftype Curl_cft_http3 = {
cf_ngtcp2_close,
cf_ngtcp2_shutdown,
cf_ngtcp2_adjust_pollset,
cf_ngtcp2_data_pending,
Curl_cf_def_data_pending,
cf_ngtcp2_send,
cf_ngtcp2_recv,
cf_ngtcp2_data_event,

View file

@ -185,7 +185,8 @@ class ScoreRunner:
verbose: int,
curl_verbose: int,
download_parallel: int = 0,
server_addr: Optional[str] = None):
server_addr: Optional[str] = None,
with_dtrace: bool = False):
self.verbose = verbose
self.env = env
self.protocol = protocol
@ -194,12 +195,18 @@ class ScoreRunner:
self.server_port = server_port
self._silent_curl = not curl_verbose
self._download_parallel = download_parallel
self._with_dtrace = with_dtrace
def info(self, msg):
if self.verbose > 0:
sys.stderr.write(msg)
sys.stderr.flush()
def mk_curl_client(self):
return CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr,
with_dtrace=self._with_dtrace)
def handshakes(self) -> Dict[str, Any]:
props = {}
sample_size = 5
@ -215,8 +222,7 @@ class ScoreRunner:
hs_samples = []
errors = []
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
args = [
'--http3-only' if self.protocol == 'h3' else '--http2',
f'--{ipv}', f'https://{authority}/'
@ -274,8 +280,7 @@ class ScoreRunner:
profiles = []
self.info('single...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_download(urls=[url], alpn_proto=self.protocol,
no_save=True, with_headers=False,
with_profile=True)
@ -295,8 +300,7 @@ class ScoreRunner:
url = f'{url}?[0-{count - 1}]'
self.info('serial...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_download(urls=[url], alpn_proto=self.protocol,
no_save=True,
with_headers=False, with_profile=True)
@ -317,8 +321,7 @@ class ScoreRunner:
url = f'{url}?[0-{count - 1}]'
self.info('parallel...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_download(urls=[url], alpn_proto=self.protocol,
no_save=True,
with_headers=False,
@ -339,9 +342,12 @@ class ScoreRunner:
def downloads(self, count: int, fsizes: List[int], meta: Dict[str, Any]) -> Dict[str, Any]:
nsamples = meta['samples']
max_parallel = self._download_parallel if self._download_parallel > 0 else count
cols = ['size', 'single']
cols = ['size']
if not self._download_parallel:
cols.append('single')
if count > 1:
cols.append(f'serial({count})')
if count > 1:
cols.append(f'serial({count})')
cols.append(f'parallel({count}x{max_parallel})')
rows = []
for fsize in fsizes:
@ -351,10 +357,11 @@ class ScoreRunner:
}]
self.info(f'{row[0]["sval"]} downloads...')
url = f'https://{self.env.domain1}:{self.server_port}/score{row[0]["sval"]}.data'
row.append(self.dl_single(url=url, nsamples=nsamples))
if 'single' in cols:
row.append(self.dl_single(url=url, nsamples=nsamples))
if count > 1:
row.append(self.dl_serial(url=url, count=count, nsamples=nsamples))
if 'single' in cols:
row.append(self.dl_serial(url=url, count=count, nsamples=nsamples))
row.append(self.dl_parallel(url=url, count=count, nsamples=nsamples))
rows.append(row)
self.info('done.\n')
@ -387,8 +394,7 @@ class ScoreRunner:
profiles = []
self.info('single...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
with_headers=False, with_profile=True)
err = self._check_uploads(r, 1)
@ -407,8 +413,7 @@ class ScoreRunner:
url = f'{url}?id=[0-{count - 1}]'
self.info('serial...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
with_headers=False, with_profile=True)
err = self._check_uploads(r, count)
@ -428,8 +433,7 @@ class ScoreRunner:
url = f'{url}?id=[0-{count - 1}]'
self.info('parallel...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
with_headers=False, with_profile=True,
extra_args=[
@ -494,8 +498,7 @@ class ScoreRunner:
])
self.info(f'{max_parallel}...')
for _ in range(nsamples):
curl = CurlClient(env=self.env, silent=self._silent_curl,
server_addr=self.server_addr)
curl = self.mk_curl_client()
r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True,
with_headers=False, with_profile=True,
with_stats=False, extra_args=extra_args)
@ -517,7 +520,8 @@ class ScoreRunner:
fsize = 10*1024
cols = ['size', 'total']
rows = []
cols.extend([f'{mp} max' for mp in [1, 6, 25, 50, 100, 300]])
mparallel = meta['request_parallels']
cols.extend([f'{mp} max' for mp in mparallel])
row = [{
'val': fsize,
'sval': Card.fmt_size(fsize)
@ -528,7 +532,7 @@ class ScoreRunner:
self.info('requests, max parallel...')
row.extend([self.do_requests(url=url, count=count,
max_parallel=mp, nsamples=meta["samples"])
for mp in [1, 6, 25, 50, 100, 300]])
for mp in mparallel])
rows.append(row)
self.info('done.\n')
return {
@ -547,6 +551,7 @@ class ScoreRunner:
uploads: Optional[List[int]] = None,
upload_count: int = 50,
req_count=5000,
request_parallels=None,
nsamples: int = 1,
requests: bool = True):
self.info(f"scoring {self.protocol} against {self.server_descr}\n")
@ -599,6 +604,9 @@ class ScoreRunner:
fsizes=uploads,
meta=score['meta'])
if requests:
if request_parallels is None:
request_parallels = [1, 6, 25, 50, 100, 300]
score['meta']['request_parallels'] = request_parallels
score['requests'] = self.requests(count=req_count, meta=score['meta'])
return score
@ -624,6 +632,13 @@ def run_score(args, protocol):
uploads.extend([Card.parse_size(s) for s in x.split(',')])
requests = True
request_parallels = None
if args.request_parallels:
request_parallels = []
for x in args.request_parallels:
request_parallels.extend([int(s) for s in x.split(',')])
if args.downloads or args.uploads or args.requests or args.handshakes:
handshakes = args.handshakes
if not args.downloads:
@ -663,7 +678,8 @@ def run_score(args, protocol):
server_port=remote_port,
verbose=args.verbose,
curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
download_parallel=args.download_parallel,
with_dtrace=args.dtrace)
cards.append(card)
if test_httpd:
@ -687,7 +703,8 @@ def run_score(args, protocol):
server_descr=server_descr,
server_port=server_port,
verbose=args.verbose, curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
download_parallel=args.download_parallel,
with_dtrace=args.dtrace)
card.setup_resources(server_docs, downloads)
cards.append(card)
@ -711,7 +728,8 @@ def run_score(args, protocol):
server_descr=server_descr,
server_port=server_port,
verbose=args.verbose, curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
download_parallel=args.download_parallel,
with_dtrace=args.dtrace)
card.setup_resources(server_docs, downloads)
cards.append(card)
@ -731,6 +749,7 @@ def run_score(args, protocol):
upload_count=args.upload_count,
req_count=args.request_count,
requests=requests,
request_parallels=request_parallels,
nsamples=args.samples)
if args.json:
print(json.JSONEncoder(indent=2).encode(score))
@ -772,34 +791,8 @@ def main():
help="log more output on stderr")
parser.add_argument("-j", "--json", action='store_true',
default=False, help="print json instead of text")
parser.add_argument("-H", "--handshakes", action='store_true',
default=False, help="evaluate handshakes only")
parser.add_argument("-d", "--downloads", action='store_true',
default=False, help="evaluate downloads")
parser.add_argument("--download-sizes", action='append', type=str,
metavar='numberlist',
default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int,
metavar='number',
default=50, help="perform that many downloads")
parser.add_argument("--samples", action='store', type=int, metavar='number',
default=1, help="how many sample runs to make")
parser.add_argument("--download-parallel", action='store', type=int,
metavar='number', default=0,
help="perform that many downloads in parallel (default all)")
parser.add_argument("-u", "--uploads", action='store_true',
default=False, help="evaluate uploads")
parser.add_argument("--upload-sizes", action='append', type=str,
metavar='numberlist',
default=None, help="evaluate upload size")
parser.add_argument("--upload-count", action='store', type=int,
metavar='number', default=50,
help="perform that many uploads")
parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int,
metavar='number',
default=5000, help="perform that many requests")
parser.add_argument("--httpd", action='store_true', default=False,
help="evaluate httpd server only")
parser.add_argument("--caddy", action='store_true', default=False,
@ -814,6 +807,41 @@ def main():
help="only start the servers")
parser.add_argument("--remote", action='store', type=str,
default=None, help="score against the remote server at <ip>:<port>")
parser.add_argument("--dtrace", action='store_true',
default = False, help = "produce dtrace of curl")
parser.add_argument("-H", "--handshakes", action='store_true',
default=False, help="evaluate handshakes only")
parser.add_argument("-d", "--downloads", action='store_true',
default=False, help="evaluate downloads")
parser.add_argument("--download-sizes", action='append', type=str,
metavar='numberlist',
default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int,
metavar='number',
default=50, help="perform that many downloads")
parser.add_argument("--download-parallel", action='store', type=int,
metavar='number', default=0,
help="perform that many downloads in parallel (default all)")
parser.add_argument("-u", "--uploads", action='store_true',
default=False, help="evaluate uploads")
parser.add_argument("--upload-sizes", action='append', type=str,
metavar='numberlist',
default=None, help="evaluate upload size")
parser.add_argument("--upload-count", action='store', type=int,
metavar='number', default=50,
help="perform that many uploads")
parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int,
metavar='number',
default=5000, help="perform that many requests")
parser.add_argument("--request-parallels", action='append', type=str,
metavar='numberlist',
default=None, help="evaluate request with these max-parallel numbers")
args = parser.parse_args()
if args.verbose > 0:

View file

@ -108,6 +108,28 @@ class RunProfile:
f'stats={self.stats}]'
class DTraceProfile:
def __init__(self, pid: int, run_dir):
self._pid = pid
self._run_dir = run_dir
self._proc = None
def start(self):
args = [
'sudo', 'dtrace',
'-x', 'ustackframes=100',
'-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}',
'-o', f'{self._run_dir}/curl.user_stacks'
]
self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False)
assert self._proc
def finish(self):
if self._proc:
self._proc.terminate()
class RunTcpDump:
def __init__(self, env, run_dir):
@ -467,7 +489,8 @@ class CurlClient:
timeout: Optional[float] = None,
silent: bool = False,
run_env: Optional[Dict[str, str]] = None,
server_addr: Optional[str] = None):
server_addr: Optional[str] = None,
with_dtrace: bool = False):
self.env = env
self._timeout = timeout if timeout else env.test_timeout
self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
@ -476,6 +499,7 @@ class CurlClient:
self._stderrfile = f'{self._run_dir}/curl.stderr'
self._headerfile = f'{self._run_dir}/curl.headers'
self._log_path = f'{self._run_dir}/curl.log'
self._with_dtrace = with_dtrace
self._silent = silent
self._run_env = run_env
self._server_addr = server_addr if server_addr else '127.0.0.1'
@ -784,6 +808,9 @@ class CurlClient:
profile = RunProfile(p.pid, started_at, self._run_dir)
if intext is not None and False:
p.communicate(input=intext.encode(), timeout=1)
if self._with_dtrace:
dtrace = DTraceProfile(p.pid, self._run_dir)
dtrace.start()
ptimeout = 0.0
while True:
try:
@ -797,6 +824,8 @@ class CurlClient:
ptimeout = 0.01
exitcode = p.returncode
profile.finish()
if self._with_dtrace:
dtrace.finish()
log.info(f'done: exit={exitcode}, profile={profile}')
else:
p = subprocess.run(args, stderr=cerr, stdout=cout,