mod common; use anyhow::Result; use common::{BundlerTestHelper, TestCacheManager, TestProject, TestProjectManager}; use serial_test::serial; use std::process::Command; use std::sync::{Arc, Barrier}; use std::thread; use std::time::{Duration, Instant}; /// Test concurrent execution during first launch #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_concurrent_first_launch() -> Result<()> { println!("Testing concurrent execution during first launch..."); // Create a simple test project let project = TestProject::new("concurrent-test-app").with_dependency("uuid", "^9.0.1"); let manager = TestProjectManager::create(project)?; manager.install_dependencies()?; // Bundle the project let executable_path = BundlerTestHelper::bundle_project_with_compression( manager.project_path(), manager.temp_dir(), Some("concurrent-test"), false, // No compression for faster testing )?; // Debug executable creation eprintln!( "Debug: Executable path created: {}", executable_path.display() ); eprintln!("Debug: Executable exists: {}", executable_path.exists()); if executable_path.exists() { if let Ok(metadata) = std::fs::metadata(&executable_path) { eprintln!("Debug: Executable size: {} bytes", metadata.len()); eprintln!("Debug: Executable is file: {}", metadata.is_file()); } else { eprintln!("Debug: Cannot read executable metadata"); } } else { eprintln!("Debug: Executable does not exist!"); if let Some(parent) = executable_path.parent() { eprintln!("Debug: Parent directory: {}", parent.display()); eprintln!("Debug: Parent exists: {}", parent.exists()); if let Ok(entries) = std::fs::read_dir(parent) { eprintln!("Debug: Parent directory contents:"); for entry in entries.flatten() { eprintln!(" - {}", entry.file_name().to_string_lossy()); } } else { eprintln!("Debug: Cannot read parent directory"); } } } // Give the filesystem a moment to settle on Windows if cfg!(windows) { std::thread::sleep(std::time::Duration::from_millis(250)); } // Clear any existing cache to ensure we test first launch TestCacheManager::clear_application_cache()?; // Number of concurrent executions to test const NUM_CONCURRENT: usize = 5; // Use a barrier to synchronize the start of all threads let barrier = Arc::new(Barrier::new(NUM_CONCURRENT)); let executable_path = Arc::new(executable_path); let mut handles = Vec::new(); let start_time = Instant::now(); // Spawn multiple threads that will execute the binary concurrently for i in 0..NUM_CONCURRENT { let barrier = Arc::clone(&barrier); let executable_path = Arc::clone(&executable_path); let handle = thread::spawn(move || -> Result<(usize, Duration, String)> { // Wait for all threads to be ready barrier.wait(); // Add a small staggered delay to reduce race conditions on Windows std::thread::sleep(std::time::Duration::from_millis(50 + i as u64 * 20)); let thread_start = Instant::now(); // Execute the binary using the test helper with retry logic for Windows let mut last_error = None; let mut output = None; for attempt in 1..=3 { match BundlerTestHelper::run_executable( executable_path.as_ref(), &[&format!("--thread-id={i}")], &[("TEST_VAR", &format!("thread_{i}"))], ) { Ok(result) => { output = Some(result); break; } Err(e) => { eprintln!("Thread {i}: Attempt {attempt} failed: {e}"); last_error = Some(e); if attempt < 3 { std::thread::sleep(std::time::Duration::from_millis( 100 * attempt as u64, )); } } } } let output = output.ok_or_else(|| { let e = last_error.unwrap(); eprintln!( "Thread {}: Failed to execute binary at {} after 3 attempts", i, executable_path.as_ref().display() ); eprintln!("Thread {i}: Final error details: {e:?}"); eprintln!("Thread {i}: Error chain:"); let mut source = e.source(); let mut level = 0; while let Some(err) = source { eprintln!("Thread {i}: Level {level}: {err}"); source = err.source(); level += 1; } anyhow::anyhow!("Failed to execute binary after retries: {e}") })?; let duration = thread_start.elapsed(); let stdout = String::from_utf8_lossy(&output.stdout).to_string(); if !output.status.success() { return Err(anyhow::anyhow!( "Thread {} failed with exit code {:?}. Stderr: {}", i, output.status.code(), String::from_utf8_lossy(&output.stderr) )); } Ok((i, duration, stdout)) }); handles.push(handle); } // Wait for all threads to complete and collect results let mut results = Vec::new(); for handle in handles { let result = handle .join() .map_err(|e| anyhow::anyhow!("Thread panicked: {:?}", e))??; results.push(result); } let total_time = start_time.elapsed(); println!("Total concurrent execution time: {total_time:?}"); // Verify all executions succeeded assert_eq!( results.len(), NUM_CONCURRENT, "Not all threads completed successfully" ); // Verify each execution produced expected output for (thread_id, duration, stdout) in &results { println!("Thread {thread_id} completed in {duration:?}"); // Check for expected output assert!( stdout.contains("Hello from test project!"), "Thread {thread_id} missing expected greeting in output: {stdout}" ); assert!( stdout.contains(&format!("thread_{thread_id}")), "Thread {thread_id} missing environment variable in output: {stdout}" ); assert!( stdout.contains(&format!("--thread-id={thread_id}")), "Thread {thread_id} missing argument in output: {stdout}" ); } // Verify that the execution was properly queued (no thread should have taken too long) let max_duration = results .iter() .map(|(_, duration, _)| *duration) .max() .unwrap(); let min_duration = results .iter() .map(|(_, duration, _)| *duration) .min() .unwrap(); println!("Duration range: {min_duration:?} - {max_duration:?}"); // The difference shouldn't be too extreme if queueing is working properly // Allow up to 30 seconds difference for extraction + queue processing assert!( max_duration - min_duration < Duration::from_secs(30), "Duration difference too large: {:?}, suggesting queue is not working properly", max_duration - min_duration ); println!("✅ Concurrent first launch test passed!"); Ok(()) } /// Test that subsequent executions after cache is populated are fast #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_cached_concurrent_execution() -> Result<()> { println!("Testing concurrent execution with populated cache..."); // Create a simple test project let project = TestProject::new("cached-concurrent-app").with_dependency("lodash", "^4.17.21"); let manager = TestProjectManager::create(project)?; manager.install_dependencies()?; // Bundle the project let executable_path = BundlerTestHelper::bundle_project_with_compression( manager.project_path(), manager.temp_dir(), Some("cached-concurrent-test"), false, )?; // Debug executable creation eprintln!( "Debug (cached): Executable path created: {}", executable_path.display() ); eprintln!( "Debug (cached): Executable exists: {}", executable_path.exists() ); if executable_path.exists() { if let Ok(metadata) = std::fs::metadata(&executable_path) { eprintln!("Debug (cached): Executable size: {} bytes", metadata.len()); eprintln!("Debug (cached): Executable is file: {}", metadata.is_file()); } else { eprintln!("Debug (cached): Cannot read executable metadata"); } } else { eprintln!("Debug (cached): Executable does not exist!"); if let Some(parent) = executable_path.parent() { eprintln!("Debug (cached): Parent directory: {}", parent.display()); eprintln!("Debug (cached): Parent exists: {}", parent.exists()); if let Ok(entries) = std::fs::read_dir(parent) { eprintln!("Debug (cached): Parent directory contents:"); for entry in entries.flatten() { eprintln!(" - {}", entry.file_name().to_string_lossy()); } } else { eprintln!("Debug (cached): Cannot read parent directory"); } } } // Give the filesystem a moment to settle on Windows if cfg!(windows) { std::thread::sleep(std::time::Duration::from_millis(100)); } // Clear cache and run once to populate it TestCacheManager::clear_application_cache()?; println!("Populating cache with initial run..."); let initial_output = BundlerTestHelper::run_executable(&executable_path, &[], &[("TEST_VAR", "initial")])?; assert!( initial_output.status.success(), "Initial run failed: {}", String::from_utf8_lossy(&initial_output.stderr) ); // Now test concurrent execution with populated cache const NUM_CONCURRENT: usize = 8; let barrier = Arc::new(Barrier::new(NUM_CONCURRENT)); let executable_path = Arc::new(executable_path); let mut handles = Vec::new(); let start_time = Instant::now(); for i in 0..NUM_CONCURRENT { let barrier = Arc::clone(&barrier); let executable_path = Arc::clone(&executable_path); let handle = thread::spawn(move || -> Result<(usize, Duration)> { barrier.wait(); // Add a small staggered delay to reduce race conditions on Windows std::thread::sleep(std::time::Duration::from_millis(50 + i as u64 * 20)); let thread_start = Instant::now(); // Execute the binary using the test helper with retry logic for Windows let mut last_error = None; let mut output = None; for attempt in 1..=3 { match BundlerTestHelper::run_executable( executable_path.as_ref(), &[], &[("TEST_VAR", &format!("cached_{i}"))], ) { Ok(result) => { output = Some(result); break; } Err(e) => { eprintln!("Cached thread {i}: Attempt {attempt} failed: {e}"); last_error = Some(e); if attempt < 3 { std::thread::sleep(std::time::Duration::from_millis( 100 * attempt as u64, )); } } } } let output = output.ok_or_else(|| { let e = last_error.unwrap(); eprintln!( "Cached thread {}: Failed to execute binary at {} after 3 attempts", i, executable_path.as_ref().display() ); eprintln!("Cached thread {i}: Final error details: {e:?}"); eprintln!("Cached thread {i}: Error chain:"); let mut source = e.source(); let mut level = 0; while let Some(err) = source { eprintln!("Cached thread {i}: Level {level}: {err}"); source = err.source(); level += 1; } anyhow::anyhow!("Failed to execute binary after retries: {}", e) })?; let duration = thread_start.elapsed(); if !output.status.success() { return Err(anyhow::anyhow!( "Cached thread {} failed: {}", i, String::from_utf8_lossy(&output.stderr) )); } Ok((i, duration)) }); handles.push(handle); } let mut results = Vec::new(); for handle in handles { let result = handle .join() .map_err(|e| anyhow::anyhow!("Thread panicked: {e:?}"))??; results.push(result); } let total_time = start_time.elapsed(); println!("Total cached concurrent execution time: {total_time:?}"); // Verify all executions succeeded assert_eq!(results.len(), NUM_CONCURRENT); // With cache populated, all executions should be relatively fast for (thread_id, duration) in &results { println!("Cached thread {thread_id} completed in {duration:?}"); // Each execution should be fast since cache is populated assert!( *duration < Duration::from_secs(10), "Cached execution took too long: {duration:?} for thread {thread_id}" ); } println!("✅ Cached concurrent execution test passed!"); Ok(()) } /// Test queue ordering - verify that processes are executed in the order they were queued #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_queue_ordering() -> Result<()> { println!("Testing queue ordering..."); // Create a test project that takes a bit of time to execute let project = TestProject::new("queue-order-app"); let manager = TestProjectManager::create(project)?; // Create a custom index.js that logs timing information let index_js = r#" const fs = require('fs'); const path = require('path'); // Get thread ID from arguments const threadId = process.argv.find(arg => arg.startsWith('--thread-id='))?.split('=')[1] || 'unknown'; const startTime = Date.now(); console.log(`Thread ${threadId} started at ${startTime}`); console.log("Hello from test project!"); console.log("Node version:", process.version); // Simulate some work const start = Date.now(); while (Date.now() - start < 100) { // Busy wait for 100ms to simulate work } console.log(`Thread ${threadId} completed at ${Date.now()}`); console.log("All tests completed!"); process.exit(0); "#; std::fs::write(manager.project_path().join("index.js"), index_js)?; // Bundle the project let executable_path = BundlerTestHelper::bundle_project_with_compression( manager.project_path(), manager.temp_dir(), Some("queue-order-test"), false, )?; // Clear cache to ensure we test first launch queueing TestCacheManager::clear_application_cache()?; const NUM_THREADS: usize = 4; let barrier = Arc::new(Barrier::new(NUM_THREADS)); let executable_path = Arc::new(executable_path); let mut handles = Vec::new(); for i in 0..NUM_THREADS { let barrier = Arc::clone(&barrier); let executable_path = Arc::clone(&executable_path); let handle = thread::spawn(move || -> Result<(usize, String)> { barrier.wait(); // Add a small delay to ensure threads start in order thread::sleep(Duration::from_millis(i as u64 * 10)); let output = Command::new(executable_path.as_ref()) .args(&[format!("--thread-id={i}")]) .output() .map_err(|e| anyhow::anyhow!("Failed to execute binary: {}", e))?; if !output.status.success() { return Err(anyhow::anyhow!( "Queue order thread {} failed: {}", i, String::from_utf8_lossy(&output.stderr) )); } let stdout = String::from_utf8_lossy(&output.stdout).to_string(); Ok((i, stdout)) }); handles.push(handle); } let mut results = Vec::new(); for handle in handles { let result = handle .join() .map_err(|e| anyhow::anyhow!("Thread panicked: {:?}", e))??; results.push(result); } // Verify all executions succeeded assert_eq!(results.len(), NUM_THREADS); for (thread_id, stdout) in &results { println!( "Queue order thread {} output: {}", thread_id, stdout.lines().next().unwrap_or("") ); assert!( stdout.contains(&format!("Thread {thread_id} started")), "Thread {thread_id} missing start message" ); assert!( stdout.contains(&format!("Thread {thread_id} completed")), "Thread {thread_id} missing completion message" ); } println!("✅ Queue ordering test passed!"); Ok(()) } /// Test recovery from failed extraction #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_extraction_failure_recovery() -> Result<()> { println!("Testing recovery from extraction failure..."); // Create a simple test project let project = TestProject::new("recovery-test-app"); let manager = TestProjectManager::create(project)?; // Bundle the project let executable_path = BundlerTestHelper::bundle_project_with_compression( manager.project_path(), manager.temp_dir(), Some("recovery-test"), false, )?; // Clear cache TestCacheManager::clear_application_cache()?; // Test that after clearing cache, the binary still works let output = Command::new(&executable_path) .env("TEST_VAR", "recovery_test") .output()?; assert!( output.status.success(), "Recovery test failed: {}", String::from_utf8_lossy(&output.stderr) ); let stdout = String::from_utf8_lossy(&output.stdout); assert!( stdout.contains("Hello from test project!"), "Recovery test missing expected output: {stdout}" ); println!("✅ Extraction failure recovery test passed!"); Ok(()) } /// Cleanup function to be called after all tests #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_zzz_cleanup_cache() -> Result<()> { println!("Cleaning up application cache after all tests..."); // This test runs last due to the "zzz" prefix, ensuring cleanup happens after other tests TestCacheManager::clear_application_cache()?; println!("✅ Cache cleanup completed!"); Ok(()) }