diff --git a/Cargo.lock b/Cargo.lock index 19c57f3..3631633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1451,6 +1451,7 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" name = "lib_rusteria" version = "0.1.0" dependencies = [ + "bytes", "log", "octets", "quiche", diff --git a/src/libRusteria/Cargo.toml b/src/libRusteria/Cargo.toml index 11ccd5b..3ac8817 100644 --- a/src/libRusteria/Cargo.toml +++ b/src/libRusteria/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" quiche = "*" log = "*" octets = "*" +bytes = "*" [lib] path = "src/lib.rs" diff --git a/src/libRusteria/src/proto.rs b/src/libRusteria/src/proto.rs index bcc9c6a..4bf24e6 100644 --- a/src/libRusteria/src/proto.rs +++ b/src/libRusteria/src/proto.rs @@ -56,11 +56,13 @@ pub struct HysteriaTcpRequest { //potentially none resolved url pub url: String, pub padding: String, + //the pos of the last byte in the input slice + pub offset: usize, } //TODO error handling when you have time impl HysteriaTcpRequest { - pub fn from_bytes(bytes: &[u8]) -> Option { + pub fn from_bytes(bytes: &[u8]) -> Option< Self> { let mut octet = Octets::with_slice(bytes); let request_id = octet.get_varint().ok()?; let addr_len = octet.get_varint().ok()?; @@ -77,22 +79,17 @@ impl HysteriaTcpRequest { let padding_len = octet.get_varint().ok()?; let padding_bytes = octet.get_bytes(padding_len as usize).ok()?; let padding = std::str::from_utf8(padding_bytes.buf()).ok()?.to_string(); - let remaining_bytes = bytes.len() - octet.off(); + let offset=octet.off(); info!( "Parsed TCP request: request_id={}, addr_len={}, url={}, padding_len={}, remaining_bytes={}", - request_id, addr_len, url, padding_len, remaining_bytes + request_id, addr_len, url, padding_len, octet.len()-octet.off() ); - if remaining_bytes > 0 { - info!( - "remaining bytes in ascii: {}", - String::from_utf8_lossy(&bytes[octet.off()..]) - ); - } - //print remaining bytes + Some(Self { request_id, url, padding, + offset, }) } } diff --git a/src/rusteriaServer/src/hysteria.rs b/src/rusteriaServer/src/hysteria.rs index b5c1d3a..e846f45 100644 --- a/src/rusteriaServer/src/hysteria.rs +++ b/src/rusteriaServer/src/hysteria.rs @@ -16,6 +16,7 @@ use tokio_quiche::buffer_pool::PooledBuf; use tokio_quiche::http3::driver::OutboundFrame; use tokio_quiche::quic::{HandshakeInfo, QuicheConnection}; use tokio_quiche::{ApplicationOverQuic, QuicResult}; +use tokio_quiche::metrics::Metrics; const MAX_BUF_SIZE: usize = 65535 * 5; //64kb*5 @@ -118,6 +119,14 @@ impl HysDriver { Some(stream) => { // Get the response data before processing if let Some(response) = response { + //if the verification attempt fails, rearm the receiver + // if !response.auth_res{ + // self.waiting_streams + // .push(WaitForStream::H3Stream(WaitForH3Stream { + // stream_id, + // chan: Some(chan), + // })); + // } stream.queued_frames.push(response); } Ok(()) @@ -173,14 +182,15 @@ impl HysDriver { for frame in responses.response.iter() { sending_results.push(match frame { OutboundFrame::Headers(h) => { - debug!("{} send headers", qconn.trace_id()); + if self .h3conn_as_mut() - .send_response(qconn, stream_id, h.as_ref(), false) + .send_response(qconn, stream_id, h.as_ref(),responses.auth_res) .is_err() { Some(OutboundFrame::Headers((*h.clone()).to_owned())) } else { + debug!("{} send headers", qconn.trace_id()); None } } @@ -287,14 +297,16 @@ impl HysDriver { read_fin: false, }, ); + let _ = read_data.split_at(offset); } else { error!( - "client is sending invalid initial proxy request on stream id: {}", + "client is sending invalid initial proxy request on stream {}", stream_id ); } - } else { - //TODO Copying + } + else { + //have to clone let inbound_bytes = Bytes::copy_from_slice(&read_data); let _ = event.insert(HysEvent::QuicEvent( stream_id, @@ -431,9 +443,9 @@ impl ApplicationOverQuic for HysDriver { //TODO: proper logging trace!("new unverified stream id: {}", stream_id); } - } else { + } if let Some(context) = self.quic_context_map.get_mut(&stream_id) { - if !context.queued_bytes.is_empty() { + // if !context.queued_bytes.is_empty() { debug!( "writing len {} bytes quic traffic to the client", context.queued_bytes.len() @@ -456,7 +468,7 @@ impl ApplicationOverQuic for HysDriver { let sent = qconn.stream_send(stream_id, &*context.queued_bytes, false)?; context.queued_bytes = context.queued_bytes.split_off(sent); context.queued_bytes.reserve(65535); - } + // } if context.fin && context.queued_bytes.is_empty() { if let Err(e) = qconn.stream_shutdown(stream_id, Shutdown::Write, 0) { error!( @@ -477,9 +489,20 @@ impl ApplicationOverQuic for HysDriver { self.quic_context_map.remove(&stream_id); info!("{} stream {} fin", qconn.trace_id(), stream_id); } - } + // } } } Ok(()) } + + fn on_conn_close( + &mut self, + qconn: &mut QuicheConnection, + metrics: &M, + connection_result: &QuicResult<()>, + ){ + self.process_writes(qconn).unwrap(); + + warn!("{}: quic connection closed", qconn.trace_id()); + } } diff --git a/src/rusteriaServer/src/server.rs b/src/rusteriaServer/src/server.rs index 98577c6..6551b0b 100644 --- a/src/rusteriaServer/src/server.rs +++ b/src/rusteriaServer/src/server.rs @@ -60,16 +60,17 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t match event { //handle H3 event locally because ideally all auth event should be one shot HysEvent::H3Event(stream_id, h3_event, sender) => { - if !is_verified { + if is_verified { warn!("Verified user is sending h3 request again");} match h3_event { h3::Event::Headers { list, .. } => { let (auth_res, response) = auth::auth(list, &auth_token); is_verified = auth_res; //ignore sending error for now - sender + if let Err(e) = sender .send(H3Response { auth_res, response }) - .await - .unwrap(); + .await { + info!("The quic connection was closed before the h3 response can be sent"); + } } h3::Event::Finished => { //self.h3_outbound_map.remove(&stream_id); @@ -80,11 +81,7 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t info!("other h3 events:{:?}", other); } } - } else { - warn!("Verified user is sending h3 request again"); - } //TODO: maybe use oneshot channel - drop(sender); } HysEvent::QuicEvent(stream_id, proxy_event) => { match proxy_event { @@ -98,7 +95,7 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t let proxy_response = HysteriaTCPResponse::new( HysteriaTCPResponseStatus::Ok, - "Hello World!", + "Hello, world!", "padding", ) .into_bytes();