made file change detection run as go routines and parallel

This commit is contained in:
Philipp Häfelfinger 2019-04-07 23:31:54 +02:00
parent 56e21b2d90
commit 962660d674
1 changed files with 41 additions and 4 deletions

View File

@ -11,6 +11,9 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync"
"time"
) )
type fileChecksumCalculator func(filePath string) (string, error) type fileChecksumCalculator func(filePath string) (string, error)
@ -39,12 +42,42 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local
logrus.Debug("Entering synchronizeLocalImageMetadataScanNewFiles") logrus.Debug("Entering synchronizeLocalImageMetadataScanNewFiles")
defer logrus.Debug("Leaving synchronizeLocalImageMetadataScanNewFiles") defer logrus.Debug("Leaving synchronizeLocalImageMetadataScanNewFiles")
workQueue := make(chan localFileStructure.FilesystemNode, 128)
wg := sync.WaitGroup{}
wg.Add(1)
logrus.Debug("Starting change detection producer")
go checkFileForChangesProducer(fileSystemNodes, workQueue, &wg)
for i := 0; i < runtime.NumCPU()-1; i++ {
logrus.Debugf("Starting image change detection worker %d", i)
wg.Add(1)
go checkFileForChangesWorker(workQueue, &wg, imageDb, categoryDb, checksumCalculator)
}
wg.Wait()
return nil
}
func checkFileForChangesProducer(fileSystemNodes map[string]*localFileStructure.FilesystemNode, workQueue chan<- localFileStructure.FilesystemNode, waitGroup *sync.WaitGroup) {
for _, file := range fileSystemNodes { for _, file := range fileSystemNodes {
workQueue <- *file
}
waitGroup.Done()
close(workQueue)
}
func checkFileForChangesWorker(workQueue <-chan localFileStructure.FilesystemNode, waitGroup *sync.WaitGroup, imageDb datastore.ImageMetadataProvider, categoryDb datastore.CategoryProvider, checksumCalculator fileChecksumCalculator) {
for file := range workQueue {
if file.IsDir { if file.IsDir {
// we are only interested in files not directories // we are only interested in files not directories
logrus.Tracef("Skipping file check as %s is a directory", file.Path)
continue continue
} }
startTime := time.Now()
metadata, err := imageDb.ImageMetadata(file.Path) metadata, err := imageDb.ImageMetadata(file.Path)
if err == datastore.ErrorRecordNotFound { if err == datastore.ErrorRecordNotFound {
logrus.Debugf("Creating new metadata entry for %s.", file.Path) logrus.Debugf("Creating new metadata entry for %s.", file.Path)
@ -62,10 +95,10 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local
} else if err != nil { } else if err != nil {
logrus.Errorf("Could not get metadata due to trouble. Cancelling - %s", err) logrus.Errorf("Could not get metadata due to trouble. Cancelling - %s", err)
return err continue
} }
if fileDidNotChange(&metadata, file) { if fileDidNotChange(&metadata, &file) {
logrus.Debugf("No changes found for file %s", file.Path) logrus.Debugf("No changes found for file %s", file.Path)
continue continue
} }
@ -81,10 +114,14 @@ func synchronizeLocalImageMetadataScanNewFiles(fileSystemNodes map[string]*local
err = imageDb.SaveImageMetadata(metadata) err = imageDb.SaveImageMetadata(metadata)
if err != nil { if err != nil {
return err logrus.Errorf("Error during save of metadata of %s - %s", file.Path, err)
} }
stopTime := time.Now()
usedTime := stopTime.Sub(startTime)
logrus.Debugf("Processing file %s took %f s", file.Path, usedTime.Seconds())
} }
return nil waitGroup.Done()
} }
func synchronizeLocalImageMetadataFindFilesToDelete(imageDb datastore.ImageMetadataProvider) error { func synchronizeLocalImageMetadataFindFilesToDelete(imageDb datastore.ImageMetadataProvider) error {