makes file indexing run in parallel
All checks were successful
PiwigoDirectorySync/pipeline/head This commit looks good

This commit is contained in:
Philipp Häfelfinger 2023-09-11 16:31:23 +02:00
parent 6f0e5e06a9
commit dc117d703b
3 changed files with 80 additions and 63 deletions

View File

@ -5,6 +5,7 @@ using System.Threading.Channels;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using PiwigoDirectorySync.Infrastructure; using PiwigoDirectorySync.Infrastructure;
using PiwigoDirectorySync.Services; using PiwigoDirectorySync.Services;
using Spectre.Console;
using Spectre.Console.Cli; using Spectre.Console.Cli;
namespace PiwigoDirectorySync.Commands; namespace PiwigoDirectorySync.Commands;
@ -31,25 +32,23 @@ internal class ScanCommand : CancellableAsyncCommand<ScanCommand.ScanSettings>
internal static async Task ScanDirectory(ILogger logger, IFileIndexer fileIndexer, IFileSystemScanner fileSystemScanner, int piwigoServerId, CancellationToken ct) internal static async Task ScanDirectory(ILogger logger, IFileIndexer fileIndexer, IFileSystemScanner fileSystemScanner, int piwigoServerId, CancellationToken ct)
{ {
//TODO: check files for deletion -> files in db but no longer exist //TODO: check files for deletion -> files in db but no longer exist
logger.LogInformation("Starting scanner and remover"); logger.LogInformation("Starting scanner and remover");
var stopWatch = Stopwatch.StartNew(); var stopWatch = Stopwatch.StartNew();
var fileQueue = Channel.CreateUnbounded<string>(); var fileQueue = Channel.CreateUnbounded<string>();
var indexerTask = fileIndexer.StartProcessingAsync(fileQueue, piwigoServerId, ct); var indexerTask = fileIndexer.StartProcessingAsync(fileQueue, piwigoServerId, ct);
await fileSystemScanner.ScanAsync(fileQueue, piwigoServerId, ct); var scannerTask = fileSystemScanner.ScanAsync(fileQueue, piwigoServerId, ct);
await Task.WhenAll(scannerTask, indexerTask);
fileQueue.Writer.Complete();
await Task.WhenAll(fileQueue.Reader.Completion, indexerTask);
stopWatch.Stop(); stopWatch.Stop();
logger.LogInformation("Processed {IndexerTotalFilesScanned} image files in {ElapsedTotalSeconds} seconds", fileIndexer.TotalFilesScanned, stopWatch.Elapsed.TotalSeconds); logger.LogInformation("Processed {IndexerTotalFilesScanned} image files in {ElapsedTotalSeconds} seconds", fileIndexer.TotalFilesScanned, stopWatch.Elapsed.TotalSeconds);
//TODO: write failed files to log foreach (var failedFilePath in fileIndexer.FailedFiles)
{
AnsiConsole.MarkupLine($"[red]Failed to index file {failedFilePath}[/]");
logger.LogError("Failed to index file {FailedFilePath}", failedFilePath);
}
} }
[SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global")] [SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global")]

View File

@ -1,5 +1,6 @@
using System.Threading.Channels; using System.Threading.Channels;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using PiwigoDirectorySync.Infrastructure; using PiwigoDirectorySync.Infrastructure;
using PiwigoDirectorySync.Persistence; using PiwigoDirectorySync.Persistence;
@ -10,12 +11,12 @@ internal class FileIndexer : IFileIndexer
{ {
private readonly IList<string> _failedFiles = new List<string>(); private readonly IList<string> _failedFiles = new List<string>();
private readonly ILogger<FileIndexer> _logger; private readonly ILogger<FileIndexer> _logger;
private readonly PersistenceContext _persistenceContext; private readonly IServiceProvider _serviceProvider;
public FileIndexer(ILogger<FileIndexer> logger, PersistenceContext persistenceContext) public FileIndexer(ILogger<FileIndexer> logger, IServiceProvider serviceProvider)
{ {
_logger = logger; _logger = logger;
_persistenceContext = persistenceContext; _serviceProvider = serviceProvider;
} }
public int TotalFilesScanned { get; private set; } public int TotalFilesScanned { get; private set; }
@ -23,16 +24,23 @@ internal class FileIndexer : IFileIndexer
public async Task StartProcessingAsync(Channel<string> fileQueue, int piwigoServerId, CancellationToken ct) public async Task StartProcessingAsync(Channel<string> fileQueue, int piwigoServerId, CancellationToken ct)
{ {
var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct); var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = ct
};
await Parallel.ForEachAsync(fileQueue.Reader.ReadAllAsync(ct), parallelOptions,
async (fullFilePath, token) => { await ProcessFileAsync(fullFilePath, piwigoServerId, token); });
}
await foreach (var fullFilePath in fileQueue.Reader.ReadAllAsync(ct)) private async Task ProcessFileAsync(string fullFilePath, int piwigoServerId, CancellationToken ct)
{ {
try try
{ {
if (ct.IsCancellationRequested) if (ct.IsCancellationRequested)
{ {
_logger.LogWarning("Indexing cancelled"); _logger.LogWarning("Indexing cancelled");
break; return;
} }
_logger.LogInformation("Indexing file {FullFilePath}", fullFilePath); _logger.LogInformation("Indexing file {FullFilePath}", fullFilePath);
@ -42,14 +50,18 @@ internal class FileIndexer : IFileIndexer
{ {
_logger.LogWarning("File {FullFilePath} not found", fullFilePath); _logger.LogWarning("File {FullFilePath} not found", fullFilePath);
_failedFiles.Add(fullFilePath); _failedFiles.Add(fullFilePath);
continue; return;
} }
await using var scope = _serviceProvider.CreateAsyncScope();
await using var db = scope.ServiceProvider.GetRequiredService<PersistenceContext>();
var piwigoServer = await db.PiwigoServers.GetByIdAsync(piwigoServerId, ct);
var relativePath = Path.GetRelativePath(piwigoServer.RootDirectory, fullFilePath); var relativePath = Path.GetRelativePath(piwigoServer.RootDirectory, fullFilePath);
var album = await GetOrAddAlbumAsync(piwigoServer, fileInfo.Directory!, ct); var album = await GetOrAddAlbumAsync(db, piwigoServer, fileInfo.Directory!, ct);
var image = await GetOrAddImageAsync(album, relativePath, ct); var image = await GetOrAddImageAsync(db, album, relativePath, ct);
if (image.LastChange != fileInfo.LastWriteTimeUtc) if (image.LastChange != fileInfo.LastWriteTimeUtc)
{ {
@ -60,21 +72,20 @@ internal class FileIndexer : IFileIndexer
image.DeleteRequired = false; image.DeleteRequired = false;
image.LastChange = fileInfo.LastWriteTimeUtc; image.LastChange = fileInfo.LastWriteTimeUtc;
await _persistenceContext.SaveChangesAsync(ct); await db.SaveChangesAsync(ct);
TotalFilesScanned++; TotalFilesScanned++;
} }
catch (Exception ex) catch (Exception ex)
{ {
_failedFiles.Add(fullFilePath); _failedFiles.Add(fullFilePath);
_logger.LogError(ex, "could not delete file {FullFilePath}", fullFilePath); _logger.LogError(ex, "could not index file {FullFilePath}", fullFilePath);
}
} }
} }
private async Task<ImageEntity> GetOrAddImageAsync(AlbumEntity album, string relativePath, CancellationToken ct) private static async Task<ImageEntity> GetOrAddImageAsync(PersistenceContext db, AlbumEntity album, string relativePath, CancellationToken ct)
{ {
var imageEntity = await _persistenceContext.PiwigoImages.Where(i => i.AlbumId == album.Id && i.FilePath == relativePath).FirstOrDefaultAsync(ct); var imageEntity = await db.PiwigoImages.Where(i => i.AlbumId == album.Id && i.FilePath == relativePath).FirstOrDefaultAsync(ct);
if (imageEntity is null) if (imageEntity is null)
{ {
imageEntity = new ImageEntity imageEntity = new ImageEntity
@ -85,16 +96,16 @@ internal class FileIndexer : IFileIndexer
UploadRequired = true, UploadRequired = true,
DeleteRequired = false DeleteRequired = false
}; };
_persistenceContext.PiwigoImages.Add(imageEntity); db.PiwigoImages.Add(imageEntity);
} }
return imageEntity; return imageEntity;
} }
private async Task<AlbumEntity> GetOrAddAlbumAsync(ServerEntity server, DirectoryInfo directory, CancellationToken ct) private static async Task<AlbumEntity> GetOrAddAlbumAsync(PersistenceContext db, ServerEntity server, DirectoryInfo directory, CancellationToken ct)
{ {
var albumPath = Path.GetRelativePath(server.RootDirectory, directory.FullName); var albumPath = Path.GetRelativePath(server.RootDirectory, directory.FullName);
var album = await _persistenceContext.PiwigoAlbums.FindByServerAndPathAsync(server.Id, albumPath, ct); var album = await db.PiwigoAlbums.FindByServerAndPathAsync(server.Id, albumPath, ct);
if (album != null) if (album != null)
{ {
return album; return album;
@ -107,7 +118,7 @@ internal class FileIndexer : IFileIndexer
} }
else else
{ {
parentAlbum = await GetOrAddAlbumAsync(server, directory.Parent!, ct); parentAlbum = await GetOrAddAlbumAsync(db, server, directory.Parent!, ct);
} }
album = new AlbumEntity album = new AlbumEntity
@ -119,9 +130,9 @@ internal class FileIndexer : IFileIndexer
ParentId = parentAlbum?.Id, ParentId = parentAlbum?.Id,
Parent = parentAlbum Parent = parentAlbum
}; };
_persistenceContext.PiwigoAlbums.Add(album); db.PiwigoAlbums.Add(album);
await _persistenceContext.SaveChangesAsync(ct); await db.SaveChangesAsync(ct);
return album; return album;
} }

View File

@ -17,12 +17,19 @@ internal class FileSystemScanner : IFileSystemScanner
} }
public async Task ScanAsync(Channel<string> fileQueue, int piwigoServerId, CancellationToken ct) public async Task ScanAsync(Channel<string> fileQueue, int piwigoServerId, CancellationToken ct)
{
try
{ {
var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct); var piwigoServer = await _persistenceContext.PiwigoServers.GetByIdAsync(piwigoServerId, ct);
_logger.LogInformation("Scanning files for piwigo server {PiwigoServerName} in directory {PiwigoServerRootDirectory}", piwigoServer.Name, piwigoServer.RootDirectory); _logger.LogInformation("Scanning files for piwigo server {PiwigoServerName} in directory {PiwigoServerRootDirectory}", piwigoServer.Name, piwigoServer.RootDirectory);
await ScanRootDirectory(fileQueue, new DirectoryInfo(piwigoServer.RootDirectory), ct); await ScanRootDirectory(fileQueue, new DirectoryInfo(piwigoServer.RootDirectory), ct);
} }
finally
{
fileQueue.Writer.Complete();
}
}
private async ValueTask ScanRootDirectory(Channel<string> fileQueue, DirectoryInfo directory, CancellationToken ct) private async ValueTask ScanRootDirectory(Channel<string> fileQueue, DirectoryInfo directory, CancellationToken ct)
{ {