1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-08 15:36:47 +00:00

44 次代码提交

作者 SHA1 备注 提交日期
z4yx
9ac374527a regenerate travis deploy key 2018-10-25 17:27:32 +08:00
z4yx
f03626d4e1 update Get Started document 2018-10-25 17:23:02 +08:00
z4yx
23bf4890cc bump version to v0.3.2 2018-10-25 17:07:04 +08:00
z4yx
2f6a61aee5 increse test coverage 2018-10-25 17:02:05 +08:00
z4yx
b6043142e1 test if it works with golang 1.8 2018-10-25 16:16:04 +08:00
zhang
6241576b12 bug fix: tunasynctl failed to parse datetime when you list jobs of specific worker 2018-06-13 10:28:48 +08:00
bigeagle
ef78563b8c Merge pull request #74 from houbaron/patch-1
Update README.md
2018-05-31 21:25:55 +08:00
bigeagle
ca106f1360 Merge pull request #82 from tuna/dev
New feature: remove a worker with tunasynctl
2018-05-31 21:22:46 +08:00
Miao Wang
628266ac5a Merge pull request #81 from tuna/wip-override-concurrent-limit
New feature: run "tunasynctl start" with "-f" to override the limit of concurrent jobs
2018-05-31 14:22:03 +08:00
Yuxiang Zhang
7e601d9fff New feature: remove a worker with tunasynctl
Fix #78
2018-05-31 12:32:22 +08:00
z4yx
c750aa1871 new feature: run "tunasynctl start" with "-f" to override concurrent job limit 2018-05-30 18:59:24 +08:00
Yuxiang Zhang
6cbe91b4f1 new command: jobForceStart 2018-05-30 16:07:07 +08:00
Yuxiang Zhang
89a792986d increase test coverage rate of job & provider 2018-05-30 14:00:10 +08:00
Yuxiang Zhang
0fdb07d061 bug fix: log over-written in twoStageRsyncProvider
solve more DATA RACE problem
2018-05-30 12:28:09 +08:00
Yuxiang Zhang
c5bb172f99 increase test coverage rate of job.go 2018-05-30 11:45:05 +08:00
Yuxiang Zhang
79e6167028 fix race condition on logFile of baseProvider 2018-05-30 01:46:16 +08:00
Miao Wang
285ffb2f2f Merge pull request #80 from tuna/dev
Fix the "list" command of tunasynctl
2018-05-29 21:42:57 +08:00
Yuxiang Zhang
95bb4bbd5e report the last ended time (updated whether successful or not) of jobs 2018-05-29 21:21:03 +08:00
Yuxiang Zhang
6bca9d2cd5 fix TestHTTPServer in manager package 2018-05-29 19:07:01 +08:00
Yuxiang Zhang
4fe7d03e54 Move the WebMirrorStatus to internal package. Fix the list command of tunasynctl 2018-05-29 18:48:33 +08:00
Baron Hou
1fe9499728 Update README.md 2017-09-29 18:14:11 +08:00
bigeagle
a475b044c6 feat(worker): add 'use_ipv4' option for rsync provider 2017-09-08 00:15:48 +08:00
bigeagle
a50a360a91 Revert "feat(worker): add '-4' option to rsync when 'use_ipv6' is false"
This reverts commit d536aca2ac.
2017-09-08 00:12:40 +08:00
bigeagle
d536aca2ac feat(worker): add '-4' option to rsync when 'use_ipv6' is false 2017-09-06 23:22:55 +08:00
bigeagle
28545d61e7 Merge pull request #68 from l2dy/master
Update README.md
2017-05-29 11:03:27 -05:00
Zero King
a87fb0f8b4 Update README.md 2017-05-29 15:42:10 +00:00
Jason Lau
095e7c6320 Merge pull request #65 from felixonmars/patch-1
Fix a typo: Fisrt -> First
2017-03-30 15:31:46 +08:00
Felix Yan
7b441312f4 Fix a typo: Fisrt -> First 2017-03-30 13:27:40 +08:00
bigeagle
93194cde2e Merge pull request #60 from tuna/dev
Dev
2016-12-19 01:10:38 +08:00
bigeagle
aa4c31a32b feat(tunasynctl): implemented 'set-size' command to update a mirror size 2016-12-18 23:30:41 +08:00
bigeagle
4c6a407c17 feat(manager): implemented restful API for updating mirror size 2016-12-18 23:06:08 +08:00
bigeagle
939abaef9b feat(worker): TUNASYNC_LOG_DIR environment variable 2016-12-18 20:41:26 +08:00
bigeagle
d5a438462f feat(worker): map current uid and gid to docker 2016-12-18 14:28:48 +08:00
bigeagle
d4e07a7b29 fix(worker): keep the same working dir inside and outside of docker 2016-12-18 14:28:32 +08:00
bigeagle
9ac3193d50 Merge pull request #58 from tuna/dev
Dev
2016-12-12 23:46:22 +08:00
bigeagle
9ffb101cc7 chore(tunasync): bump version to 0.2-dev 2016-12-12 23:23:06 +08:00
bigeagle
fd277388d5 fix(worker): fixed multi-manager configuration
the worker must be registerred on the manager

`extra_status_manager` option is replaced by `api_base_list`, which overrides the `api_base` option
2016-12-12 23:17:50 +08:00
bigeagle
c5cba66786 Merge pull request #57 from tuna/dev
fix(cmd): make tunasynctl work with both HTTP and HTTPS
2016-12-11 02:45:19 +08:00
bigeagle
97e9725774 fix(cmd): make tunasynctl work with both HTTP and HTTPS 2016-12-11 02:13:19 +08:00
bigeagle
54740388b3 Merge pull request #56 from tuna/dev
Dev
2016-12-10 04:18:46 +08:00
bigeagle
7601e5793f fix(worker): improved cgroup creation 2016-12-10 04:14:39 +08:00
bigeagle
9645fd44ec ci(travis): Enabled docker on travis 2016-12-10 03:48:03 +08:00
bigeagle
ebd462be36 feat(worker): Implemented docker executor, close #55
if docker is enabled in configure file and `docker_image` is set on mirror config, the command would

be executed via `docker run ...`
2016-12-10 02:44:45 +08:00
bigeagle
21c832c8fb fix(worker): disabled memory limit
rsync memory is nolonger limited
2016-12-09 23:07:05 +08:00
共有 30 个文件被更改,包括 1197 次插入207 次删除

查看文件

@@ -1,6 +1,8 @@
sudo: required
language: go language: go
go: go:
- 1.6 - 1.8
before_install: before_install:
- sudo apt-get install cgroup-bin - sudo apt-get install cgroup-bin
@@ -11,8 +13,14 @@ before_install:
os: os:
- linux - linux
services:
- docker
before_script: before_script:
- sudo cgcreate -t travis -a travis -g memory:tunasync - lssubsys -am
- sudo cgcreate -a $USER -t $USER -g cpu:tunasync
- sudo cgcreate -a $USER -t $USER -g memory:tunasync
- docker pull alpine
script: script:
- ./.testandcover.bash - ./.testandcover.bash
@@ -27,9 +35,10 @@ deploy:
file: file:
- "build/tunasync-linux-bin.tar.gz" - "build/tunasync-linux-bin.tar.gz"
api_key: api_key:
secure: "F9kaVaR1mxEh2+EL9Nm8GZmbVY98pXCJA0LGDNrq1C2vU61AUNOeX6yI1mMklHNZPLBqoFDvGN1M5HnJ+xWCFH+KnJgLD2GVIAcAxFNpcNWQe8XKE5heklNsIQNQfuh/rJKM6YzeDB9G5RN4Y76iL4WIAXhNnMm48W6jLnWhf70=" secure: ZOYL/CALrVJsZzbZqUMSI89Gw4zsBJH1StD/2yTyG45GfKgvtK4hG0S5cQM/L0wcikjEkgxSMsmr4ycq+OwbN++gc0umfoAQ/VSjzetiobAlT1E854aRKRjT82WxYdnPW2fsFjuEJTcyZmcbgJGTMi86MDt7w8tEjLomhd1+rUo=
skip_cleanup: true skip_cleanup: true
overwrite: true overwrite: true
on: on:
tags: true tags: true
all_branches: true all_branches: true
repo: tuna/tunasync

查看文件

@@ -19,7 +19,7 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
``` ```
# Architecture # Architecture
- Manager: Centural instance on status and job management - Manager: Central instance for status and job management
- Worker: Runs mirror jobs - Worker: Runs mirror jobs
+------------+ +---+ +---+ +------------+ +---+ +---+
@@ -47,12 +47,12 @@ PreSyncing Syncing Success
| | | |
| +-----------------+ | Failed | +-----------------+ | Failed
+------+ post-fail |<---------+ +------+ post-fail |<---------+
+-----------------+ +-----------------+
``` ```
## Generate Self-Signed Certificate ## Generate Self-Signed Certificate
Fisrt, create root CA First, create root CA
``` ```
openssl genrsa -out rootCA.key 2048 openssl genrsa -out rootCA.key 2048

查看文件

@@ -134,7 +134,7 @@ func main() {
app.Name = "tunasync" app.Name = "tunasync"
app.Usage = "tunasync mirror job management tool" app.Usage = "tunasync mirror job management tool"
app.EnableBashCompletion = true app.EnableBashCompletion = true
app.Version = "0.1" app.Version = tunasync.Version
app.Commands = []cli.Command{ app.Commands = []cli.Command{
{ {
Name: "manager", Name: "manager",

查看文件

@@ -99,8 +99,11 @@ func initialize(c *cli.Context) error {
} }
// parse base url of the manager server // parse base url of the manager server
baseURL = fmt.Sprintf("https://%s:%d", if cfg.CACert != "" {
cfg.ManagerAddr, cfg.ManagerPort) baseURL = fmt.Sprintf("https://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
} else {
baseURL = fmt.Sprintf("http://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
}
logger.Infof("Use manager address: %s", baseURL) logger.Infof("Use manager address: %s", baseURL)
@@ -137,9 +140,9 @@ func listWorkers(c *cli.Context) error {
} }
func listJobs(c *cli.Context) error { func listJobs(c *cli.Context) error {
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl var genericJobs interface{}
var jobs []tunasync.MirrorStatus
if c.Bool("all") { if c.Bool("all") {
var jobs []tunasync.WebMirrorStatus
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client) _, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
if err != nil { if err != nil {
return cli.NewExitError( return cli.NewExitError(
@@ -147,8 +150,10 @@ func listJobs(c *cli.Context) error {
"of all jobs from manager server: %s", err.Error()), "of all jobs from manager server: %s", err.Error()),
1) 1)
} }
genericJobs = jobs
} else { } else {
var jobs []tunasync.MirrorStatus
args := c.Args() args := c.Args()
if len(args) == 0 { if len(args) == 0 {
return cli.NewExitError( return cli.NewExitError(
@@ -171,9 +176,10 @@ func listJobs(c *cli.Context) error {
for range args { for range args {
jobs = append(jobs, <-ans...) jobs = append(jobs, <-ans...)
} }
genericJobs = jobs
} }
b, err := json.MarshalIndent(jobs, "", " ") b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil { if err != nil {
return cli.NewExitError( return cli.NewExitError(
fmt.Sprintf("Error printing out informations: %s", err.Error()), fmt.Sprintf("Error printing out informations: %s", err.Error()),
@@ -183,6 +189,103 @@ func listJobs(c *cli.Context) error {
return nil return nil
} }
func updateMirrorSize(c *cli.Context) error {
args := c.Args()
if len(args) != 2 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
}
workerID := c.String("worker")
mirrorID := args.Get(0)
mirrorSize := args.Get(1)
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{
Name: mirrorID,
Size: mirrorSize,
}
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
)
resp, err := tunasync.PostJSON(url, msg, client)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s",
err.Error()),
1)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return cli.NewExitError(
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
)
}
var status tunasync.MirrorStatus
json.Unmarshal(body, &status)
if status.Size != mirrorSize {
return cli.NewExitError(
fmt.Sprintf(
"Mirror size error, expecting %s, manager returned %s",
mirrorSize, status.Size,
), 1,
)
}
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
return nil
}
func removeWorker(c *cli.Context) error {
args := c.Args()
if len(args) != 0 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
}
workerID := c.String("worker")
if len(workerID) == 0 {
return cli.NewExitError("Please specify the <worker-id>", 1)
}
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
logger.Panicf("Invalid HTTP Request: %s", err.Error())
}
resp, err := client.Do(req)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to parse response: %s", err.Error()),
1)
}
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
" command: HTTP status code is not 200: %s", body),
1)
}
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
if res["message"] == "deleted" {
logger.Info("Successfully removed the worker")
} else {
logger.Info("Failed to remove the worker")
}
return nil
}
func flushDisabledJobs(c *cli.Context) error { func flushDisabledJobs(c *cli.Context) error {
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil) req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
if err != nil { if err != nil {
@@ -232,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
"argument WORKER", 1) "argument WORKER", 1)
} }
options := map[string]bool{}
if c.Bool("force") {
options["force"] = true
}
cmd := tunasync.ClientCmd{ cmd := tunasync.ClientCmd{
Cmd: cmd, Cmd: cmd,
MirrorID: mirrorID, MirrorID: mirrorID,
WorkerID: c.String("worker"), WorkerID: c.String("worker"),
Args: argsList, Args: argsList,
Options: options,
} }
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
if err != nil { if err != nil {
@@ -322,7 +430,7 @@ func main() {
app := cli.NewApp() app := cli.NewApp()
app.EnableBashCompletion = true app.EnableBashCompletion = true
app.Version = "0.1" app.Version = tunasync.Version
app.Name = "tunasynctl" app.Name = "tunasynctl"
app.Usage = "control client for tunasync manager" app.Usage = "control client for tunasync manager"
@@ -357,6 +465,11 @@ func main() {
}, },
} }
forceStartFlag := cli.BoolFlag{
Name: "force, f",
Usage: "Override the concurrent limit",
}
app.Commands = []cli.Command{ app.Commands = []cli.Command{
{ {
Name: "list", Name: "list",
@@ -382,10 +495,34 @@ func main() {
Flags: commonFlags, Flags: commonFlags,
Action: initializeWrapper(listWorkers), Action: initializeWrapper(listWorkers),
}, },
{
Name: "rm-worker",
Usage: "Remove a worker",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "worker-id of the worker to be removed",
},
),
Action: initializeWrapper(removeWorker),
},
{
Name: "set-size",
Usage: "Set mirror size",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "specify worker-id of the mirror job",
},
),
Action: initializeWrapper(updateMirrorSize),
},
{ {
Name: "start", Name: "start",
Usage: "Start a job", Usage: "Start a job",
Flags: append(commonFlags, cmdFlags...), Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
Action: initializeWrapper(cmdJob(tunasync.CmdStart)), Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
}, },
{ {

查看文件

@@ -90,6 +90,14 @@ $ tunasync worker --config ~/tunasync_demo/worker.conf
本例中,镜像的数据在`/tmp/tunasync/` 本例中,镜像的数据在`/tmp/tunasync/`
### 控制
查看同步状态
```
$ tunasynctl list -p 12345 --all
```
## 更进一步 ## 更进一步
可以参看 可以参看

查看文件

@@ -5,7 +5,7 @@ import (
"time" "time"
) )
// A StatusUpdateMsg represents a msg when // A MirrorStatus represents a msg when
// a worker has done syncing // a worker has done syncing
type MirrorStatus struct { type MirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
@@ -13,6 +13,7 @@ type MirrorStatus struct {
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"` LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"`
Upstream string `json:"upstream"` Upstream string `json:"upstream"`
Size string `json:"size"` Size string `json:"size"`
ErrorMsg string `json:"error_msg"` ErrorMsg string `json:"error_msg"`
@@ -67,9 +68,10 @@ func (c CmdVerb) String() string {
// A WorkerCmd is the command message send from the // A WorkerCmd is the command message send from the
// manager to a worker // manager to a worker
type WorkerCmd struct { type WorkerCmd struct {
Cmd CmdVerb `json:"cmd"` Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"` MirrorID string `json:"mirror_id"`
Args []string `json:"args"` Args []string `json:"args"`
Options map[string]bool `json:"options"`
} }
func (c WorkerCmd) String() string { func (c WorkerCmd) String() string {
@@ -82,8 +84,9 @@ func (c WorkerCmd) String() string {
// A ClientCmd is the command message send from client // A ClientCmd is the command message send from client
// to the manager // to the manager
type ClientCmd struct { type ClientCmd struct {
Cmd CmdVerb `json:"cmd"` Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"` MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"` WorkerID string `json:"worker_id"`
Args []string `json:"args"` Args []string `json:"args"`
Options map[string]bool `json:"options"`
} }

查看文件

@@ -1,11 +1,9 @@
package manager package internal
import ( import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"time" "time"
. "github.com/tuna/tunasync/internal"
) )
type textTime struct { type textTime struct {
@@ -38,24 +36,28 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
return err return err
} }
// webMirrorStatus is the mirror status to be shown in the web page // WebMirrorStatus is the mirror status to be shown in the web page
type webMirrorStatus struct { type WebMirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"` LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"` LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"`
LastEndedTs stampTime `json:"last_ended_ts"`
Upstream string `json:"upstream"` Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size Size string `json:"size"` // approximate size
} }
func convertMirrorStatus(m MirrorStatus) webMirrorStatus { func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
return webMirrorStatus{ return WebMirrorStatus{
Name: m.Name, Name: m.Name,
IsMaster: m.IsMaster, IsMaster: m.IsMaster,
Status: m.Status, Status: m.Status,
LastUpdate: textTime{m.LastUpdate}, LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate}, LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded},
LastEndedTs: stampTime{m.LastEnded},
Upstream: m.Upstream, Upstream: m.Upstream,
Size: m.Size, Size: m.Size,
} }

76
internal/status_web_test.go 普通文件
查看文件

@@ -0,0 +1,76 @@
package internal
import (
"encoding/json"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := WebMirrorStatus{
Name: "tunalinux",
Status: Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
LastEnded: textTime{t},
LastEndedTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 WebMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
Convey("BuildWebMirrorStatus should work", t, func() {
m := MirrorStatus{
Name: "arch-sync3",
Worker: "testWorker",
IsMaster: true,
Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
var m2 WebMirrorStatus
m2 = BuildWebMirrorStatus(m)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

3
internal/version.go 普通文件
查看文件

@@ -0,0 +1,3 @@
package internal
const Version string = "0.3.3"

查看文件

@@ -14,6 +14,7 @@ type dbAdapter interface {
Init() error Init() error
ListWorkers() ([]WorkerStatus, error) ListWorkers() ([]WorkerStatus, error)
GetWorker(workerID string) (WorkerStatus, error) GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error) CreateWorker(w WorkerStatus) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
return return
} }
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := bucket.Delete([]byte(workerID))
return err
})
return
}
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey)) bucket := tx.Bucket([]byte(_workerBucketKey))
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
bucket := tx.Bucket([]byte(_statusBucketKey)) bucket := tx.Bucket([]byte(_statusBucketKey))
v := bucket.Get([]byte(id)) v := bucket.Get([]byte(id))
if v == nil { if v == nil {
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
} }
err := json.Unmarshal(v, &m) err := json.Unmarshal(v, &m)
return err return err

查看文件

@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
} }
Convey("get exists worker", func() { Convey("get existent worker", func() {
_, err := boltDB.GetWorker(testWorkerIDs[0]) _, err := boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
Convey("list exist worker", func() { Convey("list existent workers", func() {
ws, err := boltDB.ListWorkers() ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2) So(len(ws), ShouldEqual, 2)
}) })
Convey("get inexist worker", func() { Convey("get non-existent worker", func() {
_, err := boltDB.GetWorker("invalid workerID") _, err := boltDB.GetWorker("invalid workerID")
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("delete existent worker", func() {
err := boltDB.DeleteWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
_, err = boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 1)
})
Convey("delete non-existent worker", func() {
err := boltDB.DeleteWorker("invalid workerID")
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})
}) })
Convey("update mirror status", func() { Convey("update mirror status", func() {
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB", Size: "3GB",
}, },
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Disabled, Status: Disabled,
LastUpdate: time.Now(), LastUpdate: time.Now().Add(-time.Hour),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
}, },
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now().Add(-time.Second),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
}, },

查看文件

@@ -1,6 +1,7 @@
package manager package manager
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
@@ -83,10 +84,13 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// workerID should be valid in this route group // workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator) workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
{ {
// delete specified worker
workerValidateGroup.DELETE(":id", s.deleteWorker)
// get job list // get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker) workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status // post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
} }
// for tunasynctl to post commands // for tunasynctl to post commands
@@ -133,11 +137,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
s.returnErrJSON(c, http.StatusInternalServerError, err) s.returnErrJSON(c, http.StatusInternalServerError, err)
return return
} }
webMirStatusList := []webMirrorStatus{} webMirStatusList := []WebMirrorStatus{}
for _, m := range mirrorStatusList { for _, m := range mirrorStatusList {
webMirStatusList = append( webMirStatusList = append(
webMirStatusList, webMirStatusList,
convertMirrorStatus(m), BuildWebMirrorStatus(m),
) )
} }
c.JSON(http.StatusOK, webMirStatusList) c.JSON(http.StatusOK, webMirStatusList)
@@ -157,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"}) c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
} }
// deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id")
err := s.adapter.DeleteWorker(workerID)
if err != nil {
err := fmt.Errorf("failed to delete worker: %s",
err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
logger.Noticef("Worker <%s> deleted", workerID)
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
}
// listWrokers respond with informations of all the workers // listWrokers respond with informations of all the workers
func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus var workerInfos []WorkerStatus
@@ -225,6 +245,12 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
var status MirrorStatus var status MirrorStatus
c.BindJSON(&status) c.BindJSON(&status)
mirrorName := status.Name mirrorName := status.Name
if len(mirrorName) == 0 {
s.returnErrJSON(
c, http.StatusBadRequest,
errors.New("Mirror Name should not be empty"),
)
}
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
@@ -234,21 +260,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
} else { } else {
status.LastUpdate = curStatus.LastUpdate status.LastUpdate = curStatus.LastUpdate
} }
if status.Status == Success || status.Status == Failed {
status.LastEnded = time.Now()
} else {
status.LastEnded = curStatus.LastEnded
}
// Only message with meaningful size updates the mirror size
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
if len(status.Size) == 0 || status.Size == "unknown" {
status.Size = curStatus.Size
}
}
// for logging // for logging
switch status.Status { switch status.Status {
case Success:
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
case Failed:
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
case Syncing: case Syncing:
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker) logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
case Disabled:
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
case Paused:
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
default: default:
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status) logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
} }
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
@@ -263,6 +293,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
c.JSON(http.StatusOK, newStatus) c.JSON(http.StatusOK, newStatus)
} }
func (s *Manager) updateMirrorSize(c *gin.Context) {
workerID := c.Param("id")
type SizeMsg struct {
Name string `json:"name"`
Size string `json:"size"`
}
var msg SizeMsg
c.BindJSON(&msg)
mirrorName := msg.Name
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
if err != nil {
logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s",
mirrorName, workerID, err.Error(),
)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
// Only message with meaningful size updates the mirror size
if len(msg.Size) > 0 || msg.Size != "unknown" {
status.Size = msg.Size
}
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, newStatus)
}
func (s *Manager) handleClientCmd(c *gin.Context) { func (s *Manager) handleClientCmd(c *gin.Context) {
var clientCmd ClientCmd var clientCmd ClientCmd
c.BindJSON(&clientCmd) c.BindJSON(&clientCmd)
@@ -286,6 +355,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
Cmd: clientCmd.Cmd, Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID, MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args, Args: clientCmd.Args,
Options: clientCmd.Options,
} }
// update job status, even if the job did not disable successfully, // update job status, even if the job did not disable successfully,

查看文件

@@ -21,9 +21,16 @@ const (
) )
func TestHTTPServer(t *testing.T) { func TestHTTPServer(t *testing.T) {
var listenPort = 5000
Convey("HTTP server should work", t, func(ctx C) { Convey("HTTP server should work", t, func(ctx C) {
listenPort++
port := listenPort
addr := "127.0.0.1"
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
InitLogger(true, true, false) InitLogger(true, true, false)
s := GetTUNASyncManager(&Config{Debug: false}) s := GetTUNASyncManager(&Config{Debug: true})
s.cfg.Server.Addr = addr
s.cfg.Server.Port = port
So(s, ShouldNotBeNil) So(s, ShouldNotBeNil)
s.setDBAdapter(&mockDBAdapter{ s.setDBAdapter(&mockDBAdapter{
workerStore: map[string]WorkerStatus{ workerStore: map[string]WorkerStatus{
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
}}, }},
statusStore: make(map[string]MirrorStatus), statusStore: make(map[string]MirrorStatus),
}) })
port := rand.Intn(10000) + 20000 go s.Run()
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port) time.Sleep(50 * time.Millisecond)
go func() {
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
}()
time.Sleep(50 * time.Microsecond)
resp, err := http.Get(baseURL + "/ping") resp, err := http.Get(baseURL + "/ping")
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -79,6 +82,33 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2) So(len(actualResponseObj), ShouldEqual, 2)
}) })
Convey("delete an existent worker", func(ctx C) {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_infoKey], ShouldEqual, "deleted")
})
Convey("delete non-existent worker", func(ctx C) {
invalidWorker := "test_worker233"
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("flush disabled jobs", func(ctx C) { Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil) req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -99,7 +129,7 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB", Size: "unknown",
} }
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil) resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close() defer resp.Body.Close()
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
Convey("list all job status of all workers", func(ctx C) { Convey("list all job status of all workers", func(ctx C) {
var ms []webMirrorStatus var ms []WebMirrorStatus
resp, err := GetJSON(baseURL+"/jobs", &ms, nil) resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -137,8 +168,77 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
}) })
Convey("Update size of a valid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{status.Name, "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("Get new size of a mirror", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, "5GB")
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
})
Convey("Update size of an invalid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{"Invalid mirror", "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
})
// what if status changed to failed
status.Status = Failed
time.Sleep(3 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("What if syncing job failed", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
}) })
Convey("update mirror status of an inexisted worker", func(ctx C) { Convey("update mirror status of an inexisted worker", func(ctx C) {
@@ -149,6 +249,7 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
} }
@@ -252,6 +353,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
return w, nil return w, nil
} }
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
delete(b.workerStore, workerID)
return nil
}
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
// _, ok := b.workerStore[w.ID] // _, ok := b.workerStore[w.ID]
// if ok { // if ok {

查看文件

@@ -1,44 +0,0 @@
package manager
import (
"encoding/json"
"testing"
"time"
tunasync "github.com/tuna/tunasync/internal"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := webMirrorStatus{
Name: "tunalinux",
Status: tunasync.Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 webMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

查看文件

@@ -20,11 +20,11 @@ type baseProvider struct {
cmd *cmdJob cmd *cmdJob
isRunning atomic.Value isRunning atomic.Value
logFile *os.File
cgroup *cgroupHook cgroup *cgroupHook
zfs *zfsHook zfs *zfsHook
hooks []jobHook docker *dockerHook
hooks []jobHook
} }
func (p *baseProvider) Name() string { func (p *baseProvider) Name() string {
@@ -87,6 +87,8 @@ func (p *baseProvider) AddHook(hook jobHook) {
p.cgroup = v p.cgroup = v
case *zfsHook: case *zfsHook:
p.zfs = v p.zfs = v
case *dockerHook:
p.docker = v
} }
p.hooks = append(p.hooks, hook) p.hooks = append(p.hooks, hook)
} }
@@ -103,20 +105,25 @@ func (p *baseProvider) ZFS() *zfsHook {
return p.zfs return p.zfs
} }
func (p *baseProvider) prepareLogFile() error { func (p *baseProvider) Docker() *dockerHook {
return p.docker
}
func (p *baseProvider) prepareLogFile(append bool) error {
if p.LogFile() == "/dev/null" { if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil) p.cmd.SetLogFile(nil)
return nil return nil
} }
if p.logFile == nil { appendMode := 0
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) if append {
if err != nil { appendMode = os.O_APPEND
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.logFile = logFile
} }
p.cmd.SetLogFile(p.logFile) logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.cmd.SetLogFile(logFile)
return nil return nil
} }
@@ -135,32 +142,22 @@ func (p *baseProvider) IsRunning() bool {
func (p *baseProvider) Wait() error { func (p *baseProvider) Wait() error {
defer func() { defer func() {
p.Lock() logger.Debugf("set isRunning to false: %s", p.Name())
p.isRunning.Store(false) p.isRunning.Store(false)
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
}() }()
logger.Debugf("calling Wait: %s", p.Name())
return p.cmd.Wait() return p.cmd.Wait()
} }
func (p *baseProvider) Terminate() error { func (p *baseProvider) Terminate() error {
p.Lock()
defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name()) logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() { if !p.IsRunning() {
return nil return nil
} }
p.Lock()
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
err := p.cmd.Terminate() err := p.cmd.Terminate()
p.isRunning.Store(false)
return err return err
} }

查看文件

@@ -15,35 +15,31 @@ import (
"github.com/codeskyblue/go-sh" "github.com/codeskyblue/go-sh"
) )
var cgSubsystem = "cpu"
type cgroupHook struct { type cgroupHook struct {
emptyHook emptyHook
provider mirrorProvider provider mirrorProvider
basePath string basePath string
baseGroup string baseGroup string
created bool created bool
subsystem string
memLimit string
} }
func initCgroup(basePath string) { func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
cgSubsystem = "memory"
return
}
logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
}
func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
if basePath == "" { if basePath == "" {
basePath = "/sys/fs/cgroup" basePath = "/sys/fs/cgroup"
} }
if baseGroup == "" { if baseGroup == "" {
baseGroup = "tunasync" baseGroup = "tunasync"
} }
if subsystem == "" {
subsystem = "cpu"
}
return &cgroupHook{ return &cgroupHook{
provider: p, provider: p,
basePath: basePath, basePath: basePath,
baseGroup: baseGroup, baseGroup: baseGroup,
subsystem: subsystem,
} }
} }
@@ -52,13 +48,15 @@ func (c *cgroupHook) preExec() error {
if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil { if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
return err return err
} }
if cgSubsystem != "memory" { if c.subsystem != "memory" {
return nil return nil
} }
if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync { if c.memLimit != "" {
gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name()) gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
return sh.Command( return sh.Command(
"cgset", "-r", "memory.limit_in_bytes=512M", gname, "cgset", "-r",
fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
gname,
).Run() ).Run()
} }
return nil return nil
@@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error {
func (c *cgroupHook) Cgroup() string { func (c *cgroupHook) Cgroup() string {
name := c.provider.Name() name := c.provider.Name()
return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name) return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
} }
func (c *cgroupHook) killAll() error { func (c *cgroupHook) killAll() error {
@@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error {
readTaskList := func() ([]int, error) { readTaskList := func() ([]int, error) {
taskList := []int{} taskList := []int{}
taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
if err != nil { if err != nil {
return taskList, err return taskList, err
} }

查看文件

@@ -72,11 +72,14 @@ sleep 30
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
initCgroup("/sys/fs/cgroup") cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
provider.AddHook(cg) provider.AddHook(cg)
err = cg.preExec() err = cg.preExec()
if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func() { go func() {
@@ -129,12 +132,15 @@ sleep 30
provider, err := newRsyncProvider(c) provider, err := newRsyncProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
initCgroup("/sys/fs/cgroup") cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
provider.AddHook(cg) provider.AddHook(cg)
cg.preExec() err = cg.preExec()
if cgSubsystem == "memory" { if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
if cg.subsystem == "memory" {
memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes")) memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))

查看文件

@@ -1,6 +1,7 @@
package worker package worker
import ( import (
"errors"
"time" "time"
"github.com/anmitsu/go-shlex" "github.com/anmitsu/go-shlex"
@@ -60,17 +61,25 @@ func (p *cmdProvider) Run() error {
} }
func (p *cmdProvider) Start() error { func (p *cmdProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{ env := map[string]string{
"TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(), "TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.upstreamURL, "TUNASYNC_UPSTREAM_URL": p.upstreamURL,
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(), "TUNASYNC_LOG_FILE": p.LogFile(),
} }
for k, v := range p.env { for k, v := range p.env {
env[k] = v env[k] = v
} }
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

查看文件

@@ -38,6 +38,7 @@ type Config struct {
Server serverConfig `toml:"server"` Server serverConfig `toml:"server"`
Cgroup cgroupConfig `toml:"cgroup"` Cgroup cgroupConfig `toml:"cgroup"`
ZFS zfsConfig `toml:"zfs"` ZFS zfsConfig `toml:"zfs"`
Docker dockerConfig `toml:"docker"`
Include includeConfig `toml:"include"` Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"` Mirrors []mirrorConfig `toml:"mirrors"`
} }
@@ -54,12 +55,20 @@ type globalConfig struct {
} }
type managerConfig struct { type managerConfig struct {
APIBase string `toml:"api_base"` APIBase string `toml:"api_base"`
CACert string `toml:"ca_cert"` // this option overrides the APIBase
ExtraStatusAPIs []string `toml:"extra_status_managers"` APIList []string `toml:"api_base_list"`
CACert string `toml:"ca_cert"`
// Token string `toml:"token"` // Token string `toml:"token"`
} }
func (mc managerConfig) APIBaseList() []string {
if len(mc.APIList) > 0 {
return mc.APIList
}
return []string{mc.APIBase}
}
type serverConfig struct { type serverConfig struct {
Hostname string `toml:"hostname"` Hostname string `toml:"hostname"`
Addr string `toml:"listen_addr"` Addr string `toml:"listen_addr"`
@@ -69,9 +78,16 @@ type serverConfig struct {
} }
type cgroupConfig struct { type cgroupConfig struct {
Enable bool `toml:"enable"` Enable bool `toml:"enable"`
BasePath string `toml:"base_path"` BasePath string `toml:"base_path"`
Group string `toml:"group"` Group string `toml:"group"`
Subsystem string `toml:"subsystem"`
}
type dockerConfig struct {
Enable bool `toml:"enable"`
Volumes []string `toml:"volumes"`
Options []string `toml:"options"`
} }
type zfsConfig struct { type zfsConfig struct {
@@ -107,10 +123,17 @@ type mirrorConfig struct {
Command string `toml:"command"` Command string `toml:"command"`
UseIPv6 bool `toml:"use_ipv6"` UseIPv6 bool `toml:"use_ipv6"`
UseIPv4 bool `toml:"use_ipv4"`
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Stage1Profile string `toml:"stage1_profile"` Stage1Profile string `toml:"stage1_profile"`
MemoryLimit string `toml:"memory_limit"`
DockerImage string `toml:"docker_image"`
DockerVolumes []string `toml:"docker_volumes"`
DockerOptions []string `toml:"docker_options"`
} }
// LoadConfig loads configuration // LoadConfig loads configuration

95
worker/docker.go 普通文件
查看文件

@@ -0,0 +1,95 @@
package worker
import (
"fmt"
"os"
)
type dockerHook struct {
emptyHook
provider mirrorProvider
image string
volumes []string
options []string
}
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
volumes := []string{}
volumes = append(volumes, gCfg.Volumes...)
volumes = append(volumes, mCfg.DockerVolumes...)
options := []string{}
options = append(options, gCfg.Options...)
options = append(options, mCfg.DockerOptions...)
return &dockerHook{
provider: p,
image: mCfg.DockerImage,
volumes: volumes,
options: options,
}
}
func (d *dockerHook) preExec() error {
p := d.provider
logDir := p.LogDir()
logFile := p.LogFile()
workingDir := p.WorkingDir()
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debugf("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error())
}
}
// Override workingDir
ctx := p.EnterContext()
ctx.Set(
"volumes", []string{
fmt.Sprintf("%s:%s", logDir, logDir),
fmt.Sprintf("%s:%s", logFile, logFile),
fmt.Sprintf("%s:%s", workingDir, workingDir),
},
)
return nil
}
func (d *dockerHook) postExec() error {
// sh.Command(
// "docker", "rm", "-f", d.Name(),
// ).Run()
d.provider.ExitContext()
return nil
}
// Volumes returns the configured volumes and
// runtime-needed volumes, including mirror dirs
// and log files
func (d *dockerHook) Volumes() []string {
vols := make([]string, len(d.volumes))
copy(vols, d.volumes)
p := d.provider
ctx := p.Context()
if ivs, ok := ctx.Get("volumes"); ok {
vs := ivs.([]string)
vols = append(vols, vs...)
}
return vols
}
func (d *dockerHook) LogFile() string {
p := d.provider
ctx := p.Context()
if iv, ok := ctx.Get(_LogFileKey + ":docker"); ok {
v := iv.(string)
return v
}
return p.LogFile()
}
func (d *dockerHook) Name() string {
p := d.provider
return "tunasync-job-" + p.Name()
}

97
worker/docker_test.go 普通文件
查看文件

@@ -0,0 +1,97 @@
package worker
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/codeskyblue/go-sh"
. "github.com/smartystreets/goconvey/convey"
)
func getDockerByName(name string) (string, error) {
// docker ps -f 'name=$name' --format '{{.Names}}'
out, err := sh.Command(
"docker", "ps",
"--filter", "name="+name,
"--format", "{{.Names}}",
).Output()
return string(out), err
}
func TestDocker(t *testing.T) {
Convey("Docker Should Work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
cmdScript := filepath.Join(tmpDir, "cmd.sh")
tmpFile := filepath.Join(tmpDir, "log_file")
expectedOutput := "HELLO_WORLD"
c := cmdConfig{
name: "tuna-docker",
upstreamURL: "http://mirrors.tuna.moe/",
command: "/bin/cmd.sh",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 600 * time.Second,
env: map[string]string{
"TEST_CONTENT": expectedOutput,
},
}
cmdScriptContent := `#!/bin/sh
echo ${TEST_CONTENT}
sleep 10
`
err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
d := &dockerHook{
provider: provider,
image: "alpine",
volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
},
}
provider.AddHook(d)
So(provider.Docker(), ShouldNotBeNil)
err = d.preExec()
So(err, ShouldBeNil)
go func() {
err = provider.Run()
ctx.So(err, ShouldNotBeNil)
}()
time.Sleep(1 * time.Second)
// assert container running
names, err := getDockerByName(d.Name())
So(err, ShouldBeNil)
So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate()
So(err, ShouldBeNil)
// container should be terminated and removed
names, err = getDockerByName(d.Name())
So(err, ShouldBeNil)
So(names, ShouldEqual, "")
// check log content
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput+"\n")
d.postExec()
})
}

查看文件

@@ -71,6 +71,7 @@ func (h *execPostHook) Do() error {
"TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(), "TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.Upstream(), "TUNASYNC_UPSTREAM_URL": p.Upstream(),
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(), "TUNASYNC_LOG_FILE": p.LogFile(),
"TUNASYNC_JOB_EXIT_STATUS": exitStatus, "TUNASYNC_JOB_EXIT_STATUS": exitStatus,
} }

查看文件

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
tunasync "github.com/tuna/tunasync/internal" tunasync "github.com/tuna/tunasync/internal"
) )
@@ -14,12 +15,13 @@ import (
type ctrlAction uint8 type ctrlAction uint8
const ( const (
jobStart ctrlAction = iota jobStart ctrlAction = iota
jobStop // stop syncing keep the job jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine) jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing jobRestart // restart syncing
jobPing // ensure the goroutine is alive jobPing // ensure the goroutine is alive
jobHalt // worker halts jobHalt // worker halts
jobForceStart // ignore concurrent limit
) )
type jobMessage struct { type jobMessage struct {
@@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncDone := make(chan error, 1) syncDone := make(chan error, 1)
go func() { go func() {
err := provider.Run() err := provider.Run()
if !stopASAP { syncDone <- err
syncDone <- err
}
}() }()
select { select {
@@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return nil return nil
} }
runJob := func(kill <-chan empty, jobDone chan<- empty) { runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
select { select {
case semaphore <- empty{}: case semaphore <- empty{}:
defer func() { <-semaphore }() defer func() { <-semaphore }()
runJobWrapper(kill, jobDone) runJobWrapper(kill, jobDone)
case <-bypassSemaphore:
logger.Noticef("Concurrent limit ignored by %s", m.Name())
runJobWrapper(kill, jobDone)
case <-kill: case <-kill:
jobDone <- empty{} jobDone <- empty{}
return return
} }
} }
bypassSemaphore := make(chan empty, 1)
for { for {
if m.State() == stateReady { if m.State() == stateReady {
kill := make(chan empty) kill := make(chan empty)
jobDone := make(chan empty) jobDone := make(chan empty)
go runJob(kill, jobDone) go runJob(kill, jobDone, bypassSemaphore)
_wait_for_job: _wait_for_job:
select { select {
@@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
m.SetState(stateReady) m.SetState(stateReady)
close(kill) close(kill)
<-jobDone <-jobDone
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
continue continue
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobStart: case jobStart:
m.SetState(stateReady) m.SetState(stateReady)
goto _wait_for_job goto _wait_for_job
@@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
case jobDisable: case jobDisable:
m.SetState(stateDisabled) m.SetState(stateDisabled)
return nil return nil
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobRestart: case jobRestart:
m.SetState(stateReady) fallthrough
case jobStart: case jobStart:
m.SetState(stateReady) m.SetState(stateReady)
default: default:

查看文件

@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
msg = <-managerChan msg = <-managerChan
So(msg.status, ShouldEqual, Syncing) So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
job.ctrlChan <- jobStop job.ctrlChan <- jobStop
msg = <-managerChan msg = <-managerChan
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
job.ctrlChan <- jobDisable job.ctrlChan <- jobDisable
<-job.disabled <-job.disabled
}) })
Convey("If we restart it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobRestart
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("If we disable it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobDisable
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
<-job.disabled
})
Convey("If we stop it twice, than start it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStop
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
job.ctrlChan <- jobStop // should be ignored
job.ctrlChan <- jobStart
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
}) })
}) })
} }
func TestConcurrentMirrorJobs(t *testing.T) {
InitLogger(true, true, false)
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
const CONCURRENT = 5
var providers [CONCURRENT]*cmdProvider
var jobs [CONCURRENT]*mirrorJob
for i := 0; i < CONCURRENT; i++ {
c := cmdConfig{
name: fmt.Sprintf("job-%d", i),
upstreamURL: "http://mirrors.tuna.moe/",
command: "sleep 2",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 10 * time.Second,
}
var err error
providers[i], err = newCmdProvider(c)
So(err, ShouldBeNil)
jobs[i] = newMirrorJob(providers[i])
}
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, CONCURRENT-2)
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
counterEnded := 0
counterRunning := 0
peakConcurrent = 0
counterFailed = 0
for counterEnded < totalJobs {
msg := <-managerChan
switch msg.status {
case PreSyncing:
counterRunning++
case Syncing:
case Failed:
counterFailed++
fallthrough
case Success:
counterEnded++
counterRunning--
default:
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
if counterRunning > peakConcurrent {
peakConcurrent = counterRunning
}
}
// select {
// case msg := <-managerChan:
// logger.Errorf("extra message received: %v", msg)
// So(0, ShouldEqual, 1)
// case <-time.After(2 * time.Second):
// }
return
}
Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we cancel one job", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobRestart
time.Sleep(200 * time.Millisecond)
}
// Cancel the one waiting for semaphore
jobs[len(jobs)-1].ctrlChan <- jobStop
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we override the concurrent limit", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(200 * time.Millisecond)
}
jobs[len(jobs)-1].ctrlChan <- jobForceStart
jobs[len(jobs)-2].ctrlChan <- jobForceStart
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
So(peakConcurrent, ShouldEqual, CONCURRENT)
So(counterFailed, ShouldEqual, 0)
time.Sleep(1 * time.Second)
// fmt.Println("Restart them")
for _, job := range jobs {
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
})
}

查看文件

@@ -38,6 +38,8 @@ type mirrorProvider interface {
Cgroup() *cgroupHook Cgroup() *cgroupHook
// ZFS // ZFS
ZFS() *zfsHook ZFS() *zfsHook
// Docker
Docker() *dockerHook
AddHook(hook jobHook) AddHook(hook jobHook)
Hooks() []jobHook Hooks() []jobHook
@@ -128,6 +130,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
} }
p, err := newRsyncProvider(rc) p, err := newRsyncProvider(rc)
@@ -169,10 +172,17 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool)) provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
} }
// Add Cgroup Hook // Add Docker Hook
if cfg.Cgroup.Enable { if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
} else if cfg.Cgroup.Enable {
// Add Cgroup Hook
provider.AddHook( provider.AddHook(
newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group), newCgroupHook(
provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
cfg.Cgroup.Subsystem, mirror.MemoryLimit,
),
) )
} }

查看文件

@@ -79,11 +79,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@@ -144,11 +145,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@@ -260,6 +262,40 @@ sleep 5
}) })
}) })
Convey("Command Provider without log file should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
c := cmdConfig{
name: "run-ls",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 600 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
So(provider.IsMaster(), ShouldEqual, false)
So(provider.ZFS(), ShouldBeNil)
So(provider.Type(), ShouldEqual, provCommand)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
Convey("Run the command", func() {
err = provider.Run()
So(err, ShouldBeNil)
})
})
} }
func TestTwoStageRsyncProvider(t *testing.T) { func TestTwoStageRsyncProvider(t *testing.T) {
@@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
excludeFile: tmpFile, excludeFile: tmpFile,
username: "hello",
password: "world",
} }
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
@@ -306,6 +344,7 @@ exit 0
err = provider.Run() err = provider.Run()
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
@@ -313,14 +352,14 @@ exit 0
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+

查看文件

@@ -11,7 +11,7 @@ type rsyncConfig struct {
rsyncCmd string rsyncCmd string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
} }
@@ -49,6 +49,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if c.useIPv6 { if c.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} else if c.useIPv4 {
options = append(options, "-4")
} }
if c.excludeFile != "" { if c.excludeFile != "" {
@@ -79,6 +81,12 @@ func (p *rsyncProvider) Run() error {
} }
func (p *rsyncProvider) Start() error { func (p *rsyncProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@@ -92,7 +100,7 @@ func (p *rsyncProvider) Start() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

查看文件

@@ -2,6 +2,7 @@ package worker
import ( import (
"errors" "errors"
"fmt"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
@@ -9,6 +10,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/codeskyblue/go-sh"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@@ -31,11 +33,44 @@ type cmdJob struct {
func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
var cmd *exec.Cmd var cmd *exec.Cmd
if provider.Cgroup() != nil { if d := provider.Docker(); d != nil {
c := "docker"
args := []string{
"run", "--rm",
"-a", "STDOUT", "-a", "STDERR",
"--name", d.Name(),
"-w", workingDir,
}
// specify user
args = append(
args, "-u",
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
)
// add volumes
for _, vol := range d.Volumes() {
logger.Debugf("volume: %s", vol)
args = append(args, "-v", vol)
}
// set env
for k, v := range env {
kv := fmt.Sprintf("%s=%s", k, v)
args = append(args, "-e", kv)
}
// apply options
args = append(args, d.options...)
// apply image and command
args = append(args, d.image)
// apply command
args = append(args, cmdAndArgs...)
cmd = exec.Command(c, args...)
} else if provider.Cgroup() != nil {
c := "cgexec" c := "cgexec"
args := []string{"-g", provider.Cgroup().Cgroup()} args := []string{"-g", provider.Cgroup().Cgroup()}
args = append(args, cmdAndArgs...) args = append(args, cmdAndArgs...)
cmd = exec.Command(c, args...) cmd = exec.Command(c, args...)
} else { } else {
if len(cmdAndArgs) == 1 { if len(cmdAndArgs) == 1 {
cmd = exec.Command(cmdAndArgs[0]) cmd = exec.Command(cmdAndArgs[0])
@@ -48,25 +83,28 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
} }
} }
logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir) if provider.Docker() == nil {
if _, err := os.Stat(workingDir); os.IsNotExist(err) { logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
logger.Debugf("Making dir %s", workingDir) if _, err := os.Stat(workingDir); os.IsNotExist(err) {
if err = os.MkdirAll(workingDir, 0755); err != nil { logger.Debugf("Making dir %s", workingDir)
logger.Errorf("Error making dir %s", workingDir) if err = os.MkdirAll(workingDir, 0755); err != nil {
logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
}
} }
cmd.Dir = workingDir
cmd.Env = newEnviron(env, true)
} }
cmd.Dir = workingDir
cmd.Env = newEnviron(env, true)
return &cmdJob{ return &cmdJob{
cmd: cmd, cmd: cmd,
workingDir: workingDir, workingDir: workingDir,
env: env, env: env,
provider: provider,
} }
} }
func (c *cmdJob) Start() error { func (c *cmdJob) Start() error {
// logger.Debugf("Command start: %v", c.cmd.Args)
c.finished = make(chan empty, 1) c.finished = make(chan empty, 1)
return c.cmd.Start() return c.cmd.Start()
} }
@@ -80,6 +118,9 @@ func (c *cmdJob) Wait() error {
return c.retErr return c.retErr
default: default:
err := c.cmd.Wait() err := c.cmd.Wait()
if c.cmd.Stdout != nil {
c.cmd.Stdout.(*os.File).Close()
}
c.retErr = err c.retErr = err
close(c.finished) close(c.finished)
return err return err
@@ -95,6 +136,14 @@ func (c *cmdJob) Terminate() error {
if c.cmd == nil || c.cmd.Process == nil { if c.cmd == nil || c.cmd.Process == nil {
return errProcessNotStarted return errProcessNotStarted
} }
if d := c.provider.Docker(); d != nil {
sh.Command(
"docker", "stop", "-t", "2", d.Name(),
).Run()
return nil
}
err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM) err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
if err != nil { if err != nil {
return err return err

查看文件

@@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
} }
func (p *twoStageRsyncProvider) Run() error { func (p *twoStageRsyncProvider) Run() error {
defer p.Wait() p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(stage > 1); err != nil {
return err return err
} }
@@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
err = p.cmd.Wait() p.Unlock()
p.isRunning.Store(false) err = p.Wait()
p.Lock()
if err != nil { if err != nil {
return err return err
} }

查看文件

@@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
w.httpClient = httpClient w.httpClient = httpClient
} }
if cfg.Cgroup.Enable {
initCgroup(cfg.Cgroup.BasePath)
}
w.initJobs() w.initJobs()
w.makeHTTPServer() w.makeHTTPServer()
tunasyncWorker = w tunasyncWorker = w
@@ -222,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
} }
switch cmd.Cmd { switch cmd.Cmd {
case CmdStart: case CmdStart:
job.ctrlChan <- jobStart if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart: case CmdRestart:
job.ctrlChan <- jobRestart job.ctrlChan <- jobRestart
case CmdStop: case CmdStop:
@@ -389,18 +390,17 @@ func (w *Worker) URL() string {
} }
func (w *Worker) registorWorker() { func (w *Worker) registorWorker() {
url := fmt.Sprintf(
"%s/workers",
w.cfg.Manager.APIBase,
)
msg := WorkerStatus{ msg := WorkerStatus{
ID: w.Name(), ID: w.Name(),
URL: w.URL(), URL: w.URL(),
} }
if _, err := PostJSON(url, msg, w.httpClient); err != nil { for _, root := range w.cfg.Manager.APIBaseList() {
logger.Errorf("Failed to register worker") url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
}
} }
} }
@@ -416,12 +416,11 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
ErrorMsg: jobMsg.msg, ErrorMsg: jobMsg.msg,
} }
apiBases := []string{w.cfg.Manager.APIBase} for _, root := range w.cfg.Manager.APIBaseList() {
apiBases = append(apiBases, w.cfg.Manager.ExtraStatusAPIs...)
for _, root := range apiBases {
url := fmt.Sprintf( url := fmt.Sprintf(
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name, "%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
) )
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, smsg, w.httpClient); err != nil { if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error()) logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
} }
@@ -430,12 +429,9 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
func (w *Worker) fetchJobStatus() []MirrorStatus { func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus var mirrorList []MirrorStatus
apiBase := w.cfg.Manager.APIBaseList()[0]
url := fmt.Sprintf( url := fmt.Sprintf("%s/workers/%s/jobs", apiBase, w.Name())
"%s/workers/%s/jobs",
w.cfg.Manager.APIBase,
w.Name(),
)
if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil { if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil {
logger.Errorf("Failed to fetch job status: %s", err.Error()) logger.Errorf("Failed to fetch job status: %s", err.Error())