diff --git a/internal/msg.go b/internal/msg.go index a4e2838..a433949 100644 --- a/internal/msg.go +++ b/internal/msg.go @@ -15,9 +15,9 @@ type MirrorStatus struct { ErrorMsg string `json:"error_msg"` } -// A WorkerInfoMsg is the information struct that describe +// A WorkerStatus is the information struct that describe // a worker, and sent from the manager to clients. -type WorkerInfoMsg struct { +type WorkerStatus struct { ID string `json:"id"` URL string `json:"url"` // worker url Token string `json:"token"` // session token diff --git a/manager/db.go b/manager/db.go index 2403277..42623a0 100644 --- a/manager/db.go +++ b/manager/db.go @@ -6,17 +6,19 @@ import ( "strings" "github.com/boltdb/bolt" + + . "github.com/tuna/tunasync/internal" ) type dbAdapter interface { Init() error - ListWorkers() ([]workerStatus, error) - GetWorker(workerID string) (workerStatus, error) - CreateWorker(w workerStatus) (workerStatus, error) - UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) - GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) - ListMirrorStatus(workerID string) ([]mirrorStatus, error) - ListAllMirrorStatus() ([]mirrorStatus, error) + ListWorkers() ([]WorkerStatus, error) + GetWorker(workerID string) (WorkerStatus, error) + CreateWorker(w WorkerStatus) (WorkerStatus, error) + UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) + GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) + ListMirrorStatus(workerID string) ([]MirrorStatus, error) + ListAllMirrorStatus() ([]MirrorStatus, error) Close() error } @@ -61,11 +63,11 @@ func (b *boltAdapter) Init() (err error) { }) } -func (b *boltAdapter) ListWorkers() (ws []workerStatus, err error) { +func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { err = b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_workerBucketKey)) c := bucket.Cursor() - var w workerStatus + var w WorkerStatus for k, v := c.First(); k != nil; k, v = c.Next() { jsonErr := json.Unmarshal(v, &w) if jsonErr != nil { @@ -79,7 +81,7 @@ func (b *boltAdapter) ListWorkers() (ws []workerStatus, err error) { return } -func (b *boltAdapter) GetWorker(workerID string) (w workerStatus, err error) { +func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { err = b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_workerBucketKey)) v := bucket.Get([]byte(workerID)) @@ -92,7 +94,7 @@ func (b *boltAdapter) GetWorker(workerID string) (w workerStatus, err error) { return } -func (b *boltAdapter) CreateWorker(w workerStatus) (workerStatus, error) { +func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { err := b.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_workerBucketKey)) v, err := json.Marshal(w) @@ -105,7 +107,7 @@ func (b *boltAdapter) CreateWorker(w workerStatus) (workerStatus, error) { return w, err } -func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) { +func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { id := mirrorID + "/" + workerID err := b.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_statusBucketKey)) @@ -116,7 +118,7 @@ func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirro return status, err } -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus, err error) { +func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { id := mirrorID + "/" + workerID err = b.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_statusBucketKey)) @@ -130,11 +132,11 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m mirrorStatus return } -func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err error) { +func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { err = b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_statusBucketKey)) c := bucket.Cursor() - var m mirrorStatus + var m MirrorStatus for k, v := c.First(); k != nil; k, v = c.Next() { if wID := strings.Split(string(k), "/")[1]; wID == workerID { jsonErr := json.Unmarshal(v, &m) @@ -150,11 +152,11 @@ func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []mirrorStatus, err return } -func (b *boltAdapter) ListAllMirrorStatus() (ms []mirrorStatus, err error) { +func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { err = b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_statusBucketKey)) c := bucket.Cursor() - var m mirrorStatus + var m MirrorStatus for k, v := c.First(); k != nil; k, v = c.Next() { jsonErr := json.Unmarshal(v, &m) if jsonErr != nil { diff --git a/manager/db_test.go b/manager/db_test.go index 4c01d3e..5cd7456 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -31,7 +31,7 @@ func TestBoltAdapter(t *testing.T) { testWorkerIDs := []string{"test_worker1", "test_worker2"} Convey("create worker", func() { for _, id := range testWorkerIDs { - w := workerStatus{ + w := WorkerStatus{ ID: id, Token: "token_" + id, LastOnline: time.Now(), @@ -58,7 +58,7 @@ func TestBoltAdapter(t *testing.T) { }) Convey("update mirror status", func() { - status1 := mirrorStatus{ + status1 := MirrorStatus{ Name: "arch-sync1", Worker: testWorkerIDs[0], IsMaster: true, @@ -67,7 +67,7 @@ func TestBoltAdapter(t *testing.T) { Upstream: "mirrors.tuna.tsinghua.edu.cn", Size: "3GB", } - status2 := mirrorStatus{ + status2 := MirrorStatus{ Name: "arch-sync2", Worker: testWorkerIDs[1], IsMaster: true, @@ -94,7 +94,7 @@ func TestBoltAdapter(t *testing.T) { Convey("list mirror status", func() { ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0]) So(err, ShouldBeNil) - expectedJSON, err := json.Marshal([]mirrorStatus{status1}) + expectedJSON, err := json.Marshal([]MirrorStatus{status1}) So(err, ShouldBeNil) actualJSON, err := json.Marshal(ms) So(err, ShouldBeNil) @@ -104,7 +104,7 @@ func TestBoltAdapter(t *testing.T) { Convey("list all mirror status", func() { ms, err := boltDB.ListAllMirrorStatus() So(err, ShouldBeNil) - expectedJSON, err := json.Marshal([]mirrorStatus{status1, status2}) + expectedJSON, err := json.Marshal([]MirrorStatus{status1, status2}) So(err, ShouldBeNil) actualJSON, err := json.Marshal(ms) So(err, ShouldBeNil) diff --git a/manager/server.go b/manager/server.go index 5102aae..b2ab87b 100644 --- a/manager/server.go +++ b/manager/server.go @@ -30,12 +30,19 @@ func (s *managerServer) listAllJobs(c *gin.Context) { s.returnErrJSON(c, http.StatusInternalServerError, err) return } - c.JSON(http.StatusOK, mirrorStatusList) + webMirStatusList := []webMirrorStatus{} + for _, m := range mirrorStatusList { + webMirStatusList = append( + webMirStatusList, + convertMirrorStatus(m), + ) + } + c.JSON(http.StatusOK, webMirStatusList) } // listWrokers respond with informations of all the workers func (s *managerServer) listWorkers(c *gin.Context) { - var workerInfos []WorkerInfoMsg + var workerInfos []WorkerStatus workers, err := s.adapter.ListWorkers() if err != nil { err := fmt.Errorf("failed to list workers: %s", @@ -47,7 +54,7 @@ func (s *managerServer) listWorkers(c *gin.Context) { } for _, w := range workers { workerInfos = append(workerInfos, - WorkerInfoMsg{ + WorkerStatus{ ID: w.ID, LastOnline: w.LastOnline, }) @@ -57,7 +64,7 @@ func (s *managerServer) listWorkers(c *gin.Context) { // registerWorker register an newly-online worker func (s *managerServer) registerWorker(c *gin.Context) { - var _worker workerStatus + var _worker WorkerStatus c.BindJSON(&_worker) newWorker, err := s.adapter.CreateWorker(_worker) if err != nil { @@ -95,7 +102,7 @@ func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) { func (s *managerServer) updateJobOfWorker(c *gin.Context) { workerID := c.Param("id") - var status mirrorStatus + var status MirrorStatus c.BindJSON(&status) mirrorName := status.Name newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) diff --git a/manager/server_test.go b/manager/server_test.go index 6a14d9b..beeb80b 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -26,11 +26,11 @@ func TestHTTPServer(t *testing.T) { s := makeHTTPServer(false) So(s, ShouldNotBeNil) s.setDBAdapter(&mockDBAdapter{ - workerStore: map[string]workerStatus{ - _magicBadWorkerID: workerStatus{ + workerStore: map[string]WorkerStatus{ + _magicBadWorkerID: WorkerStatus{ ID: _magicBadWorkerID, }}, - statusStore: make(map[string]mirrorStatus), + statusStore: make(map[string]MirrorStatus), }) port := rand.Intn(10000) + 20000 baseURL := fmt.Sprintf("http://127.0.0.1:%d", port) @@ -62,7 +62,7 @@ func TestHTTPServer(t *testing.T) { }) Convey("when register a worker", func(ctx C) { - w := workerStatus{ + w := WorkerStatus{ ID: "test_worker1", } resp, err := postJSON(baseURL+"/workers", w) @@ -74,14 +74,14 @@ func TestHTTPServer(t *testing.T) { resp, err := http.Get(baseURL + "/workers") So(err, ShouldBeNil) defer resp.Body.Close() - var actualResponseObj []WorkerInfoMsg + var actualResponseObj []WorkerStatus err = json.NewDecoder(resp.Body).Decode(&actualResponseObj) So(err, ShouldBeNil) So(len(actualResponseObj), ShouldEqual, 2) }) Convey("update mirror status of a existed worker", func(ctx C) { - status := mirrorStatus{ + status := MirrorStatus{ Name: "arch-sync1", Worker: "test_worker1", IsMaster: true, @@ -97,7 +97,7 @@ func TestHTTPServer(t *testing.T) { Convey("list mirror status of an existed worker", func(ctx C) { - expectedResponse, err := json.Marshal([]mirrorStatus{status}) + expectedResponse, err := json.Marshal([]MirrorStatus{status}) So(err, ShouldBeNil) resp, err := http.Get(baseURL + "/workers/test_worker1/jobs") So(err, ShouldBeNil) @@ -110,7 +110,9 @@ func TestHTTPServer(t *testing.T) { }) Convey("list all job status of all workers", func(ctx C) { - expectedResponse, err := json.Marshal([]mirrorStatus{status}) + expectedResponse, err := json.Marshal( + []webMirrorStatus{convertMirrorStatus(status)}, + ) So(err, ShouldBeNil) resp, err := http.Get(baseURL + "/jobs") So(err, ShouldBeNil) @@ -125,7 +127,7 @@ func TestHTTPServer(t *testing.T) { Convey("update mirror status of an inexisted worker", func(ctx C) { invalidWorker := "test_worker2" - status := mirrorStatus{ + status := MirrorStatus{ Name: "arch-sync2", Worker: invalidWorker, IsMaster: true, @@ -150,7 +152,7 @@ func TestHTTPServer(t *testing.T) { workerPort := rand.Intn(10000) + 30000 bindAddress := fmt.Sprintf("127.0.0.1:%d", workerPort) workerBaseURL := fmt.Sprintf("http://%s", bindAddress) - w := workerStatus{ + w := WorkerStatus{ ID: "test_worker_cmd", URL: workerBaseURL + "/cmd", } @@ -208,16 +210,16 @@ func TestHTTPServer(t *testing.T) { } type mockDBAdapter struct { - workerStore map[string]workerStatus - statusStore map[string]mirrorStatus + workerStore map[string]WorkerStatus + statusStore map[string]MirrorStatus } func (b *mockDBAdapter) Init() error { return nil } -func (b *mockDBAdapter) ListWorkers() ([]workerStatus, error) { - workers := make([]workerStatus, len(b.workerStore)) +func (b *mockDBAdapter) ListWorkers() ([]WorkerStatus, error) { + workers := make([]WorkerStatus, len(b.workerStore)) idx := 0 for _, w := range b.workerStore { workers[idx] = w @@ -226,15 +228,15 @@ func (b *mockDBAdapter) ListWorkers() ([]workerStatus, error) { return workers, nil } -func (b *mockDBAdapter) GetWorker(workerID string) (workerStatus, error) { +func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) { w, ok := b.workerStore[workerID] if !ok { - return workerStatus{}, fmt.Errorf("invalid workerId") + return WorkerStatus{}, fmt.Errorf("invalid workerId") } return w, nil } -func (b *mockDBAdapter) CreateWorker(w workerStatus) (workerStatus, error) { +func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { // _, ok := b.workerStore[w.ID] // if ok { // return workerStatus{}, fmt.Errorf("duplicate worker name") @@ -243,19 +245,19 @@ func (b *mockDBAdapter) CreateWorker(w workerStatus) (workerStatus, error) { return w, nil } -func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (mirrorStatus, error) { +func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) { id := mirrorID + "/" + workerID status, ok := b.statusStore[id] if !ok { - return mirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) + return MirrorStatus{}, fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) } return status, nil } -func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mirrorStatus) (mirrorStatus, error) { +func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { // if _, ok := b.workerStore[workerID]; !ok { // // unregistered worker - // return mirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID) + // return MirrorStatus{}, fmt.Errorf("invalid workerID %s", workerID) // } id := mirrorID + "/" + workerID @@ -263,11 +265,11 @@ func (b *mockDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status mir return status, nil } -func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error) { - var mirrorStatusList []mirrorStatus +func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]MirrorStatus, error) { + var mirrorStatusList []MirrorStatus // simulating a database fail if workerID == _magicBadWorkerID { - return []mirrorStatus{}, fmt.Errorf("database fail") + return []MirrorStatus{}, fmt.Errorf("database fail") } for k, v := range b.statusStore { if wID := strings.Split(k, "/")[1]; wID == workerID { @@ -277,8 +279,8 @@ func (b *mockDBAdapter) ListMirrorStatus(workerID string) ([]mirrorStatus, error return mirrorStatusList, nil } -func (b *mockDBAdapter) ListAllMirrorStatus() ([]mirrorStatus, error) { - var mirrorStatusList []mirrorStatus +func (b *mockDBAdapter) ListAllMirrorStatus() ([]MirrorStatus, error) { + var mirrorStatusList []MirrorStatus for _, v := range b.statusStore { mirrorStatusList = append(mirrorStatusList, v) } diff --git a/manager/status.go b/manager/status.go index 8c6150c..8df2b3c 100644 --- a/manager/status.go +++ b/manager/status.go @@ -2,114 +2,61 @@ package manager import ( "encoding/json" - "errors" - "fmt" "strconv" "time" . "github.com/tuna/tunasync/internal" ) -type mirrorStatus struct { - Name string - Worker string - IsMaster bool - Status SyncStatus - LastUpdate time.Time - Upstream string - Size string // approximate size +type textTime struct { + time.Time } -func (s mirrorStatus) MarshalJSON() ([]byte, error) { - m := map[string]interface{}{ - "name": s.Name, - "worker": s.Worker, - "is_master": s.IsMaster, - "status": s.Status, - "last_update": s.LastUpdate.Format("2006-01-02 15:04:05"), - "last_update_ts": fmt.Sprintf("%d", s.LastUpdate.Unix()), - "size": s.Size, - "upstream": s.Upstream, - } - return json.Marshal(m) +func (t textTime) MarshalJSON() ([]byte, error) { + return json.Marshal(t.Format("2006-01-02 15:04:05")) +} +func (t *textTime) UnmarshalJSON(b []byte) error { + s := string(b) + t2, err := time.ParseInLocation(`"2006-01-02 15:04:05"`, s, time.Local) + *t = textTime{t2} + return err } -func (s *mirrorStatus) UnmarshalJSON(v []byte) error { - var m map[string]interface{} +type stampTime struct { + time.Time +} - err := json.Unmarshal(v, &m) +func (t stampTime) MarshalJSON() ([]byte, error) { + return json.Marshal(t.Unix()) +} +func (t *stampTime) UnmarshalJSON(b []byte) error { + ts, err := strconv.Atoi(string(b)) if err != nil { return err } - - if name, ok := m["name"]; ok { - if s.Name, ok = name.(string); !ok { - return errors.New("name should be a string") - } - } else { - return errors.New("key `name` does not exist in the json") - } - if isMaster, ok := m["is_master"]; ok { - if s.IsMaster, ok = isMaster.(bool); !ok { - return errors.New("is_master should be a string") - } - } else { - return errors.New("key `is_master` does not exist in the json") - } - if _worker, ok := m["worker"]; ok { - if s.Worker, ok = _worker.(string); !ok { - return errors.New("worker should be a string") - } - } else { - return errors.New("key `worker` does not exist in the json") - } - if upstream, ok := m["upstream"]; ok { - if s.Upstream, ok = upstream.(string); !ok { - return errors.New("upstream should be a string") - } - } else { - return errors.New("key `upstream` does not exist in the json") - } - if size, ok := m["size"]; ok { - if s.Size, ok = size.(string); !ok { - return errors.New("size should be a string") - } - } else { - return errors.New("key `size` does not exist in the json") - } - // tricky: status - if status, ok := m["status"]; ok { - if ss, ok := status.(string); ok { - err := json.Unmarshal([]byte(`"`+ss+`"`), &(s.Status)) - if err != nil { - return err - } - } else { - return errors.New("status should be a string") - } - } else { - return errors.New("key `status` does not exist in the json") - } - // tricky: last update - if lastUpdate, ok := m["last_update_ts"]; ok { - if sts, ok := lastUpdate.(string); ok { - ts, err := strconv.Atoi(sts) - if err != nil { - return fmt.Errorf("last_update_ts should be a interger, got: %s", sts) - } - s.LastUpdate = time.Unix(int64(ts), 0) - } else { - return fmt.Errorf("last_update_ts should be a string of integer, got: %s", lastUpdate) - } - } else { - return errors.New("key `last_update_ts` does not exist in the json") - } - return nil + *t = stampTime{time.Unix(int64(ts), 0)} + return err } -type workerStatus struct { - ID string `json:"id"` // worker name - Token string `json:"token"` // session token - URL string `json:"url"` // worker url - LastOnline time.Time `json:"last_online"` // last seen +// webMirrorStatus is the mirror status to be shown in the web page +type webMirrorStatus struct { + Name string `json:"name"` + IsMaster bool `json:"is_master"` + Status SyncStatus `json:"status"` + LastUpdate textTime `json:"last_update"` + LastUpdateTs stampTime `json:"last_update_ts"` + Upstream string `json:"upstream"` + Size string `json:"size"` // approximate size +} + +func convertMirrorStatus(m MirrorStatus) webMirrorStatus { + return webMirrorStatus{ + Name: m.Name, + IsMaster: m.IsMaster, + Status: m.Status, + LastUpdate: textTime{m.LastUpdate}, + LastUpdateTs: stampTime{m.LastUpdate}, + Upstream: m.Upstream, + Size: m.Size, + } } diff --git a/manager/status_test.go b/manager/status_test.go index 06260d9..2ca0f1e 100644 --- a/manager/status_test.go +++ b/manager/status_test.go @@ -15,26 +15,29 @@ func TestStatus(t *testing.T) { tz := "Asia/Shanghai" loc, err := time.LoadLocation(tz) So(err, ShouldBeNil) - - m := mirrorStatus{ - Name: "tunalinux", - Status: tunasync.Success, - LastUpdate: time.Date(2016, time.April, 16, 23, 8, 10, 0, loc), - Size: "5GB", - Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/", + t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc) + m := webMirrorStatus{ + Name: "tunalinux", + Status: tunasync.Success, + LastUpdate: textTime{t}, + LastUpdateTs: stampTime{t}, + Size: "5GB", + Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/", } b, err := json.Marshal(m) So(err, ShouldBeNil) // fmt.Println(string(b)) - var m2 mirrorStatus + var m2 webMirrorStatus err = json.Unmarshal(b, &m2) So(err, ShouldBeNil) // fmt.Printf("%#v", m2) So(m2.Name, ShouldEqual, m.Name) So(m2.Status, ShouldEqual, m.Status) So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix()) + So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix()) So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) + So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.Size, ShouldEqual, m.Size) So(m2.Upstream, ShouldEqual, m.Upstream) }) diff --git a/worker/worker.go b/worker/worker.go index 063f932..02ed1f2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -291,7 +291,7 @@ func (w *Worker) registorWorker() { w.cfg.Manager.APIBase, ) - msg := WorkerInfoMsg{ + msg := WorkerStatus{ ID: w.Name(), URL: w.URL(), }