perf: improve download queue resilience

This commit is contained in:
zarzet
2026-05-01 23:51:24 +07:00
parent 3f56b88fa5
commit 01c7c9cc3a
3 changed files with 251 additions and 58 deletions
@@ -299,7 +299,7 @@ class MainActivity: FlutterFragmentActivity() {
private fun mimeTypeForExt(ext: String?): String {
return when (normalizeExt(ext)) {
".m4a" -> "audio/mp4"
".m4a", ".mp4" -> "audio/mp4"
".mp3" -> "audio/mpeg"
".opus" -> "audio/ogg"
".flac" -> "audio/flac"
@@ -314,7 +314,7 @@ class MainActivity: FlutterFragmentActivity() {
val safeName = sanitizeFilename(name)
val lower = safeName.lowercase(Locale.ROOT)
val knownExts = listOf(".flac", ".m4a", ".mp3", ".opus", ".lrc")
val knownExts = listOf(".flac", ".m4a", ".mp4", ".mp3", ".opus", ".lrc")
for (knownExt in knownExts) {
if (lower.endsWith(knownExt)) {
return safeName.dropLast(knownExt.length) + normalizedExt
@@ -724,6 +724,7 @@ class MainActivity: FlutterFragmentActivity() {
private fun extFromFileName(name: String): String {
return when {
name.endsWith(".m4a") -> ".m4a"
name.endsWith(".mp4") -> ".mp4"
name.endsWith(".mp3") -> ".mp3"
name.endsWith(".opus") -> ".opus"
name.endsWith(".flac") -> ".flac"
+125 -25
View File
@@ -1314,6 +1314,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
Timer? _progressStreamBootstrapTimer;
Timer? _queuePersistDebounce;
StreamSubscription<Map<String, dynamic>>? _progressStreamSub;
StreamSubscription<List<ConnectivityResult>>? _connectivitySub;
int _downloadCount = 0;
static const _cleanupInterval = 50;
static const _progressPollingInterval = Duration(milliseconds: 1200);
@@ -1335,6 +1336,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
int _idleProgressPollTick = 0;
bool _hasReceivedProgressStreamEvent = false;
bool _usingProgressStream = false;
bool _networkPausedByWifiOnly = false;
String? _lastServiceTrackName;
String? _lastServiceArtistName;
int _lastServicePercent = -1;
@@ -1408,15 +1410,20 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
'Concurrent downloads updated: $previousConcurrent -> ${next.concurrentDownloads}',
);
}
if (previous?.downloadNetworkMode != next.downloadNetworkMode) {
_handleDownloadNetworkModeChanged(next.downloadNetworkMode);
}
});
ref.onDispose(() {
_progressTimer?.cancel();
_progressStreamBootstrapTimer?.cancel();
_progressStreamSub?.cancel();
_connectivitySub?.cancel();
_progressTimer = null;
_progressStreamBootstrapTimer = null;
_progressStreamSub = null;
_connectivitySub = null;
if (_queuePersistDebounce?.isActive == true) {
_queuePersistDebounce?.cancel();
unawaited(_flushQueueToStorage());
@@ -1646,15 +1653,12 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
? rawItems.map((key, value) => MapEntry(key.toString(), value))
: const <String, dynamic>{};
final currentItems = state.items;
final itemsById = <String, DownloadItem>{};
final itemIndexById = <String, int>{};
final lookup = state.lookup;
int queuedCount = 0;
int downloadingCount = 0;
DownloadItem? firstDownloading;
for (int i = 0; i < currentItems.length; i++) {
final item = currentItems[i];
itemsById[item.id] = item;
itemIndexById[item.id] = i;
if (item.status == DownloadStatus.downloading) {
downloadingCount++;
firstDownloading ??= item;
@@ -1672,7 +1676,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
for (final entry in items.entries) {
final itemId = entry.key;
final localItem = itemsById[itemId];
final localItem = lookup.byItemId[itemId];
if (localItem == null) {
continue;
}
@@ -1770,7 +1774,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
final changedIndices = <int>[];
for (final entry in progressUpdates.entries) {
final index = itemIndexById[entry.key];
final index = lookup.indexByItemId[entry.key];
if (index == null) continue;
final current = updatedItems[index];
if (current.status == DownloadStatus.skipped ||
@@ -2970,7 +2974,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
DownloadErrorType? errorType,
}) {
final items = state.items;
final index = items.indexWhere((item) => item.id == id);
final index = state.lookup.indexByItemId[id] ?? -1;
if (index == -1) return;
final current = items[index];
@@ -3004,10 +3008,8 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
}
void updateProgress(String id, double progress, {double? speedMBps}) {
final items = state.items;
final index = items.indexWhere((i) => i.id == id);
if (index == -1) return;
final item = items[index];
final item = state.lookup.byItemId[id];
if (item == null) return;
if (item.status == DownloadStatus.skipped ||
item.status == DownloadStatus.completed ||
item.status == DownloadStatus.failed) {
@@ -3022,10 +3024,7 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
}
DownloadItem? _findItemById(String id) {
for (final item in state.items) {
if (item.id == id) return item;
}
return null;
return state.lookup.byItemId[id];
}
bool _isLocallyCancelled(String id, {DownloadItem? item}) {
@@ -4214,6 +4213,67 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
}
}
bool _hasWifiConnection(List<ConnectivityResult> results) {
return results.contains(ConnectivityResult.wifi);
}
void _startConnectivityMonitoring() {
_connectivitySub?.cancel();
_connectivitySub = Connectivity().onConnectivityChanged.listen(
_handleConnectivityResults,
onError: (Object error, StackTrace stackTrace) {
_log.w('Connectivity monitoring failed: $error');
},
cancelOnError: false,
);
}
void _stopConnectivityMonitoring({bool clearNetworkPause = true}) {
_connectivitySub?.cancel();
_connectivitySub = null;
if (clearNetworkPause) {
_networkPausedByWifiOnly = false;
}
}
void _handleDownloadNetworkModeChanged(String mode) {
if (mode == 'wifi_only') {
if (state.isProcessing || _networkPausedByWifiOnly) {
_startConnectivityMonitoring();
}
return;
}
final shouldResume = _networkPausedByWifiOnly && state.isPaused;
_stopConnectivityMonitoring();
if (shouldResume) {
resumeQueue();
}
}
void _handleConnectivityResults(List<ConnectivityResult> results) {
final settings = ref.read(settingsProvider);
if (settings.downloadNetworkMode != 'wifi_only') {
_handleDownloadNetworkModeChanged(settings.downloadNetworkMode);
return;
}
if (_hasWifiConnection(results)) {
if (_networkPausedByWifiOnly && state.isPaused) {
_networkPausedByWifiOnly = false;
_log.i('WiFi restored, resuming network-paused queue');
resumeQueue();
}
return;
}
if (state.isProcessing && !state.isPaused) {
_networkPausedByWifiOnly = true;
_log.w('WiFi connection lost, pausing active queue');
pauseQueue();
}
}
Future<void> _processQueue() async {
if (state.isProcessing) return;
@@ -4225,9 +4285,15 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
final hasWifi = connectivityResult.contains(ConnectivityResult.wifi);
if (!hasWifi) {
_log.w('WiFi-only mode enabled but no WiFi connection. Queue paused.');
_networkPausedByWifiOnly = true;
_startConnectivityMonitoring();
state = state.copyWith(isProcessing: false, isPaused: true);
return;
}
_networkPausedByWifiOnly = false;
_startConnectivityMonitoring();
} else {
_stopConnectivityMonitoring();
}
state = state.copyWith(isProcessing: true);
@@ -4328,8 +4394,13 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
_log.d('Concurrent downloads: ${state.concurrentDownloads}');
await _processQueueParallel();
final stoppedWhilePaused = state.isPaused;
final keepConnectivityMonitoring =
stoppedWhilePaused && _networkPausedByWifiOnly;
_stopProgressPolling();
if (!keepConnectivityMonitoring) {
_stopConnectivityMonitoring();
}
if (Platform.isAndroid) {
try {
@@ -5016,7 +5087,9 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
final preferredOutputExt = _extensionPreferredOutputExt(actualService);
final shouldPreserveNativeM4a =
preferredOutputExt == '.m4a' ||
_extensionPreservesNativeOutputExt(actualService, '.m4a');
preferredOutputExt == '.mp4' ||
_extensionPreservesNativeOutputExt(actualService, '.m4a') ||
_extensionPreservesNativeOutputExt(actualService, '.mp4');
final decryptionDescriptor =
DownloadDecryptionDescriptor.fromDownloadResult(result);
trackToDownload = _buildTrackForMetadataEmbedding(
@@ -5070,7 +5143,13 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
final decryptedExt = dotIndex >= 0
? decryptedTempPath.substring(dotIndex).toLowerCase()
: '.flac';
final allowedExt = <String>{'.flac', '.m4a', '.mp3', '.opus'};
final allowedExt = <String>{
'.flac',
'.m4a',
'.mp4',
'.mp3',
'.opus',
};
final finalExt = allowedExt.contains(decryptedExt)
? decryptedExt
: '.flac';
@@ -5275,8 +5354,14 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
}
}
}
} else if (shouldPreserveNativeM4a) {
_log.d('M4A file detected (SAF), preserving native container...');
} else if (shouldPreserveNativeM4a ||
currentFilePath.toLowerCase().endsWith('.mp4') ||
decryptionDescriptor != null) {
// Decrypted streams are already in their final format.
// Converting e.g. eac3 M4A to FLAC would produce fake upscaled output.
_log.d(
'M4A/MP4 file detected (SAF), preserving native container...',
);
final tempPath = await _copySafToTemp(currentFilePath);
if (tempPath != null) {
try {
@@ -5307,12 +5392,16 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
);
}
final newFileName = '${safBaseName ?? 'track'}.m4a';
final preserveExt =
currentFilePath.toLowerCase().endsWith('.mp4')
? '.mp4'
: '.m4a';
final newFileName = '${safBaseName ?? 'track'}$preserveExt';
final newUri = await _writeTempToSaf(
treeUri: settings.downloadTreeUri,
relativeDir: effectiveOutputDir,
fileName: newFileName,
mimeType: _mimeTypeForExt('.m4a'),
mimeType: _mimeTypeForExt(preserveExt),
srcPath: tempPath,
);
@@ -5488,8 +5577,10 @@ class DownloadQueueNotifier extends Notifier<DownloadQueueState> {
_log.w('M4A conversion process failed: $e, keeping M4A file');
actualQuality = 'AAC 320kbps';
}
} else if (shouldPreserveNativeM4a) {
_log.d('M4A file detected, preserving native container...');
} else if (shouldPreserveNativeM4a ||
currentFilePath.toLowerCase().endsWith('.mp4') ||
decryptionDescriptor != null) {
_log.d('M4A/MP4 file detected, preserving native container...');
try {
var targetPath = currentFilePath;
@@ -6212,31 +6303,38 @@ final downloadQueueProvider =
class DownloadQueueLookup {
final Map<String, DownloadItem> byTrackId;
final Map<String, DownloadItem> byItemId;
final Map<String, int> indexByItemId;
final List<String> itemIds;
const DownloadQueueLookup.empty()
: byTrackId = const {},
byItemId = const {},
indexByItemId = const {},
itemIds = const [];
DownloadQueueLookup._({
required this.byTrackId,
required this.byItemId,
required this.indexByItemId,
required this.itemIds,
});
factory DownloadQueueLookup.fromItems(List<DownloadItem> items) {
final byTrackId = <String, DownloadItem>{};
final byItemId = <String, DownloadItem>{};
final indexByItemId = <String, int>{};
final itemIds = <String>[];
for (final item in items) {
for (var index = 0; index < items.length; index++) {
final item = items[index];
byTrackId.putIfAbsent(item.track.id, () => item);
byItemId[item.id] = item;
indexByItemId[item.id] = index;
itemIds.add(item.id);
}
return DownloadQueueLookup._(
byTrackId: byTrackId,
byItemId: byItemId,
indexByItemId: indexByItemId,
itemIds: itemIds,
);
}
@@ -6247,7 +6345,8 @@ class DownloadQueueLookup {
required Iterable<int> changedIndices,
}) {
if (previousItems.length != nextItems.length ||
itemIds.length != nextItems.length) {
itemIds.length != nextItems.length ||
indexByItemId.length != nextItems.length) {
return DownloadQueueLookup.fromItems(nextItems);
}
@@ -6280,6 +6379,7 @@ class DownloadQueueLookup {
return DownloadQueueLookup._(
byTrackId: nextByTrackId ?? byTrackId,
byItemId: nextByItemId,
indexByItemId: indexByItemId,
itemIds: itemIds,
);
}
+123 -31
View File
@@ -363,6 +363,8 @@ class FFmpegService {
? '.mp3'
: inputPath.toLowerCase().endsWith('.opus')
? '.opus'
: inputPath.toLowerCase().endsWith('.mp4')
? '.mp4'
: '.flac';
}
@@ -431,6 +433,24 @@ class FFmpegService {
}
}
// Second fallback: use .mp4 (mp4 muxer) for codecs not supported by
// the ipod muxer (e.g. eac3/Dolby Digital Plus, mha1/Dolby Atmos).
if (!result.success &&
(preferredExt == '.flac' || preferredExt == '.m4a')) {
final mp4FallbackOutput = _buildOutputPath(inputPath, '.mp4');
final mp4FallbackResult = await _execute(
buildDecryptCommand(
mp4FallbackOutput,
mapAudioOnly: false,
key: keyCandidate,
),
);
if (mp4FallbackResult.success) {
tempOutput = mp4FallbackOutput;
result = mp4FallbackResult;
}
}
if (result.success) {
decryptSucceeded = true;
lastResult = result;
@@ -1291,6 +1311,80 @@ class FFmpegService {
}) async {
final tempDir = await getTemporaryDirectory();
final tempOutput = _nextTempEmbedPath(tempDir.path, '.mp3');
// Try with -c:a copy first (fastest, preserves original codec)
var result = await _runMp3Embed(
mp3Path: mp3Path,
tempOutput: tempOutput,
coverPath: coverPath,
metadata: metadata,
preserveMetadata: preserveMetadata,
audioCodec: 'copy',
);
if (result.success) {
return await _finalizeMp3Embed(mp3Path, tempOutput);
}
// If copy failed (e.g. AAC/Opus in .mp3 container), re-encode to real MP3
final output = result.output;
if (output.contains('Invalid audio stream') ||
output.contains('incorrect codec parameters')) {
_log.w('MP3 copy failed (codec mismatch), re-encoding with libmp3lame');
// Clean up failed temp file
try {
final tempFile = File(tempOutput);
if (await tempFile.exists()) await tempFile.delete();
} catch (_) {}
final reencodeOutput = _nextTempEmbedPath(tempDir.path, '.mp3');
result = await _runMp3Embed(
mp3Path: mp3Path,
tempOutput: reencodeOutput,
coverPath: coverPath,
metadata: metadata,
preserveMetadata: preserveMetadata,
audioCodec: 'libmp3lame',
audioBitrate: '192k', // AAC 128kbps ≈ MP3 192kbps equivalent
);
if (result.success) {
return await _finalizeMp3Embed(mp3Path, reencodeOutput);
}
// Clean up re-encode temp file
try {
final tempFile = File(reencodeOutput);
if (await tempFile.exists()) await tempFile.delete();
} catch (_) {}
_log.e('MP3 re-encode also failed: ${result.output}');
return null;
}
// Clean up temp file for other failures
try {
final tempFile = File(tempOutput);
if (await tempFile.exists()) await tempFile.delete();
} catch (e) {
_log.w('Failed to cleanup temp MP3 file: $e');
}
_log.e('MP3 Metadata/Cover embed failed: ${result.output}');
return null;
}
/// Build and execute FFmpeg arguments for MP3 metadata embedding.
static Future<FFmpegResult> _runMp3Embed({
required String mp3Path,
required String tempOutput,
String? coverPath,
Map<String, String>? metadata,
bool preserveMetadata = false,
required String audioCodec,
String? audioBitrate,
}) async {
final arguments = <String>['-v', 'error', '-hide_banner', '-i', mp3Path];
if (coverPath != null) {
@@ -1321,7 +1415,13 @@ class FFmpegService {
arguments
..add('-c:a')
..add('copy');
..add(audioCodec);
if (audioBitrate != null) {
arguments
..add('-b:a')
..add(audioBitrate);
}
if (metadata != null) {
_appendMappedMetadataToArguments(arguments, _convertToId3Tags(metadata));
@@ -1333,44 +1433,36 @@ class FFmpegService {
..add(tempOutput)
..add('-y');
_log.d('Executing FFmpeg MP3 embed command');
final result = await _executeWithArguments(arguments);
if (result.success) {
try {
final tempFile = File(tempOutput);
final originalFile = File(mp3Path);
if (await tempFile.exists()) {
if (await originalFile.exists()) {
await originalFile.delete();
}
await tempFile.copy(mp3Path);
await tempFile.delete();
_log.d('MP3 metadata embedded successfully');
return mp3Path;
} else {
_log.e('Temp MP3 output file not found: $tempOutput');
return null;
}
} catch (e) {
_log.e('Failed to replace MP3 file after metadata embed: $e');
return null;
}
}
_log.d('Executing FFmpeg MP3 embed command (codec: $audioCodec)');
return await _executeWithArguments(arguments);
}
/// Finalize MP3 embed by replacing the original file with the temp output.
static Future<String?> _finalizeMp3Embed(
String mp3Path,
String tempOutput,
) async {
try {
final tempFile = File(tempOutput);
final originalFile = File(mp3Path);
if (await tempFile.exists()) {
if (await originalFile.exists()) {
await originalFile.delete();
}
await tempFile.copy(mp3Path);
await tempFile.delete();
_log.d('MP3 metadata embedded successfully');
return mp3Path;
} else {
_log.e('Temp MP3 output file not found: $tempOutput');
return null;
}
} catch (e) {
_log.w('Failed to cleanup temp MP3 file: $e');
_log.e('Failed to replace MP3 file after metadata embed: $e');
return null;
}
_log.e('MP3 Metadata/Cover embed failed: ${result.output}');
return null;
}
static Future<String?> embedMetadataToOpus({