From dc117d703b1c59db2732b40fe9b624a967792ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=A4felfinger?= Date: Mon, 11 Sep 2023 16:31:23 +0200 Subject: [PATCH] makes file indexing run in parallel --- PiwigoDirectorySync/Commands/ScanCommand.cs | 17 ++- PiwigoDirectorySync/Services/FileIndexer.cs | 113 ++++++++++-------- .../Services/FileSystemScanner.cs | 13 +- 3 files changed, 80 insertions(+), 63 deletions(-) diff --git a/PiwigoDirectorySync/Commands/ScanCommand.cs b/PiwigoDirectorySync/Commands/ScanCommand.cs index 73aed77..208596a 100644 --- a/PiwigoDirectorySync/Commands/ScanCommand.cs +++ b/PiwigoDirectorySync/Commands/ScanCommand.cs @@ -5,6 +5,7 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging; using PiwigoDirectorySync.Infrastructure; using PiwigoDirectorySync.Services; +using Spectre.Console; using Spectre.Console.Cli; namespace PiwigoDirectorySync.Commands; @@ -31,25 +32,23 @@ internal class ScanCommand : CancellableAsyncCommand internal static async Task ScanDirectory(ILogger logger, IFileIndexer fileIndexer, IFileSystemScanner fileSystemScanner, int piwigoServerId, CancellationToken ct) { //TODO: check files for deletion -> files in db but no longer exist - logger.LogInformation("Starting scanner and remover"); var stopWatch = Stopwatch.StartNew(); - - var fileQueue = Channel.CreateUnbounded(); var indexerTask = fileIndexer.StartProcessingAsync(fileQueue, piwigoServerId, ct); - await fileSystemScanner.ScanAsync(fileQueue, piwigoServerId, ct); - - fileQueue.Writer.Complete(); - - await Task.WhenAll(fileQueue.Reader.Completion, indexerTask); + var scannerTask = fileSystemScanner.ScanAsync(fileQueue, piwigoServerId, ct); + await Task.WhenAll(scannerTask, indexerTask); stopWatch.Stop(); logger.LogInformation("Processed {IndexerTotalFilesScanned} image files in {ElapsedTotalSeconds} seconds", fileIndexer.TotalFilesScanned, stopWatch.Elapsed.TotalSeconds); - //TODO: write failed files to log + foreach (var failedFilePath in fileIndexer.FailedFiles) + { + AnsiConsole.MarkupLine($"[red]Failed to index file {failedFilePath}[/]"); + logger.LogError("Failed to index file {FailedFilePath}", failedFilePath); + } } [SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global")] diff --git a/PiwigoDirectorySync/Services/FileIndexer.cs b/PiwigoDirectorySync/Services/FileIndexer.cs index 26a71c0..7814920 100644 --- a/PiwigoDirectorySync/Services/FileIndexer.cs +++ b/PiwigoDirectorySync/Services/FileIndexer.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using PiwigoDirectorySync.Infrastructure; using PiwigoDirectorySync.Persistence; @@ -10,12 +11,12 @@ internal class FileIndexer : IFileIndexer { private readonly IList _failedFiles = new List(); private readonly ILogger _logger; - private readonly PersistenceContext _persistenceContext; + private readonly IServiceProvider _serviceProvider; - public FileIndexer(ILogger logger, PersistenceContext persistenceContext) + public FileIndexer(ILogger logger, IServiceProvider serviceProvider) { _logger = logger; - _persistenceContext = persistenceContext; + _serviceProvider = serviceProvider; } public int TotalFilesScanned { get; private set; } @@ -23,58 +24,68 @@ internal class FileIndexer : IFileIndexer public async Task StartProcessingAsync(Channel fileQueue, int piwigoServerId, CancellationToken ct) { - var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct); - - await foreach (var fullFilePath in fileQueue.Reader.ReadAllAsync(ct)) + var parallelOptions = new ParallelOptions { - try + MaxDegreeOfParallelism = Environment.ProcessorCount, + CancellationToken = ct + }; + await Parallel.ForEachAsync(fileQueue.Reader.ReadAllAsync(ct), parallelOptions, + async (fullFilePath, token) => { await ProcessFileAsync(fullFilePath, piwigoServerId, token); }); + } + + private async Task ProcessFileAsync(string fullFilePath, int piwigoServerId, CancellationToken ct) + { + try + { + if (ct.IsCancellationRequested) { - if (ct.IsCancellationRequested) - { - _logger.LogWarning("Indexing cancelled"); - break; - } - - _logger.LogInformation("Indexing file {FullFilePath}", fullFilePath); - var fileInfo = new FileInfo(fullFilePath); - - if (!fileInfo.Exists) - { - _logger.LogWarning("File {FullFilePath} not found", fullFilePath); - _failedFiles.Add(fullFilePath); - continue; - } - - var relativePath = Path.GetRelativePath(piwigoServer.RootDirectory, fullFilePath); - - var album = await GetOrAddAlbumAsync(piwigoServer, fileInfo.Directory!, ct); - - var image = await GetOrAddImageAsync(album, relativePath, ct); - - if (image.LastChange != fileInfo.LastWriteTimeUtc) - { - image.UploadRequired = true; - image.Md5Sum = await FilesystemHelpers.CalculateMd5SumAsync(fullFilePath, ct); - } - - image.DeleteRequired = false; - image.LastChange = fileInfo.LastWriteTimeUtc; - - await _persistenceContext.SaveChangesAsync(ct); - - TotalFilesScanned++; + _logger.LogWarning("Indexing cancelled"); + return; } - catch (Exception ex) + + _logger.LogInformation("Indexing file {FullFilePath}", fullFilePath); + var fileInfo = new FileInfo(fullFilePath); + + if (!fileInfo.Exists) { + _logger.LogWarning("File {FullFilePath} not found", fullFilePath); _failedFiles.Add(fullFilePath); - _logger.LogError(ex, "could not delete file {FullFilePath}", fullFilePath); + return; } + + await using var scope = _serviceProvider.CreateAsyncScope(); + await using var db = scope.ServiceProvider.GetRequiredService(); + var piwigoServer = await db.PiwigoServers.GetByIdAsync(piwigoServerId, ct); + + var relativePath = Path.GetRelativePath(piwigoServer.RootDirectory, fullFilePath); + + var album = await GetOrAddAlbumAsync(db, piwigoServer, fileInfo.Directory!, ct); + + var image = await GetOrAddImageAsync(db, album, relativePath, ct); + + if (image.LastChange != fileInfo.LastWriteTimeUtc) + { + image.UploadRequired = true; + image.Md5Sum = await FilesystemHelpers.CalculateMd5SumAsync(fullFilePath, ct); + } + + image.DeleteRequired = false; + image.LastChange = fileInfo.LastWriteTimeUtc; + + await db.SaveChangesAsync(ct); + + TotalFilesScanned++; + } + catch (Exception ex) + { + _failedFiles.Add(fullFilePath); + _logger.LogError(ex, "could not index file {FullFilePath}", fullFilePath); } } - private async Task GetOrAddImageAsync(AlbumEntity album, string relativePath, CancellationToken ct) + private static async Task GetOrAddImageAsync(PersistenceContext db, AlbumEntity album, string relativePath, CancellationToken ct) { - var imageEntity = await _persistenceContext.PiwigoImages.Where(i => i.AlbumId == album.Id && i.FilePath == relativePath).FirstOrDefaultAsync(ct); + var imageEntity = await db.PiwigoImages.Where(i => i.AlbumId == album.Id && i.FilePath == relativePath).FirstOrDefaultAsync(ct); if (imageEntity is null) { imageEntity = new ImageEntity @@ -85,16 +96,16 @@ internal class FileIndexer : IFileIndexer UploadRequired = true, DeleteRequired = false }; - _persistenceContext.PiwigoImages.Add(imageEntity); + db.PiwigoImages.Add(imageEntity); } return imageEntity; } - private async Task GetOrAddAlbumAsync(ServerEntity server, DirectoryInfo directory, CancellationToken ct) + private static async Task GetOrAddAlbumAsync(PersistenceContext db, ServerEntity server, DirectoryInfo directory, CancellationToken ct) { var albumPath = Path.GetRelativePath(server.RootDirectory, directory.FullName); - var album = await _persistenceContext.PiwigoAlbums.FindByServerAndPathAsync(server.Id, albumPath, ct); + var album = await db.PiwigoAlbums.FindByServerAndPathAsync(server.Id, albumPath, ct); if (album != null) { return album; @@ -107,7 +118,7 @@ internal class FileIndexer : IFileIndexer } else { - parentAlbum = await GetOrAddAlbumAsync(server, directory.Parent!, ct); + parentAlbum = await GetOrAddAlbumAsync(db, server, directory.Parent!, ct); } album = new AlbumEntity @@ -119,9 +130,9 @@ internal class FileIndexer : IFileIndexer ParentId = parentAlbum?.Id, Parent = parentAlbum }; - _persistenceContext.PiwigoAlbums.Add(album); + db.PiwigoAlbums.Add(album); - await _persistenceContext.SaveChangesAsync(ct); + await db.SaveChangesAsync(ct); return album; } diff --git a/PiwigoDirectorySync/Services/FileSystemScanner.cs b/PiwigoDirectorySync/Services/FileSystemScanner.cs index a7e235b..ba44118 100644 --- a/PiwigoDirectorySync/Services/FileSystemScanner.cs +++ b/PiwigoDirectorySync/Services/FileSystemScanner.cs @@ -18,10 +18,17 @@ internal class FileSystemScanner : IFileSystemScanner public async Task ScanAsync(Channel fileQueue, int piwigoServerId, CancellationToken ct) { - var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct); - _logger.LogInformation("Scanning files for piwigo server {PiwigoServerName} in directory {PiwigoServerRootDirectory}", piwigoServer.Name, piwigoServer.RootDirectory); + try + { + var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct); + _logger.LogInformation("Scanning files for piwigo server {PiwigoServerName} in directory {PiwigoServerRootDirectory}", piwigoServer.Name, piwigoServer.RootDirectory); - await ScanRootDirectory(fileQueue, new DirectoryInfo(piwigoServer.RootDirectory), ct); + await ScanRootDirectory(fileQueue, new DirectoryInfo(piwigoServer.RootDirectory), ct); + } + finally + { + fileQueue.Writer.Complete(); + } } private async ValueTask ScanRootDirectory(Channel fileQueue, DirectoryInfo directory, CancellationToken ct)