From e1c1f9578a16e33d3e4d85ae5f076abfca4fdf31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=A4felfinger?= Date: Sun, 3 Mar 2019 22:35:24 +0100 Subject: [PATCH] made local image preparations use channels and go routines to speed it up and use all cpu cores to calculate the md5 sums --- internal/pkg/localFileStructure/imageList.go | 100 ++++++++++++++++--- 1 file changed, 86 insertions(+), 14 deletions(-) diff --git a/internal/pkg/localFileStructure/imageList.go b/internal/pkg/localFileStructure/imageList.go index 72ae3e0..6d3c2a1 100644 --- a/internal/pkg/localFileStructure/imageList.go +++ b/internal/pkg/localFileStructure/imageList.go @@ -3,33 +3,105 @@ package localFileStructure import ( "github.com/sirupsen/logrus" "path/filepath" + "runtime" + "sync" "time" ) func GetImageList(fileSystem map[string]*FilesystemNode) ([]*ImageNode, error) { + logrus.Debugln("Starting GetImageList to prepare local image metadata.") + imageFiles := make([]*ImageNode, 0, len(fileSystem)) - for _, file := range fileSystem { - if file.IsDir { - continue - } + finished := make(chan bool, 1) + errChannel := make(chan error, 1) + queue := make(chan *FilesystemNode, 100) + results := make(chan *ImageNode, 200) + waitGroup := sync.WaitGroup{} - md5sum, err := calculateFileCheckSums(file.Path) + go resultCollector(results, &imageFiles) + + waitGroup.Add(1) + go queueProducer(fileSystem, queue, &waitGroup) + + numberOfCPUs := runtime.NumCPU() + for i := 0; i < numberOfCPUs; i++ { + logrus.Tracef("Starting getImageNodeWorker number %d", i) + waitGroup.Add(1) + go getImageNodeWorker(queue, results, errChannel, &waitGroup) + } + + go func() { + waitGroup.Wait() + + logrus.Debugln("All workers finished processing, closing channels.") + + close(results) + close(finished) + }() + + select { + case <-finished: + case err := <-errChannel: if err != nil { + logrus.Errorf("Error during local image processing: %S", err) return nil, err } - - logrus.Debugf("Local Image %s - %s - %s", md5sum, file.ModTime.Format(time.RFC3339), file.Path) - - imageFiles = append(imageFiles, &ImageNode{ - Path: file.Path, - CategoryName: filepath.Dir(file.Key), - ModTime: file.ModTime, - Md5Sum: md5sum, - }) } logrus.Infof("Found %d local images to process", len(imageFiles)) return imageFiles, nil } + +func queueProducer(fileSystem map[string]*FilesystemNode, queue chan *FilesystemNode, waitGroup *sync.WaitGroup) { + logrus.Debugln("Starting queueProducer to fill the queue of the files to check and calculate the checksum") + for _, file := range fileSystem { + if file.IsDir { + continue + } + queue <- file + } + + // after the last item is in the queue, we close it as there will be no more and we like + // the workers to exit. + close(queue) + + logrus.Debugln("Finished queueProducer") + + waitGroup.Done() +} + +func resultCollector(results chan *ImageNode, imageFiles *[]*ImageNode) { + logrus.Debugln("Starting image node result collector") + for imageNode := range results { + logrus.Debugf("Local Image prepared - %s - %s - %s", imageNode.Md5Sum, imageNode.ModTime.Format(time.RFC3339), imageNode.Path) + *imageFiles = append(*imageFiles, imageNode) + } + logrus.Debugln("Finished resultCollector") + +} + +func getImageNodeWorker(queue chan *FilesystemNode, results chan *ImageNode, errChannel chan error, waitGroup *sync.WaitGroup) { + logrus.Debugln("Starting image file worker to gather local image informations") + for file := range queue { + md5sum, err := calculateFileCheckSums(file.Path) + if err != nil { + errChannel <- err + // we try the next image in the queue, as this might be just one error + continue + } + + imageNode := &ImageNode{ + Path: file.Path, + CategoryName: filepath.Dir(file.Key), + ModTime: file.ModTime, + Md5Sum: md5sum, + } + + results <- imageNode + } + + logrus.Debugln("Finished getImageNodeWorker") + waitGroup.Done() +}