From 1d0c4acc6c2ead538b8c7d1b0770d37b3d5fed06 Mon Sep 17 00:00:00 2001 From: Sam Fredrickson Date: Sat, 2 Dec 2023 18:08:29 -0800 Subject: [PATCH] Initial commit. --- .gitignore | 3 + .vscode/settings.json | 5 + Makefile | 13 +++ README.md | 66 ++++++++++++ datashake.go | 228 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 3 + 6 files changed, 318 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 Makefile create mode 100644 README.md create mode 100644 datashake.go create mode 100644 go.mod diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aaa34ed --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/datashake +/release +datashake.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..907e121 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "datashake" + ] +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0a627c9 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +define build + GOOS=$(1) GOARCH=$(2) \ + go build \ + -o release/$(1)-$(2)/datashake +endef + +.PHONY: all +all: + $(call build,linux,amd64) + +.PHONY: clean +clean: + $(RM) -r release diff --git a/README.md b/README.md new file mode 100644 index 0000000..45f589d --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +# datashake - level out zpools by rewriting files + +## Background + +I have a niche problem: my storage server's ZFS pool is lumpy! + +``` +NAME SIZE ALLOC FREE FRAG CAP HEALTH +zones 32.6T 12.2T 20.4T 3% 37% ONLINE + mirror 3.62T 2.21T 1.41T 5% 61.1% ONLINE + c0t5000CCA25DE8EBF4d0 - - - - - ONLINE + c0t5000CCA25DEEC08Ad0 - - - - - ONLINE + mirror 3.62T 2.22T 1.40T 6% 61.3% ONLINE + c0t5000CCA25DE6FD92d0 - - - - - ONLINE + c0t5000CCA25DEEC738d0 - - - - - ONLINE + mirror 3.62T 2.28T 1.34T 6% 63.0% ONLINE + c0t5000CCA25DEAA3EEd0 - - - - - ONLINE + c0t5000CCA25DE6F42Ed0 - - - - - ONLINE + mirror 3.62T 2.29T 1.33T 5% 63.2% ONLINE + c0t5000CCA25DE9DB9Dd0 - - - - - ONLINE + c0t5000CCA25DEED5B7d0 - - - - - ONLINE + mirror 3.62T 2.29T 1.34T 5% 63.1% ONLINE + c0t5000CCA25DEB0F42d0 - - - - - ONLINE + c0t5000CCA25DEECB9Dd0 - - - - - ONLINE + mirror 3.62T 237G 3.39T 1% 6.38% ONLINE + c0t5000CCA24CF36876d0 - - - - - ONLINE + c0t5000CCA249D4AA59d0 - - - - - ONLINE + mirror 3.62T 236G 3.39T 0% 6.36% ONLINE + c0t5000CCA24CE9D1CAd0 - - - - - ONLINE + c0t5000CCA24CE954D2d0 - - - - - ONLINE + mirror 3.62T 228G 3.40T 0% 6.13% ONLINE + c0t5000CCA24CE8C60Ed0 - - - - - ONLINE + c0t5000CCA24CE9D249d0 - - - - - ONLINE + mirror 3.62T 220G 3.41T 0% 5.93% ONLINE + c0t5000CCA24CF80849d0 - - - - - ONLINE + c0t5000CCA24CF80838d0 - - - - - ONLINE +``` + +You can probably guess what happened: I had a zpool with five mirrors, and then +expanded it by adding four more mirrors. ZFS doesn't automatically rebalance +existing data, but does skew writes of new data so that more go to the newer +mirrors. + +To rebalance the data manually, the algorithm is straightforward: + + for file in dataset, + * copy the file to a temporary directory in another dataset + * delete the original file + * copy from the temporary directory to recreate the original file + * delete the temporary directory + +As the files get rewritten, not only do the newer mirrors get more full, but +also the older mirrors free up space. Eventually, the utilization of all mirrors +should converge. + +## Solution + +The `datashake` program aims to automate the rebalancing process, while also +adding some robustness and heuristics. + +* Gracefully handle shutdowns (e.g. Ctrl-c) to prevent files from getting lost. +* Keep track of processed files, so that if the program stops and resumes, it + can skip those files. +* Write a journal of operations so that, if shut down ungracefully, files in + the temporary directory can be identified and recovered. +* Don't bother processing really small files. diff --git a/datashake.go b/datashake.go new file mode 100644 index 0000000..95bc91c --- /dev/null +++ b/datashake.go @@ -0,0 +1,228 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "io/fs" + "os" + "os/signal" + "path/filepath" + "sync/atomic" + "syscall" +) + +func main() { + flag.Parse() + + if err := loadDb(); err != nil { + fmt.Println("error", err) + os.Exit(1) + } + + running.Store(true) + go func() { + if err := filepath.WalkDir(*sourceDir, process); err != nil { + errors <- err + } + close(errors) + }() + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + +Loop: + for { + select { + case err, ok := <-errors: + if ok { + fmt.Println("error:", err) + } else { + break Loop + } + case <-signals: + running.Store(false) + } + } + + if err := saveDb(); err != nil { + fmt.Println("error", err) + os.Exit(1) + } +} + +var sourceDir = flag.String("source", "", "source data directory") +var tempDir = flag.String("temp", "", "temporary storage directory") +var dbPath = flag.String("db", "datashake.json", "database file path") +var minimumSize = flag.Int64("min-size", 1024*1024, "minimum size in bytes") + +var errors = make(chan error) +var db = DB{ + Processed: make(map[string]struct{}), +} +var running atomic.Bool + +func loadDb() error { + if *dbPath == "" { + return nil + } + dbFile, err := os.Open(*dbPath) + if err != nil { + return nil + } + defer func() { + _ = dbFile.Close() + }() + d := json.NewDecoder(dbFile) + err = d.Decode(&db) + return err +} + +func saveDb() error { + if *dbPath == "" { + return nil + } + dbFile, err := os.Create(*dbPath) + if err != nil { + return err + } + defer func() { + _ = dbFile.Close() + }() + e := json.NewEncoder(dbFile) + err = e.Encode(&db) + return err +} + +// process is a visitor for `filepath.WalkDir` that implements the rebalancing +// algorithm. +// +// This function never returns an error, since that would stop the directory +// directory walk. Instead, any errors are sent to the `errors` channel. +func process(path string, d fs.DirEntry, err error) (alwaysNil error) { + if !running.Load() { + return fs.SkipAll + } + + defer func() { + if err != nil { + errors <- err + } + }() + + if err != nil || d.IsDir() { + return + } + + srcFileName := d.Name() + srcFilePath, err := filepath.Abs(path) + if err != nil { + return + } + if db.Contains(srcFilePath) { + return + } + srcStat, err := os.Stat(srcFilePath) + if err != nil { + return + } + if srcStat.Size() < *minimumSize { + return + } + + tempDirPath, err := os.MkdirTemp(*tempDir, "*") + if err != nil { + return + } + tempFilePath := filepath.Join(tempDirPath, srcFileName) + safeToRemoveTemp := true + defer func() { + if !safeToRemoveTemp { + err := fmt.Errorf( + "%s may be lost in %s", + srcFilePath, tempDirPath, + ) + errors <- err + return + } + if err := os.RemoveAll(tempDirPath); err != nil { + errors <- err + } + }() + + err = copy(srcFilePath, tempFilePath) + if err != nil { + return + } + + safeToRemoveTemp = false + err = os.Remove(srcFilePath) + if err != nil { + return + } + + err = copy(tempFilePath, srcFilePath) + if err != nil { + return + } + safeToRemoveTemp = true + db.Remember(srcFilePath) + + return +} + +// copy opens the file from the source path, then creates a copy of it at the +// destination path. The mode, uid and gid bits from the source file are +// replicated in the copy. +func copy(srcPath, dstPath string) error { + fmt.Println("copying", srcPath, "to", dstPath) + srcFile, err := os.Open(srcPath) + if err != nil { + return err + } + defer func() { + _ = srcFile.Close() + }() + + dstFile, err := os.Create(dstPath) + if err != nil { + return err + } + defer func() { + _ = dstFile.Close() + }() + + srcStat, err := os.Stat(srcPath) + if err != nil { + return err + } + err = os.Chmod(dstPath, srcStat.Mode()) + if err != nil { + return err + } + if sysStat, ok := srcStat.Sys().(*syscall.Stat_t); ok { + uid := int(sysStat.Uid) + gid := int(sysStat.Gid) + err = os.Chown(dstPath, uid, gid) + if err != nil { + return err + } + } + + _, err = io.Copy(dstFile, srcFile) + return err +} + +type DB struct { + Processed map[string]struct{} +} + +func (db *DB) Contains(path string) bool { + _, ok := db.Processed[path] + return ok +} + +func (db *DB) Remember(path string) { + db.Processed[path] = struct{}{} +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..333f7ed --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module gogs.humancabbage.net/datashake + +go 1.21