scuffle_http/backend/h3/
body.rs

1//! Types for parsing HTTP/3 bodies.
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{Buf, Bytes};
7use h3::server::RequestStream;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10enum State {
11    Data(Option<u64>),
12    Trailers,
13    Done,
14}
15
16/// An incoming HTTP/3 body.
17///
18/// Implements [`http_body::Body`].
19pub struct QuicIncomingBody<S> {
20    stream: RequestStream<S, Bytes>,
21    state: State,
22}
23
24impl<S> QuicIncomingBody<S> {
25    /// Create a new incoming HTTP/3 body.
26    pub fn new(stream: RequestStream<S, Bytes>, size_hint: Option<u64>) -> Self {
27        Self {
28            stream,
29            state: State::Data(size_hint),
30        }
31    }
32}
33
34impl<S: h3::quic::RecvStream> http_body::Body for QuicIncomingBody<S> {
35    type Data = Bytes;
36    type Error = h3::Error;
37
38    fn poll_frame(
39        mut self: Pin<&mut Self>,
40        cx: &mut Context<'_>,
41    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
42        let QuicIncomingBody { stream, state } = self.as_mut().get_mut();
43
44        if *state == State::Done {
45            return Poll::Ready(None);
46        }
47
48        if let State::Data(remaining) = state {
49            match stream.poll_recv_data(cx) {
50                Poll::Ready(Ok(Some(mut buf))) => {
51                    let buf_size = buf.remaining() as u64;
52
53                    if let Some(remaining) = remaining {
54                        if buf_size > *remaining {
55                            *state = State::Done;
56                            return Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into())));
57                        }
58
59                        *remaining -= buf_size;
60                    }
61
62                    return Poll::Ready(Some(Ok(http_body::Frame::data(buf.copy_to_bytes(buf_size as usize)))));
63                }
64                Poll::Ready(Ok(None)) => {
65                    *state = State::Trailers;
66                }
67                Poll::Ready(Err(err)) => {
68                    *state = State::Done;
69                    return Poll::Ready(Some(Err(err)));
70                }
71                Poll::Pending => {
72                    return Poll::Pending;
73                }
74            }
75        }
76
77        // We poll the recv data again even though we already got the None
78        // because we want to make sure there is not a frame after the trailers
79        // This is a workaround because h3 does not allow us to poll the trailer
80        // directly, so we need to make sure the future recv_trailers is going to be
81        // ready after a single poll We avoid pinning to the heap.
82        let resp = match stream.poll_recv_data(cx) {
83            Poll::Ready(Ok(None)) => match std::pin::pin!(stream.recv_trailers()).poll(cx) {
84                Poll::Ready(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))),
85                // We will only poll the recv_trailers once so if pending is returned we are done.
86                Poll::Pending => {
87                    #[cfg(feature = "tracing")]
88                    tracing::warn!("recv_trailers is pending");
89                    Poll::Ready(None)
90                }
91                Poll::Ready(Ok(None)) => Poll::Ready(None),
92                Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
93            },
94            // We are not expecting any data after the previous poll returned None
95            Poll::Ready(Ok(Some(_))) => Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into()))),
96            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
97            Poll::Pending => return Poll::Pending,
98        };
99
100        *state = State::Done;
101
102        resp
103    }
104
105    fn size_hint(&self) -> http_body::SizeHint {
106        match self.state {
107            State::Data(Some(remaining)) => http_body::SizeHint::with_exact(remaining),
108            State::Data(None) => http_body::SizeHint::default(),
109            State::Trailers | State::Done => http_body::SizeHint::with_exact(0),
110        }
111    }
112
113    fn is_end_stream(&self) -> bool {
114        match self.state {
115            State::Data(Some(0)) | State::Trailers | State::Done => true,
116            State::Data(_) => false,
117        }
118    }
119}