feat: use vm.lock to ensure process concurrency safety.

This commit is contained in:
robcholz
2026-02-07 12:35:34 -05:00
parent ca33633d28
commit fc7e95fa22
4 changed files with 84 additions and 213 deletions
+76 -3
View File
@@ -24,6 +24,7 @@ use crate::{
};
const VM_MANAGER_SOCKET_NAME: &str = "vm.sock";
const VM_MANAGER_LOCK_NAME: &str = "vm.lock";
pub fn ensure_manager(
raw_args: &[std::ffi::OsString],
@@ -40,8 +41,19 @@ pub fn ensure_manager(
return Ok(stream);
}
tracing::info!(path = %socket_path.display(), "spawning vm manager");
spawn_manager_process(raw_args, auto_shutdown_ms, &instance_dir)?;
let lock_path = instance_dir.join(VM_MANAGER_LOCK_NAME);
let mut lock_file = acquire_spawn_lock(&lock_path)?;
if lock_file.is_some() {
tracing::info!(path = %socket_path.display(), "spawning vm manager");
spawn_manager_process(raw_args, auto_shutdown_ms, &instance_dir)?;
} else {
tracing::info!(
path = %socket_path.display(),
lock = %lock_path.display(),
lock_pid = read_lock_pid(&lock_path).unwrap_or(0),
"waiting for vm manager spawn lock"
);
}
let start = Instant::now();
let timeout = Duration::from_secs(10);
@@ -50,11 +62,19 @@ pub fn ensure_manager(
Ok(stream) => {
send_client_pid(&stream);
tracing::info!(path = %socket_path.display(), "connected to vm manager");
if lock_file.is_some() {
drop(lock_file.take());
let _ = fs::remove_file(&lock_path);
}
return Ok(stream);
}
Err(err) => {
tracing::debug!(error = %err, "waiting for vm manager socket");
if start.elapsed() > timeout {
if lock_file.is_some() {
drop(lock_file.take());
let _ = fs::remove_file(&lock_path);
}
return Err(format!(
"Timed out waiting for vm manager socket: {} ({})",
socket_path.display(),
@@ -167,6 +187,59 @@ fn send_client_pid(stream: &UnixStream) {
}
}
fn acquire_spawn_lock(lock_path: &Path) -> Result<Option<fs::File>, Box<dyn std::error::Error>> {
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(lock_path)
{
Ok(mut file) => {
let pid = std::process::id();
let _ = writeln!(file, "pid={pid}");
Ok(Some(file))
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
if is_lock_stale(lock_path) {
tracing::warn!(
lock = %lock_path.display(),
lock_pid = read_lock_pid(lock_path).unwrap_or(0),
"stale vm manager lock removed"
);
let _ = fs::remove_file(lock_path);
return acquire_spawn_lock(lock_path);
}
Ok(None)
}
Err(err) => Err(err.into()),
}
}
fn is_lock_stale(lock_path: &Path) -> bool {
match read_lock_pid(lock_path) {
Some(pid) => !pid_is_alive(pid),
None => true,
}
}
fn pid_is_alive(pid: u32) -> bool {
let pid = pid as libc::pid_t;
let result = unsafe { libc::kill(pid, 0) };
if result == 0 {
return true;
}
match std::io::Error::last_os_error().raw_os_error() {
Some(code) if code == libc::EPERM => true,
Some(code) if code == libc::ESRCH => false,
_ => false,
}
}
fn read_lock_pid(lock_path: &Path) -> Option<u32> {
let content = fs::read_to_string(lock_path).ok()?;
let line = content.lines().next()?;
line.strip_prefix("pid=")?.trim().parse::<u32>().ok()
}
fn read_client_pid(stream: &UnixStream) -> Option<u32> {
let mut stream = stream.try_clone().ok()?;
let _ = stream.set_read_timeout(Some(Duration::from_millis(200)));
@@ -514,7 +587,7 @@ fn manager_event_loop(
}
Ok(ManagerEvent::VmExited(err)) => {
if let Some(err) = err {
tracing::error!(error = %err, "vm exited with error");
tracing::error!(error = %err, "vm exited with an error");
}
break;
}