From c17ed45e346e60c7dce6630d71450a5e3389676b Mon Sep 17 00:00:00 2001 From: "Liao, Jie" <28374150+leo0481@users.noreply.github.com> Date: Fri, 27 Feb 2026 21:29:26 +0800 Subject: [PATCH] refactor: simplify logging statements and fix deadlock in batch error handling - Consolidate multi-line logging statements into single lines for consistency - Add critical note about lock acquisition in _save_batch docstring - Fix potential deadlock: remove lock acquisition in error handler as caller already holds lock - Remove trailing whitespace in process_with_strategy --- pipeline/base_pipeline.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/pipeline/base_pipeline.py b/pipeline/base_pipeline.py index 9633ae2..d3f167e 100644 --- a/pipeline/base_pipeline.py +++ b/pipeline/base_pipeline.py @@ -234,13 +234,9 @@ class BasePipeline(ABC): if file_path.exists(): data = self._load_single_data_file(file_path, data_parser) all_data.extend(data) - self.logger.info( - f"Loaded {len(data)} {data_type} from {file_path}" - ) + self.logger.info(f"Loaded {len(data)} {data_type} from {file_path}") else: - self.logger.warning( - f"{data_type} file does not exist: {file_path}" - ) + self.logger.warning(f"{data_type} file does not exist: {file_path}") except Exception as e: self.logger.error( f"Failed to load {data_type} file {file_path}: {e}", @@ -297,13 +293,9 @@ class BasePipeline(ABC): if file_path.exists(): data = self._load_single_data_file(file_path, data_parser) all_data.extend(data) - self.logger.info( - f"Loaded {len(data)} {data_type} from {file_path}" - ) + self.logger.info(f"Loaded {len(data)} {data_type} from {file_path}") else: - self.logger.warning( - f"{data_type} file does not exist: {file_path}" - ) + self.logger.warning(f"{data_type} file does not exist: {file_path}") except Exception as e: self.logger.error( f"Failed to load {data_type} file {file_path}: {e}", @@ -581,9 +573,7 @@ class BatchSaveManager: """ self.pipeline = pipeline self.output_filename = output_filename - self.batch_size = ( - batch_size if batch_size is not None else pipeline.config.batch_size - ) + self.batch_size = batch_size if batch_size is not None else pipeline.config.batch_size self.flush_on_exit = flush_on_exit # Result buffer @@ -632,7 +622,11 @@ class BatchSaveManager: self._save_batch(batch) def _save_batch(self, batch: List[Dict]) -> None: - """Save a batch of results""" + """Save a batch of results + + Note: This method may be called while holding self.lock. + Any exception handling must NOT attempt to acquire the lock again. + """ try: # Use incremental save method self.pipeline.save_results_incrementally(batch, self.output_filename) @@ -650,8 +644,9 @@ class BatchSaveManager: except Exception as e: self.pipeline.logger.error(f"Batch save failed: {e}") # Put failed results back into buffer - with self.lock: - self.buffer.extend(batch) + # IMPORTANT: Do NOT acquire lock here - caller already holds it + # (see _flush_unlocked and add_results methods) + self.buffer.extend(batch) raise def flush(self) -> None: @@ -780,7 +775,6 @@ def process_with_strategy( total_items=len(items), desc=desc, ) as save_manager: - # Define wrapper function to add results to save manager def wrapped_process_func(item): try: