pytest: close file handles after use, and two minor tidy-ups

Also:
- drop two unreachable return statements.
- test_17_ssl_use: avoid implicit string concatenations in lists.

Reported by GitHub CodeQL

Closes #21916
This commit is contained in:
Viktor Szakats 2026-06-08 23:27:32 +02:00
parent 2dfd265d66
commit 8145476d5d
No known key found for this signature in database
15 changed files with 149 additions and 84 deletions

View file

@ -473,8 +473,10 @@ class TestDownload:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))

View file

@ -57,7 +57,8 @@ class TestUpload:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
r = curl.http_upload(urls=[url], data=data, alpn_proto=proto)
r.check_stats(count=1, http_status=200, exitcode=0)
respdata = open(curl.response_file(0)).readlines()
with open(curl.response_file(0)) as fr:
respdata = fr.readlines()
assert respdata == [data]
# upload large data, check that this is what was echoed
@ -68,8 +69,9 @@ class TestUpload:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto)
r.check_stats(count=1, http_status=200, exitcode=0)
indata = open(fdata).readlines()
respdata = open(curl.response_file(0)).readlines()
with open(fdata) as fi, open(curl.response_file(0)) as fr:
indata = fi.readlines()
respdata = fr.readlines()
assert respdata == indata
# upload data sequentially, check that they were echoed
@ -82,7 +84,8 @@ class TestUpload:
r = curl.http_upload(urls=[url], data=data, alpn_proto=proto)
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == [data]
# upload data parallel, check that they were echoed
@ -97,7 +100,8 @@ class TestUpload:
extra_args=['--parallel'])
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == [data]
# upload large data sequentially, check that this is what was echoed
@ -109,10 +113,12 @@ class TestUpload:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto)
r.check_response(count=count, http_status=200)
indata = open(fdata).readlines()
with open(fdata) as fi:
indata = fi.readlines()
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == indata
# upload very large data sequentially, check that this is what was echoed
@ -124,9 +130,11 @@ class TestUpload:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto)
r.check_stats(count=count, http_status=200, exitcode=0)
indata = open(fdata).readlines()
with open(fdata) as fi:
indata = fi.readlines()
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == indata
# upload from stdin, issue #14870
@ -141,7 +149,8 @@ class TestUpload:
r = curl.http_put(urls=[url], data=indata, alpn_proto=proto)
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == [f'{len(indata)}']
@pytest.mark.parametrize("proto", Env.http_protos())
@ -198,7 +207,8 @@ class TestUpload:
extra_args=['--parallel'])
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == [data]
# upload large data parallel, check that this is what was echoed
@ -243,7 +253,8 @@ class TestUpload:
exp_data = [f'{os.path.getsize(fdata)}']
r.check_response(count=count, http_status=200)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == exp_data
# PUT 10m
@ -259,7 +270,8 @@ class TestUpload:
exp_data = [f'{os.path.getsize(fdata)}']
r.check_response(count=count, http_status=200)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == exp_data
# issue #10591
@ -351,8 +363,9 @@ class TestUpload:
r.check_response(count=1, http_status=200)
# apache does not Upgrade on request with a body
assert r.stats[0]['http_version'] == '1.1', f'{r}'
indata = open(fdata).readlines()
respdata = open(curl.response_file(0)).readlines()
with open(fdata) as fi, open(curl.response_file(0)) as fr:
indata = fi.readlines()
respdata = fr.readlines()
assert respdata == indata
# upload to a 301,302,303 response
@ -368,7 +381,8 @@ class TestUpload:
'-L', '--trace-config', 'http/2,http/3'
])
r.check_response(count=1, http_status=200)
respdata = open(curl.response_file(0)).readlines()
with open(curl.response_file(0)) as fr:
respdata = fr.readlines()
assert respdata == [] # was transformed to a GET
# upload to a 307 response
@ -383,7 +397,8 @@ class TestUpload:
'-L', '--trace-config', 'http/2,http/3'
])
r.check_response(count=1, http_status=200)
respdata = open(curl.response_file(0)).readlines()
with open(curl.response_file(0)) as fr:
respdata = fr.readlines()
assert respdata == [data] # was POST again
# POST form data, yet another code path in transfer
@ -406,8 +421,9 @@ class TestUpload:
'--trace-config', 'http/2,http/3'
])
r.check_stats(count=1, http_status=200, exitcode=0)
indata = open(fdata).readlines()
respdata = open(curl.response_file(0)).readlines()
with open(fdata) as fi, open(curl.response_file(0)) as fr:
indata = fi.readlines()
respdata = fr.readlines()
assert respdata == indata
# POST data urlencoded, large enough to be sent separate from request headers
@ -420,8 +436,9 @@ class TestUpload:
'--trace-config', 'http/2,http/3'
])
r.check_stats(count=1, http_status=200, exitcode=0)
indata = open(fdata).readlines()
respdata = open(curl.response_file(0)).readlines()
with open(fdata) as fi, open(curl.response_file(0)) as fr:
indata = fi.readlines()
respdata = fr.readlines()
assert respdata == indata
# POST data urlencoded, small enough to be sent with request headers
@ -442,8 +459,9 @@ class TestUpload:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto, extra_args=extra_args)
r.check_stats(count=1, http_status=200, exitcode=0)
indata = open(fdata).readlines()
respdata = open(curl.response_file(0)).readlines()
with open(fdata) as fi, open(curl.response_file(0)) as fr:
indata = fi.readlines()
respdata = fr.readlines()
assert respdata == indata
def check_download(self, r: ExecResult, count: int, srcfile: Union[str, os.PathLike], curl: CurlClient):
@ -451,8 +469,10 @@ class TestUpload:
dfile = curl.download_file(i)
assert os.path.exists(dfile), f'download {dfile} missing\n{r.dump_logs()}'
if not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -699,8 +719,9 @@ class TestUpload:
dfile = client.download_file(i)
assert os.path.exists(dfile), f'download {dfile} missing\n{r.dump_logs()}'
if complete:
diff = "".join(difflib.unified_diff(a=source,
b=open(dfile).readlines(),
with open(dfile) as fb:
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=source, b=b,
fromfile='-',
tofile=dfile,
n=1))

View file

@ -151,7 +151,8 @@ class TestCaddy:
extra_args=['--parallel'])
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == [data]
# put large file, check that they length were echoed
@ -166,7 +167,8 @@ class TestCaddy:
exp_data = [f'{os.path.getsize(fdata)}']
r.check_response(count=count, http_status=200)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == exp_data
@pytest.mark.parametrize("proto", Env.http_protos())
@ -210,8 +212,10 @@ class TestCaddy:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))

View file

@ -57,7 +57,6 @@ class TestProxy:
if m:
return m.group(1)
assert False, f'tunnel protocol not found in:\n{"".join(r.trace_lines)}'
return None
# download via http: proxy (no tunnel)
def test_10_01_proxy_http(self, env: Env, httpd):
@ -105,9 +104,11 @@ class TestProxy:
extra_args=xargs)
r.check_response(count=count, http_status=200,
protocol='HTTP/2' if proto == 'h2' else 'HTTP/1.1')
indata = open(srcfile).readlines()
with open(srcfile) as fi:
indata = fi.readlines()
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == indata
# download http: via http: proxytunnel
@ -229,9 +230,11 @@ class TestProxy:
assert self.get_tunnel_proto_used(r) == tunnel
r.check_response(count=count, http_status=200)
assert r.total_connects == 1, r.dump_logs()
indata = open(srcfile).readlines()
with open(srcfile) as fi:
indata = fi.readlines()
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == indata, f'response {i} differs'
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")

View file

@ -48,7 +48,6 @@ class TestProxyAuth:
if m:
return m.group(1)
assert False, f'tunnel protocol not found in:\n{"".join(r.trace_lines)}'
return None
# download via http: proxy (no tunnel), no auth
def test_13_01_proxy_no_auth(self, env: Env, httpd, configures_httpd):

View file

@ -243,10 +243,10 @@ class TestSSLUse:
succeed13, succeed12):
# to test setting cipher suites, the AES 256 ciphers are disabled in the test server
httpd.set_extra_config('base', [
'SSLCipherSuite SSL'
' ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256'
'SSLCipherSuite SSL' +
' ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256' +
':ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305',
'SSLCipherSuite TLSv1.3'
'SSLCipherSuite TLSv1.3' +
' TLS_AES_128_GCM_SHA256:TLS_CHACHA20_POLY1305_SHA256',
f'SSLProtocol {tls_proto}'
])
@ -525,10 +525,10 @@ class TestSSLUse:
def test_17_18_gnutls_priority(self, env: Env, httpd, configures_httpd, priority, tls_proto, ciphers, success):
# to test setting cipher suites, the AES 256 ciphers are disabled in the test server
httpd.set_extra_config('base', [
'SSLCipherSuite SSL'
' ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256'
'SSLCipherSuite SSL' +
' ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256' +
':ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305',
'SSLCipherSuite TLSv1.3'
'SSLCipherSuite TLSv1.3' +
' TLS_AES_128_GCM_SHA256:TLS_CHACHA20_POLY1305_SHA256',
])
httpd.reload_if_config_changed()

View file

@ -77,7 +77,8 @@ class TestVsFTPD:
url = f'ftp://{env.ftp_domain}:{vsftpd.port}/'
r = curl.ftp_get(urls=[url], with_stats=True)
r.check_stats(count=1, http_status=226)
lines = open(os.path.join(curl.run_dir, 'download_#1.data')).readlines()
with open(os.path.join(curl.run_dir, 'download_#1.data')) as fd:
lines = fd.readlines()
assert len(lines) == 5, f'list: {lines}'
r.check_stats_timelines()
@ -251,8 +252,10 @@ class TestVsFTPD:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -264,8 +267,10 @@ class TestVsFTPD:
assert os.path.exists(srcfile)
assert os.path.exists(dstfile)
if not filecmp.cmp(srcfile, dstfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dstfile).readlines(),
with open(srcfile) as fa, open(dstfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dstfile,
n=1))

View file

@ -82,7 +82,8 @@ class TestVsFTPD:
url = f'ftp://{env.ftp_domain}:{vsftpds.port}/'
r = curl.ftp_ssl_get(urls=[url], with_stats=True)
r.check_stats(count=1, http_status=226)
lines = open(os.path.join(curl.run_dir, 'download_#1.data')).readlines()
with open(os.path.join(curl.run_dir, 'download_#1.data')) as fd:
lines = fd.readlines()
assert len(lines) == 4, f'list: {lines}'
r.check_stats_timelines()
@ -203,7 +204,8 @@ class TestVsFTPD:
r.check_stats(count=count, http_status=226)
# expect the uploaded file to be number of converted newlines larger
dstsize = os.path.getsize(dstfile)
newlines = len(open(srcfile).readlines())
with open(srcfile) as fd:
newlines = len(fd.readlines())
assert (srcsize + newlines) == dstsize, \
f'expected source with {newlines} lines to be that much larger,'\
f'instead srcsize={srcsize}, upload size={dstsize}, diff={dstsize-srcsize}'
@ -248,7 +250,8 @@ class TestVsFTPD:
r = curl.ftp_ssl_upload(urls=[url], updata=indata, with_stats=True)
r.check_stats(count=count, http_status=226)
assert os.path.exists(dstfile)
destdata = open(dstfile).readlines()
with open(dstfile) as fd:
destdata = fd.readlines()
expdata = [indata] if len(indata) else []
assert expdata == destdata, f'expected: {expdata}, got: {destdata}'
@ -300,8 +303,10 @@ class TestVsFTPD:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -313,8 +318,10 @@ class TestVsFTPD:
assert os.path.exists(srcfile)
assert os.path.exists(dstfile)
if not filecmp.cmp(srcfile, dstfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dstfile).readlines(),
with open(srcfile) as fa, open(dstfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dstfile,
n=1))

View file

@ -82,7 +82,8 @@ class TestFtpsVsFTPD:
url = f'ftps://{env.ftp_domain}:{vsftpds.port}/'
r = curl.ftp_get(urls=[url], with_stats=True)
r.check_stats(count=1, http_status=226)
lines = open(os.path.join(curl.run_dir, 'download_#1.data')).readlines()
with open(os.path.join(curl.run_dir, 'download_#1.data')) as fd:
lines = fd.readlines()
assert len(lines) == 4, f'list: {lines}'
r.check_stats_timelines()
@ -216,7 +217,8 @@ class TestFtpsVsFTPD:
r.check_stats(count=count, http_status=226)
# expect the uploaded file to be number of converted newlines larger
dstsize = os.path.getsize(dstfile)
newlines = len(open(srcfile).readlines())
with open(srcfile) as fd:
newlines = len(fd.readlines())
assert (srcsize + newlines) == dstsize, \
f'expected source with {newlines} lines to be that much larger,'\
f'instead srcsize={srcsize}, upload size={dstsize}, diff={dstsize-srcsize}'
@ -261,7 +263,8 @@ class TestFtpsVsFTPD:
r = curl.ftp_upload(urls=[url], updata=indata, with_stats=True)
r.check_stats(count=count, http_status=226)
assert os.path.exists(dstfile)
destdata = open(dstfile).readlines()
with open(dstfile) as fd:
destdata = fd.readlines()
expdata = [indata] if len(indata) else []
assert expdata == destdata, f'expected: {expdata}, got: {destdata}'
@ -289,8 +292,10 @@ class TestFtpsVsFTPD:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -302,8 +307,10 @@ class TestFtpsVsFTPD:
assert os.path.exists(srcfile)
assert os.path.exists(dstfile)
if not filecmp.cmp(srcfile, dstfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dstfile).readlines(),
with open(srcfile) as fa, open(dstfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dstfile,
n=1))

View file

@ -96,7 +96,9 @@ class TestSocks:
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto)
r.check_stats(count=count, http_status=200, exitcode=0)
indata = open(fdata).readlines()
with open(fdata) as fi:
indata = fi.readlines()
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
with open(curl.response_file(i)) as fr:
respdata = fr.readlines()
assert respdata == indata

View file

@ -191,8 +191,10 @@ class TestScp:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -202,8 +204,10 @@ class TestScp:
assert os.path.exists(srcfile)
assert os.path.exists(destfile)
if not filecmp.cmp(srcfile, destfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(destfile).readlines(),
with open(srcfile) as fa, open(destfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=destfile,
n=1))

View file

@ -191,8 +191,10 @@ class TestSftp:
dfile = client.download_file(i)
assert os.path.exists(dfile)
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
with open(srcfile) as fa, open(dfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=dfile,
n=1))
@ -202,8 +204,10 @@ class TestSftp:
assert os.path.exists(srcfile)
assert os.path.exists(destfile)
if not filecmp.cmp(srcfile, destfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(destfile).readlines(),
with open(srcfile) as fa, open(destfile) as fb:
a = fa.readlines()
b = fb.readlines()
diff = "".join(difflib.unified_diff(a=a, b=b,
fromfile=srcfile,
tofile=destfile,
n=1))

View file

@ -104,8 +104,9 @@ class LocalClient:
log.warning(f'Timeout after {self._timeout}s: {args}')
exitcode = -1
exception = 'TimeoutExpired'
coutput = open(self._stdoutfile).readlines()
cerrput = open(self._stderrfile).readlines()
with open(self._stdoutfile) as fout, open(self._stderrfile) as ferr:
coutput = fout.readlines()
cerrput = ferr.readlines()
return ExecResult(args=myargs, exit_code=exitcode, exception=exception,
stdout=coutput, stderr=cerrput,
duration=datetime.now() - start)
@ -113,8 +114,10 @@ class LocalClient:
def dump_logs(self):
lines = []
lines.append('>>--stdout ----------------------------------------------\n')
lines.extend(open(self._stdoutfile).readlines())
with open(self._stdoutfile) as cstdout:
lines.extend(cstdout.readlines())
lines.append('>>--stderr ----------------------------------------------\n')
lines.extend(open(self._stderrfile).readlines())
with open(self._stderrfile) as cstderr:
lines.extend(cstderr.readlines())
lines.append('<<-------------------------------------------------------\n')
return ''.join(lines)

View file

@ -393,7 +393,7 @@ class ExecResult:
f'got {self.exit_code}\n{self.dump_logs()}'
elif code is False:
assert self.exit_code != 0, f'expected exit code {code}, '\
f'got {self.exit_code}\n{self.dump_logs()}'
f'got {self.exit_code}\n{self.dump_logs()}'
else:
assert self.exit_code == code, f'expected exit code {code}, '\
f'got {self.exit_code}\n{self.dump_logs()}'
@ -1072,8 +1072,9 @@ class CurlClient:
dtrace.finish()
if self._with_flame:
self._generate_flame(args, dtrace=dtrace, perf=perf)
coutput = open(self._stdoutfile).readlines()
cerrput = open(self._stderrfile).readlines()
with open(self._stdoutfile) as fout, open(self._stderrfile) as ferr:
coutput = fout.readlines()
cerrput = ferr.readlines()
return ExecResult(args=args, exit_code=exitcode, exception=exception,
stdout=coutput, stderr=cerrput,
duration=ended_at - started_at,
@ -1167,7 +1168,8 @@ class CurlClient:
return args
def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
lines = open(headerfile).readlines()
with open(headerfile) as fd:
lines = fd.readlines()
if r is None:
r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])

View file

@ -132,7 +132,8 @@ class Sshd:
self._host_key_files.append(key_file)
pub_file = f'{key_file}.pub'
self._host_pub_files.append(pub_file)
pubkey = open(pub_file).read()
with open(pub_file) as fp:
pubkey = fp.read()
# fd_known.write(f'[127.0.0.1]:{self.port} {pubkey}')
fd_known.write(f'[{self.env.domain1.lower()}]:{self.port} {pubkey}')
fd_unknown.write(f'dummy.invalid {pubkey}')
@ -159,7 +160,8 @@ class Sshd:
self._user_pub_files.append(f'{key_file}.pub')
with open(self._auth_keys, 'w') as fd:
os.chmod(self._auth_keys, stat.S_IRUSR | stat.S_IWUSR)
pubkey = open(self._user_pub_files[0]).read()
with open(self._user_pub_files[0]) as fp:
pubkey = fp.read()
fd.write(pubkey)
def clear_logs(self):