From 62dcffc2c9bf1d96e36ce44c2a8a19bd57ff614d Mon Sep 17 00:00:00 2001 From: Sam Fredrickson Date: Wed, 6 Dec 2023 18:26:41 -0800 Subject: [PATCH] Various improvements. --- datashake.go | 95 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 38 deletions(-) diff --git a/datashake.go b/datashake.go index e2d9c47..3282a8b 100644 --- a/datashake.go +++ b/datashake.go @@ -27,14 +27,14 @@ var ( ) func main() { + flag.Parse() + ctx, stop := signal.NotifyContext( context.Background(), syscall.SIGINT, syscall.SIGTERM, ) defer stop() - flag.Parse() - concurrency := *concurrency if concurrency < 1 { concurrency = runtime.GOMAXPROCS(0) @@ -46,27 +46,30 @@ func main() { os.Exit(1) } - running.Store(true) - go func() { - if err := filepath.WalkDir(*sourceDir, process); err != nil { - fmt.Println("error:", err) - } - pending.Wait() - close(errors) - }() + go run() -Loop: + errors := errors + actions := actions for { select { case err, ok := <-errors: - if ok { - db.Alert(err) - } else { - break Loop + if !ok { + errors = nil + break } + db.Alert(err) + case action, ok := <-actions: + if !ok { + actions = nil + break + } + db.Record(action) case <-ctx.Done(): running.Store(false) } + if errors == nil && actions == nil { + break + } } if err := saveDb(); err != nil { @@ -77,17 +80,29 @@ Loop: var ( running atomic.Bool - tasks *pool.Pool - pending sync.WaitGroup errors = make(chan Error) + actions = make(chan Action) db = DB{ - Processed: make(map[string]struct{}), + Seen: make(map[string]struct{}), } dbLock sync.Mutex ) +// run drives the directory traversal. +func run() { + defer func() { + close(errors) + close(actions) + }() + running.Store(true) + if err := filepath.WalkDir(*sourceDir, process); err != nil { + fmt.Println("error:", err) + } + tasks.Wait() +} + // process is a visitor for `filepath.WalkDir` that performs the rebalancing // algorithm against regular files. // @@ -100,9 +115,7 @@ func process(path string, d fs.DirEntry, err error) (typicallyNil error) { if err != nil || d.IsDir() || !d.Type().IsRegular() { return } - pending.Add(1) tasks.Go(func() { - defer pending.Done() if running.Load() { work(path, d) } @@ -136,7 +149,7 @@ func work(path string, d fs.DirEntry) { if err != nil { return } - if db.Contains(srcFilePath) { + if db.Knows(srcFilePath) { return } srcStat, err := os.Stat(srcFilePath) @@ -155,11 +168,7 @@ func work(path string, d fs.DirEntry) { safeToRemoveTemp := true defer func() { if !safeToRemoveTemp { - err := fmt.Errorf( - "%s may be lost in %s", - srcFilePath, tempDirPath, - ) - reportErr(err) + reportErr(missingFile) return } if err := os.RemoveAll(tempDirPath); err != nil { @@ -186,6 +195,8 @@ func work(path string, d fs.DirEntry) { db.Remember(srcFilePath) } +var missingFile = fmt.Errorf("file may be missing in temp directory") + // copy opens the file from the source path, then creates a copy of it at the // destination path. The mode, uid and gid bits from the source file are // replicated in the copy. @@ -228,47 +239,55 @@ func copy(srcPath, dstPath string) error { if err != nil { return err } - db.Record(Action{ + actions <- Action{ Source: srcPath, Destination: dstPath, - }) + } return nil } -// DB holds a set of files which have been rebalanced. -// -// These files are skipped on future runs of the program. +// DB stores information collected by the program. // // The database is loaded from a JSON file when the program starts and saved // back to that JSON file as the program finishes. type DB struct { - Processed map[string]struct{} - Log []Action - Errors []Error + // Seen is a set of all files which have been successfully re-balanced. + Seen map[string]struct{} + // Log stores every successful copy operation. + Log []Action + // Errors stores details on any error that occurs. + Errors []Error } +// Action details a file copy operation. type Action struct { Source string Destination string } +// Error describes a problem from re-balancing a file. type Error struct { - Message string + // Message contains a string for the underlying error's message. + Message string + // FilePath is the path of the file. FilePath string + // TempPath is the temporary directory. + // + // This may be blank, depending on when the error occurred. TempPath string } -func (db *DB) Contains(path string) bool { +func (db *DB) Knows(path string) bool { dbLock.Lock() defer dbLock.Unlock() - _, ok := db.Processed[path] + _, ok := db.Seen[path] return ok } func (db *DB) Remember(path string) { dbLock.Lock() defer dbLock.Unlock() - db.Processed[path] = struct{}{} + db.Seen[path] = struct{}{} } func (db *DB) Record(a Action) {