Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/libRusteria/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2024"
quiche = "*"
log = "*"
octets = "*"
bytes = "*"

[lib]
path = "src/lib.rs"
17 changes: 7 additions & 10 deletions src/libRusteria/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ pub struct HysteriaTcpRequest {
//potentially none resolved url
pub url: String,
pub padding: String,
//the pos of the last byte in the input slice
pub offset: usize,
}

//TODO error handling when you have time
impl HysteriaTcpRequest {
pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
pub fn from_bytes(bytes: &[u8]) -> Option< Self> {
let mut octet = Octets::with_slice(bytes);
let request_id = octet.get_varint().ok()?;
let addr_len = octet.get_varint().ok()?;
Expand All @@ -77,22 +79,17 @@ impl HysteriaTcpRequest {
let padding_len = octet.get_varint().ok()?;
let padding_bytes = octet.get_bytes(padding_len as usize).ok()?;
let padding = std::str::from_utf8(padding_bytes.buf()).ok()?.to_string();
let remaining_bytes = bytes.len() - octet.off();
let offset=octet.off();
info!(
"Parsed TCP request: request_id={}, addr_len={}, url={}, padding_len={}, remaining_bytes={}",
request_id, addr_len, url, padding_len, remaining_bytes
request_id, addr_len, url, padding_len, octet.len()-octet.off()
);
if remaining_bytes > 0 {
info!(
"remaining bytes in ascii: {}",
String::from_utf8_lossy(&bytes[octet.off()..])
);
}
//print remaining bytes

Some(Self {
request_id,
url,
padding,
offset,
})
}
}
Expand Down
41 changes: 32 additions & 9 deletions src/rusteriaServer/src/hysteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio_quiche::buffer_pool::PooledBuf;
use tokio_quiche::http3::driver::OutboundFrame;
use tokio_quiche::quic::{HandshakeInfo, QuicheConnection};
use tokio_quiche::{ApplicationOverQuic, QuicResult};
use tokio_quiche::metrics::Metrics;

const MAX_BUF_SIZE: usize = 65535 * 5; //64kb*5

Expand Down Expand Up @@ -118,6 +119,14 @@ impl HysDriver {
Some(stream) => {
// Get the response data before processing
if let Some(response) = response {
//if the verification attempt fails, rearm the receiver
// if !response.auth_res{
// self.waiting_streams
// .push(WaitForStream::H3Stream(WaitForH3Stream {
// stream_id,
// chan: Some(chan),
// }));
// }
stream.queued_frames.push(response);
}
Ok(())
Expand Down Expand Up @@ -173,14 +182,15 @@ impl HysDriver {
for frame in responses.response.iter() {
sending_results.push(match frame {
OutboundFrame::Headers(h) => {
debug!("{} send headers", qconn.trace_id());

if self
.h3conn_as_mut()
.send_response(qconn, stream_id, h.as_ref(), false)
.send_response(qconn, stream_id, h.as_ref(),responses.auth_res)
.is_err()
{
Some(OutboundFrame::Headers((*h.clone()).to_owned()))
} else {
debug!("{} send headers", qconn.trace_id());
None
}
}
Expand Down Expand Up @@ -287,14 +297,16 @@ impl HysDriver {
read_fin: false,
},
);
let _ = read_data.split_at(offset);
} else {
error!(
"client is sending invalid initial proxy request on stream id: {}",
"client is sending invalid initial proxy request on stream {}",
stream_id
);
}
} else {
//TODO Copying
}
else {
//have to clone
let inbound_bytes = Bytes::copy_from_slice(&read_data);
let _ = event.insert(HysEvent::QuicEvent(
stream_id,
Expand Down Expand Up @@ -431,9 +443,9 @@ impl ApplicationOverQuic for HysDriver {
//TODO: proper logging
trace!("new unverified stream id: {}", stream_id);
}
} else {
}
if let Some(context) = self.quic_context_map.get_mut(&stream_id) {
if !context.queued_bytes.is_empty() {
// if !context.queued_bytes.is_empty() {
debug!(
"writing len {} bytes quic traffic to the client",
context.queued_bytes.len()
Expand All @@ -456,7 +468,7 @@ impl ApplicationOverQuic for HysDriver {
let sent = qconn.stream_send(stream_id, &*context.queued_bytes, false)?;
context.queued_bytes = context.queued_bytes.split_off(sent);
context.queued_bytes.reserve(65535);
}
// }
if context.fin && context.queued_bytes.is_empty() {
if let Err(e) = qconn.stream_shutdown(stream_id, Shutdown::Write, 0) {
error!(
Expand All @@ -477,9 +489,20 @@ impl ApplicationOverQuic for HysDriver {
self.quic_context_map.remove(&stream_id);
info!("{} stream {} fin", qconn.trace_id(), stream_id);
}
}
// }
}
}
Ok(())
}

fn on_conn_close<M: Metrics>(
&mut self,
qconn: &mut QuicheConnection,
metrics: &M,
connection_result: &QuicResult<()>,
){
self.process_writes(qconn).unwrap();

warn!("{}: quic connection closed", qconn.trace_id());
}
}
15 changes: 6 additions & 9 deletions src/rusteriaServer/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,17 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t
match event {
//handle H3 event locally because ideally all auth event should be one shot
HysEvent::H3Event(stream_id, h3_event, sender) => {
if !is_verified {
if is_verified { warn!("Verified user is sending h3 request again");}
match h3_event {
h3::Event::Headers { list, .. } => {
let (auth_res, response) = auth::auth(list, &auth_token);
is_verified = auth_res;
//ignore sending error for now
sender
if let Err(e) = sender
.send(H3Response { auth_res, response })
.await
.unwrap();
.await {
info!("The quic connection was closed before the h3 response can be sent");
}
}
h3::Event::Finished => {
//self.h3_outbound_map.remove(&stream_id);
Expand All @@ -80,11 +81,7 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t
info!("other h3 events:{:?}", other);
}
}
} else {
warn!("Verified user is sending h3 request again");
}
//TODO: maybe use oneshot channel
drop(sender);
}
HysEvent::QuicEvent(stream_id, proxy_event) => {
match proxy_event {
Expand All @@ -98,7 +95,7 @@ async fn handle_connection(mut controller: HysController, handle: Handle, auth_t

let proxy_response = HysteriaTCPResponse::new(
HysteriaTCPResponseStatus::Ok,
"Hello World!",
"Hello, world!",
"padding",
)
.into_bytes();
Expand Down
Loading