feat: Support casting transaction processing to the background

This commit is contained in:
nexy7574 2026-02-21 02:56:47 +00:00 committed by timedout
parent 2f9956ddca
commit 0d8cafc329
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F

View file

@ -1,16 +1,19 @@
use std::{collections::BTreeMap, net::IpAddr, time::Instant}; use std::{
collections::BTreeMap,
net::IpAddr,
time::{Duration, Instant},
};
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{ use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error, Err, Error, Result, debug, debug_warn, err, error, info,
result::LogErr, result::LogErr,
trace, trace,
utils::{ utils::{
IterStream, ReadyExt, millis_since_unix_epoch, IterStream, ReadyExt, millis_since_unix_epoch,
stream::{BroadbandExt, TryBroadbandExt, automatic_width}, stream::{BroadbandExt, TryBroadbandExt, automatic_width},
}, },
warn,
}; };
use conduwuit_service::{ use conduwuit_service::{
Services, Services,
@ -21,7 +24,7 @@ use itertools::Itertools;
use ruma::{ use ruma::{
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
api::{ api::{
client::error::ErrorKind, client::error::{ErrorKind, ErrorKind::LimitExceeded},
federation::transactions::{ federation::transactions::{
edu::{ edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
@ -35,6 +38,8 @@ use ruma::{
serde::Raw, serde::Raw,
to_device::DeviceIdOrAllDevices, to_device::DeviceIdOrAllDevices,
}; };
use service::transaction_ids::{TxnKey, WrappedTransactionResponse};
use tokio::sync::watch::{Receiver, Sender};
use crate::Ruma; use crate::Ruma;
@ -44,15 +49,6 @@ type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
/// # `PUT /_matrix/federation/v1/send/{txnId}` /// # `PUT /_matrix/federation/v1/send/{txnId}`
/// ///
/// Push EDUs and PDUs to this server. /// Push EDUs and PDUs to this server.
#[tracing::instrument(
name = "txn",
level = "debug",
skip_all,
fields(
%client,
origin = body.origin().as_str()
),
)]
pub(crate) async fn send_transaction_message_route( pub(crate) async fn send_transaction_message_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@ -64,6 +60,18 @@ pub(crate) async fn send_transaction_message_route(
))); )));
} }
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
// Did we already process this transaction
if let Some(response) = services.transaction_ids.get_cached_txn(&txn_key) {
return Ok(response);
}
// Or are currently processing it
if let Some(receiver) = services.transaction_ids.get_active_federation_txn(&txn_key) {
// Wait up to 50 seconds for a result
return wait_for_result(receiver).await;
}
if body.pdus.len() > PDU_LIMIT { if body.pdus.len() > PDU_LIMIT {
return Err!(Request(Forbidden( return Err!(Request(Forbidden(
"Not allowed to send more than {PDU_LIMIT} PDUs in one transaction" "Not allowed to send more than {PDU_LIMIT} PDUs in one transaction"
@ -76,16 +84,48 @@ pub(crate) async fn send_transaction_message_route(
))); )));
} }
let txn_start_time = Instant::now(); let sender = services
trace!( .transaction_ids
pdus = body.pdus.len(), .start_federation_txn(txn_key.clone())?;
edus = body.edus.len(), services.server.runtime().spawn(process_inbound_transaction(
elapsed = ?txn_start_time.elapsed(), services,
id = %body.transaction_id, body,
origin = %body.origin(), client,
"Starting txn", txn_key.clone(),
); sender,
));
let receiver = services
.transaction_ids
.get_active_federation_txn(&txn_key)
.expect("just-created transaction was missing");
wait_for_result(receiver).await
}
async fn wait_for_result(
mut recv: Receiver<WrappedTransactionResponse>,
) -> Result<send_transaction_message::v1::Response> {
if tokio::time::timeout(Duration::from_secs(50), recv.changed())
.await
.is_err()
{
// Took too long, return 429 to encourage the sender to try again
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Transaction is being still being processed. Please try again later.",
));
}
let value = recv.borrow_and_update();
Ok(value.clone().expect("channel returned with no value?"))
}
async fn process_inbound_transaction(
services: crate::State,
body: Ruma<send_transaction_message::v1::Request>,
client: IpAddr,
txn_key: TxnKey,
sender: Sender<Option<send_transaction_message::v1::Response>>,
) {
let txn_start_time = Instant::now();
let pdus = body let pdus = body
.pdus .pdus
.iter() .iter()
@ -102,30 +142,53 @@ pub(crate) async fn send_transaction_message_route(
.filter_map(Result::ok) .filter_map(Result::ok)
.stream(); .stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; info!(
id = ?txn_key.1,
debug!( origin = ?txn_key.0,
pdus = body.pdus.len(), pdus = body.pdus.len(),
edus = body.edus.len(), edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(), "Processing transaction",
id = %body.transaction_id,
origin = %body.origin(),
"Finished txn",
); );
let Ok(results) = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await
else {
// TODO: handle this properly. The channel doesn't like being closed with no
// value, returning an empty response may lie to the remote and make them
// think we processed it properly (and lose events), but we also can't return
// an actual error.
panic!("failed to handle incoming transaction");
};
for (id, result) in &results { for (id, result) in &results {
if let Err(e) = result { if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {id}: {e:?}"); debug_warn!("Incoming PDU failed {id}: {e:?}");
} }
} }
} }
Ok(send_transaction_message::v1::Response { info!(
id = ?txn_key.1,
origin = ?txn_key.0,
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
"Finished processing transaction"
);
let response = send_transaction_message::v1::Response {
pdus: results pdus: results
.into_iter() .into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message))) .map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(), .collect(),
}) };
services
.transaction_ids
.set_cached_txn(txn_key.clone(), response.clone());
sender
.send(Some(response))
.expect("couldn't send response to channel");
services.transaction_ids.finish_federation_txn(&txn_key);
drop(sender);
} }
async fn handle( async fn handle(