mirror of
https://github.com/zhom/donutbrowser.git
synced 2026-05-13 04:54:44 +02:00
feat: shadowsocks
This commit is contained in:
+199
-32
@@ -18,6 +18,13 @@ use std::task::{Context, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
/// Combined read+write trait for tunnel target streams, allowing
|
||||
/// `handle_connect_from_buffer` to handle plain TCP, SOCKS, and
|
||||
/// Shadowsocks through the same bidirectional-copy path.
|
||||
trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {}
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsyncStream for T {}
|
||||
type BoxedAsyncStream = Box<dyn AsyncStream>;
|
||||
use url::Url;
|
||||
|
||||
enum CompiledRule {
|
||||
@@ -770,6 +777,127 @@ async fn handle_http_via_socks4(
|
||||
Ok(hyper_response)
|
||||
}
|
||||
|
||||
/// Handle plain HTTP requests through a Shadowsocks upstream.
|
||||
/// reqwest doesn't support SS natively, so we connect through the SS tunnel
|
||||
/// manually and forward the HTTP request/response.
|
||||
async fn handle_http_via_shadowsocks(
|
||||
req: Request<hyper::body::Incoming>,
|
||||
upstream: &Url,
|
||||
) -> Result<Response<Full<Bytes>>, Infallible> {
|
||||
let domain = req
|
||||
.uri()
|
||||
.host()
|
||||
.map(|h| h.to_string())
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
let port = req.uri().port_u16().unwrap_or(80);
|
||||
|
||||
let ss_host = upstream.host_str().unwrap_or("127.0.0.1");
|
||||
let ss_port = upstream.port().unwrap_or(8388);
|
||||
let method_str = urlencoding::decode(upstream.username())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
let password = urlencoding::decode(upstream.password().unwrap_or(""))
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
let cipher = match method_str.parse::<shadowsocks::crypto::CipherKind>() {
|
||||
Ok(c) => c,
|
||||
Err(_) => {
|
||||
let mut resp = Response::new(Full::new(Bytes::from(format!(
|
||||
"Bad SS cipher: {method_str}"
|
||||
))));
|
||||
*resp.status_mut() = StatusCode::BAD_GATEWAY;
|
||||
return Ok(resp);
|
||||
}
|
||||
};
|
||||
|
||||
let context = shadowsocks::context::Context::new_shared(shadowsocks::config::ServerType::Local);
|
||||
let svr_cfg = match shadowsocks::config::ServerConfig::new(
|
||||
shadowsocks::config::ServerAddr::from((ss_host.to_string(), ss_port)),
|
||||
&password,
|
||||
cipher,
|
||||
) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let mut resp = Response::new(Full::new(Bytes::from(format!("SS config error: {e}"))));
|
||||
*resp.status_mut() = StatusCode::BAD_GATEWAY;
|
||||
return Ok(resp);
|
||||
}
|
||||
};
|
||||
|
||||
let target_addr = shadowsocks::relay::Address::DomainNameAddress(domain.clone(), port);
|
||||
|
||||
let mut stream = match shadowsocks::relay::tcprelay::proxy_stream::ProxyClientStream::connect(
|
||||
context,
|
||||
&svr_cfg,
|
||||
target_addr,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
let mut resp = Response::new(Full::new(Bytes::from(format!("SS connect: {e}"))));
|
||||
*resp.status_mut() = StatusCode::BAD_GATEWAY;
|
||||
return Ok(resp);
|
||||
}
|
||||
};
|
||||
|
||||
// Build and send the HTTP request through the SS tunnel
|
||||
let path = req
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.map(|pq| pq.as_str())
|
||||
.unwrap_or("/");
|
||||
let method = req.method().as_str();
|
||||
let mut raw_req = format!("{method} {path} HTTP/1.1\r\nHost: {domain}\r\nConnection: close\r\n");
|
||||
for (name, value) in req.headers() {
|
||||
if name != "host" && name != "connection" {
|
||||
raw_req.push_str(&format!("{}: {}\r\n", name, value.to_str().unwrap_or("")));
|
||||
}
|
||||
}
|
||||
raw_req.push_str("\r\n");
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
if let Err(e) = stream.write_all(raw_req.as_bytes()).await {
|
||||
let mut resp = Response::new(Full::new(Bytes::from(format!("SS write: {e}"))));
|
||||
*resp.status_mut() = StatusCode::BAD_GATEWAY;
|
||||
return Ok(resp);
|
||||
}
|
||||
|
||||
let mut response_buf = Vec::new();
|
||||
if let Err(e) = stream.read_to_end(&mut response_buf).await {
|
||||
log::warn!("SS read error (may be partial): {e}");
|
||||
}
|
||||
|
||||
if let Some(tracker) = get_traffic_tracker() {
|
||||
tracker.record_request(&domain, raw_req.len() as u64, response_buf.len() as u64);
|
||||
}
|
||||
|
||||
// Parse the raw HTTP response
|
||||
let response_str = String::from_utf8_lossy(&response_buf);
|
||||
let header_end = response_str.find("\r\n\r\n").unwrap_or(response_str.len());
|
||||
let status_line = response_str
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("HTTP/1.1 502 Bad Gateway");
|
||||
let status_code: u16 = status_line
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(502);
|
||||
let body = if header_end + 4 < response_buf.len() {
|
||||
&response_buf[header_end + 4..]
|
||||
} else {
|
||||
b""
|
||||
};
|
||||
|
||||
let mut hyper_response = Response::new(Full::new(Bytes::from(body.to_vec())));
|
||||
*hyper_response.status_mut() =
|
||||
StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY);
|
||||
|
||||
Ok(hyper_response)
|
||||
}
|
||||
|
||||
async fn handle_http(
|
||||
req: Request<hyper::body::Incoming>,
|
||||
upstream_url: Option<String>,
|
||||
@@ -800,14 +928,19 @@ async fn handle_http(
|
||||
|
||||
let should_bypass = bypass_matcher.should_bypass(&domain);
|
||||
|
||||
// Check if we need to handle SOCKS4 manually (reqwest doesn't support it)
|
||||
// Handle proxy types that reqwest doesn't support natively
|
||||
if !should_bypass {
|
||||
if let Some(ref upstream) = upstream_url {
|
||||
if upstream != "DIRECT" {
|
||||
if let Ok(url) = Url::parse(upstream) {
|
||||
if url.scheme() == "socks4" {
|
||||
// Handle SOCKS4 manually for HTTP requests
|
||||
return handle_http_via_socks4(req, upstream).await;
|
||||
match url.scheme() {
|
||||
"socks4" => {
|
||||
return handle_http_via_socks4(req, upstream).await;
|
||||
}
|
||||
"ss" | "shadowsocks" => {
|
||||
return handle_http_via_shadowsocks(req, &url).await;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1298,37 +1431,24 @@ async fn handle_connect_from_buffer(
|
||||
tracker.record_request(&domain, 0, 0);
|
||||
}
|
||||
|
||||
// Connect to target (directly or via upstream proxy)
|
||||
// Connect to target (directly or via upstream proxy).
|
||||
// Returns a BoxedAsyncStream so all upstream types (plain TCP, SOCKS,
|
||||
// Shadowsocks) share the same bidirectional-copy tunnel code below.
|
||||
let should_bypass = bypass_matcher.should_bypass(target_host);
|
||||
let target_stream = match upstream_url.as_ref() {
|
||||
None => {
|
||||
// Direct connection
|
||||
TcpStream::connect((target_host, target_port)).await?
|
||||
}
|
||||
Some(url) if url == "DIRECT" => {
|
||||
// Direct connection
|
||||
TcpStream::connect((target_host, target_port)).await?
|
||||
}
|
||||
_ if should_bypass => {
|
||||
// Bypass rule matched - connect directly
|
||||
TcpStream::connect((target_host, target_port)).await?
|
||||
}
|
||||
let target_stream: BoxedAsyncStream = match upstream_url.as_ref() {
|
||||
None => Box::new(TcpStream::connect((target_host, target_port)).await?),
|
||||
Some(url) if url == "DIRECT" => Box::new(TcpStream::connect((target_host, target_port)).await?),
|
||||
_ if should_bypass => Box::new(TcpStream::connect((target_host, target_port)).await?),
|
||||
Some(upstream_url_str) => {
|
||||
// Connect via upstream proxy
|
||||
let upstream = Url::parse(upstream_url_str)?;
|
||||
let scheme = upstream.scheme();
|
||||
|
||||
match scheme {
|
||||
"http" | "https" => {
|
||||
// Connect via HTTP/HTTPS proxy CONNECT
|
||||
// Note: HTTPS proxy URLs still use HTTP CONNECT method (CONNECT is always HTTP-based)
|
||||
// For HTTPS proxies, reqwest handles TLS automatically in handle_http
|
||||
// For manual CONNECT here, we use plain TCP - HTTPS proxy CONNECT typically works over plain TCP
|
||||
let proxy_host = upstream.host_str().unwrap_or("127.0.0.1");
|
||||
let proxy_port = upstream.port().unwrap_or(8080);
|
||||
let mut proxy_stream = TcpStream::connect((proxy_host, proxy_port)).await?;
|
||||
|
||||
// Add authentication if provided
|
||||
let mut connect_req = format!(
|
||||
"CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n",
|
||||
target_host, target_port, target_host, target_port
|
||||
@@ -1344,10 +1464,8 @@ async fn handle_connect_from_buffer(
|
||||
|
||||
connect_req.push_str("\r\n");
|
||||
|
||||
// Send CONNECT request to upstream proxy
|
||||
proxy_stream.write_all(connect_req.as_bytes()).await?;
|
||||
|
||||
// Read response
|
||||
let mut buffer = [0u8; 4096];
|
||||
let n = proxy_stream.read(&mut buffer).await?;
|
||||
let response = String::from_utf8_lossy(&buffer[..n]);
|
||||
@@ -1356,10 +1474,9 @@ async fn handle_connect_from_buffer(
|
||||
return Err(format!("Upstream proxy CONNECT failed: {}", response).into());
|
||||
}
|
||||
|
||||
proxy_stream
|
||||
Box::new(proxy_stream)
|
||||
}
|
||||
"socks4" | "socks5" => {
|
||||
// Connect via SOCKS proxy
|
||||
let socks_host = upstream.host_str().unwrap_or("127.0.0.1");
|
||||
let socks_port = upstream.port().unwrap_or(1080);
|
||||
let socks_addr = format!("{}:{}", socks_host, socks_port);
|
||||
@@ -1367,7 +1484,7 @@ async fn handle_connect_from_buffer(
|
||||
let username = upstream.username();
|
||||
let password = upstream.password().unwrap_or("");
|
||||
|
||||
connect_via_socks(
|
||||
let stream = connect_via_socks(
|
||||
&socks_addr,
|
||||
target_host,
|
||||
target_port,
|
||||
@@ -1378,7 +1495,56 @@ async fn handle_connect_from_buffer(
|
||||
None
|
||||
},
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
Box::new(stream)
|
||||
}
|
||||
"ss" | "shadowsocks" => {
|
||||
// Shadowsocks: URL format is ss://method:password@host:port
|
||||
// where "method" is the cipher (e.g. aes-256-gcm, chacha20-ietf-poly1305)
|
||||
// and "password" is the SS server password.
|
||||
let ss_host = upstream.host_str().unwrap_or("127.0.0.1");
|
||||
let ss_port = upstream.port().unwrap_or(8388);
|
||||
|
||||
// The "username" field carries the cipher method
|
||||
let method_str = urlencoding::decode(upstream.username())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
let password = urlencoding::decode(upstream.password().unwrap_or(""))
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
if method_str.is_empty() || password.is_empty() {
|
||||
return Err(
|
||||
"Shadowsocks requires method and password (URL: ss://method:password@host:port)"
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
|
||||
let cipher = method_str.parse::<shadowsocks::crypto::CipherKind>().map_err(|_| {
|
||||
format!("Unsupported Shadowsocks cipher: {method_str}. Use e.g. aes-256-gcm, chacha20-ietf-poly1305, aes-128-gcm")
|
||||
})?;
|
||||
|
||||
let context =
|
||||
shadowsocks::context::Context::new_shared(shadowsocks::config::ServerType::Local);
|
||||
let svr_cfg = shadowsocks::config::ServerConfig::new(
|
||||
shadowsocks::config::ServerAddr::from((ss_host.to_string(), ss_port)),
|
||||
&password,
|
||||
cipher,
|
||||
)
|
||||
.map_err(|e| format!("Invalid Shadowsocks config: {e}"))?;
|
||||
|
||||
let target_addr =
|
||||
shadowsocks::relay::Address::DomainNameAddress(target_host.to_string(), target_port);
|
||||
|
||||
let stream = shadowsocks::relay::tcprelay::proxy_stream::ProxyClientStream::connect(
|
||||
context,
|
||||
&svr_cfg,
|
||||
target_addr,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("Shadowsocks connection failed: {e}"))?;
|
||||
|
||||
Box::new(stream)
|
||||
}
|
||||
_ => {
|
||||
return Err(format!("Unsupported upstream proxy scheme: {}", scheme).into());
|
||||
@@ -1387,8 +1553,9 @@ async fn handle_connect_from_buffer(
|
||||
}
|
||||
};
|
||||
|
||||
// Enable TCP_NODELAY on target stream for immediate data transfer
|
||||
let _ = target_stream.set_nodelay(true);
|
||||
// TCP_NODELAY is set per-stream where applicable (TcpStream paths).
|
||||
// For encrypted streams (Shadowsocks), the underlying TCP connection
|
||||
// is managed by the library and nodelay is handled internally.
|
||||
|
||||
// Send 200 Connection Established response to client
|
||||
// CRITICAL: Must flush after writing to ensure response is sent before tunneling
|
||||
|
||||
Reference in New Issue
Block a user