From d0deeb19a984fce2c56c8cb5f9ae546c628c2477 Mon Sep 17 00:00:00 2001 From: zyx Date: Sat, 13 Apr 2019 01:27:35 +0800 Subject: [PATCH] extract mirror size from rsync provider automatically --- internal/util.go | 12 +++++++++++ internal/util_test.go | 32 ++++++++++++++++++++++++++++++ worker/base_provider.go | 4 ++++ worker/job.go | 2 ++ worker/provider.go | 1 + worker/provider_test.go | 3 +++ worker/rsync_provider.go | 19 ++++++++++++++++-- worker/two_stage_rsync_provider.go | 12 +++++++++++ worker/worker.go | 6 ++++++ 9 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 internal/util_test.go diff --git a/internal/util.go b/internal/util.go index 80b21c8..fe08d7a 100644 --- a/internal/util.go +++ b/internal/util.go @@ -8,6 +8,7 @@ import ( "errors" "io/ioutil" "net/http" + "regexp" "time" ) @@ -84,3 +85,14 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response, } return resp, json.Unmarshal(body, obj) } + +func ExtractSizeFromRsyncLog(content []byte) string { + // (?m) flag enables multi-line mode + re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`) + matches := re.FindAllSubmatch(content, -1) + // fmt.Printf("%q\n", matches) + if len(matches) == 0 { + return "" + } + return string(matches[len(matches)-1][1]) +} diff --git a/internal/util_test.go b/internal/util_test.go new file mode 100644 index 0000000..27b6641 --- /dev/null +++ b/internal/util_test.go @@ -0,0 +1,32 @@ +package internal + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestExtractSizeFromRsyncLog(t *testing.T) { + realLogContent := ` +Number of files: 998,470 (reg: 925,484, dir: 58,892, link: 14,094) +Number of created files: 1,049 (reg: 1,049) +Number of deleted files: 1,277 (reg: 1,277) +Number of regular files transferred: 5,694 +Total file size: 1.33T bytes +Total transferred file size: 2.86G bytes +Literal data: 780.62M bytes +Matched data: 2.08G bytes +File list size: 37.55M +File list generation time: 7.845 seconds +File list transfer time: 0.000 seconds +Total bytes sent: 7.55M +Total bytes received: 823.25M + +sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec +total size is 1.33T speedup is 1,604.11 +` + Convey("Log parser should work", t, func() { + res := ExtractSizeFromRsyncLog([]byte(realLogContent)) + So(res, ShouldEqual, "1.33T") + }) +} diff --git a/worker/base_provider.go b/worker/base_provider.go index 0f8ec80..4e4c099 100644 --- a/worker/base_provider.go +++ b/worker/base_provider.go @@ -161,3 +161,7 @@ func (p *baseProvider) Terminate() error { return err } + +func (p *baseProvider) DataSize() string { + return "" +} diff --git a/worker/job.go b/worker/job.go index 7ba0df4..84cf111 100644 --- a/worker/job.go +++ b/worker/job.go @@ -53,6 +53,7 @@ type mirrorJob struct { ctrlChan chan ctrlAction disabled chan empty state uint32 + size string } func newMirrorJob(provider mirrorProvider) *mirrorJob { @@ -182,6 +183,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err if syncErr == nil { // syncing success logger.Noticef("succeeded syncing %s", m.Name()) + m.size = provider.DataSize() managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)} // post-success hooks err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") diff --git a/worker/provider.go b/worker/provider.go index fe787b4..2c44820 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -50,6 +50,7 @@ type mirrorProvider interface { LogDir() string LogFile() string IsMaster() bool + DataSize() string // enter context EnterContext() *Context diff --git a/worker/provider_test.go b/worker/provider_test.go index 56ddc8d..c704a0c 100644 --- a/worker/provider_test.go +++ b/worker/provider_test.go @@ -73,6 +73,7 @@ func TestRsyncProvider(t *testing.T) { echo "syncing to $(pwd)" echo $RSYNC_PASSWORD $@ sleep 1 +echo "Total file size: 1.33T bytes" echo "Done" exit 0 ` @@ -83,6 +84,7 @@ exit 0 expectedOutput := fmt.Sprintf( "syncing to %s\n"+ "%s\n"+ + "Total file size: 1.33T bytes\n"+ "Done\n", targetDir, fmt.Sprintf( @@ -99,6 +101,7 @@ exit 0 So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, expectedOutput) // fmt.Println(string(loggedContent)) + So(provider.DataSize(), ShouldEqual, "1.33T") }) }) diff --git a/worker/rsync_provider.go b/worker/rsync_provider.go index b9d8fd5..c54219e 100644 --- a/worker/rsync_provider.go +++ b/worker/rsync_provider.go @@ -2,8 +2,11 @@ package worker import ( "errors" + "io/ioutil" "strings" "time" + + "github.com/tuna/tunasync/internal" ) type rsyncConfig struct { @@ -19,7 +22,8 @@ type rsyncConfig struct { type rsyncProvider struct { baseProvider rsyncConfig - options []string + options []string + dataSize string } func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { @@ -73,11 +77,22 @@ func (p *rsyncProvider) Upstream() string { return p.upstreamURL } +func (p *rsyncProvider) DataSize() string { + return p.dataSize +} + func (p *rsyncProvider) Run() error { + p.dataSize = "" if err := p.Start(); err != nil { return err } - return p.Wait() + if err := p.Wait(); err != nil { + return err + } + if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { + p.dataSize = internal.ExtractSizeFromRsyncLog(logContent) + } + return nil } func (p *rsyncProvider) Start() error { diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 25bb17f..5b091fd 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -3,8 +3,11 @@ package worker import ( "errors" "fmt" + "io/ioutil" "strings" "time" + + "github.com/tuna/tunasync/internal" ) type twoStageRsyncConfig struct { @@ -23,6 +26,7 @@ type twoStageRsyncProvider struct { twoStageRsyncConfig stage1Options []string stage2Options []string + dataSize string } var rsyncStage1Profiles = map[string]([]string){ @@ -78,6 +82,10 @@ func (p *twoStageRsyncProvider) Upstream() string { return p.upstreamURL } +func (p *twoStageRsyncProvider) DataSize() string { + return p.dataSize +} + func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { var options []string if stage == 1 { @@ -123,6 +131,7 @@ func (p *twoStageRsyncProvider) Run() error { env["RSYNC_PASSWORD"] = p.password } + p.dataSize = "" stages := []int{1, 2} for _, stage := range stages { command := []string{p.rsyncCmd} @@ -151,5 +160,8 @@ func (p *twoStageRsyncProvider) Run() error { return err } } + if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { + p.dataSize = internal.ExtractSizeFromRsyncLog(logContent) + } return nil } diff --git a/worker/worker.go b/worker/worker.go index 0209962..2d330a0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -416,6 +416,12 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) { ErrorMsg: jobMsg.msg, } + // Certain Providers (rsync for example) may know the size of mirror, + // so we report it to Manager here + if len(job.size) != 0 { + smsg.Size = job.size + } + for _, root := range w.cfg.Manager.APIBaseList() { url := fmt.Sprintf( "%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,