Various improvements.

This commit is contained in:
Sam Fredrickson 2023-12-06 18:26:41 -08:00
parent d062632ec8
commit 62dcffc2c9

View File

@ -27,14 +27,14 @@ var (
) )
func main() { func main() {
flag.Parse()
ctx, stop := signal.NotifyContext( ctx, stop := signal.NotifyContext(
context.Background(), context.Background(),
syscall.SIGINT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTERM,
) )
defer stop() defer stop()
flag.Parse()
concurrency := *concurrency concurrency := *concurrency
if concurrency < 1 { if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0) concurrency = runtime.GOMAXPROCS(0)
@ -46,27 +46,30 @@ func main() {
os.Exit(1) os.Exit(1)
} }
running.Store(true) go run()
go func() {
if err := filepath.WalkDir(*sourceDir, process); err != nil {
fmt.Println("error:", err)
}
pending.Wait()
close(errors)
}()
Loop: errors := errors
actions := actions
for { for {
select { select {
case err, ok := <-errors: case err, ok := <-errors:
if ok { if !ok {
db.Alert(err) errors = nil
} else { break
break Loop
} }
db.Alert(err)
case action, ok := <-actions:
if !ok {
actions = nil
break
}
db.Record(action)
case <-ctx.Done(): case <-ctx.Done():
running.Store(false) running.Store(false)
} }
if errors == nil && actions == nil {
break
}
} }
if err := saveDb(); err != nil { if err := saveDb(); err != nil {
@ -77,17 +80,29 @@ Loop:
var ( var (
running atomic.Bool running atomic.Bool
tasks *pool.Pool tasks *pool.Pool
pending sync.WaitGroup
errors = make(chan Error) errors = make(chan Error)
actions = make(chan Action)
db = DB{ db = DB{
Processed: make(map[string]struct{}), Seen: make(map[string]struct{}),
} }
dbLock sync.Mutex dbLock sync.Mutex
) )
// run drives the directory traversal.
func run() {
defer func() {
close(errors)
close(actions)
}()
running.Store(true)
if err := filepath.WalkDir(*sourceDir, process); err != nil {
fmt.Println("error:", err)
}
tasks.Wait()
}
// process is a visitor for `filepath.WalkDir` that performs the rebalancing // process is a visitor for `filepath.WalkDir` that performs the rebalancing
// algorithm against regular files. // algorithm against regular files.
// //
@ -100,9 +115,7 @@ func process(path string, d fs.DirEntry, err error) (typicallyNil error) {
if err != nil || d.IsDir() || !d.Type().IsRegular() { if err != nil || d.IsDir() || !d.Type().IsRegular() {
return return
} }
pending.Add(1)
tasks.Go(func() { tasks.Go(func() {
defer pending.Done()
if running.Load() { if running.Load() {
work(path, d) work(path, d)
} }
@ -136,7 +149,7 @@ func work(path string, d fs.DirEntry) {
if err != nil { if err != nil {
return return
} }
if db.Contains(srcFilePath) { if db.Knows(srcFilePath) {
return return
} }
srcStat, err := os.Stat(srcFilePath) srcStat, err := os.Stat(srcFilePath)
@ -155,11 +168,7 @@ func work(path string, d fs.DirEntry) {
safeToRemoveTemp := true safeToRemoveTemp := true
defer func() { defer func() {
if !safeToRemoveTemp { if !safeToRemoveTemp {
err := fmt.Errorf( reportErr(missingFile)
"%s may be lost in %s",
srcFilePath, tempDirPath,
)
reportErr(err)
return return
} }
if err := os.RemoveAll(tempDirPath); err != nil { if err := os.RemoveAll(tempDirPath); err != nil {
@ -186,6 +195,8 @@ func work(path string, d fs.DirEntry) {
db.Remember(srcFilePath) db.Remember(srcFilePath)
} }
var missingFile = fmt.Errorf("file may be missing in temp directory")
// copy opens the file from the source path, then creates a copy of it at the // copy opens the file from the source path, then creates a copy of it at the
// destination path. The mode, uid and gid bits from the source file are // destination path. The mode, uid and gid bits from the source file are
// replicated in the copy. // replicated in the copy.
@ -228,47 +239,55 @@ func copy(srcPath, dstPath string) error {
if err != nil { if err != nil {
return err return err
} }
db.Record(Action{ actions <- Action{
Source: srcPath, Source: srcPath,
Destination: dstPath, Destination: dstPath,
}) }
return nil return nil
} }
// DB holds a set of files which have been rebalanced. // DB stores information collected by the program.
//
// These files are skipped on future runs of the program.
// //
// The database is loaded from a JSON file when the program starts and saved // The database is loaded from a JSON file when the program starts and saved
// back to that JSON file as the program finishes. // back to that JSON file as the program finishes.
type DB struct { type DB struct {
Processed map[string]struct{} // Seen is a set of all files which have been successfully re-balanced.
Log []Action Seen map[string]struct{}
Errors []Error // Log stores every successful copy operation.
Log []Action
// Errors stores details on any error that occurs.
Errors []Error
} }
// Action details a file copy operation.
type Action struct { type Action struct {
Source string Source string
Destination string Destination string
} }
// Error describes a problem from re-balancing a file.
type Error struct { type Error struct {
Message string // Message contains a string for the underlying error's message.
Message string
// FilePath is the path of the file.
FilePath string FilePath string
// TempPath is the temporary directory.
//
// This may be blank, depending on when the error occurred.
TempPath string TempPath string
} }
func (db *DB) Contains(path string) bool { func (db *DB) Knows(path string) bool {
dbLock.Lock() dbLock.Lock()
defer dbLock.Unlock() defer dbLock.Unlock()
_, ok := db.Processed[path] _, ok := db.Seen[path]
return ok return ok
} }
func (db *DB) Remember(path string) { func (db *DB) Remember(path string) {
dbLock.Lock() dbLock.Lock()
defer dbLock.Unlock() defer dbLock.Unlock()
db.Processed[path] = struct{}{} db.Seen[path] = struct{}{}
} }
func (db *DB) Record(a Action) { func (db *DB) Record(a Action) {