From a2887da2dd95a07ef960b38f80528c98cd782481 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 14:27:41 +0800 Subject: [PATCH 01/10] Move bolt db adapter to separate file --- go.mod | 9 ++- go.sum | 52 +++++++----- manager/db.go | 182 ------------------------------------------ manager/db_bolt.go | 192 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 204 deletions(-) create mode 100644 manager/db_bolt.go diff --git a/go.mod b/go.mod index 2d46709..2a5edb2 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,17 @@ require ( github.com/boltdb/bolt v1.3.1 github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 + github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect github.com/gin-gonic/gin v1.5.0 + github.com/golang/protobuf v1.4.2 // indirect + github.com/google/go-cmp v0.5.2 // indirect github.com/imdario/mergo v0.3.9 - github.com/mattn/goveralls v0.0.5 // indirect github.com/pkg/profile v1.4.0 github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 github.com/smartystreets/goconvey v1.6.4 + github.com/stretchr/testify v1.6.1 // indirect github.com/urfave/cli v1.22.3 - golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 - golang.org/x/tools v0.0.0-20200312194400-c312e98713c2 // indirect + golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 + gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index f76dd97..646cc56 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 h1:4e+UEZaKPx0ZEiCMPU github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035/go.mod h1:MYsOV9Dgsec3FFSOjywi0QK5r6TeBbdWxdrMGtiYXHA= github.com/dennwc/ioctl v1.0.0 h1:DsWAAjIxRqNcLn9x6mwfuf2pet3iB7aK90K4tF16rLg= github.com/dennwc/ioctl v1.0.0/go.mod h1:ellh2YB5ldny99SBU/VX7Nq0xiZbHphf1DrtHxxjMk0= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.5.0 h1:fi+bqFAx/oLK54somfCtEZs9HeH1LHVoEPUgARpTqyc= @@ -27,7 +29,20 @@ github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rm github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -39,8 +54,6 @@ github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= -github.com/mattn/goveralls v0.0.5 h1:spfq8AyZ0cCk57Za6/juJ5btQxeE1FaEGMdfcI+XO48= -github.com/mattn/goveralls v0.0.5/go.mod h1:Xg2LHi51faXLyKXwsndxiW6uxEEQT9+3sjGzzwU4xy0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= @@ -63,6 +76,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= @@ -70,31 +85,26 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/urfave/cli v1.22.3 h1:FpNT6zq26xNpHZy08emi755QwzLPs6Pukqjlc7RfOMU= github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So= -golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200113040837-eac381796e91/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200312194400-c312e98713c2 h1:6TB4+MaZlkcSsJDu+BS5yxSEuZIYhjWz+jhbSLEZylI= -golang.org/x/tools v0.0.0-20200312194400-c312e98713c2/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= @@ -102,3 +112,7 @@ gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 h1:6D+BvnJ/j6e222UW gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473/go.mod h1:N1eN2tsCx0Ydtgjl4cqmbRCsY4/+z4cYDeqwZTk6zog= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager/db.go b/manager/db.go index 7c02180..aa61326 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,9 +1,7 @@ package manager import ( - "encoding/json" "fmt" - "strings" "time" "github.com/boltdb/bolt" @@ -44,183 +42,3 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) } - -const ( - _workerBucketKey = "workers" - _statusBucketKey = "mirror_status" -) - -type boltAdapter struct { - db *bolt.DB - dbFile string -} - -func (b *boltAdapter) Init() (err error) { - return b.db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) - } - _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) - } - return nil - }) -} - -func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - c := bucket.Cursor() - var w WorkerStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) - } - return err - }) - return -} - -func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := json.Unmarshal(v, &w) - return err - }) - return -} - -func (b *boltAdapter) DeleteWorker(workerID string) (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := bucket.Delete([]byte(workerID)) - return err - }) - return -} - -func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { - err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v, err := json.Marshal(w) - if err != nil { - return err - } - err = bucket.Put([]byte(w.ID), v) - return err - }) - return w, err -} - -func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID - err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v, err := json.Marshal(status) - err = bucket.Put([]byte(id), v) - return err - }) - return status, err -} - -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v := bucket.Get([]byte(id)) - if v == nil { - return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - err := json.Unmarshal(v, &m) - return err - }) - return -} - -func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return err - }) - return -} - -func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - return err - }) - return -} - -func (b *boltAdapter) FlushDisabledJobs() (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - err = c.Delete() - } - } - return err - }) - return -} - -func (b *boltAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil -} diff --git a/manager/db_bolt.go b/manager/db_bolt.go new file mode 100644 index 0000000..3563bed --- /dev/null +++ b/manager/db_bolt.go @@ -0,0 +1,192 @@ +package manager + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/boltdb/bolt" + + . "github.com/tuna/tunasync/internal" +) + +const ( + _workerBucketKey = "workers" + _statusBucketKey = "mirror_status" +) + +type boltAdapter struct { + db *bolt.DB + dbFile string +} + +func (b *boltAdapter) Init() (err error) { + return b.db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) + } + return nil + }) +} + +func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + c := bucket.Cursor() + var w WorkerStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + return err + }) + return +} + +func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := json.Unmarshal(v, &w) + return err + }) + return +} + +func (b *boltAdapter) DeleteWorker(workerID string) (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := bucket.Delete([]byte(workerID)) + return err + }) + return +} + +func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v, err := json.Marshal(w) + if err != nil { + return err + } + err = bucket.Put([]byte(w.ID), v) + return err + }) + return w, err +} + +func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + +func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { + id := mirrorID + "/" + workerID + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v, err := json.Marshal(status) + err = bucket.Put([]byte(id), v) + return err + }) + return status, err +} + +func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { + id := mirrorID + "/" + workerID + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v := bucket.Get([]byte(id)) + if v == nil { + return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) + } + err := json.Unmarshal(v, &m) + return err + }) + return +} + +func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + if wID := strings.Split(string(k), "/")[1]; wID == workerID { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return err + }) + return +} + +func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + return err + }) + return +} + +func (b *boltAdapter) FlushDisabledJobs() (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled || len(m.Name) == 0 { + err = c.Delete() + } + } + return err + }) + return +} + +func (b *boltAdapter) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +} From a137f0676ae39a4be790e4b65fcbb8896349d5e7 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 14:50:19 +0800 Subject: [PATCH 02/10] Add redis backend for db --- go.mod | 2 + go.sum | 34 ++++++++++ manager/db.go | 15 +++++ manager/db_bolt.go | 5 -- manager/db_redis.go | 159 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 5 deletions(-) create mode 100644 manager/db_redis.go diff --git a/go.mod b/go.mod index 2a5edb2..2cdd976 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect github.com/gin-gonic/gin v1.5.0 + github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redis/v8 v8.3.0 github.com/golang/protobuf v1.4.2 // indirect github.com/google/go-cmp v0.5.2 // indirect github.com/imdario/mergo v0.3.9 diff --git a/go.sum b/go.sum index 646cc56..d6888f4 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 h1:sDMmm+q/3+BukdIpxwO365v/Rbspp2Nt5XntgQRXq8Q= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 h1:HHUr4P/aKh4quafGxDT9LDasjGdlGkzLbfmmrlng3kA= @@ -17,8 +19,12 @@ github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 h1:4e+UEZaKPx0ZEiCMPU github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035/go.mod h1:MYsOV9Dgsec3FFSOjywi0QK5r6TeBbdWxdrMGtiYXHA= github.com/dennwc/ioctl v1.0.0 h1:DsWAAjIxRqNcLn9x6mwfuf2pet3iB7aK90K4tF16rLg= github.com/dennwc/ioctl v1.0.0/go.mod h1:ellh2YB5ldny99SBU/VX7Nq0xiZbHphf1DrtHxxjMk0= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.5.0 h1:fi+bqFAx/oLK54somfCtEZs9HeH1LHVoEPUgARpTqyc= @@ -27,6 +33,11 @@ github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotf github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.3.0 h1:Xrwvn8+QqUYD1MbQmda3cVR2U9li5XbtRFkKZN5Y0hk= +github.com/go-redis/redis/v8 v8.3.0/go.mod h1:a2xkpBM7NJUN5V5kiF46X5Ltx4WeXJ9757X/ScKUBdE= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -44,6 +55,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= @@ -58,6 +70,13 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pkg/profile v1.4.0 h1:uCmaf4vVbWAOZz36k1hrQD7ijGRzLwaME8Am/7a4jZI= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -84,13 +103,25 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.22.3 h1:FpNT6zq26xNpHZy08emi755QwzLPs6Pukqjlc7RfOMU= github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +go.opentelemetry.io/otel v0.13.0 h1:2isEnyzjjJZq6r2EKMsFj4TxiQiexsM04AVhwbR/oBA= +go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= @@ -104,14 +135,17 @@ google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyz google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 h1:6D+BvnJ/j6e222UW8s2qTSe3wGBtvo0MbVQG/c5k8RE= gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473/go.mod h1:N1eN2tsCx0Ydtgjl4cqmbRCsY4/+z4cYDeqwZTk6zog= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/manager/db.go b/manager/db.go index aa61326..9fc7e1a 100644 --- a/manager/db.go +++ b/manager/db.go @@ -5,6 +5,7 @@ import ( "time" "github.com/boltdb/bolt" + "github.com/go-redis/redis/v8" . "github.com/tuna/tunasync/internal" ) @@ -24,6 +25,11 @@ type dbAdapter interface { Close() error } +const ( + _workerBucketKey = "workers" + _statusBucketKey = "mirror_status" +) + func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { if dbType == "bolt" { innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{ @@ -38,6 +44,15 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { } err = db.Init() return &db, err + } else if dbType == "redis" { + innerDB := redis.NewClient(&redis.Options{ + Addr: dbFile, + }) + db := redisAdapter{ + db: innerDB, + } + err := db.Init() + return &db, err } // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) diff --git a/manager/db_bolt.go b/manager/db_bolt.go index 3563bed..0473b0f 100644 --- a/manager/db_bolt.go +++ b/manager/db_bolt.go @@ -11,11 +11,6 @@ import ( . "github.com/tuna/tunasync/internal" ) -const ( - _workerBucketKey = "workers" - _statusBucketKey = "mirror_status" -) - type boltAdapter struct { db *bolt.DB dbFile string diff --git a/manager/db_redis.go b/manager/db_redis.go new file mode 100644 index 0000000..324cf15 --- /dev/null +++ b/manager/db_redis.go @@ -0,0 +1,159 @@ +package manager + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/go-redis/redis/v8" + . "github.com/tuna/tunasync/internal" +) + +type redisAdapter struct { + db *redis.Client +} + +var ctx = context.Background() + +func (b *redisAdapter) Init() (err error) { + return nil +} + +func (b *redisAdapter) ListWorkers() (ws []WorkerStatus, err error) { + var val map[string]string + val, err = b.db.HGetAll(ctx, _workerBucketKey).Result() + if err == nil { + var w WorkerStatus + for _, v := range val { + jsonErr := json.Unmarshal([]byte(v), &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + } + return +} + +func (b *redisAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { + var val string + val, err = b.db.HGet(ctx, _workerBucketKey, workerID).Result() + if err == nil { + err = json.Unmarshal([]byte(val), &w) + } else { + err = fmt.Errorf("invalid workerID %s", workerID) + } + return +} + +func (b *redisAdapter) DeleteWorker(workerID string) (err error) { + _, err = b.db.HDel(ctx, _workerBucketKey, workerID).Result() + if err != nil { + err = fmt.Errorf("invalid workerID %s", workerID) + } + return +} + +func (b *redisAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { + var v []byte + v, err := json.Marshal(w) + if err == nil { + _, err = b.db.HSet(ctx, _workerBucketKey, w.ID, string(v)).Result() + } + return w, err +} + +func (b *redisAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + +func (b *redisAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { + id := mirrorID + "/" + workerID + v, err := json.Marshal(status) + if err == nil { + _, err = b.db.HSet(ctx, _statusBucketKey, id, string(v)).Result() + } + return status, err +} + +func (b *redisAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { + id := mirrorID + "/" + workerID + var val string + val, err = b.db.HGet(ctx, _statusBucketKey, id).Result() + if err == nil { + err = json.Unmarshal([]byte(val), &m) + } else { + err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) + } + return +} + +func (b *redisAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { + var val map[string]string + val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() + if err == nil { + var m MirrorStatus + for k, v := range val { + if wID := strings.Split(string(k), "/")[1]; wID == workerID { + jsonErr := json.Unmarshal([]byte(v), &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + } + return +} + +func (b *redisAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { + var val map[string]string + val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() + if err == nil { + var m MirrorStatus + for _, v := range val { + jsonErr := json.Unmarshal([]byte(v), &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return +} + +func (b *redisAdapter) FlushDisabledJobs() (err error) { + var val map[string]string + val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() + if err == nil { + var m MirrorStatus + for k, v := range val { + jsonErr := json.Unmarshal([]byte(v), &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled || len(m.Name) == 0 { + _, err = b.db.HDel(ctx, _statusBucketKey, k).Result() + } + } + } + return +} + +func (b *redisAdapter) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +} From fd4c07fdb5b9c6bd813635f17120485422cd72d8 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 14:54:25 +0800 Subject: [PATCH 03/10] Add redis backend to docs --- docs/zh_CN/get_started.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/zh_CN/get_started.md b/docs/zh_CN/get_started.md index 93cba45..73aa1d7 100644 --- a/docs/zh_CN/get_started.md +++ b/docs/zh_CN/get_started.md @@ -82,6 +82,8 @@ db_file = "/tmp/tunasync/manager.db" ca_cert = "" ``` +如果使用 redis 作为数据库后端,把 db_type 改为 redis,下面的 db_file 设为 redis 服务器的地址。 + ### 运行 ```shell From 5880ed92dcec119719ffd0e93daa561a7111733d Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 19:35:32 +0800 Subject: [PATCH 04/10] Use ParseURL from redis library --- docs/zh_CN/get_started.md | 2 +- manager/db.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/zh_CN/get_started.md b/docs/zh_CN/get_started.md index 73aa1d7..5ad8fd5 100644 --- a/docs/zh_CN/get_started.md +++ b/docs/zh_CN/get_started.md @@ -82,7 +82,7 @@ db_file = "/tmp/tunasync/manager.db" ca_cert = "" ``` -如果使用 redis 作为数据库后端,把 db_type 改为 redis,下面的 db_file 设为 redis 服务器的地址。 +如果使用 redis 作为数据库后端,把 db_type 改为 redis,下面的 db_file 设为 redis 服务器的地址: `redis://user:password@host:port/db_number`。 ### 运行 diff --git a/manager/db.go b/manager/db.go index 9fc7e1a..c2811be 100644 --- a/manager/db.go +++ b/manager/db.go @@ -45,13 +45,15 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { err = db.Init() return &db, err } else if dbType == "redis" { - innerDB := redis.NewClient(&redis.Options{ - Addr: dbFile, - }) + opt, err := redis.ParseURL(dbFile) + if err != nil { + return nil, fmt.Errorf("bad redis url: %s", err) + } + innerDB := redis.NewClient(opt) db := redisAdapter{ db: innerDB, } - err := db.Init() + err = db.Init() return &db, err } // unsupported db-type From 7dd61ae8ca06d3e4b33d82446df65216ee03edd9 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 22:41:58 +0800 Subject: [PATCH 05/10] Add kv abstration layer for bolt and redis --- manager/db.go | 181 +++++++++++++++++++++++++++++++++++++++++++- manager/db_bolt.go | 171 ++++++----------------------------------- manager/db_redis.go | 142 +++++----------------------------- 3 files changed, 219 insertions(+), 275 deletions(-) diff --git a/manager/db.go b/manager/db.go index c2811be..d254513 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,7 +1,9 @@ package manager import ( + "encoding/json" "fmt" + "strings" "time" "github.com/boltdb/bolt" @@ -25,6 +27,15 @@ type dbAdapter interface { Close() error } +type kvAdapter interface { + InitBucket(bucket string) error + Get(bucket string, key string) ([]byte, error) + GetAll(bucket string) (map[string][]byte, error) + Put(bucket string, key string, value []byte) error + Delete(bucket string, key string) error + Close() error +} + const ( _workerBucketKey = "workers" _statusBucketKey = "mirror_status" @@ -42,8 +53,11 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { db: innerDB, dbFile: dbFile, } - err = db.Init() - return &db, err + kv := kvDBAdapter{ + db: &db, + } + err = kv.Init() + return &kv, err } else if dbType == "redis" { opt, err := redis.ParseURL(dbFile) if err != nil { @@ -53,9 +67,168 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { db := redisAdapter{ db: innerDB, } - err = db.Init() - return &db, err + kv := kvDBAdapter{ + db: &db, + } + err = kv.Init() + return &kv, err } // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) } + +type kvDBAdapter struct { + db kvAdapter +} + +func (b *kvDBAdapter) Init() error { + err := b.db.InitBucket(_workerBucketKey) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + err = b.db.InitBucket(_statusBucketKey) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + return err +} + +func (b *kvDBAdapter) ListWorkers() (ws []WorkerStatus, err error) { + var workers map[string][]byte + workers, err = b.db.GetAll(_workerBucketKey) + + var w WorkerStatus + for _, v := range workers { + jsonErr := json.Unmarshal(v, &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + return +} + +func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { + var v []byte + v, err = b.db.Get(_workerBucketKey, workerID) + if v == nil { + err = fmt.Errorf("invalid workerID %s", workerID) + } else { + err = json.Unmarshal(v, &w) + } + return +} + +func (b *kvDBAdapter) DeleteWorker(workerID string) error { + return b.db.Delete(_workerBucketKey, workerID) +} + +func (b *kvDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { + v, err := json.Marshal(w) + if err == nil { + err = b.db.Put(_workerBucketKey, w.ID, v) + } + return w, err +} + +func (b *kvDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + +func (b *kvDBAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { + id := mirrorID + "/" + workerID + v, err := json.Marshal(status) + if err == nil { + err = b.db.Put(_statusBucketKey, id, v) + } + return status, err +} + +func (b *kvDBAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { + id := mirrorID + "/" + workerID + var v []byte + v, err = b.db.Get(_statusBucketKey, id) + if v == nil { + err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) + } else if err == nil { + err = json.Unmarshal(v, &m) + } + return +} + +func (b *kvDBAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for k, v := range vals { + if wID := strings.Split(k, "/")[1]; wID == workerID { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return +} + +func (b *kvDBAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for _, v := range vals { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + return +} + +func (b *kvDBAdapter) FlushDisabledJobs() (err error) { + var vals map[string][]byte + vals, err = b.db.GetAll(_statusBucketKey) + if err != nil { + return + } + + for k, v := range vals { + var m MirrorStatus + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled || len(m.Name) == 0 { + deleteErr := b.db.Delete(_statusBucketKey, k) + if deleteErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), deleteErr) + } + } + } + return +} + +func (b *kvDBAdapter) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +} diff --git a/manager/db_bolt.go b/manager/db_bolt.go index 0473b0f..b80372e 100644 --- a/manager/db_bolt.go +++ b/manager/db_bolt.go @@ -1,14 +1,9 @@ package manager import ( - "encoding/json" "fmt" - "strings" - "time" "github.com/boltdb/bolt" - - . "github.com/tuna/tunasync/internal" ) type boltAdapter struct { @@ -16,172 +11,56 @@ type boltAdapter struct { dbFile string } -func (b *boltAdapter) Init() (err error) { +func (b *boltAdapter) InitBucket(bucket string) (err error) { return b.db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) + _, err = tx.CreateBucketIfNotExists([]byte(bucket)) if err != nil { return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) } - _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) - } return nil }) } -func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { +func (b *boltAdapter) Get(bucket string, key string) (v []byte, err error) { err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) + bucket := tx.Bucket([]byte(bucket)) + v = bucket.Get([]byte(key)) + return nil + }) + return +} + +func (b *boltAdapter) GetAll(bucket string) (m map[string][]byte, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(bucket)) c := bucket.Cursor() - var w WorkerStatus + m = make(map[string][]byte) for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) + m[string(k)] = v } - return err + return nil }) return } -func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := json.Unmarshal(v, &w) - return err - }) - return -} - -func (b *boltAdapter) DeleteWorker(workerID string) (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := bucket.Delete([]byte(workerID)) - return err - }) - return -} - -func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { +func (b *boltAdapter) Put(bucket string, key string, value []byte) error { err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v, err := json.Marshal(w) - if err != nil { - return err - } - err = bucket.Put([]byte(w.ID), v) + bucket := tx.Bucket([]byte(bucket)) + err := bucket.Put([]byte(key), value) return err }) - return w, err + return err } -func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID +func (b *boltAdapter) Delete(bucket string, key string) error { err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v, err := json.Marshal(status) - err = bucket.Put([]byte(id), v) + bucket := tx.Bucket([]byte(bucket)) + err := bucket.Delete([]byte(key)) return err }) - return status, err -} - -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v := bucket.Get([]byte(id)) - if v == nil { - return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - err := json.Unmarshal(v, &m) - return err - }) - return -} - -func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return err - }) - return -} - -func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - return err - }) - return -} - -func (b *boltAdapter) FlushDisabledJobs() (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - err = c.Delete() - } - } - return err - }) - return + return err } func (b *boltAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil + return b.db.Close() } diff --git a/manager/db_redis.go b/manager/db_redis.go index 324cf15..17005a0 100644 --- a/manager/db_redis.go +++ b/manager/db_redis.go @@ -2,13 +2,8 @@ package manager import ( "context" - "encoding/json" - "fmt" - "strings" - "time" "github.com/go-redis/redis/v8" - . "github.com/tuna/tunasync/internal" ) type redisAdapter struct { @@ -17,143 +12,40 @@ type redisAdapter struct { var ctx = context.Background() -func (b *redisAdapter) Init() (err error) { - return nil -} - -func (b *redisAdapter) ListWorkers() (ws []WorkerStatus, err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _workerBucketKey).Result() - if err == nil { - var w WorkerStatus - for _, v := range val { - jsonErr := json.Unmarshal([]byte(v), &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) - } - } +func (b *redisAdapter) InitBucket(bucket string) (err error) { + // no-op return } -func (b *redisAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { +func (b *redisAdapter) Get(bucket string, key string) (v []byte, err error) { var val string - val, err = b.db.HGet(ctx, _workerBucketKey, workerID).Result() - if err == nil { - err = json.Unmarshal([]byte(val), &w) - } else { - err = fmt.Errorf("invalid workerID %s", workerID) - } + val, err = b.db.HGet(ctx, bucket, key).Result() + v = []byte(val) return } -func (b *redisAdapter) DeleteWorker(workerID string) (err error) { - _, err = b.db.HDel(ctx, _workerBucketKey, workerID).Result() - if err != nil { - err = fmt.Errorf("invalid workerID %s", workerID) - } - return -} - -func (b *redisAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { - var v []byte - v, err := json.Marshal(w) - if err == nil { - _, err = b.db.HSet(ctx, _workerBucketKey, w.ID, string(v)).Result() - } - return w, err -} - -func (b *redisAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *redisAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID - v, err := json.Marshal(status) - if err == nil { - _, err = b.db.HSet(ctx, _statusBucketKey, id, string(v)).Result() - } - return status, err -} - -func (b *redisAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - var val string - val, err = b.db.HGet(ctx, _statusBucketKey, id).Result() - if err == nil { - err = json.Unmarshal([]byte(val), &m) - } else { - err = fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - return -} - -func (b *redisAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { +func (b *redisAdapter) GetAll(bucket string) (m map[string][]byte, err error) { var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus + val, err = b.db.HGetAll(ctx, bucket).Result() + if err == nil && val != nil { + m = make(map[string][]byte) for k, v := range val { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } + m[k] = []byte(v) } } return } -func (b *redisAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus - for _, v := range val { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return +func (b *redisAdapter) Put(bucket string, key string, value []byte) error { + _, err := b.db.HSet(ctx, bucket, key, string(value)).Result() + return err } -func (b *redisAdapter) FlushDisabledJobs() (err error) { - var val map[string]string - val, err = b.db.HGetAll(ctx, _statusBucketKey).Result() - if err == nil { - var m MirrorStatus - for k, v := range val { - jsonErr := json.Unmarshal([]byte(v), &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - _, err = b.db.HDel(ctx, _statusBucketKey, k).Result() - } - } - } - return +func (b *redisAdapter) Delete(bucket string, key string) error { + _, err := b.db.HDel(ctx, bucket, key).Result() + return err } func (b *redisAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil + return b.db.Close() } From 90b4e5debb1f59fd23f7307875dd0d25170d95b6 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 22:49:49 +0800 Subject: [PATCH 06/10] Fix DeleteWorker behavior to match tests --- manager/db.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/manager/db.go b/manager/db.go index d254513..1a6472d 100644 --- a/manager/db.go +++ b/manager/db.go @@ -121,6 +121,10 @@ func (b *kvDBAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { } func (b *kvDBAdapter) DeleteWorker(workerID string) error { + v, _ := b.db.Get(_workerBucketKey, workerID) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } return b.db.Delete(_workerBucketKey, workerID) } From d341c0c99d68324f82deb0504002daea16948a86 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 23:01:46 +0800 Subject: [PATCH 07/10] Rearrange and fix db tests --- manager/db_test.go | 283 ++++++++++++++++++++++++--------------------- 1 file changed, 149 insertions(+), 134 deletions(-) diff --git a/manager/db_test.go b/manager/db_test.go index f65a356..b0d197c 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sort" "testing" "time" @@ -12,6 +13,153 @@ import ( . "github.com/tuna/tunasync/internal" ) +func SortMirrorStatus(status []MirrorStatus) { + sort.Slice(status, func(l, r int) bool { + return status[l].Name < status[r].Name + }) +} + +func DBAdapterTest(db dbAdapter) { + var err error + testWorkerIDs := []string{"test_worker1", "test_worker2"} + Convey("create worker", func() { + for _, id := range testWorkerIDs { + w := WorkerStatus{ + ID: id, + Token: "token_" + id, + LastOnline: time.Now(), + LastRegister: time.Now(), + } + w, err = db.CreateWorker(w) + So(err, ShouldBeNil) + } + + Convey("get existent worker", func() { + _, err := db.GetWorker(testWorkerIDs[0]) + So(err, ShouldBeNil) + }) + + Convey("list existent workers", func() { + ws, err := db.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 2) + }) + + Convey("get non-existent worker", func() { + _, err := db.GetWorker("invalid workerID") + So(err, ShouldNotBeNil) + }) + + Convey("delete existent worker", func() { + err := db.DeleteWorker(testWorkerIDs[0]) + So(err, ShouldBeNil) + _, err = db.GetWorker(testWorkerIDs[0]) + So(err, ShouldNotBeNil) + ws, err := db.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 1) + }) + + Convey("delete non-existent worker", func() { + err := db.DeleteWorker("invalid workerID") + So(err, ShouldNotBeNil) + ws, err := db.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 2) + }) + }) + + Convey("update mirror status", func() { + status := []MirrorStatus{ + MirrorStatus{ + Name: "arch-sync1", + Worker: testWorkerIDs[0], + IsMaster: true, + Status: Success, + LastUpdate: time.Now(), + LastStarted: time.Now().Add(-time.Minute), + LastEnded: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "3GB", + }, + MirrorStatus{ + Name: "arch-sync2", + Worker: testWorkerIDs[1], + IsMaster: true, + Status: Disabled, + LastUpdate: time.Now().Add(-time.Hour), + LastStarted: time.Now().Add(-time.Minute), + LastEnded: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "4GB", + }, + MirrorStatus{ + Name: "arch-sync3", + Worker: testWorkerIDs[1], + IsMaster: true, + Status: Success, + LastUpdate: time.Now().Add(-time.Minute), + LastStarted: time.Now().Add(-time.Second), + LastEnded: time.Now(), + Upstream: "mirrors.tuna.tsinghua.edu.cn", + Size: "4GB", + }, + } + SortMirrorStatus(status) + + for _, s := range status { + _, err := db.UpdateMirrorStatus(s.Worker, s.Name, s) + So(err, ShouldBeNil) + + } + + Convey("get mirror status", func() { + m, err := db.GetMirrorStatus(testWorkerIDs[0], status[0].Name) + So(err, ShouldBeNil) + expectedJSON, err := json.Marshal(status[0]) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(m) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + Convey("list mirror status", func() { + ms, err := db.ListMirrorStatus(testWorkerIDs[0]) + So(err, ShouldBeNil) + expectedJSON, err := json.Marshal([]MirrorStatus{status[0]}) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(ms) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + Convey("list all mirror status", func() { + ms, err := db.ListAllMirrorStatus() + So(err, ShouldBeNil) + SortMirrorStatus(ms) + + expectedJSON, err := json.Marshal(status) + So(err, ShouldBeNil) + actualJSON, err := json.Marshal(ms) + So(err, ShouldBeNil) + So(string(actualJSON), ShouldEqual, string(expectedJSON)) + }) + + Convey("flush disabled jobs", func() { + ms, err := db.ListAllMirrorStatus() + So(err, ShouldBeNil) + So(len(ms), ShouldEqual, 3) + err = db.FlushDisabledJobs() + So(err, ShouldBeNil) + ms, err = db.ListAllMirrorStatus() + So(err, ShouldBeNil) + So(len(ms), ShouldEqual, 2) + }) + + }) + return +} + func TestBoltAdapter(t *testing.T) { Convey("boltAdapter should work", t, func() { tmpDir, err := ioutil.TempDir("", "tunasync") @@ -28,139 +176,6 @@ func TestBoltAdapter(t *testing.T) { So(err, ShouldBeNil) }() - testWorkerIDs := []string{"test_worker1", "test_worker2"} - Convey("create worker", func() { - for _, id := range testWorkerIDs { - w := WorkerStatus{ - ID: id, - Token: "token_" + id, - LastOnline: time.Now(), - LastRegister: time.Now(), - } - w, err = boltDB.CreateWorker(w) - So(err, ShouldBeNil) - } - - Convey("get existent worker", func() { - _, err := boltDB.GetWorker(testWorkerIDs[0]) - So(err, ShouldBeNil) - }) - - Convey("list existent workers", func() { - ws, err := boltDB.ListWorkers() - So(err, ShouldBeNil) - So(len(ws), ShouldEqual, 2) - }) - - Convey("get non-existent worker", func() { - _, err := boltDB.GetWorker("invalid workerID") - So(err, ShouldNotBeNil) - }) - - Convey("delete existent worker", func() { - err := boltDB.DeleteWorker(testWorkerIDs[0]) - So(err, ShouldBeNil) - _, err = boltDB.GetWorker(testWorkerIDs[0]) - So(err, ShouldNotBeNil) - ws, err := boltDB.ListWorkers() - So(err, ShouldBeNil) - So(len(ws), ShouldEqual, 1) - }) - - Convey("delete non-existent worker", func() { - err := boltDB.DeleteWorker("invalid workerID") - So(err, ShouldNotBeNil) - ws, err := boltDB.ListWorkers() - So(err, ShouldBeNil) - So(len(ws), ShouldEqual, 2) - }) - }) - - Convey("update mirror status", func() { - status := []MirrorStatus{ - MirrorStatus{ - Name: "arch-sync1", - Worker: testWorkerIDs[0], - IsMaster: true, - Status: Success, - LastUpdate: time.Now(), - LastStarted: time.Now().Add(-time.Minute), - LastEnded: time.Now(), - Upstream: "mirrors.tuna.tsinghua.edu.cn", - Size: "3GB", - }, - MirrorStatus{ - Name: "arch-sync2", - Worker: testWorkerIDs[1], - IsMaster: true, - Status: Disabled, - LastUpdate: time.Now().Add(-time.Hour), - LastStarted: time.Now().Add(-time.Minute), - LastEnded: time.Now(), - Upstream: "mirrors.tuna.tsinghua.edu.cn", - Size: "4GB", - }, - MirrorStatus{ - Name: "arch-sync3", - Worker: testWorkerIDs[1], - IsMaster: true, - Status: Success, - LastUpdate: time.Now().Add(-time.Minute), - LastStarted: time.Now().Add(-time.Second), - LastEnded: time.Now(), - Upstream: "mirrors.tuna.tsinghua.edu.cn", - Size: "4GB", - }, - } - - for _, s := range status { - _, err := boltDB.UpdateMirrorStatus(s.Worker, s.Name, s) - So(err, ShouldBeNil) - - } - - Convey("get mirror status", func() { - m, err := boltDB.GetMirrorStatus(testWorkerIDs[0], status[0].Name) - So(err, ShouldBeNil) - expectedJSON, err := json.Marshal(status[0]) - So(err, ShouldBeNil) - actualJSON, err := json.Marshal(m) - So(err, ShouldBeNil) - So(string(actualJSON), ShouldEqual, string(expectedJSON)) - }) - - Convey("list mirror status", func() { - ms, err := boltDB.ListMirrorStatus(testWorkerIDs[0]) - So(err, ShouldBeNil) - expectedJSON, err := json.Marshal([]MirrorStatus{status[0]}) - So(err, ShouldBeNil) - actualJSON, err := json.Marshal(ms) - So(err, ShouldBeNil) - So(string(actualJSON), ShouldEqual, string(expectedJSON)) - }) - - Convey("list all mirror status", func() { - ms, err := boltDB.ListAllMirrorStatus() - So(err, ShouldBeNil) - expectedJSON, err := json.Marshal(status) - So(err, ShouldBeNil) - actualJSON, err := json.Marshal(ms) - So(err, ShouldBeNil) - So(string(actualJSON), ShouldEqual, string(expectedJSON)) - }) - - Convey("flush disabled jobs", func() { - ms, err := boltDB.ListAllMirrorStatus() - So(err, ShouldBeNil) - So(len(ms), ShouldEqual, 3) - err = boltDB.FlushDisabledJobs() - So(err, ShouldBeNil) - ms, err = boltDB.ListAllMirrorStatus() - So(err, ShouldBeNil) - So(len(ms), ShouldEqual, 2) - }) - - }) - + DBAdapterTest(boltDB) }) } From 3c7ee8f9fdb56f338172e9f381d40405277d880c Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 23:05:39 +0800 Subject: [PATCH 08/10] Add mock test for redis backend --- go.mod | 3 +++ go.sum | 16 ++++++++++++++++ manager/db_redis.go | 4 +++- manager/db_test.go | 20 ++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 2cdd976..a2465a3 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 + github.com/alicebob/miniredis v2.5.0+incompatible + github.com/alicebob/miniredis/v2 v2.13.3 // indirect github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 github.com/boltdb/bolt v1.3.1 github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 @@ -13,6 +15,7 @@ require ( github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis/v8 v8.3.0 github.com/golang/protobuf v1.4.2 // indirect + github.com/gomodule/redigo v1.8.2 // indirect github.com/google/go-cmp v0.5.2 // indirect github.com/imdario/mergo v0.3.9 github.com/pkg/profile v1.4.0 diff --git a/go.sum b/go.sum index d6888f4..cc6a222 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,20 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= +github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= +github.com/alicebob/miniredis/v2 v2.13.3 h1:kohgdtN58KW/r9ZDVmMJE3MrfbumwsDQStd0LPAGmmw= +github.com/alicebob/miniredis/v2 v2.13.3/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wXxSpQUE9EaJ/Blg= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 h1:sDMmm+q/3+BukdIpxwO365v/Rbspp2Nt5XntgQRXq8Q= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 h1:HHUr4P/aKh4quafGxDT9LDasjGdlGkzLbfmmrlng3kA= @@ -47,6 +56,9 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/gomodule/redigo/redis v0.0.0-do-not-use h1:J7XIp6Kau0WoyT4JtXHT3Ei0gA1KkSc6bc87j9v9WIo= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -95,6 +107,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= @@ -103,6 +116,8 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.22.3 h1:FpNT6zq26xNpHZy08emi755QwzLPs6Pukqjlc7RfOMU= github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.opentelemetry.io/otel v0.13.0 h1:2isEnyzjjJZq6r2EKMsFj4TxiQiexsM04AVhwbR/oBA= go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -111,6 +126,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/manager/db_redis.go b/manager/db_redis.go index 17005a0..0625fcf 100644 --- a/manager/db_redis.go +++ b/manager/db_redis.go @@ -20,7 +20,9 @@ func (b *redisAdapter) InitBucket(bucket string) (err error) { func (b *redisAdapter) Get(bucket string, key string) (v []byte, err error) { var val string val, err = b.db.HGet(ctx, bucket, key).Result() - v = []byte(val) + if err == nil { + v = []byte(val) + } return } diff --git a/manager/db_test.go b/manager/db_test.go index b0d197c..2ba2170 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -2,6 +2,7 @@ package manager import ( "encoding/json" + "fmt" "io/ioutil" "os" "path/filepath" @@ -9,6 +10,7 @@ import ( "testing" "time" + "github.com/alicebob/miniredis" . "github.com/smartystreets/goconvey/convey" . "github.com/tuna/tunasync/internal" ) @@ -178,4 +180,22 @@ func TestBoltAdapter(t *testing.T) { DBAdapterTest(boltDB) }) + + Convey("redisAdapter should work", t, func() { + mr, err := miniredis.Run() + So(err, ShouldBeNil) + + addr := fmt.Sprintf("redis://%s", mr.Addr()) + redisDB, err := makeDBAdapter("redis", addr) + So(err, ShouldBeNil) + + defer func() { + // close redisDB + err := redisDB.Close() + So(err, ShouldBeNil) + mr.Close() + }() + + DBAdapterTest(redisDB) + }) } From 992044d402d2c0a922293f501c9b5d047b0ad031 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 23:11:07 +0800 Subject: [PATCH 09/10] Small code cleanup --- go.mod | 8 ++------ go.sum | 12 +++++++----- manager/config.go | 1 + manager/db.go | 5 +++-- manager/db_bolt.go | 4 ++-- manager/db_redis.go | 1 + manager/server.go | 2 +- 7 files changed, 17 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index a2465a3..2a47ca1 100644 --- a/go.mod +++ b/go.mod @@ -4,26 +4,22 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/alicebob/miniredis v2.5.0+incompatible - github.com/alicebob/miniredis/v2 v2.13.3 // indirect github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 github.com/boltdb/bolt v1.3.1 github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect github.com/gin-gonic/gin v1.5.0 - github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis/v8 v8.3.0 - github.com/golang/protobuf v1.4.2 // indirect github.com/gomodule/redigo v1.8.2 // indirect - github.com/google/go-cmp v0.5.2 // indirect github.com/imdario/mergo v0.3.9 github.com/pkg/profile v1.4.0 github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 github.com/smartystreets/goconvey v1.6.4 - github.com/stretchr/testify v1.6.1 // indirect github.com/urfave/cli v1.22.3 + github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 - gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index cc6a222..b494dd2 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= -github.com/alicebob/miniredis/v2 v2.13.3 h1:kohgdtN58KW/r9ZDVmMJE3MrfbumwsDQStd0LPAGmmw= -github.com/alicebob/miniredis/v2 v2.13.3/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wXxSpQUE9EaJ/Blg= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= @@ -33,6 +31,7 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= @@ -42,8 +41,6 @@ github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotf github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.3.0 h1:Xrwvn8+QqUYD1MbQmda3cVR2U9li5XbtRFkKZN5Y0hk= github.com/go-redis/redis/v8 v8.3.0/go.mod h1:a2xkpBM7NJUN5V5kiF46X5Ltx4WeXJ9757X/ScKUBdE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -58,7 +55,6 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/gomodule/redigo/redis v0.0.0-do-not-use h1:J7XIp6Kau0WoyT4JtXHT3Ei0gA1KkSc6bc87j9v9WIo= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -82,12 +78,15 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.1 h1:jMU0WaQrP0a/YAEq8eJmJKjBoMs+pClEr1vDMlM/Do4= github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pkg/profile v1.4.0 h1:uCmaf4vVbWAOZz36k1hrQD7ijGRzLwaME8Am/7a4jZI= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= @@ -123,6 +122,7 @@ go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -136,6 +136,7 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= @@ -158,6 +159,7 @@ gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvR gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 h1:6D+BvnJ/j6e222UW8s2qTSe3wGBtvo0MbVQG/c5k8RE= gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473/go.mod h1:N1eN2tsCx0Ydtgjl4cqmbRCsY4/+z4cYDeqwZTk6zog= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/manager/config.go b/manager/config.go index e363fb2..7699b40 100644 --- a/manager/config.go +++ b/manager/config.go @@ -29,6 +29,7 @@ type FileConfig struct { CACert string `toml:"ca_cert"` } +// LoadConfig loads config from specified file func LoadConfig(cfgFile string, c *cli.Context) (*Config, error) { cfg := new(Config) diff --git a/manager/db.go b/manager/db.go index 1a6472d..3c034bb 100644 --- a/manager/db.go +++ b/manager/db.go @@ -27,6 +27,7 @@ type dbAdapter interface { Close() error } +// interface for a kv database type kvAdapter interface { InitBucket(bucket string) error Get(bucket string, key string) ([]byte, error) @@ -50,8 +51,7 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { return nil, err } db := boltAdapter{ - db: innerDB, - dbFile: dbFile, + db: innerDB, } kv := kvDBAdapter{ db: &db, @@ -77,6 +77,7 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { return nil, fmt.Errorf("unsupported db-type: %s", dbType) } +// use the underlying kv database to store data type kvDBAdapter struct { db kvAdapter } diff --git a/manager/db_bolt.go b/manager/db_bolt.go index b80372e..80a2e4c 100644 --- a/manager/db_bolt.go +++ b/manager/db_bolt.go @@ -6,9 +6,9 @@ import ( "github.com/boltdb/bolt" ) +// implement kv interface backed by boltdb type boltAdapter struct { - db *bolt.DB - dbFile string + db *bolt.DB } func (b *boltAdapter) InitBucket(bucket string) (err error) { diff --git a/manager/db_redis.go b/manager/db_redis.go index 0625fcf..9dad51a 100644 --- a/manager/db_redis.go +++ b/manager/db_redis.go @@ -6,6 +6,7 @@ import ( "github.com/go-redis/redis/v8" ) +// implement kv interface backed by redis type redisAdapter struct { db *redis.Client } diff --git a/manager/server.go b/manager/server.go index 8d3ba11..fbd6712 100644 --- a/manager/server.go +++ b/manager/server.go @@ -276,7 +276,7 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) { curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) s.rwmu.RUnlock() if err != nil { - fmt.Errorf("failed to get job %s of worker %s: %s", + err = fmt.Errorf("failed to get job %s of worker %s: %s", mirrorName, workerID, err.Error(), ) continue From 4e426c891ecdfafc47e2a3a0760b6846024859ff Mon Sep 17 00:00:00 2001 From: jiegec Date: Thu, 15 Oct 2020 07:33:02 +0800 Subject: [PATCH 10/10] Fix error logging in server.go --- manager/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/server.go b/manager/server.go index fbd6712..5ebc71d 100644 --- a/manager/server.go +++ b/manager/server.go @@ -276,7 +276,7 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) { curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) s.rwmu.RUnlock() if err != nil { - err = fmt.Errorf("failed to get job %s of worker %s: %s", + logger.Errorf("failed to get job %s of worker %s: %s", mirrorName, workerID, err.Error(), ) continue