From 962660d674e29b6e16661ca5a05c9878b038f8f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=A4felfinger?= Date: Sun, 7 Apr 2019 23:31:54 +0200 Subject: [PATCH] made file change detection run as go routines and parallel --- internal/pkg/images/synchronizeLocalFiles.go | 45 ++++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/internal/pkg/images/synchronizeLocalFiles.go b/internal/pkg/images/synchronizeLocalFiles.go index 89d1794..7e3b873 100644 --- a/internal/pkg/images/synchronizeLocalFiles.go +++ b/internal/pkg/images/synchronizeLocalFiles.go @@ -11,6 +11,9 @@ import ( "github.com/sirupsen/logrus" "os" "path/filepath" + "runtime" + "sync" + "time" ) type fileChecksumCalculator func(filePath string) (string, error) @@ -39,12 +42,42 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local logrus.Debug("Entering synchronizeLocalImageMetadataScanNewFiles") defer logrus.Debug("Leaving synchronizeLocalImageMetadataScanNewFiles") + workQueue := make(chan localFileStructure.FilesystemNode, 128) + + wg := sync.WaitGroup{} + + wg.Add(1) + logrus.Debug("Starting change detection producer") + go checkFileForChangesProducer(fileSystemNodes, workQueue, &wg) + + for i := 0; i < runtime.NumCPU()-1; i++ { + logrus.Debugf("Starting image change detection worker %d", i) + wg.Add(1) + go checkFileForChangesWorker(workQueue, &wg, imageDb, categoryDb, checksumCalculator) + } + + wg.Wait() + return nil +} + +func checkFileForChangesProducer(fileSystemNodes map[string]*localFileStructure.FilesystemNode, workQueue chan<- localFileStructure.FilesystemNode, waitGroup *sync.WaitGroup) { for _, file := range fileSystemNodes { + workQueue <- *file + } + waitGroup.Done() + close(workQueue) +} + +func checkFileForChangesWorker(workQueue <-chan localFileStructure.FilesystemNode, waitGroup *sync.WaitGroup, imageDb datastore.ImageMetadataProvider, categoryDb datastore.CategoryProvider, checksumCalculator fileChecksumCalculator) { + for file := range workQueue { if file.IsDir { // we are only interested in files not directories + logrus.Tracef("Skipping file check as %s is a directory", file.Path) continue } + startTime := time.Now() + metadata, err := imageDb.ImageMetadata(file.Path) if err == datastore.ErrorRecordNotFound { logrus.Debugf("Creating new metadata entry for %s.", file.Path) @@ -62,10 +95,10 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local } else if err != nil { logrus.Errorf("Could not get metadata due to trouble. Cancelling - %s", err) - return err + continue } - if fileDidNotChange(&metadata, file) { + if fileDidNotChange(&metadata, &file) { logrus.Debugf("No changes found for file %s", file.Path) continue } @@ -81,10 +114,14 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local err = imageDb.SaveImageMetadata(metadata) if err != nil { - return err + logrus.Errorf("Error during save of metadata of %s - %s", file.Path, err) } + + stopTime := time.Now() + usedTime := stopTime.Sub(startTime) + logrus.Debugf("Processing file %s took %f s", file.Path, usedTime.Seconds()) } - return nil + waitGroup.Done() } func synchronizeLocalImageMetadataFindFilesToDelete(imageDb datastore.ImageMetadataProvider) error {