refactor: networking

This commit is contained in:
zhom
2026-03-20 02:43:01 +04:00
parent 8936816613
commit 116a54942d
5 changed files with 193 additions and 179 deletions
+83 -136
View File
@@ -883,6 +883,87 @@ fn build_reqwest_client_with_proxy(
Ok(client_builder.proxy(proxy).build()?)
}
/// Handle a single proxy connection (used by both the proxy worker and in-process proxy checks).
pub async fn handle_proxy_connection(
mut stream: tokio::net::TcpStream,
upstream_url: Option<String>,
bypass_matcher: BypassMatcher,
) {
let _ = stream.set_nodelay(true);
if stream.readable().await.is_err() {
return;
}
let mut peek_buffer = [0u8; 16];
match stream.read(&mut peek_buffer).await {
Ok(0) => {}
Ok(n) => {
let request_start_upper = String::from_utf8_lossy(&peek_buffer[..n.min(7)]).to_uppercase();
let is_connect = request_start_upper.starts_with("CONNECT");
if is_connect {
let mut full_request = Vec::with_capacity(4096);
full_request.extend_from_slice(&peek_buffer[..n]);
let mut remaining = [0u8; 4096];
let mut total_read = n;
let max_reads = 100;
let mut reads = 0;
loop {
if reads >= max_reads {
break;
}
match stream.read(&mut remaining).await {
Ok(0) => {
if full_request.ends_with(b"\r\n\r\n")
|| full_request.ends_with(b"\n\n")
|| total_read > 0
{
break;
}
return;
}
Ok(m) => {
reads += 1;
total_read += m;
full_request.extend_from_slice(&remaining[..m]);
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
}
Err(_) => {
if total_read > 0 {
break;
}
return;
}
}
}
let _ =
handle_connect_from_buffer(stream, full_request, upstream_url, bypass_matcher).await;
return;
}
// Non-CONNECT: prepend consumed bytes and pass to hyper
let prepended_bytes = peek_buffer[..n].to_vec();
let prepended_reader = PrependReader {
prepended: prepended_bytes,
prepended_pos: 0,
inner: stream,
};
let io = TokioIo::new(prepended_reader);
let service =
service_fn(move |req| handle_request(req, upstream_url.clone(), bypass_matcher.clone()));
let _ = http1::Builder::new().serve_connection(io, service).await;
}
Err(_) => {}
}
}
pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::error::Error>> {
log::error!(
"Proxy worker starting, looking for config id: {}",
@@ -1052,145 +1133,11 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
// This ensures the process doesn't exit even if there are no active connections
loop {
match listener.accept().await {
Ok((mut stream, peer_addr)) => {
// Enable TCP_NODELAY to ensure small packets are sent immediately
// This is critical for CONNECT responses to be sent before tunneling begins
let _ = stream.set_nodelay(true);
log::error!("DEBUG: Accepted connection from {:?}", peer_addr);
Ok((stream, _peer_addr)) => {
let upstream = upstream_url.clone();
let matcher = bypass_matcher.clone();
tokio::task::spawn(async move {
// Wait for the stream to have readable data before attempting to read.
// This prevents read() from returning 0 on a fresh connection before
// the client's data arrives.
if stream.readable().await.is_err() {
return;
}
let mut peek_buffer = [0u8; 16];
match stream.read(&mut peek_buffer).await {
Ok(0) => {}
Ok(n) => {
// Check if this looks like a CONNECT request
// Be more lenient - check if the first bytes match "CONNECT" (case-insensitive)
let request_start_upper =
String::from_utf8_lossy(&peek_buffer[..n.min(7)]).to_uppercase();
let is_connect = request_start_upper.starts_with("CONNECT");
log::error!(
"DEBUG: Read {} bytes, starts with: {:?}, is_connect: {}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(20)]),
is_connect
);
if is_connect {
// Handle CONNECT request manually for tunneling
let mut full_request = Vec::with_capacity(4096);
full_request.extend_from_slice(&peek_buffer[..n]);
// Read the rest of the CONNECT request until we have the full headers
// CONNECT requests end with \r\n\r\n (or \n\n)
let mut remaining = [0u8; 4096];
let mut total_read = n;
let max_reads = 100; // Prevent infinite loop
let mut reads = 0;
loop {
if reads >= max_reads {
log::error!("DEBUG: Max reads reached, breaking");
break;
}
match stream.read(&mut remaining).await {
Ok(0) => {
// Connection closed, but we might have a complete request
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
// If we have some data, try to process it anyway
if total_read > 0 {
break;
}
return; // No data at all
}
Ok(m) => {
reads += 1;
total_read += m;
full_request.extend_from_slice(&remaining[..m]);
// Check if we have complete headers
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
// Also check if we have enough to parse (at least "CONNECT host:port HTTP/1.x")
if total_read >= 20 {
// Check if we have a newline that might indicate end of request line
if let Some(pos) = full_request.iter().position(|&b| b == b'\n') {
if pos < full_request.len() - 1 {
// We have at least the request line, check if we have headers
let request_str = String::from_utf8_lossy(&full_request);
if request_str.contains("\r\n\r\n") || request_str.contains("\n\n") {
break;
}
}
}
}
}
Err(e) => {
log::error!("DEBUG: Error reading CONNECT request: {:?}", e);
// If we have some data, try to process it
if total_read > 0 {
break;
}
return;
}
}
}
// Handle CONNECT manually
log::error!(
"DEBUG: Handling CONNECT manually for: {}",
String::from_utf8_lossy(&full_request[..full_request.len().min(200)])
);
if let Err(e) =
handle_connect_from_buffer(stream, full_request, upstream, matcher).await
{
log::error!("Error handling CONNECT request: {:?}", e);
} else {
log::error!("DEBUG: CONNECT handled successfully");
}
return;
}
// Not CONNECT (or partial read) - reconstruct stream with consumed bytes prepended
// This is critical: we MUST prepend any bytes we consumed, even if < 7 bytes
log::error!(
"DEBUG: Non-CONNECT request, first {} bytes: {:?}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(50)])
);
let prepended_bytes = peek_buffer[..n].to_vec();
let prepended_reader = PrependReader {
prepended: prepended_bytes,
prepended_pos: 0,
inner: stream,
};
let io = TokioIo::new(prepended_reader);
let service =
service_fn(move |req| handle_request(req, upstream.clone(), matcher.clone()));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
log::error!("Error serving connection: {:?}", err);
}
}
Err(e) => {
log::error!("Error reading from connection: {:?}", e);
}
}
handle_proxy_connection(stream, upstream, matcher).await;
});
}
Err(e) => {