From d062632ec887cb74e86f29ae24e1f15a07ef71ff Mon Sep 17 00:00:00 2001 From: Sam Fredrickson Date: Mon, 4 Dec 2023 19:48:27 -0800 Subject: [PATCH] Store copy actions and errors in DB. --- README.md | 12 +++++----- datashake.go | 62 ++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 45f589d..2c73b71 100644 --- a/README.md +++ b/README.md @@ -41,13 +41,13 @@ expanded it by adding four more mirrors. ZFS doesn't automatically rebalance existing data, but does skew writes of new data so that more go to the newer mirrors. -To rebalance the data manually, the algorithm is straightforward: +To rebalance the data, the algorithm is straightforward: - for file in dataset, - * copy the file to a temporary directory in another dataset - * delete the original file - * copy from the temporary directory to recreate the original file - * delete the temporary directory +* for file in dataset, + * copy the file to a temporary directory in another dataset + * delete the original file + * copy from the temporary directory to recreate the original file + * delete the temporary directory As the files get rewritten, not only do the newer mirrors get more full, but also the older mirrors free up space. Eventually, the utilization of all mirrors diff --git a/datashake.go b/datashake.go index f6f67f1..e2d9c47 100644 --- a/datashake.go +++ b/datashake.go @@ -49,7 +49,7 @@ func main() { running.Store(true) go func() { if err := filepath.WalkDir(*sourceDir, process); err != nil { - errors <- err + fmt.Println("error:", err) } pending.Wait() close(errors) @@ -60,7 +60,7 @@ Loop: select { case err, ok := <-errors: if ok { - fmt.Println("error:", err) + db.Alert(err) } else { break Loop } @@ -80,7 +80,7 @@ var ( tasks *pool.Pool pending sync.WaitGroup - errors = make(chan error) + errors = make(chan Error) db = DB{ Processed: make(map[string]struct{}), @@ -113,14 +113,26 @@ func process(path string, d fs.DirEntry, err error) (typicallyNil error) { // work rebalances a single file. func work(path string, d fs.DirEntry) { var err error + var srcFilePath = path + var tempFilePath string + + reportErr := func(reported error) { + e := Error{ + Message: reported.Error(), + FilePath: srcFilePath, + TempPath: tempFilePath, + } + errors <- e + } + defer func() { if err != nil { - errors <- err + reportErr(err) } }() srcFileName := d.Name() - srcFilePath, err := filepath.Abs(path) + srcFilePath, err = filepath.Abs(path) if err != nil { return } @@ -139,7 +151,7 @@ func work(path string, d fs.DirEntry) { if err != nil { return } - tempFilePath := filepath.Join(tempDirPath, srcFileName) + tempFilePath = filepath.Join(tempDirPath, srcFileName) safeToRemoveTemp := true defer func() { if !safeToRemoveTemp { @@ -147,11 +159,11 @@ func work(path string, d fs.DirEntry) { "%s may be lost in %s", srcFilePath, tempDirPath, ) - errors <- err + reportErr(err) return } if err := os.RemoveAll(tempDirPath); err != nil { - errors <- err + reportErr(err) } }() @@ -213,7 +225,14 @@ func copy(srcPath, dstPath string) error { } _, err = io.Copy(dstFile, srcFile) - return err + if err != nil { + return err + } + db.Record(Action{ + Source: srcPath, + Destination: dstPath, + }) + return nil } // DB holds a set of files which have been rebalanced. @@ -224,6 +243,19 @@ func copy(srcPath, dstPath string) error { // back to that JSON file as the program finishes. type DB struct { Processed map[string]struct{} + Log []Action + Errors []Error +} + +type Action struct { + Source string + Destination string +} + +type Error struct { + Message string + FilePath string + TempPath string } func (db *DB) Contains(path string) bool { @@ -239,6 +271,18 @@ func (db *DB) Remember(path string) { db.Processed[path] = struct{}{} } +func (db *DB) Record(a Action) { + dbLock.Lock() + defer dbLock.Unlock() + db.Log = append(db.Log, a) +} + +func (db *DB) Alert(e Error) { + dbLock.Lock() + defer dbLock.Unlock() + db.Errors = append(db.Errors, e) +} + func loadDb() error { if *dbPath == "" { return nil