Revision control
Copy as Markdown
Other Tools
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::{cell::RefCell, cmp::min, collections::VecDeque, fmt::Debug, rc::Rc};
use neqo_common::{qdebug, qinfo, qtrace, Header};
use neqo_qpack::decoder::QPackDecoder;
use neqo_transport::{Connection, StreamId};
use crate::{
frames::{hframe::HFrameType, FrameReader, HFrame, StreamReaderConnectionWrapper},
headers_checks::{headers_valid, is_interim},
priority::PriorityHandler,
push_controller::PushController,
qlog, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpRecvStream, HttpRecvStreamEvents,
MessageType, Priority, ReceiveOutput, RecvStream, Res, Stream,
};
#[allow(clippy::module_name_repetitions)]
pub struct RecvMessageInfo {
pub message_type: MessageType,
pub stream_type: Http3StreamType,
pub stream_id: StreamId,
pub first_frame_type: Option<u64>,
}
/*
* Response stream state:
* WaitingForResponseHeaders : we wait for headers. in this state we can
* also get a PUSH_PROMISE frame.
* DecodingHeaders : In this step the headers will be decoded. The stream
* may be blocked in this state on encoder instructions.
* WaitingForData : we got HEADERS, we are waiting for one or more data
* frames. In this state we can receive one or more
* PUSH_PROMIS frames or a HEADERS frame carrying trailers.
* ReadingData : we got a DATA frame, now we letting the app read payload.
* From here we will go back to WaitingForData state to wait
* for more data frames or to CLosed state
* ClosePending : waiting for app to pick up data, after that we can delete
* the TransactionClient.
* Closed
* ExtendedConnect: this request is for a WebTransport session. In this
* state RecvMessage will not be treated as a HTTP
* stream anymore. It is waiting to be transformed
* into WebTransport session or to be closed.
*/
#[derive(Debug)]
enum RecvMessageState {
WaitingForResponseHeaders { frame_reader: FrameReader },
DecodingHeaders { header_block: Vec<u8>, fin: bool },
WaitingForData { frame_reader: FrameReader },
ReadingData { remaining_data_len: usize },
WaitingForFinAfterTrailers { frame_reader: FrameReader },
ClosePending, // Close must first be read by application
Closed,
ExtendedConnect,
}
#[derive(Debug)]
struct PushInfo {
push_id: u64,
header_block: Vec<u8>,
}
#[derive(Debug)]
pub struct RecvMessage {
state: RecvMessageState,
message_type: MessageType,
stream_type: Http3StreamType,
qpack_decoder: Rc<RefCell<QPackDecoder>>,
conn_events: Box<dyn HttpRecvStreamEvents>,
push_handler: Option<Rc<RefCell<PushController>>>,
stream_id: StreamId,
priority_handler: PriorityHandler,
blocked_push_promise: VecDeque<PushInfo>,
}
impl ::std::fmt::Display for RecvMessage {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "RecvMessage stream_id:{}", self.stream_id)
}
}
impl RecvMessage {
pub fn new(
message_info: &RecvMessageInfo,
qpack_decoder: Rc<RefCell<QPackDecoder>>,
conn_events: Box<dyn HttpRecvStreamEvents>,
push_handler: Option<Rc<RefCell<PushController>>>,
priority_handler: PriorityHandler,
) -> Self {
Self {
state: RecvMessageState::WaitingForResponseHeaders {
frame_reader: message_info
.first_frame_type
.map_or_else(FrameReader::new, |frame_type| {
FrameReader::new_with_type(HFrameType(frame_type))
}),
},
message_type: message_info.message_type,
stream_type: message_info.stream_type,
qpack_decoder,
conn_events,
push_handler,
stream_id: message_info.stream_id,
priority_handler,
blocked_push_promise: VecDeque::new(),
}
}
fn handle_headers_frame(&mut self, header_block: Vec<u8>, fin: bool) -> Res<()> {
match self.state {
RecvMessageState::WaitingForResponseHeaders {..} => {
if header_block.is_empty() {
return Err(Error::HttpGeneralProtocolStream);
}
self.state = RecvMessageState::DecodingHeaders { header_block, fin };
}
RecvMessageState::WaitingForData { ..} => {
// TODO implement trailers, for now just ignore them.
self.state = RecvMessageState::WaitingForFinAfterTrailers{frame_reader: FrameReader::new()};
}
RecvMessageState::WaitingForFinAfterTrailers {..} => {
return Err(Error::HttpFrameUnexpected);
}
_ => unreachable!("This functions is only called in WaitingForResponseHeaders | WaitingForData | WaitingForFinAfterTrailers state.")
}
Ok(())
}
fn handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()> {
match self.state {
RecvMessageState::WaitingForResponseHeaders {..} | RecvMessageState::WaitingForFinAfterTrailers {..} => {
return Err(Error::HttpFrameUnexpected);
}
RecvMessageState::WaitingForData {..} => {
if len > 0 {
if fin {
return Err(Error::HttpFrame);
}
self.state = RecvMessageState::ReadingData {
remaining_data_len: usize::try_from(len).or(Err(Error::HttpFrame))?,
};
}
}
_ => unreachable!("This functions is only called in WaitingForResponseHeaders | WaitingForData | WaitingForFinAfterTrailers state.")
}
Ok(())
}
fn add_headers(&mut self, mut headers: Vec<Header>, fin: bool) -> Res<()> {
qtrace!([self], "Add new headers fin={}", fin);
let interim = match self.message_type {
MessageType::Request => false,
MessageType::Response => is_interim(&headers)?,
};
headers_valid(&headers, self.message_type)?;
if self.message_type == MessageType::Response {
headers.retain(Header::is_allowed_for_response);
}
if fin && interim {
return Err(Error::HttpGeneralProtocolStream);
}
let is_web_transport = self.message_type == MessageType::Request
&& headers
.iter()
.any(|h| h.name() == ":method" && h.value() == "CONNECT")
&& headers
.iter()
.any(|h| h.name() == ":protocol" && h.value() == "webtransport");
if is_web_transport {
self.conn_events
.extended_connect_new_session(self.stream_id, headers);
} else {
self.conn_events
.header_ready(self.get_stream_info(), headers, interim, fin);
}
if fin {
self.set_closed();
} else {
self.state = if is_web_transport {
self.stream_type = Http3StreamType::ExtendedConnect;
RecvMessageState::ExtendedConnect
} else if interim {
RecvMessageState::WaitingForResponseHeaders {
frame_reader: FrameReader::new(),
}
} else {
RecvMessageState::WaitingForData {
frame_reader: FrameReader::new(),
}
};
}
Ok(())
}
fn set_state_to_close_pending(&mut self, post_readable_event: bool) -> Res<()> {
// Stream has received fin. Depending on headers state set header_ready
// or data_readable event so that app can pick up the fin.
qtrace!([self], "set_state_to_close_pending: state={:?}", self.state);
match self.state {
RecvMessageState::WaitingForResponseHeaders { .. } => {
return Err(Error::HttpGeneralProtocolStream);
}
RecvMessageState::ReadingData { .. } => {}
RecvMessageState::WaitingForData { .. }
| RecvMessageState::WaitingForFinAfterTrailers { .. } => {
if post_readable_event {
self.conn_events.data_readable(self.get_stream_info());
}
}
_ => unreachable!("Closing an already closed transaction."),
}
if !matches!(self.state, RecvMessageState::Closed) {
self.state = RecvMessageState::ClosePending;
}
Ok(())
}
fn handle_push_promise(&mut self, push_id: u64, header_block: Vec<u8>) -> Res<()> {
if self.push_handler.is_none() {
return Err(Error::HttpFrameUnexpected);
}
if !self.blocked_push_promise.is_empty() {
self.blocked_push_promise.push_back(PushInfo {
push_id,
header_block,
});
} else if let Some(headers) = self
.qpack_decoder
.borrow_mut()
.decode_header_block(&header_block, self.stream_id)?
{
self.push_handler
.as_ref()
.ok_or(Error::HttpFrameUnexpected)?
.borrow_mut()
.new_push_promise(push_id, self.stream_id, headers)?;
} else {
self.blocked_push_promise.push_back(PushInfo {
push_id,
header_block,
});
}
Ok(())
}
fn receive_internal(&mut self, conn: &mut Connection, post_readable_event: bool) -> Res<()> {
let label = ::neqo_common::log_subject!(::log::Level::Debug, self);
loop {
qdebug!([label], "state={:?}.", self.state);
match &mut self.state {
// In the following 3 states we need to read frames.
RecvMessageState::WaitingForResponseHeaders { frame_reader }
| RecvMessageState::WaitingForData { frame_reader }
| RecvMessageState::WaitingForFinAfterTrailers { frame_reader } => {
match frame_reader.receive(&mut StreamReaderConnectionWrapper::new(
conn,
self.stream_id,
))? {
(None, true) => {
break self.set_state_to_close_pending(post_readable_event);
}
(None, false) => break Ok(()),
(Some(frame), fin) => {
qdebug!(
[self],
"A new frame has been received: {:?}; state={:?} fin={}",
frame,
self.state,
fin,
);
match frame {
HFrame::Headers { header_block } => {
self.handle_headers_frame(header_block, fin)?;
}
HFrame::Data { len } => self.handle_data_frame(len, fin)?,
HFrame::PushPromise {
push_id,
header_block,
} => self.handle_push_promise(push_id, header_block)?,
_ => break Err(Error::HttpFrameUnexpected),
}
if matches!(self.state, RecvMessageState::Closed) {
break Ok(());
}
if fin
&& !matches!(self.state, RecvMessageState::DecodingHeaders { .. })
{
break self.set_state_to_close_pending(post_readable_event);
}
}
};
}
RecvMessageState::DecodingHeaders {
ref header_block,
fin,
} => {
if self
.qpack_decoder
.borrow()
.refers_dynamic_table(header_block)?
&& !self.blocked_push_promise.is_empty()
{
qinfo!(
[self],
"decoding header is blocked waiting for a push_promise header block."
);
break Ok(());
}
let done = *fin;
let d_headers = self
.qpack_decoder
.borrow_mut()
.decode_header_block(header_block, self.stream_id)?;
if let Some(headers) = d_headers {
self.add_headers(headers, done)?;
if matches!(
self.state,
RecvMessageState::Closed | RecvMessageState::ExtendedConnect
) {
break Ok(());
}
} else {
qinfo!([self], "decoding header is blocked.");
break Ok(());
}
}
RecvMessageState::ReadingData { .. } => {
if post_readable_event {
self.conn_events.data_readable(self.get_stream_info());
}
break Ok(());
}
RecvMessageState::ClosePending | RecvMessageState::Closed => {
panic!("Stream readable after being closed!");
}
RecvMessageState::ExtendedConnect => {
// Ignore read event, this request is waiting to be picked up by a new
// WebTransportSession
break Ok(());
}
};
}
}
fn set_closed(&mut self) {
if !self.blocked_push_promise.is_empty() {
self.qpack_decoder
.borrow_mut()
.cancel_stream(self.stream_id);
}
self.state = RecvMessageState::Closed;
self.conn_events
.recv_closed(self.get_stream_info(), CloseType::Done);
}
const fn closing(&self) -> bool {
matches!(
self.state,
RecvMessageState::ClosePending | RecvMessageState::Closed
)
}
const fn get_stream_info(&self) -> Http3StreamInfo {
Http3StreamInfo::new(self.stream_id, Http3StreamType::Http)
}
}
impl Stream for RecvMessage {
fn stream_type(&self) -> Http3StreamType {
self.stream_type
}
}
impl RecvStream for RecvMessage {
fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
self.receive_internal(conn, true)?;
Ok((
ReceiveOutput::NoOutput,
matches!(self.state, RecvMessageState::Closed),
))
}
fn reset(&mut self, close_type: CloseType) -> Res<()> {
if !self.closing() || !self.blocked_push_promise.is_empty() {
self.qpack_decoder
.borrow_mut()
.cancel_stream(self.stream_id);
}
self.conn_events
.recv_closed(self.get_stream_info(), close_type);
self.state = RecvMessageState::Closed;
Ok(())
}
fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> {
let mut written = 0;
loop {
match self.state {
RecvMessageState::ReadingData {
ref mut remaining_data_len,
} => {
let to_read = min(*remaining_data_len, buf.len() - written);
let (amount, fin) = conn
.stream_recv(self.stream_id, &mut buf[written..written + to_read])
.map_err(|e| Error::map_stream_recv_errors(&Error::from(e)))?;
qlog::h3_data_moved_up(conn.qlog_mut(), self.stream_id, amount);
debug_assert!(amount <= to_read);
*remaining_data_len -= amount;
written += amount;
if fin {
if *remaining_data_len > 0 {
return Err(Error::HttpFrame);
}
self.set_closed();
break Ok((written, fin));
} else if *remaining_data_len == 0 {
self.state = RecvMessageState::WaitingForData {
frame_reader: FrameReader::new(),
};
self.receive_internal(conn, false)?;
} else {
break Ok((written, false));
}
}
RecvMessageState::ClosePending => {
self.set_closed();
break Ok((written, true));
}
_ => break Ok((written, false)),
}
}
}
fn http_stream(&mut self) -> Option<&mut dyn HttpRecvStream> {
Some(self)
}
}
impl HttpRecvStream for RecvMessage {
fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
while let Some(p) = self.blocked_push_promise.front() {
if let Some(headers) = self
.qpack_decoder
.borrow_mut()
.decode_header_block(&p.header_block, self.stream_id)?
{
self.push_handler
.as_ref()
.ok_or(Error::HttpFrameUnexpected)?
.borrow_mut()
.new_push_promise(p.push_id, self.stream_id, headers)?;
self.blocked_push_promise.pop_front();
} else {
return Ok((ReceiveOutput::NoOutput, false));
}
}
self.receive(conn)
}
fn maybe_update_priority(&mut self, priority: Priority) -> bool {
self.priority_handler.maybe_update_priority(priority)
}
fn priority_update_frame(&mut self) -> Option<HFrame> {
self.priority_handler.maybe_encode_frame(self.stream_id)
}
fn priority_update_sent(&mut self) {
self.priority_handler.priority_update_sent();
}
fn set_new_listener(&mut self, conn_events: Box<dyn HttpRecvStreamEvents>) {
self.state = RecvMessageState::WaitingForData {
frame_reader: FrameReader::new(),
};
self.conn_events = conn_events;
}
fn extended_connect_wait_for_response(&self) -> bool {
matches!(self.state, RecvMessageState::ExtendedConnect)
}
}