refactor: cleanup and decouple

This commit is contained in:
zhom
2026-02-20 04:44:35 +04:00
parent 0f579cb97d
commit e5663515a7
47 changed files with 4501 additions and 770 deletions
+1
View File
@@ -601,6 +601,7 @@ async fn create_profile(
&request.version,
request.release_type.as_deref().unwrap_or("stable"),
request.proxy_id.clone(),
None, // vpn_id
camoufox_config,
wayfern_config,
request.group_id.clone(),
+1
View File
@@ -511,6 +511,7 @@ mod tests {
version: version.to_string(),
process_id: None,
proxy_id: None,
vpn_id: None,
last_launch: None,
release_type: "stable".to_string(),
camoufox_config: None,
+2 -5
View File
@@ -243,7 +243,8 @@ fn run_daemon() {
// Use swap to only run cleanup once
if SHOULD_QUIT.swap(false, Ordering::SeqCst) {
// Cleanup
tray::quit_gui();
let mut state = read_state();
state.daemon_pid = None;
let _ = write_state(&state);
@@ -357,10 +358,6 @@ fn main() {
match args[1].as_str() {
"start" => {
// "start" is now an alias for "run"
// On macOS, the daemon should be started via launchctl (see daemon_spawn.rs)
// This command is kept for backward compatibility
eprintln!("Starting daemon...");
run_daemon();
}
"stop" => {
+119
View File
@@ -172,6 +172,24 @@ async fn main() {
)
.arg(Arg::new("action").required(true).help("Action (start)")),
)
.subcommand(
Command::new("vpn-worker")
.about("Run a VPN worker process (internal use)")
.arg(
Arg::new("id")
.long("id")
.required(true)
.help("VPN worker configuration ID"),
)
.arg(
Arg::new("port")
.long("port")
.value_parser(clap::value_parser!(u16))
.required(true)
.help("Local SOCKS5 port"),
)
.arg(Arg::new("action").required(true).help("Action (start)")),
)
.get_matches();
if let Some(proxy_matches) = matches.subcommand_matches("proxy") {
@@ -333,6 +351,107 @@ async fn main() {
log::error!("Invalid action for proxy-worker. Use 'start'");
process::exit(1);
}
} else if let Some(vpn_matches) = matches.subcommand_matches("vpn-worker") {
let id = vpn_matches.get_one::<String>("id").expect("id is required");
let action = vpn_matches
.get_one::<String>("action")
.expect("action is required");
let port = *vpn_matches
.get_one::<u16>("port")
.expect("port is required");
if action == "start" {
set_high_priority();
log::info!("VPN worker starting, config id: {}", id);
log::info!("Process PID: {}", std::process::id());
// Retry config loading to handle file system race condition
let config = {
let mut attempts = 0;
loop {
if let Some(config) = donutbrowser_lib::vpn_worker_storage::get_vpn_worker_config(id) {
log::info!(
"Found VPN worker config: id={}, vpn_type={}, vpn_id={}",
config.id,
config.vpn_type,
config.vpn_id
);
break config;
}
attempts += 1;
if attempts >= 10 {
log::error!(
"VPN worker configuration {} not found after {} attempts",
id,
attempts
);
process::exit(1);
}
log::info!(
"VPN worker config {} not found yet, retrying ({}/10)...",
id,
attempts
);
std::thread::sleep(std::time::Duration::from_millis(50));
}
};
// Read the decrypted VPN config from the temp file
let vpn_config_data = match std::fs::read_to_string(&config.config_file_path) {
Ok(data) => data,
Err(e) => {
log::error!(
"Failed to read VPN config file {}: {}",
config.config_file_path,
e
);
process::exit(1);
}
};
match config.vpn_type.as_str() {
"wireguard" => {
let wg_config = match donutbrowser_lib::vpn::parse_wireguard_config(&vpn_config_data) {
Ok(c) => c,
Err(e) => {
log::error!("Failed to parse WireGuard config: {}", e);
process::exit(1);
}
};
let server =
donutbrowser_lib::vpn::socks5_server::WireGuardSocks5Server::new(wg_config, port);
if let Err(e) = server.run(id.clone()).await {
log::error!("VPN worker failed: {}", e);
process::exit(1);
}
}
"openvpn" => {
let ovpn_config = match donutbrowser_lib::vpn::parse_openvpn_config(&vpn_config_data) {
Ok(c) => c,
Err(e) => {
log::error!("Failed to parse OpenVPN config: {}", e);
process::exit(1);
}
};
let server =
donutbrowser_lib::vpn::openvpn_socks5::OpenVpnSocks5Server::new(ovpn_config, port);
if let Err(e) = server.run(id.clone()).await {
log::error!("VPN worker failed: {}", e);
process::exit(1);
}
}
other => {
log::error!("Unknown VPN type: {}", other);
process::exit(1);
}
}
} else {
log::error!("Invalid action for vpn-worker. Use 'start'");
process::exit(1);
}
} else {
log::error!("No command specified");
process::exit(1);
+72 -3
View File
@@ -113,11 +113,34 @@ impl BrowserRunner {
});
// Always start a local proxy for Camoufox (for traffic monitoring and geoip support)
let upstream_proxy = profile
let mut upstream_proxy = profile
.proxy_id
.as_ref()
.and_then(|id| PROXY_MANAGER.get_proxy_settings_by_id(id));
// If profile has a VPN instead of proxy, start VPN worker and use it as upstream
if upstream_proxy.is_none() {
if let Some(ref vpn_id) = profile.vpn_id {
match crate::vpn_worker_runner::start_vpn_worker(vpn_id).await {
Ok(vpn_worker) => {
if let Some(port) = vpn_worker.local_port {
upstream_proxy = Some(ProxySettings {
proxy_type: "socks5".to_string(),
host: "127.0.0.1".to_string(),
port,
username: None,
password: None,
});
log::info!("VPN worker started for Camoufox profile on port {}", port);
}
}
Err(e) => {
return Err(format!("Failed to start VPN worker: {e}").into());
}
}
}
}
log::info!(
"Starting local proxy for Camoufox profile: {} (upstream: {})",
profile.name,
@@ -312,11 +335,34 @@ impl BrowserRunner {
});
// Always start a local proxy for Wayfern (for traffic monitoring and geoip support)
let upstream_proxy = profile
let mut upstream_proxy = profile
.proxy_id
.as_ref()
.and_then(|id| PROXY_MANAGER.get_proxy_settings_by_id(id));
// If profile has a VPN instead of proxy, start VPN worker and use it as upstream
if upstream_proxy.is_none() {
if let Some(ref vpn_id) = profile.vpn_id {
match crate::vpn_worker_runner::start_vpn_worker(vpn_id).await {
Ok(vpn_worker) => {
if let Some(port) = vpn_worker.local_port {
upstream_proxy = Some(ProxySettings {
proxy_type: "socks5".to_string(),
host: "127.0.0.1".to_string(),
port,
username: None,
password: None,
});
log::info!("VPN worker started for Wayfern profile on port {}", port);
}
}
Err(e) => {
return Err(format!("Failed to start VPN worker: {e}").into());
}
}
}
}
log::info!(
"Starting local proxy for Wayfern profile: {} (upstream: {})",
profile.name,
@@ -2413,11 +2459,34 @@ pub async fn launch_browser_profile(
// This ensures all traffic goes through the local proxy for monitoring and future features
if profile.browser != "camoufox" && profile.browser != "wayfern" {
// Determine upstream proxy if configured; otherwise use DIRECT (no upstream)
let upstream_proxy = profile_for_launch
let mut upstream_proxy = profile_for_launch
.proxy_id
.as_ref()
.and_then(|id| PROXY_MANAGER.get_proxy_settings_by_id(id));
// If profile has a VPN instead of proxy, start VPN worker and use it as upstream
if upstream_proxy.is_none() {
if let Some(ref vpn_id) = profile_for_launch.vpn_id {
match crate::vpn_worker_runner::start_vpn_worker(vpn_id).await {
Ok(vpn_worker) => {
if let Some(port) = vpn_worker.local_port {
upstream_proxy = Some(ProxySettings {
proxy_type: "socks5".to_string(),
host: "127.0.0.1".to_string(),
port,
username: None,
password: None,
});
log::info!("VPN worker started for profile on port {}", port);
}
}
Err(e) => {
return Err(format!("Failed to start VPN worker: {e}"));
}
}
}
}
// Use a temporary PID (1) to start the proxy, we'll update it after browser launch
let temp_pid = 1u32;
let profile_id_str = profile.id.to_string();
+22 -7
View File
@@ -103,11 +103,6 @@ pub fn enable_autostart() -> io::Result<()> {
<true/>
<key>LimitLoadToSessionType</key>
<string>Aqua</string>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>ProcessType</key>
<string>Interactive</string>
<key>StandardOutPath</key>
@@ -188,6 +183,26 @@ pub fn load_launch_agent() -> io::Result<()> {
Ok(())
}
#[cfg(target_os = "macos")]
pub fn start_launch_agent() -> io::Result<()> {
use std::process::Command;
let output = Command::new("launchctl")
.args(["start", "com.donutbrowser.daemon"])
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(io::Error::other(format!(
"launchctl start failed: {}",
stderr
)));
}
log::info!("Started launch agent via launchctl");
Ok(())
}
#[cfg(target_os = "macos")]
pub fn unload_launch_agent() -> io::Result<()> {
use std::process::Command;
@@ -233,7 +248,7 @@ pub fn enable_autostart() -> io::Result<()> {
r#"[Desktop Entry]
Type=Application
Name=Donut Browser Daemon
Exec={} start
Exec={} run
Hidden=false
NoDisplay=true
X-GNOME-Autostart-enabled=true
@@ -281,7 +296,7 @@ pub fn enable_autostart() -> io::Result<()> {
key.set_value(
"DonutBrowserDaemon",
&format!("\"{}\" start", daemon_path.display()),
&format!("\"{}\" run", daemon_path.display()),
)?;
log::info!("Added registry autostart entry");
+26
View File
@@ -127,6 +127,32 @@ pub fn activate_gui() {
}
}
pub fn quit_gui() {
log::info!("[daemon] Quitting GUI...");
#[cfg(target_os = "macos")]
{
let _ = Command::new("osascript")
.args(["-e", "tell application \"Donut Browser\" to quit"])
.output();
}
#[cfg(target_os = "windows")]
{
let _ = Command::new("taskkill")
.args(["/IM", "Donut.exe", "/F"])
.output();
let _ = Command::new("taskkill")
.args(["/IM", "donutbrowser.exe", "/F"])
.output();
}
#[cfg(target_os = "linux")]
{
let _ = Command::new("pkill").args(["-x", "donutbrowser"]).output();
}
}
pub fn set_gui_running(running: bool) {
GUI_RUNNING.store(running, Ordering::SeqCst);
}
+6 -1
View File
@@ -32,7 +32,7 @@ fn read_state() -> DaemonState {
DaemonState::default()
}
fn is_daemon_running() -> bool {
pub fn is_daemon_running() -> bool {
let state = read_state();
if let Some(pid) = state.daemon_pid {
@@ -243,6 +243,11 @@ fn spawn_daemon_macos() -> Result<(), String> {
autostart::load_launch_agent().map_err(|e| format!("Failed to load LaunchAgent: {}", e))?;
log::info!("launchctl load completed");
// Also explicitly start the agent in case it was already loaded but stopped
if let Err(e) = autostart::start_launch_agent() {
log::debug!("launchctl start note (non-fatal): {}", e);
}
Ok(())
}
+189 -70
View File
@@ -49,6 +49,8 @@ mod mcp_server;
mod tag_manager;
mod version_updater;
pub mod vpn;
pub mod vpn_worker_runner;
pub mod vpn_worker_storage;
use browser_runner::{
check_browser_exists, kill_browser_profile, launch_browser_profile, open_url_with_profile,
@@ -57,7 +59,7 @@ use browser_runner::{
use profile::manager::{
check_browser_status, clone_profile, create_browser_profile_new, delete_profile,
list_browser_profiles, rename_profile, update_camoufox_config, update_profile_note,
update_profile_proxy, update_profile_tags, update_wayfern_config,
update_profile_proxy, update_profile_tags, update_profile_vpn, update_wayfern_config,
};
use browser_version_manager::{
@@ -80,8 +82,9 @@ use settings_manager::{
};
use sync::{
is_group_in_use_by_synced_profile, is_proxy_in_use_by_synced_profile, request_profile_sync,
set_group_sync_enabled, set_profile_sync_enabled, set_proxy_sync_enabled,
is_group_in_use_by_synced_profile, is_proxy_in_use_by_synced_profile,
is_vpn_in_use_by_synced_profile, request_profile_sync, set_group_sync_enabled,
set_profile_sync_enabled, set_proxy_sync_enabled, set_vpn_sync_enabled,
};
use tag_manager::get_all_tags;
@@ -469,69 +472,142 @@ async fn get_vpn_config(vpn_id: String) -> Result<vpn::VpnConfig, String> {
}
#[tauri::command]
async fn delete_vpn_config(vpn_id: String) -> Result<(), String> {
// First disconnect if connected
async fn delete_vpn_config(app_handle: tauri::AppHandle, vpn_id: String) -> Result<(), String> {
// First disconnect if connected (stop VPN worker)
let _ = vpn_worker_runner::stop_vpn_worker_by_vpn_id(&vpn_id).await;
// Check if sync was enabled before deleting
let was_sync_enabled = {
let storage = vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.load_config(&vpn_id)
.map(|c| c.sync_enabled)
.unwrap_or(false)
};
// Delete from storage
{
let mut manager = vpn::TUNNEL_MANAGER.lock().await;
if manager.is_tunnel_active(&vpn_id) {
if let Some(tunnel) = manager.get_tunnel_mut(&vpn_id) {
let _ = tunnel.disconnect().await;
}
manager.remove_tunnel(&vpn_id);
}
}
// Then delete from storage
let storage = vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.delete_config(&vpn_id)
.map_err(|e| format!("Failed to delete VPN config: {e}"))
}
#[tauri::command]
async fn connect_vpn(vpn_id: String) -> Result<(), String> {
// Load config from storage
let config = {
let storage = vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.load_config(&vpn_id)
.map_err(|e| format!("Failed to load VPN config: {e}"))?
};
// Create and connect the appropriate tunnel
let mut manager = vpn::TUNNEL_MANAGER.lock().await;
// Check if already connected
if manager.is_tunnel_active(&vpn_id) {
return Ok(());
.delete_config(&vpn_id)
.map_err(|e| format!("Failed to delete VPN config: {e}"))?;
}
let mut tunnel: Box<dyn vpn::VpnTunnel> = match config.vpn_type {
vpn::VpnType::WireGuard => {
let wg_config = vpn::parse_wireguard_config(&config.config_data)
.map_err(|e| format!("Invalid WireGuard config: {e}"))?;
Box::new(vpn::WireGuardTunnel::new(vpn_id.clone(), wg_config))
// If sync was enabled, also delete from remote
if was_sync_enabled {
let vpn_id_clone = vpn_id.clone();
let app_handle_clone = app_handle.clone();
tauri::async_runtime::spawn(async move {
match sync::SyncEngine::create_from_settings(&app_handle_clone).await {
Ok(engine) => {
if let Err(e) = engine.delete_vpn(&vpn_id_clone).await {
log::warn!("Failed to delete VPN {} from sync: {}", vpn_id_clone, e);
} else {
log::info!("VPN {} deleted from sync storage", vpn_id_clone);
}
}
Err(e) => {
log::debug!("Sync not configured, skipping remote VPN deletion: {}", e);
}
}
});
}
let _ = events::emit("vpn-configs-changed", ());
Ok(())
}
#[tauri::command]
async fn create_vpn_config_manual(
name: String,
vpn_type: vpn::VpnType,
config_data: String,
) -> Result<vpn::VpnConfig, String> {
let storage = vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.create_config_manual(&name, vpn_type, &config_data)
.map_err(|e| format!("Failed to create VPN config: {e}"))
}
#[tauri::command]
async fn update_vpn_config(vpn_id: String, name: String) -> Result<vpn::VpnConfig, String> {
let storage = vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.update_config_name(&vpn_id, &name)
.map_err(|e| format!("Failed to update VPN config: {e}"))
}
#[tauri::command]
async fn check_vpn_validity(
vpn_id: String,
) -> Result<crate::proxy_manager::ProxyCheckResult, String> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Start a temporary VPN worker to send real traffic
let vpn_worker = vpn_worker_runner::start_vpn_worker(&vpn_id)
.await
.map_err(|e| format!("Failed to start VPN worker: {e}"))?;
let socks_url = format!("socks5://127.0.0.1:{}", vpn_worker.local_port.unwrap_or(0));
// Fetch public IP through the VPN SOCKS5 proxy
let result = match ip_utils::fetch_public_ip(Some(&socks_url)).await {
Ok(ip) => {
let (city, country, country_code) =
crate::proxy_manager::ProxyManager::get_ip_geolocation(&ip)
.await
.unwrap_or_default();
crate::proxy_manager::ProxyCheckResult {
ip,
city,
country,
country_code,
timestamp: now,
is_valid: true,
}
}
vpn::VpnType::OpenVPN => {
let ovpn_config = vpn::parse_openvpn_config(&config.config_data)
.map_err(|e| format!("Invalid OpenVPN config: {e}"))?;
Box::new(vpn::OpenVpnTunnel::new(vpn_id.clone(), ovpn_config))
Err(e) => {
log::warn!("VPN check failed to fetch public IP: {e}");
crate::proxy_manager::ProxyCheckResult {
ip: String::new(),
city: None,
country: None,
country_code: None,
timestamp: now,
is_valid: false,
}
}
};
tunnel
.connect()
// Stop the temporary VPN worker
let _ = vpn_worker_runner::stop_vpn_worker(&vpn_worker.id).await;
Ok(result)
}
#[tauri::command]
async fn connect_vpn(vpn_id: String) -> Result<(), String> {
// Start VPN worker process (detached, survives GUI shutdown)
vpn_worker_runner::start_vpn_worker(&vpn_id)
.await
.map_err(|e| format!("Failed to connect VPN: {e}"))?;
manager.register_tunnel(vpn_id.clone(), tunnel);
// Update last_used timestamp
{
let storage = vpn::VPN_STORAGE
@@ -545,27 +621,27 @@ async fn connect_vpn(vpn_id: String) -> Result<(), String> {
#[tauri::command]
async fn disconnect_vpn(vpn_id: String) -> Result<(), String> {
let mut manager = vpn::TUNNEL_MANAGER.lock().await;
if let Some(tunnel) = manager.get_tunnel_mut(&vpn_id) {
tunnel
.disconnect()
.await
.map_err(|e| format!("Failed to disconnect VPN: {e}"))?;
}
manager.remove_tunnel(&vpn_id);
vpn_worker_runner::stop_vpn_worker_by_vpn_id(&vpn_id)
.await
.map_err(|e| format!("Failed to disconnect VPN: {e}"))?;
Ok(())
}
#[tauri::command]
async fn get_vpn_status(vpn_id: String) -> Result<vpn::VpnStatus, String> {
let manager = vpn::TUNNEL_MANAGER.lock().await;
use crate::proxy_storage::is_process_running;
if let Some(tunnel) = manager.get_tunnel(&vpn_id) {
Ok(tunnel.get_status())
if let Some(worker) = vpn_worker_storage::find_vpn_worker_by_vpn_id(&vpn_id) {
let connected = worker.pid.map(is_process_running).unwrap_or(false);
Ok(vpn::VpnStatus {
connected,
vpn_id,
connected_at: None,
bytes_sent: None,
bytes_received: None,
last_handshake: None,
})
} else {
// Not connected
Ok(vpn::VpnStatus {
connected: false,
vpn_id,
@@ -579,8 +655,23 @@ async fn get_vpn_status(vpn_id: String) -> Result<vpn::VpnStatus, String> {
#[tauri::command]
async fn list_active_vpn_connections() -> Result<Vec<vpn::VpnStatus>, String> {
let manager = vpn::TUNNEL_MANAGER.lock().await;
Ok(manager.get_all_statuses())
use crate::proxy_storage::is_process_running;
let workers = vpn_worker_storage::list_vpn_worker_configs();
Ok(
workers
.into_iter()
.filter(|w| w.pid.map(is_process_running).unwrap_or(false))
.map(|w| vpn::VpnStatus {
connected: true,
vpn_id: w.vpn_id,
connected_at: None,
bytes_sent: None,
bytes_received: None,
last_handshake: None,
})
.collect(),
)
}
#[cfg_attr(mobile, tauri::mobile_entry_point)]
@@ -645,6 +736,30 @@ pub fn run() {
log::warn!("Failed to start daemon: {e}");
}
// Monitor daemon health - quit GUI if daemon dies
let app_handle_daemon = app.handle().clone();
tauri::async_runtime::spawn(async move {
// Give the daemon time to fully start
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let is_running = tokio::task::spawn_blocking(daemon_spawn::is_daemon_running)
.await
.unwrap_or(false);
if !is_running {
log::warn!("Daemon is no longer running, quitting GUI");
app_handle_daemon.exit(0);
break;
}
}
});
// Create the main window programmatically
#[allow(unused_variables)]
let win_builder = WebviewWindowBuilder::new(app, "main", WebviewUrl::default())
@@ -1124,6 +1239,7 @@ pub fn run() {
get_all_tags,
get_browser_release_types,
update_profile_proxy,
update_profile_vpn,
update_profile_tags,
update_profile_note,
check_browser_status,
@@ -1191,6 +1307,8 @@ pub fn run() {
set_group_sync_enabled,
is_proxy_in_use_by_synced_profile,
is_group_in_use_by_synced_profile,
set_vpn_sync_enabled,
is_vpn_in_use_by_synced_profile,
read_profile_cookies,
copy_profile_cookies,
check_wayfern_terms_accepted,
@@ -1208,6 +1326,9 @@ pub fn run() {
list_vpn_configs,
get_vpn_config,
delete_vpn_config,
create_vpn_config_manual,
update_vpn_config,
check_vpn_validity,
connect_vpn,
disconnect_vpn,
get_vpn_status,
@@ -1247,12 +1368,10 @@ mod tests {
// Commands that are intentionally not used in the frontend
// but are used via MCP server or other programmatic APIs
let mcp_only_commands = [
"list_vpn_configs",
"get_vpn_config",
"delete_vpn_config",
"connect_vpn",
"disconnect_vpn",
"get_vpn_status",
"get_vpn_config",
"list_active_vpn_connections",
];
+27 -84
View File
@@ -1702,16 +1702,8 @@ impl McpServer {
message: "Missing vpn_id".to_string(),
})?;
// First disconnect if connected
{
let mut manager = crate::vpn::TUNNEL_MANAGER.lock().await;
if manager.is_tunnel_active(vpn_id) {
if let Some(tunnel) = manager.get_tunnel_mut(vpn_id) {
let _ = tunnel.disconnect().await;
}
manager.remove_tunnel(vpn_id);
}
}
// First disconnect if connected (stop VPN worker)
let _ = crate::vpn_worker_runner::stop_vpn_worker_by_vpn_id(vpn_id).await;
let storage = crate::vpn::VPN_STORAGE.lock().map_err(|e| McpError {
code: -32000,
@@ -1743,63 +1735,14 @@ impl McpServer {
message: "Missing vpn_id".to_string(),
})?;
// Load config from storage
let config = {
let storage = crate::vpn::VPN_STORAGE.lock().map_err(|e| McpError {
// Start VPN worker process
crate::vpn_worker_runner::start_vpn_worker(vpn_id)
.await
.map_err(|e| McpError {
code: -32000,
message: format!("Failed to lock VPN storage: {e}"),
message: format!("Failed to connect VPN: {e}"),
})?;
storage.load_config(vpn_id).map_err(|e| McpError {
code: -32000,
message: format!("Failed to load VPN config: {e}"),
})?
};
let mut manager = crate::vpn::TUNNEL_MANAGER.lock().await;
// Check if already connected
if manager.is_tunnel_active(vpn_id) {
return Ok(serde_json::json!({
"content": [{
"type": "text",
"text": format!("VPN '{}' is already connected", config.name)
}]
}));
}
let mut tunnel: Box<dyn crate::vpn::VpnTunnel> = match config.vpn_type {
crate::vpn::VpnType::WireGuard => {
let wg_config =
crate::vpn::parse_wireguard_config(&config.config_data).map_err(|e| McpError {
code: -32000,
message: format!("Invalid WireGuard config: {e}"),
})?;
Box::new(crate::vpn::WireGuardTunnel::new(
vpn_id.to_string(),
wg_config,
))
}
crate::vpn::VpnType::OpenVPN => {
let ovpn_config =
crate::vpn::parse_openvpn_config(&config.config_data).map_err(|e| McpError {
code: -32000,
message: format!("Invalid OpenVPN config: {e}"),
})?;
Box::new(crate::vpn::OpenVpnTunnel::new(
vpn_id.to_string(),
ovpn_config,
))
}
};
tunnel.connect().await.map_err(|e| McpError {
code: -32000,
message: format!("Failed to connect VPN: {e}"),
})?;
manager.register_tunnel(vpn_id.to_string(), tunnel);
// Update last_used timestamp
{
let storage = crate::vpn::VPN_STORAGE.lock().map_err(|e| McpError {
@@ -1812,7 +1755,7 @@ impl McpServer {
Ok(serde_json::json!({
"content": [{
"type": "text",
"text": format!("VPN '{}' connected successfully", config.name)
"text": format!("VPN '{}' connected successfully", vpn_id)
}]
}))
}
@@ -1829,16 +1772,12 @@ impl McpServer {
message: "Missing vpn_id".to_string(),
})?;
let mut manager = crate::vpn::TUNNEL_MANAGER.lock().await;
if let Some(tunnel) = manager.get_tunnel_mut(vpn_id) {
tunnel.disconnect().await.map_err(|e| McpError {
crate::vpn_worker_runner::stop_vpn_worker_by_vpn_id(vpn_id)
.await
.map_err(|e| McpError {
code: -32000,
message: format!("Failed to disconnect VPN: {e}"),
})?;
}
manager.remove_tunnel(vpn_id);
Ok(serde_json::json!({
"content": [{
@@ -1860,19 +1799,23 @@ impl McpServer {
message: "Missing vpn_id".to_string(),
})?;
let manager = crate::vpn::TUNNEL_MANAGER.lock().await;
let connected =
if let Some(worker) = crate::vpn_worker_storage::find_vpn_worker_by_vpn_id(vpn_id) {
worker
.pid
.map(crate::proxy_storage::is_process_running)
.unwrap_or(false)
} else {
false
};
let status = if let Some(tunnel) = manager.get_tunnel(vpn_id) {
tunnel.get_status()
} else {
crate::vpn::VpnStatus {
connected: false,
vpn_id: vpn_id.to_string(),
connected_at: None,
bytes_sent: None,
bytes_received: None,
last_handshake: None,
}
let status = crate::vpn::VpnStatus {
connected,
vpn_id: vpn_id.to_string(),
connected_at: None,
bytes_sent: None,
bytes_received: None,
last_handshake: None,
};
Ok(serde_json::json!({
+73 -1
View File
@@ -61,10 +61,14 @@ impl ProfileManager {
version: &str,
release_type: &str,
proxy_id: Option<String>,
vpn_id: Option<String>,
camoufox_config: Option<CamoufoxConfig>,
wayfern_config: Option<WayfernConfig>,
group_id: Option<String>,
) -> Result<BrowserProfile, Box<dyn std::error::Error>> {
if proxy_id.is_some() && vpn_id.is_some() {
return Err("Cannot set both proxy_id and vpn_id".into());
}
log::info!("Attempting to create profile: {name}");
// Check if a profile with this name already exists (case insensitive)
@@ -163,6 +167,7 @@ impl ProfileManager {
browser: browser.to_string(),
version: version.to_string(),
proxy_id: proxy_id.clone(),
vpn_id: None,
process_id: None,
last_launch: None,
release_type: release_type.to_string(),
@@ -277,6 +282,7 @@ impl ProfileManager {
browser: browser.to_string(),
version: version.to_string(),
proxy_id: proxy_id.clone(),
vpn_id: None,
process_id: None,
last_launch: None,
release_type: release_type.to_string(),
@@ -323,6 +329,7 @@ impl ProfileManager {
browser: browser.to_string(),
version: version.to_string(),
proxy_id: proxy_id.clone(),
vpn_id: vpn_id.clone(),
process_id: None,
last_launch: None,
release_type: release_type.to_string(),
@@ -840,6 +847,7 @@ impl ProfileManager {
browser: source.browser,
version: source.version,
proxy_id: source.proxy_id,
vpn_id: source.vpn_id,
process_id: None,
last_launch: None,
release_type: source.release_type,
@@ -1011,8 +1019,9 @@ impl ProfileManager {
// Remember old proxy_id for cleanup (not used yet, but may be needed for cleanup)
let _old_proxy_id = profile.proxy_id.clone();
// Update proxy settings
// Update proxy settings and clear VPN (mutual exclusion)
profile.proxy_id = proxy_id.clone();
profile.vpn_id = None;
// Save the updated profile
self
@@ -1075,6 +1084,52 @@ impl ProfileManager {
Ok(profile)
}
pub async fn update_profile_vpn(
&self,
_app_handle: tauri::AppHandle,
profile_id: &str,
vpn_id: Option<String>,
) -> Result<BrowserProfile, Box<dyn std::error::Error + Send + Sync>> {
let profile_uuid = uuid::Uuid::parse_str(profile_id).map_err(
|_| -> Box<dyn std::error::Error + Send + Sync> {
format!("Invalid profile ID: {profile_id}").into()
},
)?;
let profiles =
self
.list_profiles()
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to list profiles: {e}").into()
})?;
let mut profile = profiles
.into_iter()
.find(|p| p.id == profile_uuid)
.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
format!("Profile with ID '{profile_id}' not found").into()
})?;
// Update VPN and clear proxy (mutual exclusion)
profile.vpn_id = vpn_id;
profile.proxy_id = None;
self
.save_profile(&profile)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to save profile: {e}").into()
})?;
if let Err(e) = events::emit("profile-updated", &profile) {
log::warn!("Warning: Failed to emit profile update event: {e}");
}
if let Err(e) = events::emit_empty("profiles-changed") {
log::warn!("Warning: Failed to emit profiles-changed event: {e}");
}
Ok(profile)
}
pub async fn check_browser_status(
&self,
app_handle: tauri::AppHandle,
@@ -1799,6 +1854,7 @@ pub async fn create_browser_profile_with_group(
version: String,
release_type: String,
proxy_id: Option<String>,
vpn_id: Option<String>,
camoufox_config: Option<CamoufoxConfig>,
wayfern_config: Option<WayfernConfig>,
group_id: Option<String>,
@@ -1812,6 +1868,7 @@ pub async fn create_browser_profile_with_group(
&version,
&release_type,
proxy_id,
vpn_id,
camoufox_config,
wayfern_config,
group_id,
@@ -1841,6 +1898,19 @@ pub async fn update_profile_proxy(
.map_err(|e| format!("Failed to update profile: {e}"))
}
#[tauri::command]
pub async fn update_profile_vpn(
app_handle: tauri::AppHandle,
profile_id: String,
vpn_id: Option<String>,
) -> Result<BrowserProfile, String> {
let profile_manager = ProfileManager::instance();
profile_manager
.update_profile_vpn(app_handle, &profile_id, vpn_id)
.await
.map_err(|e| format!("Failed to update profile VPN: {e}"))
}
#[tauri::command]
pub fn update_profile_tags(
app_handle: tauri::AppHandle,
@@ -1898,6 +1968,7 @@ pub async fn create_browser_profile_new(
version: String,
release_type: String,
proxy_id: Option<String>,
vpn_id: Option<String>,
camoufox_config: Option<CamoufoxConfig>,
wayfern_config: Option<WayfernConfig>,
group_id: Option<String>,
@@ -1923,6 +1994,7 @@ pub async fn create_browser_profile_new(
version,
release_type,
proxy_id,
vpn_id,
camoufox_config,
wayfern_config,
group_id,
+2
View File
@@ -22,6 +22,8 @@ pub struct BrowserProfile {
#[serde(default)]
pub proxy_id: Option<String>, // Reference to stored proxy
#[serde(default)]
pub vpn_id: Option<String>, // Reference to stored VPN config
#[serde(default)]
pub process_id: Option<u32>,
#[serde(default)]
pub last_launch: Option<u64>,
+1
View File
@@ -545,6 +545,7 @@ impl ProfileImporter {
browser: browser_type.to_string(),
version: available_versions,
proxy_id: None,
vpn_id: None,
process_id: None,
last_launch: None,
release_type: "stable".to_string(),
+22 -2
View File
@@ -243,8 +243,7 @@ impl ProxyManager {
.as_secs()
}
// Get geolocation for an IP address
async fn get_ip_geolocation(
pub async fn get_ip_geolocation(
ip: &str,
) -> Result<(Option<String>, Option<String>, Option<String>), String> {
// Use ip-api.com (free, no API key required)
@@ -1626,6 +1625,27 @@ impl ProxyManager {
delete_proxy_config(&config.id);
}
// Clean up orphaned VPN worker configs where the worker process is dead
{
use crate::proxy_storage::is_process_running;
use crate::vpn_worker_storage::{delete_vpn_worker_config, list_vpn_worker_configs};
let vpn_workers = list_vpn_worker_configs();
for worker in vpn_workers {
if let Some(pid) = worker.pid {
if !is_process_running(pid) {
log::info!(
"Cleaning up orphaned VPN worker config: {} (process PID {} is dead)",
worker.id,
pid
);
let _ = std::fs::remove_file(&worker.config_file_path);
delete_vpn_worker_config(&worker.id);
}
}
}
}
// Emit event for reactive UI updates
if let Err(e) = events::emit_empty("proxies-changed") {
log::error!("Failed to emit proxies-changed event: {e}");
+302 -5
View File
@@ -102,6 +102,24 @@ impl SyncEngine {
// Generate local manifest
let local_manifest = generate_manifest(&profile_id, &profile_dir, &mut hash_cache)?;
let total_size: u64 = local_manifest.files.iter().map(|f| f.size).sum();
let has_cookies = local_manifest
.files
.iter()
.any(|f| f.path.contains("Cookies") || f.path.contains("cookies"));
let has_local_state = local_manifest
.files
.iter()
.any(|f| f.path.contains("Local State"));
log::info!(
"Profile {} manifest: {} files, {} bytes total, cookies={}, local_state={}",
profile_id,
local_manifest.files.len(),
total_size,
has_cookies,
has_local_state
);
// Save the hash cache for future runs
hash_cache.save(&cache_path)?;
@@ -174,13 +192,16 @@ impl SyncEngine {
// Upload manifest.json last for atomicity
self.upload_manifest(&profile_id, &local_manifest).await?;
// Sync associated proxy and group
// Sync associated proxy, group, and VPN
if let Some(proxy_id) = &profile.proxy_id {
let _ = self.sync_proxy(proxy_id, Some(app_handle)).await;
}
if let Some(group_id) = &profile.group_id {
let _ = self.sync_group(group_id, Some(app_handle)).await;
}
if let Some(vpn_id) = &profile.vpn_id {
let _ = self.sync_vpn(vpn_id, Some(app_handle)).await;
}
// Update profile last_sync
let mut updated_profile = profile.clone();
@@ -785,6 +806,145 @@ impl SyncEngine {
Ok(())
}
async fn sync_vpn(&self, vpn_id: &str, app_handle: Option<&tauri::AppHandle>) -> SyncResult<()> {
let local_vpn = {
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage.load_config(vpn_id).ok()
};
let remote_key = format!("vpns/{}.json", vpn_id);
let stat = self.client.stat(&remote_key).await?;
match (local_vpn, stat.exists) {
(Some(vpn), true) => {
let local_updated = vpn.last_sync.unwrap_or(0);
let remote_updated: DateTime<Utc> = stat
.last_modified
.as_ref()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now);
let remote_ts = remote_updated.timestamp() as u64;
if remote_ts > local_updated {
self.download_vpn(vpn_id, app_handle).await?;
} else if local_updated > remote_ts {
self.upload_vpn(&vpn).await?;
}
}
(Some(vpn), false) => {
self.upload_vpn(&vpn).await?;
}
(None, true) => {
self.download_vpn(vpn_id, app_handle).await?;
}
(None, false) => {
log::debug!("VPN {} not found locally or remotely", vpn_id);
}
}
Ok(())
}
async fn upload_vpn(&self, vpn: &crate::vpn::VpnConfig) -> SyncResult<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut updated_vpn = vpn.clone();
updated_vpn.last_sync = Some(now);
let json = serde_json::to_string_pretty(&updated_vpn)
.map_err(|e| SyncError::SerializationError(format!("Failed to serialize VPN: {e}")))?;
let remote_key = format!("vpns/{}.json", vpn.id);
let presign = self
.client
.presign_upload(&remote_key, Some("application/json"))
.await?;
self
.client
.upload_bytes(&presign.url, json.as_bytes(), Some("application/json"))
.await?;
// Update local VPN with new last_sync
{
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
if let Err(e) = storage.update_sync_fields(&vpn.id, vpn.sync_enabled, Some(now)) {
log::warn!("Failed to update VPN last_sync: {}", e);
}
}
log::info!("VPN {} uploaded", vpn.id);
Ok(())
}
async fn download_vpn(
&self,
vpn_id: &str,
app_handle: Option<&tauri::AppHandle>,
) -> SyncResult<()> {
let remote_key = format!("vpns/{}.json", vpn_id);
let presign = self.client.presign_download(&remote_key).await?;
let data = self.client.download_bytes(&presign.url).await?;
let mut vpn: crate::vpn::VpnConfig = serde_json::from_slice(&data)
.map_err(|e| SyncError::SerializationError(format!("Failed to parse VPN JSON: {e}")))?;
vpn.last_sync = Some(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
vpn.sync_enabled = true;
// Save via VPN storage (handles encryption)
{
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
if let Err(e) = storage.save_config(&vpn) {
log::warn!("Failed to save downloaded VPN: {}", e);
}
}
// Emit event for UI update
if let Some(_handle) = app_handle {
let _ = events::emit("vpn-configs-changed", ());
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "synced"
}),
);
}
log::info!("VPN {} downloaded", vpn_id);
Ok(())
}
pub async fn sync_vpn_by_id_with_handle(
&self,
vpn_id: &str,
app_handle: &tauri::AppHandle,
) -> SyncResult<()> {
self.sync_vpn(vpn_id, Some(app_handle)).await
}
pub async fn delete_vpn(&self, vpn_id: &str) -> SyncResult<()> {
let remote_key = format!("vpns/{}.json", vpn_id);
let tombstone_key = format!("tombstones/vpns/{}.json", vpn_id);
self
.client
.delete(&remote_key, Some(&tombstone_key))
.await?;
log::info!("VPN {} deleted from sync", vpn_id);
Ok(())
}
/// Download a profile from S3 if it exists remotely but not locally
pub async fn download_profile_if_missing(
&self,
@@ -901,6 +1061,21 @@ impl SyncEngine {
})?;
// Download all files from manifest
let total_size: u64 = manifest.files.iter().map(|f| f.size).sum();
log::info!(
"Profile {} recovery: downloading {} files ({} bytes total)",
profile_id,
manifest.files.len(),
total_size
);
for file in &manifest.files {
log::info!(
" -> {} ({} bytes, hash: {})",
file.path,
file.size,
file.hash
);
}
if !manifest.files.is_empty() {
self
.download_profile_files(app_handle, profile_id, &profile_dir, &manifest.files)
@@ -1100,6 +1275,43 @@ pub async fn enable_proxy_sync_if_needed(
Ok(())
}
/// Check if VPN is used by any synced profile
pub fn is_vpn_used_by_synced_profile(vpn_id: &str) -> bool {
let profile_manager = ProfileManager::instance();
if let Ok(profiles) = profile_manager.list_profiles() {
profiles
.iter()
.any(|p| p.sync_enabled && p.vpn_id.as_deref() == Some(vpn_id))
} else {
false
}
}
/// Enable sync for VPN if not already enabled
pub async fn enable_vpn_sync_if_needed(
vpn_id: &str,
_app_handle: &tauri::AppHandle,
) -> Result<(), String> {
let vpn = {
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage
.load_config(vpn_id)
.map_err(|e| format!("VPN with ID '{vpn_id}' not found: {e}"))?
};
if !vpn.sync_enabled {
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage
.update_sync_fields(vpn_id, true, None)
.map_err(|e| format!("Failed to enable VPN sync: {e}"))?;
let _ = events::emit("vpn-configs-changed", ());
log::info!("Auto-enabled sync for VPN {}", vpn_id);
}
Ok(())
}
/// Enable sync for group if not already enabled
pub async fn enable_group_sync_if_needed(
group_id: &str,
@@ -1197,10 +1409,6 @@ pub async fn set_profile_sync_enabled(
profile.sync_enabled = enabled;
if !enabled {
profile.last_sync = None;
}
profile_manager
.save_profile(&profile)
.map_err(|e| format!("Failed to save profile: {e}"))?;
@@ -1240,6 +1448,13 @@ pub async fn set_profile_sync_enabled(
scheduler.queue_group_sync(group_id.clone()).await;
}
}
if let Some(ref vpn_id) = profile.vpn_id {
if let Err(e) = enable_vpn_sync_if_needed(vpn_id, &app_handle).await {
log::warn!("Failed to enable sync for VPN {}: {}", vpn_id, e);
} else {
scheduler.queue_vpn_sync(vpn_id.clone()).await;
}
}
} else {
log::warn!("Scheduler not initialized, sync will not start");
}
@@ -1526,3 +1741,85 @@ pub fn is_proxy_in_use_by_synced_profile(proxy_id: String) -> bool {
pub fn is_group_in_use_by_synced_profile(group_id: String) -> bool {
is_group_used_by_synced_profile(&group_id)
}
#[tauri::command]
pub async fn set_vpn_sync_enabled(
app_handle: tauri::AppHandle,
vpn_id: String,
enabled: bool,
) -> Result<(), String> {
let vpn = {
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage
.load_config(&vpn_id)
.map_err(|e| format!("VPN with ID '{vpn_id}' not found: {e}"))?
};
// If disabling, check if VPN is used by any synced profile
if !enabled && is_vpn_used_by_synced_profile(&vpn_id) {
return Err("Sync cannot be disabled while this VPN is used by synced profiles".to_string());
}
// If enabling, check that sync settings are configured
if enabled {
let cloud_logged_in = crate::cloud_auth::CLOUD_AUTH.is_logged_in().await;
if !cloud_logged_in {
let manager = SettingsManager::instance();
let settings = manager
.load_settings()
.map_err(|e| format!("Failed to load settings: {e}"))?;
if settings.sync_server_url.is_none() {
return Err(
"Sync server not configured. Please configure sync settings first.".to_string(),
);
}
let token = manager.get_sync_token(&app_handle).await.ok().flatten();
if token.is_none() {
return Err("Sync token not configured. Please configure sync settings first.".to_string());
}
}
}
let last_sync = if enabled { vpn.last_sync } else { None };
{
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage
.update_sync_fields(&vpn_id, enabled, last_sync)
.map_err(|e| format!("Failed to update VPN sync: {e}"))?;
}
let _ = events::emit("vpn-configs-changed", ());
if enabled {
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "syncing"
}),
);
if let Some(scheduler) = super::get_global_scheduler() {
scheduler.queue_vpn_sync(vpn_id).await;
}
} else {
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "disabled"
}),
);
}
Ok(())
}
#[tauri::command]
pub fn is_vpn_in_use_by_synced_profile(vpn_id: String) -> bool {
is_vpn_used_by_synced_profile(&vpn_id)
}
+6 -5
View File
@@ -7,11 +7,12 @@ pub mod types;
pub use client::SyncClient;
pub use engine::{
enable_group_sync_if_needed, enable_proxy_sync_if_needed, is_group_in_use_by_synced_profile,
is_group_used_by_synced_profile, is_proxy_in_use_by_synced_profile,
is_proxy_used_by_synced_profile, request_profile_sync, set_group_sync_enabled,
set_profile_sync_enabled, set_proxy_sync_enabled, sync_profile, trigger_sync_for_profile,
SyncEngine,
enable_group_sync_if_needed, enable_proxy_sync_if_needed, enable_vpn_sync_if_needed,
is_group_in_use_by_synced_profile, is_group_used_by_synced_profile,
is_proxy_in_use_by_synced_profile, is_proxy_used_by_synced_profile,
is_vpn_in_use_by_synced_profile, is_vpn_used_by_synced_profile, request_profile_sync,
set_group_sync_enabled, set_profile_sync_enabled, set_proxy_sync_enabled, set_vpn_sync_enabled,
sync_profile, trigger_sync_for_profile, SyncEngine,
};
pub use manifest::{compute_diff, generate_manifest, HashCache, ManifestDiff, SyncManifest};
pub use scheduler::{get_global_scheduler, set_global_scheduler, SyncScheduler};
+84
View File
@@ -34,6 +34,7 @@ pub struct SyncScheduler {
pending_profiles: Arc<Mutex<HashMap<String, ProfileStopTime>>>,
pending_proxies: Arc<Mutex<HashSet<String>>>,
pending_groups: Arc<Mutex<HashSet<String>>>,
pending_vpns: Arc<Mutex<HashSet<String>>>,
pending_tombstones: Arc<Mutex<Vec<(String, String)>>>,
running_profiles: Arc<Mutex<HashSet<String>>>,
in_flight_profiles: Arc<Mutex<HashSet<String>>>,
@@ -52,6 +53,7 @@ impl SyncScheduler {
pending_profiles: Arc::new(Mutex::new(HashMap::new())),
pending_proxies: Arc::new(Mutex::new(HashSet::new())),
pending_groups: Arc::new(Mutex::new(HashSet::new())),
pending_vpns: Arc::new(Mutex::new(HashSet::new())),
pending_tombstones: Arc::new(Mutex::new(Vec::new())),
running_profiles: Arc::new(Mutex::new(HashSet::new())),
in_flight_profiles: Arc::new(Mutex::new(HashSet::new())),
@@ -92,6 +94,12 @@ impl SyncScheduler {
}
drop(pending_groups);
let pending_vpns = self.pending_vpns.lock().await;
if !pending_vpns.is_empty() {
return true;
}
drop(pending_vpns);
let pending_tombstones = self.pending_tombstones.lock().await;
if !pending_tombstones.is_empty() {
return true;
@@ -190,6 +198,11 @@ impl SyncScheduler {
pending.insert(proxy_id);
}
pub async fn queue_vpn_sync(&self, vpn_id: String) {
let mut pending = self.pending_vpns.lock().await;
pending.insert(vpn_id);
}
pub async fn queue_group_sync(&self, group_id: String) {
let mut pending = self.pending_groups.lock().await;
pending.insert(group_id);
@@ -269,6 +282,7 @@ impl SyncScheduler {
SyncWorkItem::Profile(id) => scheduler.queue_profile_sync(id).await,
SyncWorkItem::Proxy(id) => scheduler.queue_proxy_sync(id).await,
SyncWorkItem::Group(id) => scheduler.queue_group_sync(id).await,
SyncWorkItem::Vpn(id) => scheduler.queue_vpn_sync(id).await,
SyncWorkItem::Tombstone(entity_type, entity_id) => {
scheduler.queue_tombstone(entity_type, entity_id).await
}
@@ -288,6 +302,7 @@ impl SyncScheduler {
self.process_pending_profiles(app_handle).await;
self.process_pending_proxies(app_handle).await;
self.process_pending_groups(app_handle).await;
self.process_pending_vpns(app_handle).await;
self.process_pending_tombstones(app_handle).await;
}
@@ -366,6 +381,7 @@ impl SyncScheduler {
&& self.pending_profiles.lock().await.is_empty()
&& self.pending_proxies.lock().await.is_empty()
&& self.pending_groups.lock().await.is_empty()
&& self.pending_vpns.lock().await.is_empty()
};
match result {
@@ -537,6 +553,68 @@ impl SyncScheduler {
}
}
async fn process_pending_vpns(&self, app_handle: &tauri::AppHandle) {
let vpns_to_sync: Vec<String> = {
let mut pending = self.pending_vpns.lock().await;
let list: Vec<String> = pending.drain().collect();
list
};
if vpns_to_sync.is_empty() {
return;
}
match SyncEngine::create_from_settings(app_handle).await {
Ok(engine) => {
for vpn_id in vpns_to_sync {
log::info!("Syncing VPN {}", vpn_id);
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "syncing"
}),
);
match engine.sync_vpn_by_id_with_handle(&vpn_id, app_handle).await {
Ok(()) => {
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "synced"
}),
);
}
Err(e) => {
log::error!("Failed to sync VPN {}: {}", vpn_id, e);
let _ = events::emit(
"vpn-sync-status",
serde_json::json!({
"id": vpn_id,
"status": "error"
}),
);
}
}
}
if !self.is_sync_in_progress().await {
log::debug!("All syncs completed after VPN sync, triggering cleanup");
let registry =
crate::downloaded_browsers_registry::DownloadedBrowsersRegistry::instance();
if let Err(e) = registry.cleanup_unused_binaries() {
log::warn!("Cleanup after sync failed: {e}");
} else {
log::debug!("Cleanup after sync completed successfully");
}
}
}
Err(e) => {
log::error!("Failed to create sync engine: {}", e);
}
}
}
async fn process_pending_tombstones(&self, app_handle: &tauri::AppHandle) {
let tombstones: Vec<(String, String)> = {
let mut pending = self.pending_tombstones.lock().await;
@@ -607,6 +685,12 @@ impl SyncScheduler {
entity_id
);
}
"vpn" => {
log::debug!(
"VPN tombstone for {} - local deletion not implemented",
entity_id
);
}
_ => {}
}
}
+11
View File
@@ -23,6 +23,7 @@ pub enum SyncWorkItem {
Profile(String),
Proxy(String),
Group(String),
Vpn(String),
Tombstone(String, String),
}
@@ -229,6 +230,11 @@ impl SyncSubscription {
.strip_prefix("groups/")
.and_then(|s| s.strip_suffix(".json"))
.map(|s| SyncWorkItem::Group(s.to_string()))
} else if key.starts_with("vpns/") {
key
.strip_prefix("vpns/")
.and_then(|s| s.strip_suffix(".json"))
.map(|s| SyncWorkItem::Vpn(s.to_string()))
} else if key.starts_with("tombstones/") {
key.strip_prefix("tombstones/").and_then(|rest| {
if rest.starts_with("profiles/") {
@@ -246,6 +252,11 @@ impl SyncSubscription {
.strip_prefix("groups/")
.and_then(|s| s.strip_suffix(".json"))
.map(|id| SyncWorkItem::Tombstone("group".to_string(), id.to_string()))
} else if rest.starts_with("vpns/") {
rest
.strip_prefix("vpns/")
.and_then(|s| s.strip_suffix(".json"))
.map(|id| SyncWorkItem::Tombstone("vpn".to_string(), id.to_string()))
} else {
None
}
+4
View File
@@ -52,6 +52,10 @@ pub struct VpnConfig {
pub config_data: String, // Raw config content (encrypted at rest)
pub created_at: i64,
pub last_used: Option<i64>,
#[serde(default)]
pub sync_enabled: bool,
#[serde(default)]
pub last_sync: Option<u64>,
}
/// Parsed WireGuard configuration
+2 -4
View File
@@ -7,6 +7,8 @@
mod config;
mod openvpn;
pub mod openvpn_socks5;
pub mod socks5_server;
mod storage;
mod tunnel;
mod wireguard;
@@ -25,7 +27,3 @@ use std::sync::Mutex;
/// Global VPN storage instance
pub static VPN_STORAGE: Lazy<Mutex<VpnStorage>> = Lazy::new(|| Mutex::new(VpnStorage::new()));
/// Global tunnel manager instance
pub static TUNNEL_MANAGER: Lazy<tokio::sync::Mutex<TunnelManager>> =
Lazy::new(|| tokio::sync::Mutex::new(TunnelManager::new()));
+233
View File
@@ -0,0 +1,233 @@
use super::config::{OpenVpnConfig, VpnError};
use std::path::PathBuf;
use std::process::{Command, Stdio};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
pub struct OpenVpnSocks5Server {
config: OpenVpnConfig,
port: u16,
}
impl OpenVpnSocks5Server {
pub fn new(config: OpenVpnConfig, port: u16) -> Self {
Self { config, port }
}
fn find_openvpn_binary() -> Result<PathBuf, VpnError> {
let locations = [
"/usr/sbin/openvpn",
"/usr/local/sbin/openvpn",
"/opt/homebrew/bin/openvpn",
"/usr/bin/openvpn",
"C:\\Program Files\\OpenVPN\\bin\\openvpn.exe",
"C:\\Program Files (x86)\\OpenVPN\\bin\\openvpn.exe",
];
for loc in &locations {
let path = PathBuf::from(loc);
if path.exists() {
return Ok(path);
}
}
#[cfg(unix)]
{
if let Ok(output) = Command::new("which").arg("openvpn").output() {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !path.is_empty() {
return Ok(PathBuf::from(path));
}
}
}
}
#[cfg(windows)]
{
if let Ok(output) = Command::new("where").arg("openvpn").output() {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout)
.lines()
.next()
.unwrap_or("")
.trim()
.to_string();
if !path.is_empty() {
return Ok(PathBuf::from(path));
}
}
}
}
Err(VpnError::Connection(
"OpenVPN binary not found. Please install OpenVPN.".to_string(),
))
}
pub async fn run(self, config_id: String) -> Result<(), VpnError> {
let openvpn_bin = Self::find_openvpn_binary()?;
// Write config to temp file
let config_path = std::env::temp_dir().join(format!("openvpn_{}.ovpn", config_id));
std::fs::write(&config_path, &self.config.raw_config).map_err(VpnError::Io)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&config_path, std::fs::Permissions::from_mode(0o600));
}
// Find a management port
let mgmt_listener = std::net::TcpListener::bind("127.0.0.1:0")
.map_err(|e| VpnError::Connection(format!("Failed to bind management port: {e}")))?;
let mgmt_port = mgmt_listener
.local_addr()
.map_err(|e| VpnError::Connection(format!("Failed to get management port: {e}")))?
.port();
drop(mgmt_listener);
// Start OpenVPN with SOCKS proxy mode
let mut cmd = Command::new(&openvpn_bin);
cmd
.arg("--config")
.arg(&config_path)
.arg("--management")
.arg("127.0.0.1")
.arg(mgmt_port.to_string())
.arg("--socks-proxy")
.arg("127.0.0.1")
.arg(self.port.to_string())
.arg("--verb")
.arg("3")
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| VpnError::Connection(format!("Failed to start OpenVPN: {e}")))?;
// Wait for OpenVPN to start
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
match child.try_wait() {
Ok(Some(status)) => {
let _ = std::fs::remove_file(&config_path);
return Err(VpnError::Connection(format!(
"OpenVPN exited early with status: {status}. OpenVPN requires elevated privileges (sudo/admin)."
)));
}
Ok(None) => {}
Err(e) => {
let _ = std::fs::remove_file(&config_path);
return Err(VpnError::Connection(format!(
"Failed to check OpenVPN status: {e}"
)));
}
}
// Start a basic SOCKS5 proxy that tunnels through the OpenVPN TUN interface
let listener = TcpListener::bind(format!("127.0.0.1:{}", self.port))
.await
.map_err(|e| VpnError::Connection(format!("Failed to bind SOCKS5: {e}")))?;
let actual_port = listener
.local_addr()
.map_err(|e| VpnError::Connection(format!("Failed to get local addr: {e}")))?
.port();
if let Some(mut wc) = crate::vpn_worker_storage::get_vpn_worker_config(&config_id) {
wc.local_port = Some(actual_port);
wc.local_url = Some(format!("socks5://127.0.0.1:{}", actual_port));
let _ = crate::vpn_worker_storage::save_vpn_worker_config(&wc);
}
log::info!(
"[vpn-worker] OpenVPN SOCKS5 server listening on 127.0.0.1:{}",
actual_port
);
loop {
match listener.accept().await {
Ok((client, _)) => {
tokio::spawn(Self::handle_socks5_client(client));
}
Err(e) => {
log::warn!("[vpn-worker] Accept error: {e}");
}
}
}
}
async fn handle_socks5_client(
mut client: TcpStream,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// SOCKS5 greeting
let mut buf = [0u8; 256];
let n = client.read(&mut buf).await?;
if n < 3 || buf[0] != 0x05 {
return Ok(());
}
client.write_all(&[0x05, 0x00]).await?;
// SOCKS5 connect request
let n = client.read(&mut buf).await?;
if n < 10 || buf[0] != 0x05 || buf[1] != 0x01 {
return Ok(());
}
let dest_addr = match buf[3] {
0x01 => {
let ip = std::net::Ipv4Addr::new(buf[4], buf[5], buf[6], buf[7]);
let port = u16::from_be_bytes([buf[8], buf[9]]);
format!("{}:{}", ip, port)
}
0x03 => {
let domain_len = buf[4] as usize;
let domain = String::from_utf8_lossy(&buf[5..5 + domain_len]).to_string();
let port_start = 5 + domain_len;
let port = u16::from_be_bytes([buf[port_start], buf[port_start + 1]]);
format!("{}:{}", domain, port)
}
_ => return Ok(()),
};
// Connect to destination through OpenVPN tunnel (OS routing handles it)
match TcpStream::connect(&dest_addr).await {
Ok(upstream) => {
client
.write_all(&[0x05, 0x00, 0x00, 0x01, 127, 0, 0, 1, 0, 0])
.await?;
let (mut cr, mut cw) = client.into_split();
let (mut ur, mut uw) = upstream.into_split();
let c2u = tokio::io::copy(&mut cr, &mut uw);
let u2c = tokio::io::copy(&mut ur, &mut cw);
let _ = tokio::try_join!(c2u, u2c);
}
Err(_) => {
client
.write_all(&[0x05, 0x05, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_find_openvpn_binary_format() {
let result = OpenVpnSocks5Server::find_openvpn_binary();
match result {
Ok(path) => assert!(!path.as_os_str().is_empty()),
Err(e) => assert!(e.to_string().contains("not found")),
}
}
}
+656
View File
@@ -0,0 +1,656 @@
use super::config::{VpnError, WireGuardConfig};
use boringtun::noise::{Tunn, TunnResult};
use boringtun::x25519::{PublicKey, StaticSecret};
use smoltcp::iface::{Config as IfaceConfig, Interface, SocketHandle, SocketSet};
use smoltcp::phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken};
use smoltcp::socket::tcp::{Socket as TcpSocket, SocketBuffer};
use smoltcp::time::Instant as SmolInstant;
use smoltcp::wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address};
use std::collections::VecDeque;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::{Arc, Mutex};
use tokio::net::{TcpListener, TcpStream};
const SMOLTCP_TCP_RX_BUF: usize = 65536;
const SMOLTCP_TCP_TX_BUF: usize = 65536;
struct WgDevice {
tunn: Arc<Mutex<Box<Tunn>>>,
udp_socket: Arc<UdpSocket>,
peer_addr: SocketAddr,
rx_queue: VecDeque<Vec<u8>>,
tx_queue: VecDeque<Vec<u8>>,
}
impl WgDevice {
fn pump_wg_to_rx(&mut self) {
let mut recv_buf = vec![0u8; 2048];
loop {
match self.udp_socket.recv_from(&mut recv_buf) {
Ok((len, _)) => {
let mut dst = vec![0u8; 2048];
let mut tunn = self.tunn.lock().unwrap();
let result = tunn.decapsulate(None, &recv_buf[..len], &mut dst);
match result {
TunnResult::WriteToTunnelV4(data, _) | TunnResult::WriteToTunnelV6(data, _) => {
self.rx_queue.push_back(data.to_vec());
}
TunnResult::WriteToNetwork(response) => {
let _ = self.udp_socket.send_to(response, self.peer_addr);
}
_ => {}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
}
fn flush_tx_queue(&mut self) {
while let Some(ip_packet) = self.tx_queue.pop_front() {
let mut dst = vec![0u8; ip_packet.len() + 256];
let mut tunn = self.tunn.lock().unwrap();
let result = tunn.encapsulate(&ip_packet, &mut dst);
if let TunnResult::WriteToNetwork(packet) = result {
let _ = self.udp_socket.send_to(packet, self.peer_addr);
}
}
}
fn tick_timers(&mut self) {
let mut dst = vec![0u8; 2048];
let mut tunn = self.tunn.lock().unwrap();
let result = tunn.update_timers(&mut dst);
if let TunnResult::WriteToNetwork(packet) = result {
let _ = self.udp_socket.send_to(packet, self.peer_addr);
}
}
}
struct WgRxToken {
data: Vec<u8>,
}
impl RxToken for WgRxToken {
fn consume<R, F>(mut self, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
f(&mut self.data)
}
}
struct WgTxToken<'a> {
tx_queue: &'a mut VecDeque<Vec<u8>>,
}
impl<'a> TxToken for WgTxToken<'a> {
fn consume<R, F>(self, len: usize, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
let mut buf = vec![0u8; len];
let result = f(&mut buf);
self.tx_queue.push_back(buf);
result
}
}
impl Device for WgDevice {
type RxToken<'a> = WgRxToken;
type TxToken<'a> = WgTxToken<'a>;
fn receive(&mut self, _timestamp: SmolInstant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
if let Some(data) = self.rx_queue.pop_front() {
Some((
WgRxToken { data },
WgTxToken {
tx_queue: &mut self.tx_queue,
},
))
} else {
None
}
}
fn transmit(&mut self, _timestamp: SmolInstant) -> Option<Self::TxToken<'_>> {
Some(WgTxToken {
tx_queue: &mut self.tx_queue,
})
}
fn capabilities(&self) -> DeviceCapabilities {
let mut caps = DeviceCapabilities::default();
caps.medium = Medium::Ip;
caps.max_transmission_unit = 1420;
caps
}
}
fn parse_key(key: &str) -> Result<[u8; 32], VpnError> {
let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, key)
.map_err(|e| VpnError::InvalidWireGuard(format!("Invalid key encoding: {e}")))?;
if decoded.len() != 32 {
return Err(VpnError::InvalidWireGuard(format!(
"Invalid key length: {} (expected 32)",
decoded.len()
)));
}
let mut key_bytes = [0u8; 32];
key_bytes.copy_from_slice(&decoded);
Ok(key_bytes)
}
fn parse_cidr_address(addr: &str) -> Result<(IpCidr, IpAddress), VpnError> {
let first_addr = addr.split(',').next().unwrap_or(addr).trim();
let parts: Vec<&str> = first_addr.split('/').collect();
let ip_str = parts[0];
let prefix = if parts.len() > 1 {
parts[1]
.parse::<u8>()
.map_err(|_| VpnError::InvalidWireGuard(format!("Invalid prefix length: {}", parts[1])))?
} else {
32
};
let ip: std::net::IpAddr = ip_str
.parse()
.map_err(|_| VpnError::InvalidWireGuard(format!("Invalid IP address: {ip_str}")))?;
match ip {
std::net::IpAddr::V4(v4) => {
let smol_ip = Ipv4Address::new(
v4.octets()[0],
v4.octets()[1],
v4.octets()[2],
v4.octets()[3],
);
Ok((
IpCidr::new(IpAddress::Ipv4(smol_ip), prefix),
IpAddress::Ipv4(smol_ip),
))
}
std::net::IpAddr::V6(v6) => {
let smol_ip = smoltcp::wire::Ipv6Address::from_bytes(&v6.octets());
Ok((
IpCidr::new(IpAddress::Ipv6(smol_ip), prefix),
IpAddress::Ipv6(smol_ip),
))
}
}
}
pub struct WireGuardSocks5Server {
config: WireGuardConfig,
port: u16,
}
impl WireGuardSocks5Server {
pub fn new(config: WireGuardConfig, port: u16) -> Self {
Self { config, port }
}
fn create_tunnel(&self) -> Result<Box<Tunn>, VpnError> {
let private_key_bytes = parse_key(&self.config.private_key)?;
let static_private = StaticSecret::from(private_key_bytes);
let peer_public_bytes = parse_key(&self.config.peer_public_key)?;
let peer_public = PublicKey::from(peer_public_bytes);
let preshared_key = if let Some(ref psk) = self.config.preshared_key {
Some(parse_key(psk)?)
} else {
None
};
Ok(Box::new(Tunn::new(
static_private,
peer_public,
preshared_key,
self.config.persistent_keepalive,
0,
None,
)))
}
fn resolve_endpoint(&self) -> Result<SocketAddr, VpnError> {
self
.config
.peer_endpoint
.to_socket_addrs()
.map_err(|e| {
VpnError::Connection(format!(
"Failed to resolve endpoint '{}': {e}",
self.config.peer_endpoint
))
})?
.next()
.ok_or_else(|| {
VpnError::Connection(format!(
"No addresses found for endpoint: {}",
self.config.peer_endpoint
))
})
}
fn do_handshake(
tunn: &mut Tunn,
socket: &UdpSocket,
peer_addr: SocketAddr,
) -> Result<(), VpnError> {
let mut dst = vec![0u8; 2048];
let result = tunn.format_handshake_initiation(&mut dst, false);
match result {
TunnResult::WriteToNetwork(packet) => {
socket
.send_to(packet, peer_addr)
.map_err(|e| VpnError::Connection(format!("Failed to send handshake: {e}")))?;
}
TunnResult::Err(e) => {
return Err(VpnError::Tunnel(format!(
"Handshake initiation failed: {e:?}"
)));
}
_ => {}
}
socket
.set_read_timeout(Some(std::time::Duration::from_secs(10)))
.map_err(|e| VpnError::Connection(format!("Failed to set timeout: {e}")))?;
let mut recv_buf = vec![0u8; 2048];
match socket.recv_from(&mut recv_buf) {
Ok((len, _)) => {
let result = tunn.decapsulate(None, &recv_buf[..len], &mut dst);
match result {
TunnResult::WriteToNetwork(response) => {
socket
.send_to(response, peer_addr)
.map_err(|e| VpnError::Connection(format!("Failed to send response: {e}")))?;
}
TunnResult::Done => {}
TunnResult::Err(e) => {
return Err(VpnError::Tunnel(format!(
"Handshake response failed: {e:?}"
)));
}
_ => {}
}
}
Err(e) => {
return Err(VpnError::Connection(format!(
"Handshake timeout or error: {e}"
)));
}
}
socket
.set_read_timeout(None)
.map_err(|e| VpnError::Connection(format!("Failed to clear timeout: {e}")))?;
Ok(())
}
pub async fn run(self, config_id: String) -> Result<(), VpnError> {
let peer_addr = self.resolve_endpoint()?;
let mut tunn = self.create_tunnel()?;
let udp_socket = UdpSocket::bind("0.0.0.0:0")
.map_err(|e| VpnError::Connection(format!("Failed to create UDP socket: {e}")))?;
Self::do_handshake(&mut tunn, &udp_socket, peer_addr)?;
udp_socket
.set_nonblocking(true)
.map_err(|e| VpnError::Connection(format!("Failed to set non-blocking: {e}")))?;
log::info!("[vpn-worker] WireGuard handshake completed");
let (cidr, local_ip) = parse_cidr_address(&self.config.address)?;
let tunn_arc = Arc::new(Mutex::new(tunn));
let udp_arc = Arc::new(udp_socket);
let mut device = WgDevice {
tunn: tunn_arc.clone(),
udp_socket: udp_arc.clone(),
peer_addr,
rx_queue: VecDeque::new(),
tx_queue: VecDeque::new(),
};
let iface_config = IfaceConfig::new(HardwareAddress::Ip);
let mut iface = Interface::new(iface_config, &mut device, SmolInstant::now());
iface.update_ip_addrs(|addrs| {
let _ = addrs.push(cidr);
});
// Set default gateway
match local_ip {
IpAddress::Ipv4(v4) => {
let octets = v4.as_bytes();
let gw = Ipv4Address::new(octets[0], octets[1], octets[2], 1);
iface
.routes_mut()
.add_default_ipv4_route(gw)
.map_err(|e| VpnError::Tunnel(format!("Failed to add default route: {e}")))?;
}
IpAddress::Ipv6(_) => {
// IPv6 routing not yet implemented
}
}
let listener = TcpListener::bind(format!("127.0.0.1:{}", self.port))
.await
.map_err(|e| VpnError::Connection(format!("Failed to bind SOCKS5 listener: {e}")))?;
let actual_port = listener
.local_addr()
.map_err(|e| VpnError::Connection(format!("Failed to get local addr: {e}")))?
.port();
// Update config with actual port and local_url
if let Some(mut wc) = crate::vpn_worker_storage::get_vpn_worker_config(&config_id) {
wc.local_port = Some(actual_port);
wc.local_url = Some(format!("socks5://127.0.0.1:{}", actual_port));
let _ = crate::vpn_worker_storage::save_vpn_worker_config(&wc);
}
log::info!(
"[vpn-worker] SOCKS5 server listening on 127.0.0.1:{}",
actual_port
);
let mut sockets = SocketSet::new(vec![]);
struct Connection {
smol_handle: SocketHandle,
tcp_stream: TcpStream,
socks_done: bool,
read_buf: Vec<u8>,
dest_addr: Option<SocketAddr>,
}
let mut connections: Vec<Connection> = Vec::new();
let mut timer_counter: u64 = 0;
loop {
// Accept new SOCKS5 connections (non-blocking via short timeout)
if let Ok(Ok((stream, _addr))) =
tokio::time::timeout(tokio::time::Duration::from_millis(1), listener.accept()).await
{
let tcp_rx = SocketBuffer::new(vec![0u8; SMOLTCP_TCP_RX_BUF]);
let tcp_tx = SocketBuffer::new(vec![0u8; SMOLTCP_TCP_TX_BUF]);
let tcp_socket = TcpSocket::new(tcp_rx, tcp_tx);
let handle = sockets.add(tcp_socket);
connections.push(Connection {
smol_handle: handle,
tcp_stream: stream,
socks_done: false,
read_buf: Vec::new(),
dest_addr: None,
});
}
// Pump WireGuard packets into smoltcp rx queue
device.pump_wg_to_rx();
// Poll the smoltcp interface
let timestamp = SmolInstant::now();
let _changed = iface.poll(timestamp, &mut device, &mut sockets);
// Flush encrypted packets out through WireGuard
device.flush_tx_queue();
// Process each connection
let mut completed = Vec::new();
for (idx, conn) in connections.iter_mut().enumerate() {
if !conn.socks_done {
// Handle SOCKS5 handshake
let mut buf = [0u8; 512];
match conn.tcp_stream.try_read(&mut buf) {
Ok(0) => {
completed.push(idx);
continue;
}
Ok(n) => {
conn.read_buf.extend_from_slice(&buf[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(_) => {
completed.push(idx);
continue;
}
}
if conn.dest_addr.is_none() && conn.read_buf.len() >= 3 {
// SOCKS5 greeting: version, nmethods, methods
if conn.read_buf[0] != 0x05 {
completed.push(idx);
continue;
}
// Reply: no auth required
let _ = conn.tcp_stream.try_write(&[0x05, 0x00]);
let nmethods = conn.read_buf[1] as usize;
conn.read_buf.drain(..2 + nmethods);
}
if conn.dest_addr.is_none() && conn.read_buf.len() >= 10 {
// SOCKS5 connect request
if conn.read_buf[0] != 0x05 || conn.read_buf[1] != 0x01 {
completed.push(idx);
continue;
}
let (addr, addr_len) = match conn.read_buf[3] {
0x01 => {
// IPv4
if conn.read_buf.len() < 10 {
continue;
}
let ip = std::net::Ipv4Addr::new(
conn.read_buf[4],
conn.read_buf[5],
conn.read_buf[6],
conn.read_buf[7],
);
let port = u16::from_be_bytes([conn.read_buf[8], conn.read_buf[9]]);
(SocketAddr::new(std::net::IpAddr::V4(ip), port), 10)
}
0x03 => {
// Domain name
let domain_len = conn.read_buf[4] as usize;
let needed = 4 + 1 + domain_len + 2;
if conn.read_buf.len() < needed {
continue;
}
let domain = String::from_utf8_lossy(&conn.read_buf[5..5 + domain_len]).to_string();
let port_start = 5 + domain_len;
let port =
u16::from_be_bytes([conn.read_buf[port_start], conn.read_buf[port_start + 1]]);
// Resolve domain
match format!("{}:{}", domain, port).to_socket_addrs() {
Ok(mut addrs) => {
if let Some(addr) = addrs.next() {
(addr, needed)
} else {
// Send SOCKS5 error: host unreachable
let _ = conn
.tcp_stream
.try_write(&[0x05, 0x04, 0x00, 0x01, 0, 0, 0, 0, 0, 0]);
completed.push(idx);
continue;
}
}
Err(_) => {
let _ = conn
.tcp_stream
.try_write(&[0x05, 0x04, 0x00, 0x01, 0, 0, 0, 0, 0, 0]);
completed.push(idx);
continue;
}
}
}
0x04 => {
// IPv6
if conn.read_buf.len() < 22 {
continue;
}
let mut octets = [0u8; 16];
octets.copy_from_slice(&conn.read_buf[4..20]);
let ip = std::net::Ipv6Addr::from(octets);
let port = u16::from_be_bytes([conn.read_buf[20], conn.read_buf[21]]);
(SocketAddr::new(std::net::IpAddr::V6(ip), port), 22)
}
_ => {
completed.push(idx);
continue;
}
};
conn.read_buf.drain(..addr_len);
conn.dest_addr = Some(addr);
// Open smoltcp TCP socket to the destination
let socket = sockets.get_mut::<TcpSocket>(conn.smol_handle);
let smol_addr = match addr.ip() {
std::net::IpAddr::V4(v4) => {
let o = v4.octets();
IpAddress::Ipv4(Ipv4Address::new(o[0], o[1], o[2], o[3]))
}
std::net::IpAddr::V6(v6) => {
IpAddress::Ipv6(smoltcp::wire::Ipv6Address::from_bytes(&v6.octets()))
}
};
let local_port = 10000 + (rand::random::<u16>() % 50000);
if socket
.connect(iface.context(), (smol_addr, addr.port()), local_port)
.is_err()
{
let _ = conn
.tcp_stream
.try_write(&[0x05, 0x05, 0x00, 0x01, 0, 0, 0, 0, 0, 0]);
completed.push(idx);
continue;
}
// Send SOCKS5 success reply
let _ = conn.tcp_stream.try_write(&[
0x05,
0x00,
0x00,
0x01,
127,
0,
0,
1,
(actual_port >> 8) as u8,
(actual_port & 0xff) as u8,
]);
conn.socks_done = true;
}
} else {
// Data relay between SOCKS5 client and smoltcp socket
let socket = sockets.get_mut::<TcpSocket>(conn.smol_handle);
// Client → smoltcp
let mut buf = [0u8; 4096];
match conn.tcp_stream.try_read(&mut buf) {
Ok(0) => {
socket.close();
completed.push(idx);
continue;
}
Ok(n) => {
if socket.can_send() {
let _ = socket.send_slice(&buf[..n]);
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(_) => {
socket.close();
completed.push(idx);
continue;
}
}
// smoltcp → Client
if socket.can_recv() {
match socket.recv(|data| (data.len(), data.to_vec())) {
Ok(data) if !data.is_empty() => {
if conn.tcp_stream.try_write(&data).is_err() {
socket.close();
completed.push(idx);
continue;
}
}
_ => {}
}
}
// Check if smoltcp socket closed
if !socket.is_open() && !socket.is_active() {
completed.push(idx);
}
}
}
// Remove completed connections (in reverse order)
completed.sort_unstable();
completed.dedup();
for idx in completed.into_iter().rev() {
let conn = connections.remove(idx);
sockets.remove(conn.smol_handle);
}
// Timer ticks for WireGuard keepalives
timer_counter += 1;
if timer_counter.is_multiple_of(500) {
device.tick_timers();
}
// Small sleep to avoid busy-spinning
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cidr_ipv4() {
let (cidr, ip) = parse_cidr_address("10.0.0.2/24").unwrap();
assert_eq!(cidr.prefix_len(), 24);
assert_eq!(ip, IpAddress::Ipv4(Ipv4Address::new(10, 0, 0, 2)));
}
#[test]
fn test_parse_cidr_no_prefix() {
let (cidr, _) = parse_cidr_address("10.0.0.2").unwrap();
assert_eq!(cidr.prefix_len(), 32);
}
#[test]
fn test_parse_cidr_multi_address() {
let (_, ip) = parse_cidr_address("10.0.0.2/24, fd00::2/128").unwrap();
assert_eq!(ip, IpAddress::Ipv4(Ipv4Address::new(10, 0, 0, 2)));
}
#[test]
fn test_parse_key_valid() {
let key = "YEocP0e2o1WT5GlvBvQzVF7EeR6z9aCk+ZdZ5NKEuXA=";
assert!(parse_key(key).is_ok());
}
#[test]
fn test_parse_key_invalid() {
assert!(parse_key("not-valid").is_err());
}
}
+81
View File
@@ -32,6 +32,10 @@ struct StoredVpnConfig {
nonce: String, // Base64 encoded nonce
created_at: i64,
last_used: Option<i64>,
#[serde(default)]
sync_enabled: bool,
#[serde(default)]
last_sync: Option<u64>,
}
/// VPN storage manager with encryption
@@ -220,6 +224,8 @@ impl VpnStorage {
nonce,
created_at: config.created_at,
last_used: config.last_used,
sync_enabled: config.sync_enabled,
last_sync: config.last_sync,
};
// Update existing or add new
@@ -251,6 +257,8 @@ impl VpnStorage {
config_data,
created_at: stored.created_at,
last_used: stored.last_used,
sync_enabled: stored.sync_enabled,
last_sync: stored.last_sync,
})
}
@@ -269,6 +277,8 @@ impl VpnStorage {
config_data: String::new(), // Don't include config data in list
created_at: stored.created_at,
last_used: stored.last_used,
sync_enabled: stored.sync_enabled,
last_sync: stored.last_sync,
})
.collect(),
)
@@ -300,6 +310,67 @@ impl VpnStorage {
}
}
/// Create a VPN config manually from validated data
pub fn create_config_manual(
&self,
name: &str,
vpn_type: VpnType,
config_data: &str,
) -> Result<VpnConfig, VpnError> {
// Validate the config by parsing it
match vpn_type {
VpnType::WireGuard => {
super::parse_wireguard_config(config_data)?;
}
VpnType::OpenVPN => {
super::parse_openvpn_config(config_data)?;
}
}
let id = Uuid::new_v4().to_string();
let config = VpnConfig {
id,
name: name.to_string(),
vpn_type,
config_data: config_data.to_string(),
created_at: Utc::now().timestamp(),
last_used: None,
sync_enabled: false,
last_sync: None,
};
self.save_config(&config)?;
Ok(config)
}
/// Update the name of an existing VPN config
pub fn update_config_name(&self, id: &str, new_name: &str) -> Result<VpnConfig, VpnError> {
let mut config = self.load_config(id)?;
config.name = new_name.to_string();
self.save_config(&config)?;
Ok(config)
}
/// Update sync fields on a VPN config
pub fn update_sync_fields(
&self,
id: &str,
sync_enabled: bool,
last_sync: Option<u64>,
) -> Result<(), VpnError> {
let mut storage = self.load_storage()?;
if let Some(config) = storage.configs.iter_mut().find(|c| c.id == id) {
config.sync_enabled = sync_enabled;
config.last_sync = last_sync;
self.save_storage(&storage)
} else {
Err(VpnError::NotFound(id.to_string()))
}
}
/// Import a VPN config from raw content
pub fn import_config(
&self,
@@ -333,6 +404,8 @@ impl VpnStorage {
config_data: content.to_string(),
created_at: Utc::now().timestamp(),
last_used: None,
sync_enabled: false,
last_sync: None,
};
self.save_config(&config)?;
@@ -375,6 +448,8 @@ mod tests {
config_data: "[Interface]\nPrivateKey = test\n[Peer]\nPublicKey = peer".to_string(),
created_at: 1234567890,
last_used: None,
sync_enabled: false,
last_sync: None,
};
storage.save_config(&config).unwrap();
@@ -397,6 +472,8 @@ mod tests {
config_data: "secret1".to_string(),
created_at: 1000,
last_used: None,
sync_enabled: false,
last_sync: None,
};
let config2 = VpnConfig {
@@ -406,6 +483,8 @@ mod tests {
config_data: "secret2".to_string(),
created_at: 2000,
last_used: Some(3000),
sync_enabled: false,
last_sync: None,
};
storage.save_config(&config1).unwrap();
@@ -430,6 +509,8 @@ mod tests {
config_data: "data".to_string(),
created_at: 1000,
last_used: None,
sync_enabled: false,
last_sync: None,
};
storage.save_config(&config).unwrap();
+245
View File
@@ -0,0 +1,245 @@
use crate::proxy_storage::is_process_running;
use crate::vpn_worker_storage::{
delete_vpn_worker_config, find_vpn_worker_by_vpn_id, generate_vpn_worker_id,
get_vpn_worker_config, list_vpn_worker_configs, save_vpn_worker_config, VpnWorkerConfig,
};
use std::process::Stdio;
pub async fn start_vpn_worker(vpn_id: &str) -> Result<VpnWorkerConfig, Box<dyn std::error::Error>> {
// Check if a VPN worker for this vpn_id already exists and is running
if let Some(existing) = find_vpn_worker_by_vpn_id(vpn_id) {
if let Some(pid) = existing.pid {
if is_process_running(pid) {
return Ok(existing);
}
}
// Worker config exists but process is dead, clean up
delete_vpn_worker_config(&existing.id);
}
// Load VPN config from storage to determine type
let vpn_config = {
let storage = crate::vpn::VPN_STORAGE
.lock()
.map_err(|e| format!("Failed to lock VPN storage: {e}"))?;
storage
.load_config(vpn_id)
.map_err(|e| format!("Failed to load VPN config: {e}"))?
};
let vpn_type_str = match vpn_config.vpn_type {
crate::vpn::VpnType::WireGuard => "wireguard",
crate::vpn::VpnType::OpenVPN => "openvpn",
};
// Write decrypted config to a temp file
let config_file_path = std::env::temp_dir()
.join(format!("donut_vpn_{}.conf", vpn_id))
.to_string_lossy()
.to_string();
std::fs::write(&config_file_path, &vpn_config.config_data)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&config_file_path, std::fs::Permissions::from_mode(0o600));
}
let id = generate_vpn_worker_id();
// Find an available port
let local_port = {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
listener.local_addr()?.port()
};
let config = VpnWorkerConfig::new(
id.clone(),
vpn_id.to_string(),
vpn_type_str.to_string(),
config_file_path,
);
save_vpn_worker_config(&config)?;
// Spawn detached VPN worker process
let exe = std::env::current_exe()?;
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
use std::process::Command as StdCommand;
let mut cmd = StdCommand::new(&exe);
cmd.arg("vpn-worker");
cmd.arg("start");
cmd.arg("--id");
cmd.arg(&id);
cmd.arg("--port");
cmd.arg(local_port.to_string());
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::null());
let log_path = std::env::temp_dir().join(format!("donut-vpn-{}.log", id));
if let Ok(file) = std::fs::File::create(&log_path) {
log::info!("VPN worker stderr will be logged to: {:?}", log_path);
cmd.stderr(Stdio::from(file));
} else {
cmd.stderr(Stdio::null());
}
unsafe {
cmd.pre_exec(|| {
libc::setsid();
if libc::setpriority(libc::PRIO_PROCESS, 0, -10) != 0 {
let _ = libc::setpriority(libc::PRIO_PROCESS, 0, -5);
}
Ok(())
});
}
let child = cmd.spawn()?;
let pid = child.id();
let mut config_with_pid = config.clone();
config_with_pid.pid = Some(pid);
config_with_pid.local_port = Some(local_port);
save_vpn_worker_config(&config_with_pid)?;
drop(child);
}
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
use std::process::Command as StdCommand;
let mut cmd = StdCommand::new(&exe);
cmd.arg("vpn-worker");
cmd.arg("start");
cmd.arg("--id");
cmd.arg(&id);
cmd.arg("--port");
cmd.arg(local_port.to_string());
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::null());
let log_path = std::env::temp_dir().join(format!("donut-vpn-{}.log", id));
if let Ok(file) = std::fs::File::create(&log_path) {
log::info!("VPN worker stderr will be logged to: {:?}", log_path);
cmd.stderr(Stdio::from(file));
} else {
cmd.stderr(Stdio::null());
}
const DETACHED_PROCESS: u32 = 0x00000008;
const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;
cmd.creation_flags(DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP);
let child = cmd.spawn()?;
let pid = child.id();
let mut config_with_pid = config.clone();
config_with_pid.pid = Some(pid);
config_with_pid.local_port = Some(local_port);
save_vpn_worker_config(&config_with_pid)?;
drop(child);
}
// Wait for the worker to update config with local_url
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let mut attempts = 0;
let max_attempts = 100; // 10 seconds max
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if let Some(updated_config) = get_vpn_worker_config(&id) {
if let Some(ref local_url) = updated_config.local_url {
if !local_url.is_empty() {
if let Some(port) = updated_config.local_port {
if let Ok(Ok(_)) = tokio::time::timeout(
tokio::time::Duration::from_millis(100),
tokio::net::TcpStream::connect(("127.0.0.1", port)),
)
.await
{
return Ok(updated_config);
}
}
}
}
}
attempts += 1;
if attempts >= max_attempts {
if let Some(config) = get_vpn_worker_config(&id) {
let process_running = config.pid.map(is_process_running).unwrap_or(false);
// Clean up on failure
delete_vpn_worker_config(&id);
return Err(
format!(
"VPN worker failed to start in time. pid={:?}, process_running={}, local_url={:?}",
config.pid, process_running, config.local_url
)
.into(),
);
}
delete_vpn_worker_config(&id);
return Err("VPN worker config not found after spawn".into());
}
}
}
pub async fn stop_vpn_worker(id: &str) -> Result<bool, Box<dyn std::error::Error>> {
let config = get_vpn_worker_config(id);
if let Some(config) = config {
if let Some(pid) = config.pid {
#[cfg(unix)]
{
use std::process::Command;
let _ = Command::new("kill")
.arg("-TERM")
.arg(pid.to_string())
.output();
}
#[cfg(windows)]
{
use std::process::Command;
let _ = Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.output();
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
// Clean up temp config file
let _ = std::fs::remove_file(&config.config_file_path);
delete_vpn_worker_config(id);
return Ok(true);
}
Ok(false)
}
pub async fn stop_vpn_worker_by_vpn_id(vpn_id: &str) -> Result<bool, Box<dyn std::error::Error>> {
if let Some(config) = find_vpn_worker_by_vpn_id(vpn_id) {
return stop_vpn_worker(&config.id).await;
}
Ok(false)
}
pub async fn stop_all_vpn_workers() -> Result<(), Box<dyn std::error::Error>> {
let configs = list_vpn_worker_configs();
for config in configs {
let _ = stop_vpn_worker(&config.id).await;
}
Ok(())
}
+107
View File
@@ -0,0 +1,107 @@
use crate::proxy_storage::get_storage_dir;
use serde::{Deserialize, Serialize};
use std::fs;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VpnWorkerConfig {
pub id: String,
pub vpn_id: String,
pub vpn_type: String,
pub config_file_path: String,
pub local_port: Option<u16>,
pub local_url: Option<String>,
pub pid: Option<u32>,
}
impl VpnWorkerConfig {
pub fn new(id: String, vpn_id: String, vpn_type: String, config_file_path: String) -> Self {
Self {
id,
vpn_id,
vpn_type,
config_file_path,
local_port: None,
local_url: None,
pid: None,
}
}
}
pub fn save_vpn_worker_config(config: &VpnWorkerConfig) -> Result<(), Box<dyn std::error::Error>> {
let storage_dir = get_storage_dir();
fs::create_dir_all(&storage_dir)?;
let file_path = storage_dir.join(format!("vpn_worker_{}.json", config.id));
let content = serde_json::to_string_pretty(config)?;
fs::write(&file_path, content)?;
Ok(())
}
pub fn get_vpn_worker_config(id: &str) -> Option<VpnWorkerConfig> {
let storage_dir = get_storage_dir();
let file_path = storage_dir.join(format!("vpn_worker_{}.json", id));
if !file_path.exists() {
return None;
}
match fs::read_to_string(&file_path) {
Ok(content) => serde_json::from_str(&content).ok(),
Err(_) => None,
}
}
pub fn delete_vpn_worker_config(id: &str) -> bool {
let storage_dir = get_storage_dir();
let file_path = storage_dir.join(format!("vpn_worker_{}.json", id));
if !file_path.exists() {
return false;
}
fs::remove_file(&file_path).is_ok()
}
pub fn list_vpn_worker_configs() -> Vec<VpnWorkerConfig> {
let storage_dir = get_storage_dir();
if !storage_dir.exists() {
return Vec::new();
}
let mut configs = Vec::new();
if let Ok(entries) = fs::read_dir(&storage_dir) {
for entry in entries.flatten() {
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with("vpn_worker_") && name.ends_with(".json") {
if let Ok(content) = fs::read_to_string(&path) {
if let Ok(config) = serde_json::from_str::<VpnWorkerConfig>(&content) {
configs.push(config);
}
}
}
}
}
}
configs
}
pub fn find_vpn_worker_by_vpn_id(vpn_id: &str) -> Option<VpnWorkerConfig> {
list_vpn_worker_configs()
.into_iter()
.find(|c| c.vpn_id == vpn_id)
}
pub fn generate_vpn_worker_id() -> String {
format!(
"vpnw_{}_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
rand::random::<u32>()
)
}