scuffle_http/backend/h3/
body.rs1use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{Buf, Bytes};
7use h3::server::RequestStream;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10enum State {
11 Data(Option<u64>),
12 Trailers,
13 Done,
14}
15
16pub struct QuicIncomingBody<S> {
20 stream: RequestStream<S, Bytes>,
21 state: State,
22}
23
24impl<S> QuicIncomingBody<S> {
25 pub fn new(stream: RequestStream<S, Bytes>, size_hint: Option<u64>) -> Self {
27 Self {
28 stream,
29 state: State::Data(size_hint),
30 }
31 }
32}
33
34impl<S: h3::quic::RecvStream> http_body::Body for QuicIncomingBody<S> {
35 type Data = Bytes;
36 type Error = h3::Error;
37
38 fn poll_frame(
39 mut self: Pin<&mut Self>,
40 cx: &mut Context<'_>,
41 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
42 let QuicIncomingBody { stream, state } = self.as_mut().get_mut();
43
44 if *state == State::Done {
45 return Poll::Ready(None);
46 }
47
48 if let State::Data(remaining) = state {
49 match stream.poll_recv_data(cx) {
50 Poll::Ready(Ok(Some(mut buf))) => {
51 let buf_size = buf.remaining() as u64;
52
53 if let Some(remaining) = remaining {
54 if buf_size > *remaining {
55 *state = State::Done;
56 return Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into())));
57 }
58
59 *remaining -= buf_size;
60 }
61
62 return Poll::Ready(Some(Ok(http_body::Frame::data(buf.copy_to_bytes(buf_size as usize)))));
63 }
64 Poll::Ready(Ok(None)) => {
65 *state = State::Trailers;
66 }
67 Poll::Ready(Err(err)) => {
68 *state = State::Done;
69 return Poll::Ready(Some(Err(err)));
70 }
71 Poll::Pending => {
72 return Poll::Pending;
73 }
74 }
75 }
76
77 let resp = match stream.poll_recv_data(cx) {
83 Poll::Ready(Ok(None)) => match std::pin::pin!(stream.recv_trailers()).poll(cx) {
84 Poll::Ready(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))),
85 Poll::Pending => {
87 #[cfg(feature = "tracing")]
88 tracing::warn!("recv_trailers is pending");
89 Poll::Ready(None)
90 }
91 Poll::Ready(Ok(None)) => Poll::Ready(None),
92 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
93 },
94 Poll::Ready(Ok(Some(_))) => Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into()))),
96 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
97 Poll::Pending => return Poll::Pending,
98 };
99
100 *state = State::Done;
101
102 resp
103 }
104
105 fn size_hint(&self) -> http_body::SizeHint {
106 match self.state {
107 State::Data(Some(remaining)) => http_body::SizeHint::with_exact(remaining),
108 State::Data(None) => http_body::SizeHint::default(),
109 State::Trailers | State::Done => http_body::SizeHint::with_exact(0),
110 }
111 }
112
113 fn is_end_stream(&self) -> bool {
114 match self.state {
115 State::Data(Some(0)) | State::Trailers | State::Done => true,
116 State::Data(_) => false,
117 }
118 }
119}