From a2887da2dd95a07ef960b38f80528c98cd782481 Mon Sep 17 00:00:00 2001 From: jiegec Date: Tue, 13 Oct 2020 14:27:41 +0800 Subject: [PATCH] Move bolt db adapter to separate file --- go.mod | 9 ++- go.sum | 52 +++++++----- manager/db.go | 182 ------------------------------------------ manager/db_bolt.go | 192 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 204 deletions(-) create mode 100644 manager/db_bolt.go diff --git a/go.mod b/go.mod index 2d46709..2a5edb2 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,17 @@ require ( github.com/boltdb/bolt v1.3.1 github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 + github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect github.com/gin-gonic/gin v1.5.0 + github.com/golang/protobuf v1.4.2 // indirect + github.com/google/go-cmp v0.5.2 // indirect github.com/imdario/mergo v0.3.9 - github.com/mattn/goveralls v0.0.5 // indirect github.com/pkg/profile v1.4.0 github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 github.com/smartystreets/goconvey v1.6.4 + github.com/stretchr/testify v1.6.1 // indirect github.com/urfave/cli v1.22.3 - golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 - golang.org/x/tools v0.0.0-20200312194400-c312e98713c2 // indirect + golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 + gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index f76dd97..646cc56 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 h1:4e+UEZaKPx0ZEiCMPU github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035/go.mod h1:MYsOV9Dgsec3FFSOjywi0QK5r6TeBbdWxdrMGtiYXHA= github.com/dennwc/ioctl v1.0.0 h1:DsWAAjIxRqNcLn9x6mwfuf2pet3iB7aK90K4tF16rLg= github.com/dennwc/ioctl v1.0.0/go.mod h1:ellh2YB5ldny99SBU/VX7Nq0xiZbHphf1DrtHxxjMk0= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.5.0 h1:fi+bqFAx/oLK54somfCtEZs9HeH1LHVoEPUgARpTqyc= @@ -27,7 +29,20 @@ github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rm github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -39,8 +54,6 @@ github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= -github.com/mattn/goveralls v0.0.5 h1:spfq8AyZ0cCk57Za6/juJ5btQxeE1FaEGMdfcI+XO48= -github.com/mattn/goveralls v0.0.5/go.mod h1:Xg2LHi51faXLyKXwsndxiW6uxEEQT9+3sjGzzwU4xy0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= @@ -63,6 +76,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= @@ -70,31 +85,26 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/urfave/cli v1.22.3 h1:FpNT6zq26xNpHZy08emi755QwzLPs6Pukqjlc7RfOMU= github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So= -golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200113040837-eac381796e91/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200312194400-c312e98713c2 h1:6TB4+MaZlkcSsJDu+BS5yxSEuZIYhjWz+jhbSLEZylI= -golang.org/x/tools v0.0.0-20200312194400-c312e98713c2/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= @@ -102,3 +112,7 @@ gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 h1:6D+BvnJ/j6e222UW gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473/go.mod h1:N1eN2tsCx0Ydtgjl4cqmbRCsY4/+z4cYDeqwZTk6zog= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager/db.go b/manager/db.go index 7c02180..aa61326 100644 --- a/manager/db.go +++ b/manager/db.go @@ -1,9 +1,7 @@ package manager import ( - "encoding/json" "fmt" - "strings" "time" "github.com/boltdb/bolt" @@ -44,183 +42,3 @@ func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { // unsupported db-type return nil, fmt.Errorf("unsupported db-type: %s", dbType) } - -const ( - _workerBucketKey = "workers" - _statusBucketKey = "mirror_status" -) - -type boltAdapter struct { - db *bolt.DB - dbFile string -} - -func (b *boltAdapter) Init() (err error) { - return b.db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) - } - _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) - if err != nil { - return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) - } - return nil - }) -} - -func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - c := bucket.Cursor() - var w WorkerStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &w) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ws = append(ws, w) - } - return err - }) - return -} - -func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := json.Unmarshal(v, &w) - return err - }) - return -} - -func (b *boltAdapter) DeleteWorker(workerID string) (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v := bucket.Get([]byte(workerID)) - if v == nil { - return fmt.Errorf("invalid workerID %s", workerID) - } - err := bucket.Delete([]byte(workerID)) - return err - }) - return -} - -func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { - err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_workerBucketKey)) - v, err := json.Marshal(w) - if err != nil { - return err - } - err = bucket.Put([]byte(w.ID), v) - return err - }) - return w, err -} - -func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { - w, err = b.GetWorker(workerID) - if err == nil { - w.LastOnline = time.Now() - w, err = b.CreateWorker(w) - } - return w, err -} - -func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { - id := mirrorID + "/" + workerID - err := b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v, err := json.Marshal(status) - err = bucket.Put([]byte(id), v) - return err - }) - return status, err -} - -func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { - id := mirrorID + "/" + workerID - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - v := bucket.Get([]byte(id)) - if v == nil { - return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) - } - err := json.Unmarshal(v, &m) - return err - }) - return -} - -func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - if wID := strings.Split(string(k), "/")[1]; wID == workerID { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - } - return err - }) - return -} - -func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { - err = b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - ms = append(ms, m) - } - return err - }) - return -} - -func (b *boltAdapter) FlushDisabledJobs() (err error) { - err = b.db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(_statusBucketKey)) - c := bucket.Cursor() - var m MirrorStatus - for k, v := c.First(); k != nil; k, v = c.Next() { - jsonErr := json.Unmarshal(v, &m) - if jsonErr != nil { - err = fmt.Errorf("%s; %s", err.Error(), jsonErr) - continue - } - if m.Status == Disabled || len(m.Name) == 0 { - err = c.Delete() - } - } - return err - }) - return -} - -func (b *boltAdapter) Close() error { - if b.db != nil { - return b.db.Close() - } - return nil -} diff --git a/manager/db_bolt.go b/manager/db_bolt.go new file mode 100644 index 0000000..3563bed --- /dev/null +++ b/manager/db_bolt.go @@ -0,0 +1,192 @@ +package manager + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/boltdb/bolt" + + . "github.com/tuna/tunasync/internal" +) + +const ( + _workerBucketKey = "workers" + _statusBucketKey = "mirror_status" +) + +type boltAdapter struct { + db *bolt.DB + dbFile string +} + +func (b *boltAdapter) Init() (err error) { + return b.db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(_workerBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _workerBucketKey, err.Error()) + } + _, err = tx.CreateBucketIfNotExists([]byte(_statusBucketKey)) + if err != nil { + return fmt.Errorf("create bucket %s error: %s", _statusBucketKey, err.Error()) + } + return nil + }) +} + +func (b *boltAdapter) ListWorkers() (ws []WorkerStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + c := bucket.Cursor() + var w WorkerStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &w) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ws = append(ws, w) + } + return err + }) + return +} + +func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := json.Unmarshal(v, &w) + return err + }) + return +} + +func (b *boltAdapter) DeleteWorker(workerID string) (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := bucket.Delete([]byte(workerID)) + return err + }) + return +} + +func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v, err := json.Marshal(w) + if err != nil { + return err + } + err = bucket.Put([]byte(w.ID), v) + return err + }) + return w, err +} + +func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) { + w, err = b.GetWorker(workerID) + if err == nil { + w.LastOnline = time.Now() + w, err = b.CreateWorker(w) + } + return w, err +} + +func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { + id := mirrorID + "/" + workerID + err := b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v, err := json.Marshal(status) + err = bucket.Put([]byte(id), v) + return err + }) + return status, err +} + +func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus, err error) { + id := mirrorID + "/" + workerID + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + v := bucket.Get([]byte(id)) + if v == nil { + return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID) + } + err := json.Unmarshal(v, &m) + return err + }) + return +} + +func (b *boltAdapter) ListMirrorStatus(workerID string) (ms []MirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + if wID := strings.Split(string(k), "/")[1]; wID == workerID { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + } + return err + }) + return +} + +func (b *boltAdapter) ListAllMirrorStatus() (ms []MirrorStatus, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + ms = append(ms, m) + } + return err + }) + return +} + +func (b *boltAdapter) FlushDisabledJobs() (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_statusBucketKey)) + c := bucket.Cursor() + var m MirrorStatus + for k, v := c.First(); k != nil; k, v = c.Next() { + jsonErr := json.Unmarshal(v, &m) + if jsonErr != nil { + err = fmt.Errorf("%s; %s", err.Error(), jsonErr) + continue + } + if m.Status == Disabled || len(m.Name) == 0 { + err = c.Delete() + } + } + return err + }) + return +} + +func (b *boltAdapter) Close() error { + if b.db != nil { + return b.db.Close() + } + return nil +}