refactor: simplify logging statements and fix deadlock in batch error handling

- Consolidate multi-line logging statements into single lines for consistency
- Add critical note about lock acquisition in _save_batch docstring
- Fix potential deadlock: remove lock acquisition in error handler as caller already holds lock
- Remove trailing whitespace in process_with_strategy
This commit is contained in:
Liao, Jie
2026-02-27 21:29:26 +08:00
parent 04a1cbe8d1
commit c17ed45e34
+13 -19
View File
@@ -234,13 +234,9 @@ class BasePipeline(ABC):
if file_path.exists():
data = self._load_single_data_file(file_path, data_parser)
all_data.extend(data)
self.logger.info(
f"Loaded {len(data)} {data_type} from {file_path}"
)
self.logger.info(f"Loaded {len(data)} {data_type} from {file_path}")
else:
self.logger.warning(
f"{data_type} file does not exist: {file_path}"
)
self.logger.warning(f"{data_type} file does not exist: {file_path}")
except Exception as e:
self.logger.error(
f"Failed to load {data_type} file {file_path}: {e}",
@@ -297,13 +293,9 @@ class BasePipeline(ABC):
if file_path.exists():
data = self._load_single_data_file(file_path, data_parser)
all_data.extend(data)
self.logger.info(
f"Loaded {len(data)} {data_type} from {file_path}"
)
self.logger.info(f"Loaded {len(data)} {data_type} from {file_path}")
else:
self.logger.warning(
f"{data_type} file does not exist: {file_path}"
)
self.logger.warning(f"{data_type} file does not exist: {file_path}")
except Exception as e:
self.logger.error(
f"Failed to load {data_type} file {file_path}: {e}",
@@ -581,9 +573,7 @@ class BatchSaveManager:
"""
self.pipeline = pipeline
self.output_filename = output_filename
self.batch_size = (
batch_size if batch_size is not None else pipeline.config.batch_size
)
self.batch_size = batch_size if batch_size is not None else pipeline.config.batch_size
self.flush_on_exit = flush_on_exit
# Result buffer
@@ -632,7 +622,11 @@ class BatchSaveManager:
self._save_batch(batch)
def _save_batch(self, batch: List[Dict]) -> None:
"""Save a batch of results"""
"""Save a batch of results
Note: This method may be called while holding self.lock.
Any exception handling must NOT attempt to acquire the lock again.
"""
try:
# Use incremental save method
self.pipeline.save_results_incrementally(batch, self.output_filename)
@@ -650,8 +644,9 @@ class BatchSaveManager:
except Exception as e:
self.pipeline.logger.error(f"Batch save failed: {e}")
# Put failed results back into buffer
with self.lock:
self.buffer.extend(batch)
# IMPORTANT: Do NOT acquire lock here - caller already holds it
# (see _flush_unlocked and add_results methods)
self.buffer.extend(batch)
raise
def flush(self) -> None:
@@ -780,7 +775,6 @@ def process_with_strategy(
total_items=len(items),
desc=desc,
) as save_manager:
# Define wrapper function to add results to save manager
def wrapped_process_func(item):
try: