datashake/datashake.go

336 lines
6.2 KiB
Go
Raw Normal View History

2023-12-03 02:08:29 +00:00
package main
import (
2023-12-04 21:28:04 +00:00
"context"
2023-12-03 02:08:29 +00:00
"encoding/json"
"flag"
"fmt"
"io"
"io/fs"
"os"
"os/signal"
"path/filepath"
2023-12-04 21:28:04 +00:00
"runtime"
"sync"
2023-12-03 02:08:29 +00:00
"sync/atomic"
"syscall"
2023-12-04 21:28:04 +00:00
"github.com/sourcegraph/conc/pool"
)
var (
sourceDir = flag.String("source", ".", "source data directory")
tempDir = flag.String("temp", "/tmp", "temporary storage directory")
dbPath = flag.String("db", "datashake.json", "database file path")
minimumSize = flag.Int64("min-size", 1024*1024, "minimum size in bytes")
concurrency = flag.Int("concurrency", 1, "concurrent processing limit")
2023-12-03 02:08:29 +00:00
)
func main() {
2023-12-07 02:26:41 +00:00
flag.Parse()
2023-12-04 21:28:04 +00:00
ctx, stop := signal.NotifyContext(
context.Background(),
syscall.SIGINT, syscall.SIGTERM,
)
defer stop()
concurrency := *concurrency
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
}
tasks = pool.New().WithMaxGoroutines(concurrency)
2023-12-03 02:08:29 +00:00
if err := loadDb(); err != nil {
fmt.Println("error", err)
os.Exit(1)
}
2023-12-07 02:26:41 +00:00
go run()
2023-12-03 02:08:29 +00:00
2023-12-07 02:26:41 +00:00
errors := errors
actions := actions
2023-12-03 02:08:29 +00:00
for {
select {
case err, ok := <-errors:
2023-12-07 02:26:41 +00:00
if !ok {
errors = nil
break
}
db.Alert(err)
case action, ok := <-actions:
if !ok {
actions = nil
break
2023-12-03 02:08:29 +00:00
}
2023-12-07 02:26:41 +00:00
db.Record(action)
2023-12-04 21:28:04 +00:00
case <-ctx.Done():
2023-12-03 02:08:29 +00:00
running.Store(false)
}
2023-12-07 02:26:41 +00:00
if errors == nil && actions == nil {
break
}
2023-12-03 02:08:29 +00:00
}
if err := saveDb(); err != nil {
fmt.Println("error", err)
os.Exit(1)
}
}
2023-12-04 21:28:04 +00:00
var (
running atomic.Bool
tasks *pool.Pool
2023-12-05 03:48:27 +00:00
errors = make(chan Error)
2023-12-07 02:26:41 +00:00
actions = make(chan Action)
2023-12-03 02:08:29 +00:00
2023-12-04 21:28:04 +00:00
db = DB{
2023-12-07 02:26:41 +00:00
Seen: make(map[string]struct{}),
2023-12-03 02:08:29 +00:00
}
2023-12-04 21:28:04 +00:00
dbLock sync.Mutex
)
2023-12-03 02:08:29 +00:00
2023-12-07 02:26:41 +00:00
// run drives the directory traversal.
func run() {
defer func() {
close(errors)
close(actions)
}()
running.Store(true)
if err := filepath.WalkDir(*sourceDir, process); err != nil {
fmt.Println("error:", err)
}
tasks.Wait()
}
2023-12-04 21:28:04 +00:00
// process is a visitor for `filepath.WalkDir` that performs the rebalancing
// algorithm against regular files.
2023-12-03 02:08:29 +00:00
//
2023-12-04 21:28:04 +00:00
// This function normally never returns an error, since that would stop the
2023-12-03 02:08:29 +00:00
// directory walk. Instead, any errors are sent to the `errors` channel.
2023-12-04 21:28:04 +00:00
func process(path string, d fs.DirEntry, err error) (typicallyNil error) {
2023-12-03 02:08:29 +00:00
if !running.Load() {
return fs.SkipAll
}
2023-12-04 21:28:04 +00:00
if err != nil || d.IsDir() || !d.Type().IsRegular() {
return
}
tasks.Go(func() {
if running.Load() {
work(path, d)
}
})
return
}
2023-12-03 02:08:29 +00:00
2023-12-04 21:28:04 +00:00
// work rebalances a single file.
func work(path string, d fs.DirEntry) {
var err error
2023-12-05 03:48:27 +00:00
var srcFilePath = path
var tempFilePath string
reportErr := func(reported error) {
e := Error{
Message: reported.Error(),
FilePath: srcFilePath,
TempPath: tempFilePath,
}
errors <- e
}
2023-12-03 02:08:29 +00:00
defer func() {
if err != nil {
2023-12-05 03:48:27 +00:00
reportErr(err)
2023-12-03 02:08:29 +00:00
}
}()
srcFileName := d.Name()
2023-12-05 03:48:27 +00:00
srcFilePath, err = filepath.Abs(path)
2023-12-03 02:08:29 +00:00
if err != nil {
return
}
2023-12-07 02:26:41 +00:00
if db.Knows(srcFilePath) {
2023-12-03 02:08:29 +00:00
return
}
srcStat, err := os.Stat(srcFilePath)
if err != nil {
return
}
if srcStat.Size() < *minimumSize {
return
}
tempDirPath, err := os.MkdirTemp(*tempDir, "*")
if err != nil {
return
}
2023-12-05 03:48:27 +00:00
tempFilePath = filepath.Join(tempDirPath, srcFileName)
2023-12-03 02:08:29 +00:00
safeToRemoveTemp := true
defer func() {
if !safeToRemoveTemp {
2023-12-07 02:26:41 +00:00
reportErr(missingFile)
2023-12-03 02:08:29 +00:00
return
}
if err := os.RemoveAll(tempDirPath); err != nil {
2023-12-05 03:48:27 +00:00
reportErr(err)
2023-12-03 02:08:29 +00:00
}
}()
err = copy(srcFilePath, tempFilePath)
if err != nil {
return
}
safeToRemoveTemp = false
err = os.Remove(srcFilePath)
if err != nil {
return
}
err = copy(tempFilePath, srcFilePath)
if err != nil {
return
}
safeToRemoveTemp = true
db.Remember(srcFilePath)
}
2023-12-07 02:26:41 +00:00
var missingFile = fmt.Errorf("file may be missing in temp directory")
2023-12-03 02:08:29 +00:00
// copy opens the file from the source path, then creates a copy of it at the
// destination path. The mode, uid and gid bits from the source file are
// replicated in the copy.
func copy(srcPath, dstPath string) error {
fmt.Println("copying", srcPath, "to", dstPath)
srcFile, err := os.Open(srcPath)
if err != nil {
return err
}
defer func() {
_ = srcFile.Close()
}()
dstFile, err := os.Create(dstPath)
if err != nil {
return err
}
defer func() {
_ = dstFile.Close()
}()
srcStat, err := os.Stat(srcPath)
if err != nil {
return err
}
err = os.Chmod(dstPath, srcStat.Mode())
if err != nil {
return err
}
if sysStat, ok := srcStat.Sys().(*syscall.Stat_t); ok {
uid := int(sysStat.Uid)
gid := int(sysStat.Gid)
err = os.Chown(dstPath, uid, gid)
if err != nil {
return err
}
}
_, err = io.Copy(dstFile, srcFile)
2023-12-05 03:48:27 +00:00
if err != nil {
return err
}
2023-12-07 02:26:41 +00:00
actions <- Action{
2023-12-05 03:48:27 +00:00
Source: srcPath,
Destination: dstPath,
2023-12-07 02:26:41 +00:00
}
2023-12-05 03:48:27 +00:00
return nil
2023-12-03 02:08:29 +00:00
}
2023-12-07 02:26:41 +00:00
// DB stores information collected by the program.
2023-12-04 21:28:04 +00:00
//
// The database is loaded from a JSON file when the program starts and saved
// back to that JSON file as the program finishes.
2023-12-03 02:08:29 +00:00
type DB struct {
2023-12-07 02:26:41 +00:00
// Seen is a set of all files which have been successfully re-balanced.
Seen map[string]struct{}
// Log stores every successful copy operation.
Log []Action
// Errors stores details on any error that occurs.
Errors []Error
2023-12-05 03:48:27 +00:00
}
2023-12-07 02:26:41 +00:00
// Action details a file copy operation.
2023-12-05 03:48:27 +00:00
type Action struct {
Source string
Destination string
}
2023-12-07 02:26:41 +00:00
// Error describes a problem from re-balancing a file.
2023-12-05 03:48:27 +00:00
type Error struct {
2023-12-07 02:26:41 +00:00
// Message contains a string for the underlying error's message.
Message string
// FilePath is the path of the file.
2023-12-05 03:48:27 +00:00
FilePath string
2023-12-07 02:26:41 +00:00
// TempPath is the temporary directory.
//
// This may be blank, depending on when the error occurred.
2023-12-05 03:48:27 +00:00
TempPath string
2023-12-03 02:08:29 +00:00
}
2023-12-07 02:26:41 +00:00
func (db *DB) Knows(path string) bool {
2023-12-04 21:28:04 +00:00
dbLock.Lock()
defer dbLock.Unlock()
2023-12-07 02:26:41 +00:00
_, ok := db.Seen[path]
2023-12-03 02:08:29 +00:00
return ok
}
func (db *DB) Remember(path string) {
2023-12-04 21:28:04 +00:00
dbLock.Lock()
defer dbLock.Unlock()
2023-12-07 02:26:41 +00:00
db.Seen[path] = struct{}{}
2023-12-03 02:08:29 +00:00
}
2023-12-04 21:28:04 +00:00
2023-12-05 03:48:27 +00:00
func (db *DB) Record(a Action) {
dbLock.Lock()
defer dbLock.Unlock()
db.Log = append(db.Log, a)
}
func (db *DB) Alert(e Error) {
dbLock.Lock()
defer dbLock.Unlock()
db.Errors = append(db.Errors, e)
}
2023-12-04 21:28:04 +00:00
func loadDb() error {
if *dbPath == "" {
return nil
}
dbFile, err := os.Open(*dbPath)
if err != nil {
return nil
}
defer func() {
_ = dbFile.Close()
}()
d := json.NewDecoder(dbFile)
err = d.Decode(&db)
return err
}
func saveDb() error {
if *dbPath == "" {
return nil
}
dbFile, err := os.Create(*dbPath)
if err != nil {
return err
}
defer func() {
_ = dbFile.Close()
}()
e := json.NewEncoder(dbFile)
err = e.Encode(&db)
return err
}