datashake/datashake.go

229 lines
4.0 KiB
Go
Raw Normal View History

2023-12-03 02:08:29 +00:00
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"io/fs"
"os"
"os/signal"
"path/filepath"
"sync/atomic"
"syscall"
)
func main() {
flag.Parse()
if err := loadDb(); err != nil {
fmt.Println("error", err)
os.Exit(1)
}
running.Store(true)
go func() {
if err := filepath.WalkDir(*sourceDir, process); err != nil {
errors <- err
}
close(errors)
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
Loop:
for {
select {
case err, ok := <-errors:
if ok {
fmt.Println("error:", err)
} else {
break Loop
}
case <-signals:
running.Store(false)
}
}
if err := saveDb(); err != nil {
fmt.Println("error", err)
os.Exit(1)
}
}
var sourceDir = flag.String("source", "", "source data directory")
var tempDir = flag.String("temp", "", "temporary storage directory")
var dbPath = flag.String("db", "datashake.json", "database file path")
var minimumSize = flag.Int64("min-size", 1024*1024, "minimum size in bytes")
var errors = make(chan error)
var db = DB{
Processed: make(map[string]struct{}),
}
var running atomic.Bool
func loadDb() error {
if *dbPath == "" {
return nil
}
dbFile, err := os.Open(*dbPath)
if err != nil {
return nil
}
defer func() {
_ = dbFile.Close()
}()
d := json.NewDecoder(dbFile)
err = d.Decode(&db)
return err
}
func saveDb() error {
if *dbPath == "" {
return nil
}
dbFile, err := os.Create(*dbPath)
if err != nil {
return err
}
defer func() {
_ = dbFile.Close()
}()
e := json.NewEncoder(dbFile)
err = e.Encode(&db)
return err
}
// process is a visitor for `filepath.WalkDir` that implements the rebalancing
// algorithm.
//
// This function never returns an error, since that would stop the directory
// directory walk. Instead, any errors are sent to the `errors` channel.
func process(path string, d fs.DirEntry, err error) (alwaysNil error) {
if !running.Load() {
return fs.SkipAll
}
defer func() {
if err != nil {
errors <- err
}
}()
if err != nil || d.IsDir() {
return
}
srcFileName := d.Name()
srcFilePath, err := filepath.Abs(path)
if err != nil {
return
}
if db.Contains(srcFilePath) {
return
}
srcStat, err := os.Stat(srcFilePath)
if err != nil {
return
}
if srcStat.Size() < *minimumSize {
return
}
tempDirPath, err := os.MkdirTemp(*tempDir, "*")
if err != nil {
return
}
tempFilePath := filepath.Join(tempDirPath, srcFileName)
safeToRemoveTemp := true
defer func() {
if !safeToRemoveTemp {
err := fmt.Errorf(
"%s may be lost in %s",
srcFilePath, tempDirPath,
)
errors <- err
return
}
if err := os.RemoveAll(tempDirPath); err != nil {
errors <- err
}
}()
err = copy(srcFilePath, tempFilePath)
if err != nil {
return
}
safeToRemoveTemp = false
err = os.Remove(srcFilePath)
if err != nil {
return
}
err = copy(tempFilePath, srcFilePath)
if err != nil {
return
}
safeToRemoveTemp = true
db.Remember(srcFilePath)
return
}
// copy opens the file from the source path, then creates a copy of it at the
// destination path. The mode, uid and gid bits from the source file are
// replicated in the copy.
func copy(srcPath, dstPath string) error {
fmt.Println("copying", srcPath, "to", dstPath)
srcFile, err := os.Open(srcPath)
if err != nil {
return err
}
defer func() {
_ = srcFile.Close()
}()
dstFile, err := os.Create(dstPath)
if err != nil {
return err
}
defer func() {
_ = dstFile.Close()
}()
srcStat, err := os.Stat(srcPath)
if err != nil {
return err
}
err = os.Chmod(dstPath, srcStat.Mode())
if err != nil {
return err
}
if sysStat, ok := srcStat.Sys().(*syscall.Stat_t); ok {
uid := int(sysStat.Uid)
gid := int(sysStat.Gid)
err = os.Chown(dstPath, uid, gid)
if err != nil {
return err
}
}
_, err = io.Copy(dstFile, srcFile)
return err
}
type DB struct {
Processed map[string]struct{}
}
func (db *DB) Contains(path string) bool {
_, ok := db.Processed[path]
return ok
}
func (db *DB) Remember(path string) {
db.Processed[path] = struct{}{}
}