1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-08 15:36:47 +00:00

78 次代码提交

作者 SHA1 备注 提交日期
z4yx
b4fe4db82a Merge remote-tracking branch 'origin/dev' 2019-11-04 23:11:34 +08:00
z4yx
839363aaaa reschedule the job if any hook fails 2019-11-04 22:52:03 +08:00
Yuxiang Zhang
08aee8eb42 Merge pull request #98 from ziqin/feature/btrfs-snapshot
Reimplement Btrfs snapshots hook
2019-08-31 10:57:44 +08:00
Jeeken Wang
501f77ee41 Merge branch 'master' into feature/btrfs-snapshot 2019-08-15 01:26:28 +08:00
z4yx
9e91fd706e Merge branch 'dev' 2019-08-13 23:10:43 +08:00
z4yx
94cf0b4bdb fix possible null dereferencing, reported by #96 2019-08-13 23:07:01 +08:00
WANG Ziqin
8fd2059013 add doc for setup btrfs snapshots 2019-08-02 13:31:33 +08:00
WANG Ziqin
6b56c4254c feat(btrfs_snapshot_hook): reimplemented Btrfs snapshots
TODO: test coverage
2019-08-02 13:31:33 +08:00
Yuxiang Zhang
3872c41607 Merge pull request #97 from ziqin/master
Refine: remove outer `provider`s which shadow the embedded `provider`s provided by `emptyHook`
2019-08-02 09:27:04 +08:00
WANG Ziqin
30259da0f0 fix nil pointer dereference: check err first 2019-08-02 02:15:22 +08:00
WANG Ziqin
4854d9b981 Fix test: initialize dockerHook with embedded provider 2019-07-31 17:29:28 +08:00
WANG Ziqin
06fce98c00 Eliminate duplicate mirrorProvider in Hooks 2019-07-31 16:11:56 +08:00
Jeeken Wang
8408236646 Update "Job Run Process" diagram according to runJobWrapper 2019-07-31 12:26:09 +08:00
z4yx
540eea8aeb set golang version to 1.11 2019-07-05 16:54:29 +08:00
z4yx
a6fc97889d [bug fix] stalled scheduler if post-sync hook runs for a time which is longer than the sync interval 2019-07-05 16:29:00 +08:00
Yuxiang Zhang
5f7d974469 Merge pull request #93 from vgxbj/patch-1
Fix ascii chart for `Job Run Process`
2019-05-30 10:16:22 +08:00
Guō Xīng
3b52f93e7e Fix ascii chart for Job Run Process 2019-05-29 14:32:50 +08:00
zyx
1025189542 fix possible null dereferencing in server_test 2019-04-13 11:13:17 +08:00
zyx
9f91d90fc5 check Retry configuration in providers 2019-04-13 11:01:56 +08:00
zyx
1aa4ae9cc1 Merge remote-tracking branch 'kinosang/master' into wip-test-pr 2019-04-13 02:07:41 +08:00
zyx
d0deeb19a9 extract mirror size from rsync provider automatically 2019-04-13 01:27:35 +08:00
zyx
a283328dc4 increase test converage of worker 2019-04-12 09:43:57 +08:00
zyx
1890bbed3c add tests for last commit 2019-04-11 12:36:43 +08:00
zyx
ddc9efd155 report next scheduled sync time 2019-04-11 12:36:18 +08:00
zyx
7eb119b892 singleton of worker is not used, so remove it 2019-04-11 10:07:42 +08:00
zyx
96f11f57ed throw an error if executing reload command without worker id 2019-04-09 22:30:08 +08:00
Yuxiang Zhang
3e6e6f9b14 Update tips.md 2019-04-07 21:48:57 +08:00
Yuxiang Zhang
b06cadfe06 Update tips.md 2019-04-07 21:48:00 +08:00
Yuxiang Zhang
9c34372ae4 add link to tips.md 2019-04-07 21:35:40 +08:00
Yuxiang Zhang
ebbfff40f6 Merge pull request #91 from SCU-MingYuan/master
Added some control tips
2019-04-07 21:33:33 +08:00
GaryH4
5eeade22fc Update tips.md 2019-04-07 19:55:13 +08:00
GaryH4
4b3741308b Update tips.md 2019-04-06 23:48:33 +08:00
GaryH4
7d495c1956 Update tips.md 2019-04-06 23:40:43 +08:00
GaryH4
0bf8400077 Added some tips 2019-04-06 23:30:04 +08:00
Yuxiang Zhang
c611759394 Update get_started.md 2019-04-06 11:21:22 +08:00
Yuxiang Zhang
279aa32b68 Update get_started.md 2019-04-06 11:09:24 +08:00
Yuxiang Zhang
025544449a remove section of certificate generation 2019-04-06 10:56:38 +08:00
zyx
90d419ca66 add tests for last commit 2019-03-31 12:16:45 +08:00
zyx
96cb975412 Let user create ZFS dataset manually due to security considerations 2019-03-31 12:09:42 +08:00
王邈
ff3e690497 Revert "change owner of folder to current user after creating zfs dataset (close #89)"
This reverts commit a58e6d37ae and
re-opens #89.

Signed-off-by: 王邈 <shankerwangmiao@gmail.com>
2019-03-26 00:30:06 +08:00
zyx
a58e6d37ae change owner of folder to current user after creating zfs dataset (close #89) 2019-03-25 23:40:04 +08:00
zhang
7a4a8ad486 Merge branch 'master' of github.com:tuna/tunasync 2018-10-25 22:52:21 +08:00
zhang
e1c0c25efa add example of worker config 2018-10-25 22:52:02 +08:00
z4yx
9ac374527a regenerate travis deploy key 2018-10-25 17:27:32 +08:00
z4yx
f03626d4e1 update Get Started document 2018-10-25 17:23:02 +08:00
z4yx
23bf4890cc bump version to v0.3.2 2018-10-25 17:07:04 +08:00
z4yx
2f6a61aee5 increse test coverage 2018-10-25 17:02:05 +08:00
z4yx
b6043142e1 test if it works with golang 1.8 2018-10-25 16:16:04 +08:00
zhang
6241576b12 bug fix: tunasynctl failed to parse datetime when you list jobs of specific worker 2018-06-13 10:28:48 +08:00
bigeagle
ef78563b8c Merge pull request #74 from houbaron/patch-1
Update README.md
2018-05-31 21:25:55 +08:00
bigeagle
ca106f1360 Merge pull request #82 from tuna/dev
New feature: remove a worker with tunasynctl
2018-05-31 21:22:46 +08:00
Miao Wang
628266ac5a Merge pull request #81 from tuna/wip-override-concurrent-limit
New feature: run "tunasynctl start" with "-f" to override the limit of concurrent jobs
2018-05-31 14:22:03 +08:00
Yuxiang Zhang
7e601d9fff New feature: remove a worker with tunasynctl
Fix #78
2018-05-31 12:32:22 +08:00
z4yx
c750aa1871 new feature: run "tunasynctl start" with "-f" to override concurrent job limit 2018-05-30 18:59:24 +08:00
Yuxiang Zhang
6cbe91b4f1 new command: jobForceStart 2018-05-30 16:07:07 +08:00
Yuxiang Zhang
89a792986d increase test coverage rate of job & provider 2018-05-30 14:00:10 +08:00
Yuxiang Zhang
0fdb07d061 bug fix: log over-written in twoStageRsyncProvider
solve more DATA RACE problem
2018-05-30 12:28:09 +08:00
Yuxiang Zhang
c5bb172f99 increase test coverage rate of job.go 2018-05-30 11:45:05 +08:00
Yuxiang Zhang
79e6167028 fix race condition on logFile of baseProvider 2018-05-30 01:46:16 +08:00
Miao Wang
285ffb2f2f Merge pull request #80 from tuna/dev
Fix the "list" command of tunasynctl
2018-05-29 21:42:57 +08:00
Yuxiang Zhang
95bb4bbd5e report the last ended time (updated whether successful or not) of jobs 2018-05-29 21:21:03 +08:00
Yuxiang Zhang
6bca9d2cd5 fix TestHTTPServer in manager package 2018-05-29 19:07:01 +08:00
Yuxiang Zhang
4fe7d03e54 Move the WebMirrorStatus to internal package. Fix the list command of tunasynctl 2018-05-29 18:48:33 +08:00
Baron Hou
1fe9499728 Update README.md 2017-09-29 18:14:11 +08:00
bigeagle
a475b044c6 feat(worker): add 'use_ipv4' option for rsync provider 2017-09-08 00:15:48 +08:00
bigeagle
a50a360a91 Revert "feat(worker): add '-4' option to rsync when 'use_ipv6' is false"
This reverts commit d536aca2ac.
2017-09-08 00:12:40 +08:00
bigeagle
d536aca2ac feat(worker): add '-4' option to rsync when 'use_ipv6' is false 2017-09-06 23:22:55 +08:00
bigeagle
28545d61e7 Merge pull request #68 from l2dy/master
Update README.md
2017-05-29 11:03:27 -05:00
Zero King
a87fb0f8b4 Update README.md 2017-05-29 15:42:10 +00:00
Jason Lau
095e7c6320 Merge pull request #65 from felixonmars/patch-1
Fix a typo: Fisrt -> First
2017-03-30 15:31:46 +08:00
Felix Yan
7b441312f4 Fix a typo: Fisrt -> First 2017-03-30 13:27:40 +08:00
7IN0SAN9
563860d424 fix #63 2017-03-27 13:09:56 +08:00
bigeagle
93194cde2e Merge pull request #60 from tuna/dev
Dev
2016-12-19 01:10:38 +08:00
bigeagle
aa4c31a32b feat(tunasynctl): implemented 'set-size' command to update a mirror size 2016-12-18 23:30:41 +08:00
bigeagle
4c6a407c17 feat(manager): implemented restful API for updating mirror size 2016-12-18 23:06:08 +08:00
bigeagle
939abaef9b feat(worker): TUNASYNC_LOG_DIR environment variable 2016-12-18 20:41:26 +08:00
bigeagle
d5a438462f feat(worker): map current uid and gid to docker 2016-12-18 14:28:48 +08:00
bigeagle
d4e07a7b29 fix(worker): keep the same working dir inside and outside of docker 2016-12-18 14:28:32 +08:00
共有 42 个文件被更改,包括 1826 次插入296 次删除

查看文件

@@ -2,7 +2,7 @@ sudo: required
language: go language: go
go: go:
- 1.6 - 1.11
before_install: before_install:
- sudo apt-get install cgroup-bin - sudo apt-get install cgroup-bin
@@ -35,9 +35,10 @@ deploy:
file: file:
- "build/tunasync-linux-bin.tar.gz" - "build/tunasync-linux-bin.tar.gz"
api_key: api_key:
secure: "F9kaVaR1mxEh2+EL9Nm8GZmbVY98pXCJA0LGDNrq1C2vU61AUNOeX6yI1mMklHNZPLBqoFDvGN1M5HnJ+xWCFH+KnJgLD2GVIAcAxFNpcNWQe8XKE5heklNsIQNQfuh/rJKM6YzeDB9G5RN4Y76iL4WIAXhNnMm48W6jLnWhf70=" secure: ZOYL/CALrVJsZzbZqUMSI89Gw4zsBJH1StD/2yTyG45GfKgvtK4hG0S5cQM/L0wcikjEkgxSMsmr4ycq+OwbN++gc0umfoAQ/VSjzetiobAlT1E854aRKRjT82WxYdnPW2fsFjuEJTcyZmcbgJGTMi86MDt7w8tEjLomhd1+rUo=
skip_cleanup: true skip_cleanup: true
overwrite: true overwrite: true
on: on:
tags: true tags: true
all_branches: true all_branches: true
repo: tuna/tunasync

查看文件

@@ -19,7 +19,7 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
``` ```
# Architecture # Architecture
- Manager: Centural instance on status and job management - Manager: Central instance for status and job management
- Worker: Runs mirror jobs - Worker: Runs mirror jobs
+------------+ +---+ +---+ +------------+ +---+ +---+
@@ -40,73 +40,16 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
# Job Run Process # Job Run Process
PreSyncing Syncing Success PreSyncing Syncing Success
+-----------+ +-----------+ +-------------+ +--------------+ +-----------+ +----------+ +-----------+ +-------------+ +--------------+
| pre-job +--+->| job run +--->| post-exec +-+-->| post-success | | pre-job +--+->| pre-exec +--->| job run +--->| post-exec +-+-->| post-success |
+-----------+ ^ +-----------+ +-------------+ | +--------------+ +-----------+ ^ +----------+ +-----------+ +-------------+ | +--------------+
| | | |
| +-----------------+ | Failed | +-----------------+ | Failed
+------+ post-fail |<---------+ +----------------+ post-fail |<---------------+
+-----------------+ +-----------------+
``` ```
## Generate Self-Signed Certificate
Fisrt, create root CA
```
openssl genrsa -out rootCA.key 2048
openssl req -x509 -new -nodes -key rootCA.key -days 365 -out rootCA.crt
```
Create host key
```
openssl genrsa -out host.key 2048
```
Now create CSR, before that, write a `req.cnf`
```
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = BJ
localityName = Locality Name (eg, city)
localityName_default = Beijing
organizationalUnitName = Organizational Unit Name (eg, section)
organizationalUnitName_default = TUNA
commonName = Common Name (server FQDN or domain name)
commonName_default = <server_FQDN>
commonName_max = 64
[v3_req]
# Extensions to add to a certificate request
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1 = <server_FQDN_1>
DNS.2 = <server_FQDN_2>
```
Substitute `<server_FQDN>` with your server's FQDN, then run
```
openssl req -new -key host.key -out host.csr -config req.cnf
```
Finally generate and sign host cert with root CA
```
openssl x509 -req -in host.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out host.crt -days 365 -extensions v3_req -extfile req.cnf
```
## Building ## Building

查看文件

@@ -60,7 +60,7 @@ func startWorker(c *cli.Context) error {
os.Exit(1) os.Exit(1)
} }
w := worker.GetTUNASyncWorker(cfg) w := worker.NewTUNASyncWorker(cfg)
if w == nil { if w == nil {
logger.Errorf("Error intializing TUNA sync worker.") logger.Errorf("Error intializing TUNA sync worker.")
os.Exit(1) os.Exit(1)

查看文件

@@ -140,9 +140,9 @@ func listWorkers(c *cli.Context) error {
} }
func listJobs(c *cli.Context) error { func listJobs(c *cli.Context) error {
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl var genericJobs interface{}
var jobs []tunasync.MirrorStatus
if c.Bool("all") { if c.Bool("all") {
var jobs []tunasync.WebMirrorStatus
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client) _, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
if err != nil { if err != nil {
return cli.NewExitError( return cli.NewExitError(
@@ -150,8 +150,10 @@ func listJobs(c *cli.Context) error {
"of all jobs from manager server: %s", err.Error()), "of all jobs from manager server: %s", err.Error()),
1) 1)
} }
genericJobs = jobs
} else { } else {
var jobs []tunasync.MirrorStatus
args := c.Args() args := c.Args()
if len(args) == 0 { if len(args) == 0 {
return cli.NewExitError( return cli.NewExitError(
@@ -174,9 +176,10 @@ func listJobs(c *cli.Context) error {
for range args { for range args {
jobs = append(jobs, <-ans...) jobs = append(jobs, <-ans...)
} }
genericJobs = jobs
} }
b, err := json.MarshalIndent(jobs, "", " ") b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil { if err != nil {
return cli.NewExitError( return cli.NewExitError(
fmt.Sprintf("Error printing out informations: %s", err.Error()), fmt.Sprintf("Error printing out informations: %s", err.Error()),
@@ -186,6 +189,103 @@ func listJobs(c *cli.Context) error {
return nil return nil
} }
func updateMirrorSize(c *cli.Context) error {
args := c.Args()
if len(args) != 2 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
}
workerID := c.String("worker")
mirrorID := args.Get(0)
mirrorSize := args.Get(1)
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{
Name: mirrorID,
Size: mirrorSize,
}
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
)
resp, err := tunasync.PostJSON(url, msg, client)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s",
err.Error()),
1)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return cli.NewExitError(
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
)
}
var status tunasync.MirrorStatus
json.Unmarshal(body, &status)
if status.Size != mirrorSize {
return cli.NewExitError(
fmt.Sprintf(
"Mirror size error, expecting %s, manager returned %s",
mirrorSize, status.Size,
), 1,
)
}
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
return nil
}
func removeWorker(c *cli.Context) error {
args := c.Args()
if len(args) != 0 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
}
workerID := c.String("worker")
if len(workerID) == 0 {
return cli.NewExitError("Please specify the <worker-id>", 1)
}
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
logger.Panicf("Invalid HTTP Request: %s", err.Error())
}
resp, err := client.Do(req)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to parse response: %s", err.Error()),
1)
}
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
" command: HTTP status code is not 200: %s", body),
1)
}
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
if res["message"] == "deleted" {
logger.Info("Successfully removed the worker")
} else {
logger.Info("Failed to remove the worker")
}
return nil
}
func flushDisabledJobs(c *cli.Context) error { func flushDisabledJobs(c *cli.Context) error {
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil) req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
if err != nil { if err != nil {
@@ -235,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
"argument WORKER", 1) "argument WORKER", 1)
} }
options := map[string]bool{}
if c.Bool("force") {
options["force"] = true
}
cmd := tunasync.ClientCmd{ cmd := tunasync.ClientCmd{
Cmd: cmd, Cmd: cmd,
MirrorID: mirrorID, MirrorID: mirrorID,
WorkerID: c.String("worker"), WorkerID: c.String("worker"),
Args: argsList, Args: argsList,
Options: options,
} }
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client) resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
if err != nil { if err != nil {
@@ -270,6 +375,11 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc { func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc {
return func(c *cli.Context) error { return func(c *cli.Context) error {
if c.String("worker") == "" {
return cli.NewExitError("Please specify the worker with -w <worker-id>", 1)
}
cmd := tunasync.ClientCmd{ cmd := tunasync.ClientCmd{
Cmd: cmd, Cmd: cmd,
WorkerID: c.String("worker"), WorkerID: c.String("worker"),
@@ -360,6 +470,11 @@ func main() {
}, },
} }
forceStartFlag := cli.BoolFlag{
Name: "force, f",
Usage: "Override the concurrent limit",
}
app.Commands = []cli.Command{ app.Commands = []cli.Command{
{ {
Name: "list", Name: "list",
@@ -385,10 +500,34 @@ func main() {
Flags: commonFlags, Flags: commonFlags,
Action: initializeWrapper(listWorkers), Action: initializeWrapper(listWorkers),
}, },
{
Name: "rm-worker",
Usage: "Remove a worker",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "worker-id of the worker to be removed",
},
),
Action: initializeWrapper(removeWorker),
},
{
Name: "set-size",
Usage: "Set mirror size",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "specify worker-id of the mirror job",
},
),
Action: initializeWrapper(updateMirrorSize),
},
{ {
Name: "start", Name: "start",
Usage: "Start a job", Usage: "Start a job",
Flags: append(commonFlags, cmdFlags...), Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
Action: initializeWrapper(cmdJob(tunasync.CmdStart)), Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
}, },
{ {

查看文件

@@ -42,7 +42,7 @@ interval = 1
[manager] [manager]
api_base = "http://localhost:12345" api_base = "http://localhost:12345"
token = "some_token" token = ""
ca_cert = "" ca_cert = ""
[cgroup] [cgroup]
@@ -90,6 +90,30 @@ $ tunasync worker --config ~/tunasync_demo/worker.conf
本例中,镜像的数据在`/tmp/tunasync/` 本例中,镜像的数据在`/tmp/tunasync/`
### 控制
查看同步状态
```
$ tunasynctl list -p 12345 --all
```
tunasynctl 也支持配置文件。配置文件可以放在 `/etc/tunasync/ctl.conf` 或者 `~/.config/tunasync/ctl.conf` 两个位置,后者可以覆盖前者的配置值。
配置文件内容为:
```
manager_addr = "127.0.0.1"
manager_port = 12345
ca_cert = ""
```
### 安全
worker 和 manager 之间用 http(s) 通信,如果你 worker 和 manager 都是在本机,那么没必要使用 https。此时 manager 就不指定 `ssl_key``ssl_cert`,留空;worker 的 `ca_cert` 留空,`api_base``http://` 开头。
如果需要加密的通信,manager 需要指定 `ssl_key``ssl_cert`,worker 要指定 `ca_cert`,并且 `api_base` 应该是 `https://` 开头。
## 更进一步 ## 更进一步
可以参看 可以参看
@@ -100,3 +124,7 @@ $ tunasync worker --help
``` ```
可以看一下 log 目录 可以看一下 log 目录
一些 worker 配置文件示例 [workers.conf](workers.conf)
你可能会用到的操作 [tips.md](tips.md)

93
docs/zh_CN/tips.md 普通文件
查看文件

@@ -0,0 +1,93 @@
## 删除某worker的某镜像
先确定已经给tunasynctl写好config文件`~/.config/tunasync/ctl.conf`
```toml
manager_addr = "127.0.0.1"
manager_port = 12345
ca_cert = ""
```
接着
```shell
$ tunasynctl disable -w <worker_id> <mirror_name>
$ tunasynctl flush
```
## 热重载 `worker.conf`
```shell
$ tunasynctl reload -w <worker_id>
```
e.g. 删除 `test_worker``elvish` 镜像:
1. 删除存放镜像的文件夹
2. 删除 `worker.conf` 中对应的 `mirror` 段落
3. 接着操作:
```shell
$ tunasynctl reload -w test_worker
$ tunasynctl disable -w test_worker elvish
$ tunasynctl flush
```
4. (可选)最后删除日志文件夹里的日志
## 删除worker
```shell
$ tunasynctl rm-worker -w <worker_id>
```
e.g.
```shell
$ tunasynctl rm-worker -w test_worker
```
## 更新镜像的大小
```shell
$ tunasynctl set-size -w <worker_id> <mirror_name> <size>
```
其中,末尾的 <size> 参数,由操作者设定,或由某定时脚本生成
由于 `du -s` 比较耗时,故镜像大小可直接由rsync的日志文件读出
## Btrfs 文件系统快照
如果镜像文件存放在以 Btrfs 为文件系统的分区中,可启用由 Btrfs 提供的快照 (Snapshot) 功能。对于每一个镜像,tunasync 在每次成功同步后更新其快照。
`worker.conf` 中添加如下配置,即可启用 Btrfs 快照功能:
```toml
[btrfs_snapshot]
enable = true
snapshot_path = "/path/to/snapshot/directory"
```
其中 `snapshot_path` 为快照所在目录。如将其作为发布版本,则镜像同步过程对于镜像站用户而言具有原子性。如此可避免用户接收到仍处于“中间态”的(未完成同步的)文件。
也可以在 `[[mirrors]]` 中为特定镜像单独指定快照路径,如:
```toml
[[mirrors]]
name = "elvish"
provider = "rsync"
upstream = "rsync://rsync.elvish.io/elvish/"
interval = 1440
snapshot_path = "/data/publish/elvish"
```
**提示:**
若运行 tunasync 的用户无 root 权限,请确保该用户对镜像同步目录和快照目录均具有写和执行权限,并使用 [`user_subvol_rm_allowed` 选项](https://btrfs.wiki.kernel.org/index.php/Manpage/btrfs(5)#MOUNT_OPTIONS)挂载相应的 Btrfs 分区。

77
docs/zh_CN/workers.conf 普通文件
查看文件

@@ -0,0 +1,77 @@
[global]
name = "mirror_worker"
log_dir = "/srv/tunasync/log/tunasync/{{.Name}}"
mirror_dir = "/srv/tunasync"
concurrent = 10
interval = 1
[manager]
api_base = "http://localhost:12345"
token = "some_token"
ca_cert = ""
[cgroup]
enable = false
base_path = "/sys/fs/cgroup"
group = "tunasync"
[server]
hostname = "localhost"
listen_addr = "127.0.0.1"
listen_port = 6000
ssl_cert = ""
ssl_key = ""
[[mirrors]]
name = "adobe-fonts"
interval = 1440
provider = "command"
upstream = "https://github.com/adobe-fonts"
#https://github.com/tuna/tunasync-scripts/blob/master/adobe-fonts.sh
command = "/home/scripts/adobe-fonts.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "anaconda"
provider = "command"
upstream = "https://repo.continuum.io/"
#https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py
command = "/home/scripts/anaconda.py"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gnu"
provider = "rsync"
upstream = "rsync://mirrors.ocf.berkeley.edu/gnu/"
memory_limit = "256M"
[[mirrors]]
name = "pypi"
provider = "command"
upstream = "https://pypi.python.org/"
#https://github.com/tuna/tunasync-scripts/blob/master/pypi.sh
command = "/home/scripts/pypi.sh"
docker_image = "tunathu/tunasync-scripts:latest"
interval = 5
# set environment varialbes
[mirrors.env]
INIT = "0"
[[mirrors]]
name = "debian"
interval = 720
provider = "rsync"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/"
memory_limit = "256M"
[[mirrors]]
name = "ubuntu"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://archive.ubuntu.com/ubuntu/"
memory_limit = "256M"
# vim: ft=toml

查看文件

@@ -5,7 +5,7 @@ import (
"time" "time"
) )
// A StatusUpdateMsg represents a msg when // A MirrorStatus represents a msg when
// a worker has done syncing // a worker has done syncing
type MirrorStatus struct { type MirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
@@ -13,6 +13,8 @@ type MirrorStatus struct {
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"` LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"`
Scheduled time.Time `json:"next_schedule"`
Upstream string `json:"upstream"` Upstream string `json:"upstream"`
Size string `json:"size"` Size string `json:"size"`
ErrorMsg string `json:"error_msg"` ErrorMsg string `json:"error_msg"`
@@ -27,6 +29,15 @@ type WorkerStatus struct {
LastOnline time.Time `json:"last_online"` // last seen LastOnline time.Time `json:"last_online"` // last seen
} }
type MirrorSchedules struct {
Schedules []MirrorSchedule `json:"schedules"`
}
type MirrorSchedule struct {
MirrorName string `json:"name"`
NextSchedule time.Time `json:"next_schedule"`
}
// A CmdVerb is an action to a job or worker // A CmdVerb is an action to a job or worker
type CmdVerb uint8 type CmdVerb uint8
@@ -67,9 +78,10 @@ func (c CmdVerb) String() string {
// A WorkerCmd is the command message send from the // A WorkerCmd is the command message send from the
// manager to a worker // manager to a worker
type WorkerCmd struct { type WorkerCmd struct {
Cmd CmdVerb `json:"cmd"` Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"` MirrorID string `json:"mirror_id"`
Args []string `json:"args"` Args []string `json:"args"`
Options map[string]bool `json:"options"`
} }
func (c WorkerCmd) String() string { func (c WorkerCmd) String() string {
@@ -82,8 +94,9 @@ func (c WorkerCmd) String() string {
// A ClientCmd is the command message send from client // A ClientCmd is the command message send from client
// to the manager // to the manager
type ClientCmd struct { type ClientCmd struct {
Cmd CmdVerb `json:"cmd"` Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"` MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"` WorkerID string `json:"worker_id"`
Args []string `json:"args"` Args []string `json:"args"`
Options map[string]bool `json:"options"`
} }

查看文件

@@ -1,11 +1,9 @@
package manager package internal
import ( import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"time" "time"
. "github.com/tuna/tunasync/internal"
) )
type textTime struct { type textTime struct {
@@ -38,24 +36,32 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
return err return err
} }
// webMirrorStatus is the mirror status to be shown in the web page // WebMirrorStatus is the mirror status to be shown in the web page
type webMirrorStatus struct { type WebMirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"` LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"` LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"`
LastEndedTs stampTime `json:"last_ended_ts"`
Scheduled textTime `json:"next_schedule"`
ScheduledTs stampTime `json:"next_schedule_ts"`
Upstream string `json:"upstream"` Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size Size string `json:"size"` // approximate size
} }
func convertMirrorStatus(m MirrorStatus) webMirrorStatus { func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
return webMirrorStatus{ return WebMirrorStatus{
Name: m.Name, Name: m.Name,
IsMaster: m.IsMaster, IsMaster: m.IsMaster,
Status: m.Status, Status: m.Status,
LastUpdate: textTime{m.LastUpdate}, LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate}, LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded},
LastEndedTs: stampTime{m.LastEnded},
Scheduled: textTime{m.Scheduled},
ScheduledTs: stampTime{m.Scheduled},
Upstream: m.Upstream, Upstream: m.Upstream,
Size: m.Size, Size: m.Size,
} }

87
internal/status_web_test.go 普通文件
查看文件

@@ -0,0 +1,87 @@
package internal
import (
"encoding/json"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := WebMirrorStatus{
Name: "tunalinux",
Status: Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
LastEnded: textTime{t},
LastEndedTs: stampTime{t},
Scheduled: textTime{t},
ScheduledTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 WebMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
Convey("BuildWebMirrorStatus should work", t, func() {
m := MirrorStatus{
Name: "arch-sync3",
Worker: "testWorker",
IsMaster: true,
Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(),
Scheduled: time.Now().Add(time.Minute * 5),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
var m2 WebMirrorStatus
m2 = BuildWebMirrorStatus(m)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

查看文件

@@ -8,6 +8,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"regexp"
"time" "time"
) )
@@ -84,3 +85,14 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response,
} }
return resp, json.Unmarshal(body, obj) return resp, json.Unmarshal(body, obj)
} }
func ExtractSizeFromRsyncLog(content []byte) string {
// (?m) flag enables multi-line mode
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`)
matches := re.FindAllSubmatch(content, -1)
// fmt.Printf("%q\n", matches)
if len(matches) == 0 {
return ""
}
return string(matches[len(matches)-1][1])
}

32
internal/util_test.go 普通文件
查看文件

@@ -0,0 +1,32 @@
package internal
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestExtractSizeFromRsyncLog(t *testing.T) {
realLogContent := `
Number of files: 998,470 (reg: 925,484, dir: 58,892, link: 14,094)
Number of created files: 1,049 (reg: 1,049)
Number of deleted files: 1,277 (reg: 1,277)
Number of regular files transferred: 5,694
Total file size: 1.33T bytes
Total transferred file size: 2.86G bytes
Literal data: 780.62M bytes
Matched data: 2.08G bytes
File list size: 37.55M
File list generation time: 7.845 seconds
File list transfer time: 0.000 seconds
Total bytes sent: 7.55M
Total bytes received: 823.25M
sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec
total size is 1.33T speedup is 1,604.11
`
Convey("Log parser should work", t, func() {
res := ExtractSizeFromRsyncLog([]byte(realLogContent))
So(res, ShouldEqual, "1.33T")
})
}

查看文件

@@ -1,3 +1,3 @@
package internal package internal
const Version string = "0.2-dev" const Version string = "0.3.3"

查看文件

@@ -14,6 +14,7 @@ type dbAdapter interface {
Init() error Init() error
ListWorkers() ([]WorkerStatus, error) ListWorkers() ([]WorkerStatus, error)
GetWorker(workerID string) (WorkerStatus, error) GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error) CreateWorker(w WorkerStatus) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
return return
} }
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := bucket.Delete([]byte(workerID))
return err
})
return
}
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey)) bucket := tx.Bucket([]byte(_workerBucketKey))
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
bucket := tx.Bucket([]byte(_statusBucketKey)) bucket := tx.Bucket([]byte(_statusBucketKey))
v := bucket.Get([]byte(id)) v := bucket.Get([]byte(id))
if v == nil { if v == nil {
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID) return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
} }
err := json.Unmarshal(v, &m) err := json.Unmarshal(v, &m)
return err return err

查看文件

@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
} }
Convey("get exists worker", func() { Convey("get existent worker", func() {
_, err := boltDB.GetWorker(testWorkerIDs[0]) _, err := boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
Convey("list exist worker", func() { Convey("list existent workers", func() {
ws, err := boltDB.ListWorkers() ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2) So(len(ws), ShouldEqual, 2)
}) })
Convey("get inexist worker", func() { Convey("get non-existent worker", func() {
_, err := boltDB.GetWorker("invalid workerID") _, err := boltDB.GetWorker("invalid workerID")
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("delete existent worker", func() {
err := boltDB.DeleteWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
_, err = boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 1)
})
Convey("delete non-existent worker", func() {
err := boltDB.DeleteWorker("invalid workerID")
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})
}) })
Convey("update mirror status", func() { Convey("update mirror status", func() {
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB", Size: "3GB",
}, },
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Disabled, Status: Disabled,
LastUpdate: time.Now(), LastUpdate: time.Now().Add(-time.Hour),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
}, },
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now().Add(-time.Second),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
}, },

查看文件

@@ -1,6 +1,7 @@
package manager package manager
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
@@ -83,10 +84,14 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// workerID should be valid in this route group // workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator) workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
{ {
// delete specified worker
workerValidateGroup.DELETE(":id", s.deleteWorker)
// get job list // get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker) workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status // post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker) workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
} }
// for tunasynctl to post commands // for tunasynctl to post commands
@@ -133,11 +138,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
s.returnErrJSON(c, http.StatusInternalServerError, err) s.returnErrJSON(c, http.StatusInternalServerError, err)
return return
} }
webMirStatusList := []webMirrorStatus{} webMirStatusList := []WebMirrorStatus{}
for _, m := range mirrorStatusList { for _, m := range mirrorStatusList {
webMirStatusList = append( webMirStatusList = append(
webMirStatusList, webMirStatusList,
convertMirrorStatus(m), BuildWebMirrorStatus(m),
) )
} }
c.JSON(http.StatusOK, webMirStatusList) c.JSON(http.StatusOK, webMirStatusList)
@@ -157,6 +162,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"}) c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
} }
// deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id")
err := s.adapter.DeleteWorker(workerID)
if err != nil {
err := fmt.Errorf("failed to delete worker: %s",
err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
logger.Noticef("Worker <%s> deleted", workerID)
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
}
// listWrokers respond with informations of all the workers // listWrokers respond with informations of all the workers
func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus var workerInfos []WorkerStatus
@@ -220,11 +241,59 @@ func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
}) })
} }
func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
workerID := c.Param("id")
var schedules MirrorSchedules
c.BindJSON(&schedules)
for _, schedule := range schedules.Schedules {
mirrorName := schedule.MirrorName
if len(mirrorName) == 0 {
s.returnErrJSON(
c, http.StatusBadRequest,
errors.New("Mirror Name should not be empty"),
)
}
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
if err != nil {
fmt.Errorf("failed to get job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
continue
}
if curStatus.Scheduled == schedule.NextSchedule {
// no changes, skip update
continue
}
curStatus.Scheduled = schedule.NextSchedule
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
}
type empty struct{}
c.JSON(http.StatusOK, empty{})
}
func (s *Manager) updateJobOfWorker(c *gin.Context) { func (s *Manager) updateJobOfWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
var status MirrorStatus var status MirrorStatus
c.BindJSON(&status) c.BindJSON(&status)
mirrorName := status.Name mirrorName := status.Name
if len(mirrorName) == 0 {
s.returnErrJSON(
c, http.StatusBadRequest,
errors.New("Mirror Name should not be empty"),
)
}
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
@@ -234,21 +303,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
} else { } else {
status.LastUpdate = curStatus.LastUpdate status.LastUpdate = curStatus.LastUpdate
} }
if status.Status == Success || status.Status == Failed {
status.LastEnded = time.Now()
} else {
status.LastEnded = curStatus.LastEnded
}
// Only message with meaningful size updates the mirror size
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
if len(status.Size) == 0 || status.Size == "unknown" {
status.Size = curStatus.Size
}
}
// for logging // for logging
switch status.Status { switch status.Status {
case Success:
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
case Failed:
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
case Syncing: case Syncing:
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker) logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
case Disabled:
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
case Paused:
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
default: default:
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status) logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
} }
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
@@ -263,6 +336,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
c.JSON(http.StatusOK, newStatus) c.JSON(http.StatusOK, newStatus)
} }
func (s *Manager) updateMirrorSize(c *gin.Context) {
workerID := c.Param("id")
type SizeMsg struct {
Name string `json:"name"`
Size string `json:"size"`
}
var msg SizeMsg
c.BindJSON(&msg)
mirrorName := msg.Name
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
if err != nil {
logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s",
mirrorName, workerID, err.Error(),
)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
// Only message with meaningful size updates the mirror size
if len(msg.Size) > 0 || msg.Size != "unknown" {
status.Size = msg.Size
}
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, newStatus)
}
func (s *Manager) handleClientCmd(c *gin.Context) { func (s *Manager) handleClientCmd(c *gin.Context) {
var clientCmd ClientCmd var clientCmd ClientCmd
c.BindJSON(&clientCmd) c.BindJSON(&clientCmd)
@@ -286,6 +398,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
Cmd: clientCmd.Cmd, Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID, MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args, Args: clientCmd.Args,
Options: clientCmd.Options,
} }
// update job status, even if the job did not disable successfully, // update job status, even if the job did not disable successfully,

查看文件

@@ -21,9 +21,16 @@ const (
) )
func TestHTTPServer(t *testing.T) { func TestHTTPServer(t *testing.T) {
var listenPort = 5000
Convey("HTTP server should work", t, func(ctx C) { Convey("HTTP server should work", t, func(ctx C) {
listenPort++
port := listenPort
addr := "127.0.0.1"
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
InitLogger(true, true, false) InitLogger(true, true, false)
s := GetTUNASyncManager(&Config{Debug: false}) s := GetTUNASyncManager(&Config{Debug: true})
s.cfg.Server.Addr = addr
s.cfg.Server.Port = port
So(s, ShouldNotBeNil) So(s, ShouldNotBeNil)
s.setDBAdapter(&mockDBAdapter{ s.setDBAdapter(&mockDBAdapter{
workerStore: map[string]WorkerStatus{ workerStore: map[string]WorkerStatus{
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
}}, }},
statusStore: make(map[string]MirrorStatus), statusStore: make(map[string]MirrorStatus),
}) })
port := rand.Intn(10000) + 20000 go s.Run()
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port) time.Sleep(50 * time.Millisecond)
go func() {
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
}()
time.Sleep(50 * time.Microsecond)
resp, err := http.Get(baseURL + "/ping") resp, err := http.Get(baseURL + "/ping")
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -79,7 +82,34 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2) So(len(actualResponseObj), ShouldEqual, 2)
}) })
Convey("flush disabled jobs", func(ctx C) { Convey("delete an existent worker", func(ctx C) {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_infoKey], ShouldEqual, "deleted")
})
Convey("delete non-existent worker", func(ctx C) {
invalidWorker := "test_worker233"
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil) req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
clt := &http.Client{} clt := &http.Client{}
@@ -99,11 +129,11 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB", Size: "unknown",
} }
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil) resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("list mirror status of an existed worker", func(ctx C) { Convey("list mirror status of an existed worker", func(ctx C) {
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
Convey("list all job status of all workers", func(ctx C) { Convey("list all job status of all workers", func(ctx C) {
var ms []webMirrorStatus var ms []WebMirrorStatus
resp, err := GetJSON(baseURL+"/jobs", &ms, nil) resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -137,8 +168,91 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
}) })
Convey("Update size of a valid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{status.Name, "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("Get new size of a mirror", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, "5GB")
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
})
Convey("Update schedule of valid mirrors", func(ctx C) {
msg := MirrorSchedules{
[]MirrorSchedule{
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
},
}
url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
})
Convey("Update size of an invalid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{"Invalid mirror", "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
})
// what if status changed to failed
status.Status = Failed
time.Sleep(3 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("What if syncing job failed", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
}) })
Convey("update mirror status of an inexisted worker", func(ctx C) { Convey("update mirror status of an inexisted worker", func(ctx C) {
@@ -149,6 +263,7 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
} }
@@ -162,6 +277,24 @@ func TestHTTPServer(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
}) })
Convey("update schedule of an non-existent worker", func(ctx C) {
invalidWorker := "test_worker2"
sch := MirrorSchedules{
[]MirrorSchedule{
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
},
}
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
baseURL, invalidWorker), sch, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
defer resp.Body.Close()
var msg map[string]string
err = json.NewDecoder(resp.Body).Decode(&msg)
So(err, ShouldBeNil)
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("handle client command", func(ctx C) { Convey("handle client command", func(ctx C) {
cmdChan := make(chan WorkerCmd, 1) cmdChan := make(chan WorkerCmd, 1)
workerServer := makeMockWorkerServer(cmdChan) workerServer := makeMockWorkerServer(cmdChan)
@@ -180,11 +313,11 @@ func TestHTTPServer(t *testing.T) {
// run the mock worker server // run the mock worker server
workerServer.Run(bindAddress) workerServer.Run(bindAddress)
}() }()
time.Sleep(50 * time.Microsecond) time.Sleep(50 * time.Millisecond)
// verify the worker mock server is running // verify the worker mock server is running
workerResp, err := http.Get(workerBaseURL + "/ping") workerResp, err := http.Get(workerBaseURL + "/ping")
defer workerResp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
defer workerResp.Body.Close()
So(workerResp.StatusCode, ShouldEqual, http.StatusOK) So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
Convey("when client send wrong cmd", func(ctx C) { Convey("when client send wrong cmd", func(ctx C) {
@@ -194,8 +327,8 @@ func TestHTTPServer(t *testing.T) {
WorkerID: "not_exist_worker", WorkerID: "not_exist_worker",
} }
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil) resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
}) })
@@ -207,9 +340,8 @@ func TestHTTPServer(t *testing.T) {
} }
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil) resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
time.Sleep(50 * time.Microsecond) time.Sleep(50 * time.Microsecond)
select { select {
@@ -252,6 +384,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
return w, nil return w, nil
} }
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
delete(b.workerStore, workerID)
return nil
}
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
// _, ok := b.workerStore[w.ID] // _, ok := b.workerStore[w.ID]
// if ok { // if ok {

查看文件

@@ -1,44 +0,0 @@
package manager
import (
"encoding/json"
"testing"
"time"
tunasync "github.com/tuna/tunasync/internal"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := webMirrorStatus{
Name: "tunalinux",
Status: tunasync.Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 webMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

查看文件

@@ -15,13 +15,12 @@ type baseProvider struct {
ctx *Context ctx *Context
name string name string
interval time.Duration interval time.Duration
retry int
isMaster bool isMaster bool
cmd *cmdJob cmd *cmdJob
isRunning atomic.Value isRunning atomic.Value
logFile *os.File
cgroup *cgroupHook cgroup *cgroupHook
zfs *zfsHook zfs *zfsHook
docker *dockerHook docker *dockerHook
@@ -52,6 +51,10 @@ func (p *baseProvider) Interval() time.Duration {
return p.interval return p.interval
} }
func (p *baseProvider) Retry() int {
return p.retry
}
func (p *baseProvider) IsMaster() bool { func (p *baseProvider) IsMaster() bool {
return p.isMaster return p.isMaster
} }
@@ -111,20 +114,21 @@ func (p *baseProvider) Docker() *dockerHook {
return p.docker return p.docker
} }
func (p *baseProvider) prepareLogFile() error { func (p *baseProvider) prepareLogFile(append bool) error {
if p.LogFile() == "/dev/null" { if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil) p.cmd.SetLogFile(nil)
return nil return nil
} }
if p.logFile == nil { appendMode := 0
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644) if append {
if err != nil { appendMode = os.O_APPEND
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.logFile = logFile
} }
p.cmd.SetLogFile(p.logFile) logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.cmd.SetLogFile(logFile)
return nil return nil
} }
@@ -143,32 +147,26 @@ func (p *baseProvider) IsRunning() bool {
func (p *baseProvider) Wait() error { func (p *baseProvider) Wait() error {
defer func() { defer func() {
p.Lock() logger.Debugf("set isRunning to false: %s", p.Name())
p.isRunning.Store(false) p.isRunning.Store(false)
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
}() }()
logger.Debugf("calling Wait: %s", p.Name())
return p.cmd.Wait() return p.cmd.Wait()
} }
func (p *baseProvider) Terminate() error { func (p *baseProvider) Terminate() error {
p.Lock()
defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name()) logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() { if !p.IsRunning() {
return nil return nil
} }
p.Lock()
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
err := p.cmd.Terminate() err := p.cmd.Terminate()
p.isRunning.Store(false)
return err return err
} }
func (p *baseProvider) DataSize() string {
return ""
}

查看文件

@@ -0,0 +1,90 @@
package worker
import (
"fmt"
"os"
"path/filepath"
"github.com/dennwc/btrfs"
)
type btrfsSnapshotHook struct {
provider mirrorProvider
mirrorSnapshotPath string
}
// the user who runs the jobs (typically `tunasync`) should be granted the permission to run btrfs commands
// TODO: check if the filesystem is Btrfs
func newBtrfsSnapshotHook(provider mirrorProvider, snapshotPath string, mirror mirrorConfig) *btrfsSnapshotHook {
mirrorSnapshotPath := mirror.SnapshotPath
if mirrorSnapshotPath == "" {
mirrorSnapshotPath = filepath.Join(snapshotPath, provider.Name())
}
return &btrfsSnapshotHook{
provider: provider,
mirrorSnapshotPath: mirrorSnapshotPath,
}
}
// check if path `snapshotPath/providerName` exists
// Case 1: Not exists => create a new subvolume
// Case 2: Exists as a subvolume => nothing to do
// Case 3: Exists as a directory => error detected
func (h *btrfsSnapshotHook) preJob() error {
path := h.provider.WorkingDir()
if _, err := os.Stat(path); os.IsNotExist(err) {
// create subvolume
err := btrfs.CreateSubVolume(path)
if err != nil {
logger.Errorf("failed to create Btrfs subvolume %s: %s", path, err.Error())
return err
}
logger.Noticef("created new Btrfs subvolume %s", path)
} else {
if is, err := btrfs.IsSubVolume(path); err != nil {
return err
} else if !is {
return fmt.Errorf("path %s exists but isn't a Btrfs subvolume", path)
}
}
return nil
}
func (h *btrfsSnapshotHook) preExec() error {
return nil
}
func (h *btrfsSnapshotHook) postExec() error {
return nil
}
// delete old snapshot if exists, then create a new snapshot
func (h *btrfsSnapshotHook) postSuccess() error {
if _, err := os.Stat(h.mirrorSnapshotPath); !os.IsNotExist(err) {
isSubVol, err := btrfs.IsSubVolume(h.mirrorSnapshotPath)
if err != nil {
return err
} else if !isSubVol {
return fmt.Errorf("path %s exists and isn't a Btrfs snapshot", h.mirrorSnapshotPath)
}
// is old snapshot => delete it
if err := btrfs.DeleteSubVolume(h.mirrorSnapshotPath); err != nil {
logger.Errorf("failed to delete old Btrfs snapshot %s", h.mirrorSnapshotPath)
return err
}
logger.Noticef("deleted old snapshot %s", h.mirrorSnapshotPath)
}
// create a new writable snapshot
// (the snapshot is writable so that it can be deleted easily)
if err := btrfs.SnapshotSubVolume(h.provider.WorkingDir(), h.mirrorSnapshotPath, false); err != nil {
logger.Errorf("failed to create new Btrfs snapshot %s", h.mirrorSnapshotPath)
return err
}
logger.Noticef("created new Btrfs snapshot %s", h.mirrorSnapshotPath)
return nil
}
// keep the old snapshot => nothing to do
func (h *btrfsSnapshotHook) postFail() error {
return nil
}

查看文件

@@ -17,7 +17,6 @@ import (
type cgroupHook struct { type cgroupHook struct {
emptyHook emptyHook
provider mirrorProvider
basePath string basePath string
baseGroup string baseGroup string
created bool created bool
@@ -36,7 +35,9 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit st
subsystem = "cpu" subsystem = "cpu"
} }
return &cgroupHook{ return &cgroupHook{
provider: p, emptyHook: emptyHook{
provider: p,
},
basePath: basePath, basePath: basePath,
baseGroup: baseGroup, baseGroup: baseGroup,
subsystem: subsystem, subsystem: subsystem,

查看文件

@@ -1,6 +1,7 @@
package worker package worker
import ( import (
"errors"
"time" "time"
"github.com/anmitsu/go-shlex" "github.com/anmitsu/go-shlex"
@@ -11,6 +12,7 @@ type cmdConfig struct {
upstreamURL, command string upstreamURL, command string
workingDir, logDir, logFile string workingDir, logDir, logFile string
interval time.Duration interval time.Duration
retry int
env map[string]string env map[string]string
} }
@@ -22,11 +24,15 @@ type cmdProvider struct {
func newCmdProvider(c cmdConfig) (*cmdProvider, error) { func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
// TODO: check config options // TODO: check config options
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &cmdProvider{ provider := &cmdProvider{
baseProvider: baseProvider{ baseProvider: baseProvider{
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
cmdConfig: c, cmdConfig: c,
} }
@@ -60,17 +66,25 @@ func (p *cmdProvider) Run() error {
} }
func (p *cmdProvider) Start() error { func (p *cmdProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{ env := map[string]string{
"TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(), "TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.upstreamURL, "TUNASYNC_UPSTREAM_URL": p.upstreamURL,
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(), "TUNASYNC_LOG_FILE": p.LogFile(),
} }
for k, v := range p.env { for k, v := range p.env {
env[k] = v env[k] = v
} }
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env) p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

查看文件

@@ -8,6 +8,6 @@ import (
type empty struct{} type empty struct{}
const maxRetry = 2 const defaultMaxRetry = 2
var logger = logging.MustGetLogger("tunasync") var logger = logging.MustGetLogger("tunasync")

查看文件

@@ -33,14 +33,15 @@ func (p *providerEnum) UnmarshalText(text []byte) error {
// Config represents worker config options // Config represents worker config options
type Config struct { type Config struct {
Global globalConfig `toml:"global"` Global globalConfig `toml:"global"`
Manager managerConfig `toml:"manager"` Manager managerConfig `toml:"manager"`
Server serverConfig `toml:"server"` Server serverConfig `toml:"server"`
Cgroup cgroupConfig `toml:"cgroup"` Cgroup cgroupConfig `toml:"cgroup"`
ZFS zfsConfig `toml:"zfs"` ZFS zfsConfig `toml:"zfs"`
Docker dockerConfig `toml:"docker"` BtrfsSnapshot btrfsSnapshotConfig `toml:"btrfs_snapshot"`
Include includeConfig `toml:"include"` Docker dockerConfig `toml:"docker"`
Mirrors []mirrorConfig `toml:"mirrors"` Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"`
} }
type globalConfig struct { type globalConfig struct {
@@ -49,6 +50,7 @@ type globalConfig struct {
MirrorDir string `toml:"mirror_dir"` MirrorDir string `toml:"mirror_dir"`
Concurrent int `toml:"concurrent"` Concurrent int `toml:"concurrent"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"`
ExecOnSuccess []string `toml:"exec_on_success"` ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"` ExecOnFailure []string `toml:"exec_on_failure"`
@@ -95,6 +97,11 @@ type zfsConfig struct {
Zpool string `toml:"zpool"` Zpool string `toml:"zpool"`
} }
type btrfsSnapshotConfig struct {
Enable bool `toml:"enable"`
SnapshotPath string `toml:"snapshot_path"`
}
type includeConfig struct { type includeConfig struct {
IncludeMirrors string `toml:"include_mirrors"` IncludeMirrors string `toml:"include_mirrors"`
} }
@@ -108,6 +115,7 @@ type mirrorConfig struct {
Provider providerEnum `toml:"provider"` Provider providerEnum `toml:"provider"`
Upstream string `toml:"upstream"` Upstream string `toml:"upstream"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"`
MirrorDir string `toml:"mirror_dir"` MirrorDir string `toml:"mirror_dir"`
LogDir string `toml:"log_dir"` LogDir string `toml:"log_dir"`
Env map[string]string `toml:"env"` Env map[string]string `toml:"env"`
@@ -123,6 +131,7 @@ type mirrorConfig struct {
Command string `toml:"command"` Command string `toml:"command"`
UseIPv6 bool `toml:"use_ipv6"` UseIPv6 bool `toml:"use_ipv6"`
UseIPv4 bool `toml:"use_ipv4"`
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
@@ -133,6 +142,8 @@ type mirrorConfig struct {
DockerImage string `toml:"docker_image"` DockerImage string `toml:"docker_image"`
DockerVolumes []string `toml:"docker_volumes"` DockerVolumes []string `toml:"docker_volumes"`
DockerOptions []string `toml:"docker_options"` DockerOptions []string `toml:"docker_options"`
SnapshotPath string `toml:"snapshot_path"`
} }
// LoadConfig loads configuration // LoadConfig loads configuration

查看文件

@@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors" mirror_dir = "/data/mirrors"
concurrent = 10 concurrent = 10
interval = 240 interval = 240
retry = 3
[manager] [manager]
api_base = "https://127.0.0.1:5000" api_base = "https://127.0.0.1:5000"
@@ -35,6 +36,7 @@ name = "AOSP"
provider = "command" provider = "command"
upstream = "https://aosp.google.com/" upstream = "https://aosp.google.com/"
interval = 720 interval = 720
retry = 2
mirror_dir = "/data/git/AOSP" mirror_dir = "/data/git/AOSP"
exec_on_success = [ exec_on_success = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -116,6 +118,7 @@ use_ipv6 = true
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240) So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@@ -126,6 +129,7 @@ use_ipv6 = true
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP") So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
So(m.Provider, ShouldEqual, provCommand) So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720) So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1] m = cfg.Mirrors[1]

查看文件

@@ -7,10 +7,9 @@ import (
type dockerHook struct { type dockerHook struct {
emptyHook emptyHook
provider mirrorProvider image string
image string volumes []string
volumes []string options []string
options []string
} }
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook { func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
@@ -23,15 +22,18 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
options = append(options, mCfg.DockerOptions...) options = append(options, mCfg.DockerOptions...)
return &dockerHook{ return &dockerHook{
provider: p, emptyHook: emptyHook{
image: mCfg.DockerImage, provider: p,
volumes: volumes, },
options: options, image: mCfg.DockerImage,
volumes: volumes,
options: options,
} }
} }
func (d *dockerHook) preExec() error { func (d *dockerHook) preExec() error {
p := d.provider p := d.provider
logDir := p.LogDir()
logFile := p.LogFile() logFile := p.LogFile()
workingDir := p.WorkingDir() workingDir := p.WorkingDir()
@@ -42,17 +44,13 @@ func (d *dockerHook) preExec() error {
} }
} }
logFileNew := "/log_latest"
workingDirNew := "/data"
// Override workingDir // Override workingDir
ctx := p.EnterContext() ctx := p.EnterContext()
ctx.Set(_WorkingDirKey, workingDirNew)
ctx.Set(_LogFileKey+":docker", logFileNew)
ctx.Set( ctx.Set(
"volumes", []string{ "volumes", []string{
fmt.Sprintf("%s:%s", logFile, logFileNew), fmt.Sprintf("%s:%s", logDir, logDir),
fmt.Sprintf("%s:%s", workingDir, workingDirNew), fmt.Sprintf("%s:%s", logFile, logFile),
fmt.Sprintf("%s:%s", workingDir, workingDir),
}, },
) )
return nil return nil

查看文件

@@ -55,8 +55,10 @@ sleep 10
So(err, ShouldBeNil) So(err, ShouldBeNil)
d := &dockerHook{ d := &dockerHook{
provider: provider, emptyHook: emptyHook{
image: "alpine", provider: provider,
},
image: "alpine",
volumes: []string{ volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"), fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
}, },

查看文件

@@ -18,7 +18,6 @@ const (
type execPostHook struct { type execPostHook struct {
emptyHook emptyHook
provider mirrorProvider
// exec on success or on failure // exec on success or on failure
execOn uint8 execOn uint8
@@ -37,9 +36,11 @@ func newExecPostHook(provider mirrorProvider, execOn uint8, command string) (*ex
} }
return &execPostHook{ return &execPostHook{
provider: provider, emptyHook: emptyHook{
execOn: execOn, provider: provider,
command: cmd, },
execOn: execOn,
command: cmd,
}, nil }, nil
} }
@@ -71,6 +72,7 @@ func (h *execPostHook) Do() error {
"TUNASYNC_MIRROR_NAME": p.Name(), "TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(), "TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.Upstream(), "TUNASYNC_UPSTREAM_URL": p.Upstream(),
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(), "TUNASYNC_LOG_FILE": p.LogFile(),
"TUNASYNC_JOB_EXIT_STATUS": exitStatus, "TUNASYNC_JOB_EXIT_STATUS": exitStatus,
} }

查看文件

@@ -92,7 +92,7 @@ exit 1
job.ctrlChan <- jobStart job.ctrlChan <- jobStart
msg := <-managerChan msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing) So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < maxRetry; i++ { for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan msg = <-managerChan
So(msg.status, ShouldEqual, Syncing) So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan msg = <-managerChan

查看文件

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
tunasync "github.com/tuna/tunasync/internal" tunasync "github.com/tuna/tunasync/internal"
) )
@@ -14,12 +15,13 @@ import (
type ctrlAction uint8 type ctrlAction uint8
const ( const (
jobStart ctrlAction = iota jobStart ctrlAction = iota
jobStop // stop syncing keep the job jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine) jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing jobRestart // restart syncing
jobPing // ensure the goroutine is alive jobPing // ensure the goroutine is alive
jobHalt // worker halts jobHalt // worker halts
jobForceStart // ignore concurrent limit
) )
type jobMessage struct { type jobMessage struct {
@@ -51,6 +53,7 @@ type mirrorJob struct {
ctrlChan chan ctrlAction ctrlChan chan ctrlAction
disabled chan empty disabled chan empty
state uint32 state uint32
size string
} }
func newMirrorJob(provider mirrorProvider) *mirrorJob { func newMirrorJob(provider mirrorProvider) *mirrorJob {
@@ -110,7 +113,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
managerChan <- jobMessage{ managerChan <- jobMessage{
tunasync.Failed, m.Name(), tunasync.Failed, m.Name(),
fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()), fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
false, true,
} }
return err return err
} }
@@ -136,7 +139,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return err return err
} }
for retry := 0; retry < maxRetry; retry++ { for retry := 0; retry < provider.Retry(); retry++ {
stopASAP := false // stop job as soon as possible stopASAP := false // stop job as soon as possible
if retry > 0 { if retry > 0 {
@@ -154,9 +157,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncDone := make(chan error, 1) syncDone := make(chan error, 1)
go func() { go func() {
err := provider.Run() err := provider.Run()
if !stopASAP { syncDone <- err
syncDone <- err
}
}() }()
select { select {
@@ -182,26 +183,33 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
if syncErr == nil { if syncErr == nil {
// syncing success // syncing success
logger.Noticef("succeeded syncing %s", m.Name()) logger.Noticef("succeeded syncing %s", m.Name())
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
// post-success hooks // post-success hooks
logger.Debug("post-success hooks")
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
if err != nil { if err != nil {
return err return err
} }
return nil } else {
// syncing failed
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
// post-fail hooks
logger.Debug("post-fail hooks")
err := runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
}
if syncErr == nil {
// syncing success
m.size = provider.DataSize()
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
return nil
} }
// syncing failed // syncing failed
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error()) managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
// post-fail hooks
logger.Debug("post-fail hooks")
err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
// gracefully exit // gracefully exit
if stopASAP { if stopASAP {
logger.Debug("No retry, exit directly") logger.Debug("No retry, exit directly")
@@ -212,22 +220,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return nil return nil
} }
runJob := func(kill <-chan empty, jobDone chan<- empty) { runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
select { select {
case semaphore <- empty{}: case semaphore <- empty{}:
defer func() { <-semaphore }() defer func() { <-semaphore }()
runJobWrapper(kill, jobDone) runJobWrapper(kill, jobDone)
case <-bypassSemaphore:
logger.Noticef("Concurrent limit ignored by %s", m.Name())
runJobWrapper(kill, jobDone)
case <-kill: case <-kill:
jobDone <- empty{} jobDone <- empty{}
return return
} }
} }
bypassSemaphore := make(chan empty, 1)
for { for {
if m.State() == stateReady { if m.State() == stateReady {
kill := make(chan empty) kill := make(chan empty)
jobDone := make(chan empty) jobDone := make(chan empty)
go runJob(kill, jobDone) go runJob(kill, jobDone, bypassSemaphore)
_wait_for_job: _wait_for_job:
select { select {
@@ -248,7 +260,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
m.SetState(stateReady) m.SetState(stateReady)
close(kill) close(kill)
<-jobDone <-jobDone
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
continue continue
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobStart: case jobStart:
m.SetState(stateReady) m.SetState(stateReady)
goto _wait_for_job goto _wait_for_job
@@ -272,8 +291,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
case jobDisable: case jobDisable:
m.SetState(stateDisabled) m.SetState(stateDisabled)
return nil return nil
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobRestart: case jobRestart:
m.SetState(stateReady) fallthrough
case jobStart: case jobStart:
m.SetState(stateReady) m.SetState(stateReady)
default: default:

查看文件

@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
msg = <-managerChan msg = <-managerChan
So(msg.status, ShouldEqual, Syncing) So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
job.ctrlChan <- jobStop job.ctrlChan <- jobStop
msg = <-managerChan msg = <-managerChan
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
job.ctrlChan <- jobDisable job.ctrlChan <- jobDisable
<-job.disabled <-job.disabled
}) })
Convey("If we restart it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobRestart
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("If we disable it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobDisable
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
<-job.disabled
})
Convey("If we stop it twice, than start it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStop
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
job.ctrlChan <- jobStop // should be ignored
job.ctrlChan <- jobStart
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
}) })
}) })
} }
func TestConcurrentMirrorJobs(t *testing.T) {
InitLogger(true, true, false)
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
const CONCURRENT = 5
var providers [CONCURRENT]*cmdProvider
var jobs [CONCURRENT]*mirrorJob
for i := 0; i < CONCURRENT; i++ {
c := cmdConfig{
name: fmt.Sprintf("job-%d", i),
upstreamURL: "http://mirrors.tuna.moe/",
command: "sleep 2",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 10 * time.Second,
}
var err error
providers[i], err = newCmdProvider(c)
So(err, ShouldBeNil)
jobs[i] = newMirrorJob(providers[i])
}
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, CONCURRENT-2)
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
counterEnded := 0
counterRunning := 0
peakConcurrent = 0
counterFailed = 0
for counterEnded < totalJobs {
msg := <-managerChan
switch msg.status {
case PreSyncing:
counterRunning++
case Syncing:
case Failed:
counterFailed++
fallthrough
case Success:
counterEnded++
counterRunning--
default:
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
if counterRunning > peakConcurrent {
peakConcurrent = counterRunning
}
}
// select {
// case msg := <-managerChan:
// logger.Errorf("extra message received: %v", msg)
// So(0, ShouldEqual, 1)
// case <-time.After(2 * time.Second):
// }
return
}
Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we cancel one job", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobRestart
time.Sleep(200 * time.Millisecond)
}
// Cancel the one waiting for semaphore
jobs[len(jobs)-1].ctrlChan <- jobStop
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we override the concurrent limit", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(200 * time.Millisecond)
}
jobs[len(jobs)-1].ctrlChan <- jobForceStart
jobs[len(jobs)-2].ctrlChan <- jobForceStart
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
So(peakConcurrent, ShouldEqual, CONCURRENT)
So(counterFailed, ShouldEqual, 0)
time.Sleep(1 * time.Second)
// fmt.Println("Restart them")
for _, job := range jobs {
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
})
}

查看文件

@@ -14,12 +14,13 @@ import (
type logLimiter struct { type logLimiter struct {
emptyHook emptyHook
provider mirrorProvider
} }
func newLogLimiter(provider mirrorProvider) *logLimiter { func newLogLimiter(provider mirrorProvider) *logLimiter {
return &logLimiter{ return &logLimiter{
provider: provider, emptyHook: emptyHook{
provider: provider,
},
} }
} }

查看文件

@@ -45,11 +45,13 @@ type mirrorProvider interface {
Hooks() []jobHook Hooks() []jobHook
Interval() time.Duration Interval() time.Duration
Retry() int
WorkingDir() string WorkingDir() string
LogDir() string LogDir() string
LogFile() string LogFile() string
IsMaster() bool IsMaster() bool
DataSize() string
// enter context // enter context
EnterContext() *Context EnterContext() *Context
@@ -86,6 +88,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
if mirror.Interval == 0 { if mirror.Interval == 0 {
mirror.Interval = cfg.Global.Interval mirror.Interval = cfg.Global.Interval
} }
if mirror.Retry == 0 {
mirror.Retry = cfg.Global.Retry
}
logDir = formatLogDir(logDir, mirror) logDir = formatLogDir(logDir, mirror)
// IsMaster // IsMaster
@@ -110,13 +115,14 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
env: mirror.Env, env: mirror.Env,
} }
p, err := newCmdProvider(pc) p, err := newCmdProvider(pc)
p.isMaster = isMaster
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.isMaster = isMaster
provider = p provider = p
case provRsync: case provRsync:
rc := rsyncConfig{ rc := rsyncConfig{
@@ -130,13 +136,15 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
} }
p, err := newRsyncProvider(rc) p, err := newRsyncProvider(rc)
p.isMaster = isMaster
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.isMaster = isMaster
provider = p provider = p
case provTwoStageRsync: case provTwoStageRsync:
rc := twoStageRsyncConfig{ rc := twoStageRsyncConfig{
@@ -152,12 +160,13 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
} }
p, err := newTwoStageRsyncProvider(rc) p, err := newTwoStageRsyncProvider(rc)
p.isMaster = isMaster
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.isMaster = isMaster
provider = p provider = p
default: default:
panic(errors.New("Invalid mirror provider")) panic(errors.New("Invalid mirror provider"))
@@ -171,6 +180,11 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool)) provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
} }
// Add Btrfs Snapshot Hook
if cfg.BtrfsSnapshot.Enable {
provider.AddHook(newBtrfsSnapshotHook(provider, cfg.BtrfsSnapshot.SnapshotPath, mirror))
}
// Add Docker Hook // Add Docker Hook
if cfg.Docker.Enable && len(mirror.DockerImage) > 0 { if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
provider.AddHook(newDockerHook(provider, cfg.Docker, mirror)) provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))

查看文件

@@ -73,17 +73,20 @@ func TestRsyncProvider(t *testing.T) {
echo "syncing to $(pwd)" echo "syncing to $(pwd)"
echo $RSYNC_PASSWORD $@ echo $RSYNC_PASSWORD $@
sleep 1 sleep 1
echo "Total file size: 1.33T bytes"
echo "Done" echo "Done"
exit 0 exit 0
` `
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Total file size: 1.33T bytes\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@@ -98,6 +101,7 @@ exit 0
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput) So(string(loggedContent), ShouldEqual, expectedOutput)
// fmt.Println(string(loggedContent)) // fmt.Println(string(loggedContent))
So(provider.DataSize(), ShouldEqual, "1.33T")
}) })
}) })
@@ -144,11 +148,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
@@ -260,6 +265,40 @@ sleep 5
}) })
}) })
Convey("Command Provider without log file should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
c := cmdConfig{
name: "run-ls",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 600 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
So(provider.IsMaster(), ShouldEqual, false)
So(provider.ZFS(), ShouldBeNil)
So(provider.Type(), ShouldEqual, provCommand)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
Convey("Run the command", func() {
err = provider.Run()
So(err, ShouldBeNil)
})
})
} }
func TestTwoStageRsyncProvider(t *testing.T) { func TestTwoStageRsyncProvider(t *testing.T) {
@@ -280,6 +319,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
excludeFile: tmpFile, excludeFile: tmpFile,
username: "hello",
password: "world",
} }
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
@@ -306,6 +347,7 @@ exit 0
err = provider.Run() err = provider.Run()
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
@@ -313,14 +355,14 @@ exit 0
"syncing to %s\n"+ "syncing to %s\n"+
"%s\n"+ "%s\n"+
"Done\n", "Done\n",
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--timeout=120 --contimeout=120 --exclude dists/ -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
provider.WorkingDir(), targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+

查看文件

@@ -2,8 +2,11 @@ package worker
import ( import (
"errors" "errors"
"io/ioutil"
"strings" "strings"
"time" "time"
"github.com/tuna/tunasync/internal"
) )
type rsyncConfig struct { type rsyncConfig struct {
@@ -11,15 +14,17 @@ type rsyncConfig struct {
rsyncCmd string rsyncCmd string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
retry int
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
type rsyncProvider struct { type rsyncProvider struct {
baseProvider baseProvider
rsyncConfig rsyncConfig
options []string options []string
dataSize string
} }
func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) { func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
@@ -27,11 +32,15 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if !strings.HasSuffix(c.upstreamURL, "/") { if !strings.HasSuffix(c.upstreamURL, "/") {
return nil, errors.New("rsync upstream URL should ends with /") return nil, errors.New("rsync upstream URL should ends with /")
} }
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &rsyncProvider{ provider := &rsyncProvider{
baseProvider: baseProvider{ baseProvider: baseProvider{
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
rsyncConfig: c, rsyncConfig: c,
} }
@@ -49,6 +58,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if c.useIPv6 { if c.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} else if c.useIPv4 {
options = append(options, "-4")
} }
if c.excludeFile != "" { if c.excludeFile != "" {
@@ -71,14 +82,31 @@ func (p *rsyncProvider) Upstream() string {
return p.upstreamURL return p.upstreamURL
} }
func (p *rsyncProvider) DataSize() string {
return p.dataSize
}
func (p *rsyncProvider) Run() error { func (p *rsyncProvider) Run() error {
p.dataSize = ""
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
return p.Wait() if err := p.Wait(); err != nil {
return err
}
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil
} }
func (p *rsyncProvider) Start() error { func (p *rsyncProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@@ -92,7 +120,7 @@ func (p *rsyncProvider) Start() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

查看文件

@@ -41,13 +41,17 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
"--name", d.Name(), "--name", d.Name(),
"-w", workingDir, "-w", workingDir,
} }
// specify user
args = append(
args, "-u",
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
)
// add volumes // add volumes
for _, vol := range d.Volumes() { for _, vol := range d.Volumes() {
logger.Debugf("volume: %s", vol) logger.Debugf("volume: %s", vol)
args = append(args, "-v", vol) args = append(args, "-v", vol)
} }
// set env // set env
env["TUNASYNC_LOG_FILE"] = d.LogFile()
for k, v := range env { for k, v := range env {
kv := fmt.Sprintf("%s=%s", k, v) kv := fmt.Sprintf("%s=%s", k, v)
args = append(args, "-e", kv) args = append(args, "-e", kv)
@@ -114,6 +118,9 @@ func (c *cmdJob) Wait() error {
return c.retErr return c.retErr
default: default:
err := c.cmd.Wait() err := c.cmd.Wait()
if c.cmd.Stdout != nil {
c.cmd.Stdout.(*os.File).Close()
}
c.retErr = err c.retErr = err
close(c.finished) close(c.finished)
return err return err

查看文件

@@ -15,6 +15,11 @@ type scheduleQueue struct {
jobs map[string]bool jobs map[string]bool
} }
type jobScheduleInfo struct {
jobName string
nextScheduled time.Time
}
func timeLessThan(l, r interface{}) bool { func timeLessThan(l, r interface{}) bool {
tl := l.(time.Time) tl := l.(time.Time)
tr := r.(time.Time) tr := r.(time.Time)
@@ -28,6 +33,20 @@ func newScheduleQueue() *scheduleQueue {
return queue return queue
} }
func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) {
cur := q.list.Iterator()
defer cur.Close()
for cur.Next() {
cj := cur.Value().(*mirrorJob)
jobs = append(jobs, jobScheduleInfo{
cj.Name(),
cur.Key().(time.Time),
})
}
return
}
func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()

查看文件

@@ -3,8 +3,11 @@ package worker
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"time" "time"
"github.com/tuna/tunasync/internal"
) )
type twoStageRsyncConfig struct { type twoStageRsyncConfig struct {
@@ -15,6 +18,7 @@ type twoStageRsyncConfig struct {
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
interval time.Duration interval time.Duration
retry int
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -23,6 +27,7 @@ type twoStageRsyncProvider struct {
twoStageRsyncConfig twoStageRsyncConfig
stage1Options []string stage1Options []string
stage2Options []string stage2Options []string
dataSize string
} }
var rsyncStage1Profiles = map[string]([]string){ var rsyncStage1Profiles = map[string]([]string){
@@ -38,12 +43,16 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
if !strings.HasSuffix(c.upstreamURL, "/") { if !strings.HasSuffix(c.upstreamURL, "/") {
return nil, errors.New("rsync upstream URL should ends with /") return nil, errors.New("rsync upstream URL should ends with /")
} }
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &twoStageRsyncProvider{ provider := &twoStageRsyncProvider{
baseProvider: baseProvider{ baseProvider: baseProvider{
name: c.name, name: c.name,
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry,
}, },
twoStageRsyncConfig: c, twoStageRsyncConfig: c,
stage1Options: []string{ stage1Options: []string{
@@ -78,6 +87,10 @@ func (p *twoStageRsyncProvider) Upstream() string {
return p.upstreamURL return p.upstreamURL
} }
func (p *twoStageRsyncProvider) DataSize() string {
return p.dataSize
}
func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
var options []string var options []string
if stage == 1 { if stage == 1 {
@@ -108,7 +121,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
} }
func (p *twoStageRsyncProvider) Run() error { func (p *twoStageRsyncProvider) Run() error {
defer p.Wait() p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{} env := map[string]string{}
if p.username != "" { if p.username != "" {
@@ -118,6 +136,7 @@ func (p *twoStageRsyncProvider) Run() error {
env["RSYNC_PASSWORD"] = p.password env["RSYNC_PASSWORD"] = p.password
} }
p.dataSize = ""
stages := []int{1, 2} stages := []int{1, 2}
for _, stage := range stages { for _, stage := range stages {
command := []string{p.rsyncCmd} command := []string{p.rsyncCmd}
@@ -129,7 +148,7 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil { if err := p.prepareLogFile(stage > 1); err != nil {
return err return err
} }
@@ -137,12 +156,17 @@ func (p *twoStageRsyncProvider) Run() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
err = p.cmd.Wait() p.Unlock()
p.isRunning.Store(false) err = p.Wait()
p.Lock()
if err != nil { if err != nil {
return err return err
} }
} }
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil return nil
} }

查看文件

@@ -12,8 +12,6 @@ import (
. "github.com/tuna/tunasync/internal" . "github.com/tuna/tunasync/internal"
) )
var tunasyncWorker *Worker
// A Worker is a instance of tunasync worker // A Worker is a instance of tunasync worker
type Worker struct { type Worker struct {
L sync.Mutex L sync.Mutex
@@ -29,10 +27,11 @@ type Worker struct {
httpClient *http.Client httpClient *http.Client
} }
// GetTUNASyncWorker returns a singalton worker // NewTUNASyncWorker creates a worker
func GetTUNASyncWorker(cfg *Config) *Worker { func NewTUNASyncWorker(cfg *Config) *Worker {
if tunasyncWorker != nil {
return tunasyncWorker if cfg.Global.Retry == 0 {
cfg.Global.Retry = defaultMaxRetry
} }
w := &Worker{ w := &Worker{
@@ -57,7 +56,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
w.initJobs() w.initJobs()
w.makeHTTPServer() w.makeHTTPServer()
tunasyncWorker = w
return w return w
} }
@@ -219,7 +217,11 @@ func (w *Worker) makeHTTPServer() {
} }
switch cmd.Cmd { switch cmd.Cmd {
case CmdStart: case CmdStart:
job.ctrlChan <- jobStart if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart: case CmdRestart:
job.ctrlChan <- jobRestart job.ctrlChan <- jobRestart
case CmdStop: case CmdStop:
@@ -306,6 +308,9 @@ func (w *Worker) runSchedule() {
w.L.Unlock() w.L.Unlock()
schedInfo := w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
tick := time.Tick(5 * time.Second) tick := time.Tick(5 * time.Second)
for { for {
select { select {
@@ -342,6 +347,9 @@ func (w *Worker) runSchedule() {
w.schedule.AddJob(schedTime, job) w.schedule.AddJob(schedTime, job)
} }
schedInfo = w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
case <-tick: case <-tick:
// check schedule every 5 seconds // check schedule every 5 seconds
if job := w.schedule.Pop(); job != nil { if job := w.schedule.Pop(); job != nil {
@@ -412,6 +420,12 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
ErrorMsg: jobMsg.msg, ErrorMsg: jobMsg.msg,
} }
// Certain Providers (rsync for example) may know the size of mirror,
// so we report it to Manager here
if len(job.size) != 0 {
smsg.Size = job.size
}
for _, root := range w.cfg.Manager.APIBaseList() { for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf( url := fmt.Sprintf(
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name, "%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
@@ -423,6 +437,27 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
} }
} }
func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) {
var s []MirrorSchedule
for _, sched := range schedInfo {
s = append(s, MirrorSchedule{
MirrorName: sched.jobName,
NextSchedule: sched.nextScheduled,
})
}
msg := MirrorSchedules{Schedules: s}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/schedules", root, w.Name(),
)
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to upload schedules: %s", err.Error())
}
}
}
func (w *Worker) fetchJobStatus() []MirrorStatus { func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus var mirrorList []MirrorStatus
apiBase := w.cfg.Manager.APIBaseList()[0] apiBase := w.cfg.Manager.APIBaseList()[0]

253
worker/worker_test.go 普通文件
查看文件

@@ -0,0 +1,253 @@
package worker
import (
"net/http"
"strconv"
"testing"
"time"
"github.com/gin-gonic/gin"
. "github.com/smartystreets/goconvey/convey"
. "github.com/tuna/tunasync/internal"
)
type workTestFunc func(*Worker)
var managerPort = 5001
var workerPort = 5002
func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
})
r.POST("/workers", func(c *gin.Context) {
var _worker WorkerStatus
c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
recvData <- _worker
c.JSON(http.StatusOK, _worker)
})
r.POST("/workers/dut/schedules", func(c *gin.Context) {
var _sch MirrorSchedules
c.BindJSON(&_sch)
recvData <- _sch
c.JSON(http.StatusOK, empty{})
})
r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
var status MirrorStatus
c.BindJSON(&status)
recvData <- status
c.JSON(http.StatusOK, status)
})
r.GET("/workers/dut/jobs", func(c *gin.Context) {
mirrorStatusList := []MirrorStatus{}
c.JSON(http.StatusOK, mirrorStatusList)
})
return r
}
func startWorkerThenStop(cfg *Config, tester workTestFunc) {
exitedChan := make(chan int)
w := NewTUNASyncWorker(cfg)
So(w, ShouldNotBeNil)
go func() {
w.Run()
exitedChan <- 1
}()
tester(w)
w.Halt()
select {
case exited := <-exitedChan:
So(exited, ShouldEqual, 1)
case <-time.After(2 * time.Second):
So(0, ShouldEqual, 1)
}
}
func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
workerCmd := WorkerCmd{
Cmd: cmd,
MirrorID: mirror,
}
logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
_, err := PostJSON(workerURL, workerCmd, httpClient)
So(err, ShouldBeNil)
}
func TestWorker(t *testing.T) {
InitLogger(false, true, false)
recvDataChan := make(chan interface{})
_s := makeMockManagerServer(recvDataChan)
httpServer := &http.Server{
Addr: "localhost:" + strconv.Itoa(managerPort),
Handler: _s,
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
}
go func() {
err := httpServer.ListenAndServe()
So(err, ShouldBeNil)
}()
Convey("Worker should work", t, func(ctx C) {
httpClient, err := CreateHTTPClient("")
So(err, ShouldBeNil)
workerPort++
workerCfg := Config{
Global: globalConfig{
Name: "dut",
LogDir: "/tmp",
MirrorDir: "/tmp",
Concurrent: 2,
Interval: 1,
},
Server: serverConfig{
Hostname: "localhost",
Addr: "127.0.0.1",
Port: workerPort,
},
Manager: managerConfig{
APIBase: "http://localhost:" + strconv.Itoa(managerPort),
},
}
logger.Debugf("worker port %d", workerPort)
Convey("with no job", func(ctx C) {
dummyTester := func(*Worker) {
registered := false
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
registered = true
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
} else if sch, ok := data.(MirrorSchedules); ok {
So(len(sch.Schedules), ShouldEqual, 0)
}
case <-time.After(1 * time.Second):
So(registered, ShouldBeTrue)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
Convey("with one job", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{
Name: "job-ls",
Provider: provCommand,
Command: "ls",
},
}
dummyTester := func(*Worker) {
url := ""
jobRunning := false
lastStatus := SyncStatus(None)
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
url = reg.URL
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
} else if sch, ok := data.(MirrorSchedules); ok {
if !jobRunning {
So(len(sch.Schedules), ShouldEqual, 1)
So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
So(sch.Schedules[0].NextSchedule,
ShouldHappenBetween,
time.Now().Add(-2*time.Second),
time.Now().Add(1*time.Minute))
}
} else if status, ok := data.(MirrorStatus); ok {
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
jobRunning = status.Status == PreSyncing || status.Status == Syncing
So(status.Status, ShouldNotEqual, Failed)
lastStatus = status.Status
}
case <-time.After(1 * time.Second):
So(url, ShouldNotEqual, "")
So(jobRunning, ShouldBeFalse)
So(lastStatus, ShouldEqual, Success)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
Convey("with several jobs", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{
Name: "job-ls-1",
Provider: provCommand,
Command: "ls",
},
mirrorConfig{
Name: "job-fail",
Provider: provCommand,
Command: "non-existent-command-xxxx",
},
mirrorConfig{
Name: "job-ls-2",
Provider: provCommand,
Command: "ls",
},
}
dummyTester := func(*Worker) {
url := ""
lastStatus := make(map[string]SyncStatus)
nextSch := make(map[string]time.Time)
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
url = reg.URL
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
} else if sch, ok := data.(MirrorSchedules); ok {
//So(len(sch.Schedules), ShouldEqual, 3)
for _, item := range sch.Schedules {
nextSch[item.MirrorName] = item.NextSchedule
}
} else if status, ok := data.(MirrorStatus); ok {
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
jobRunning := status.Status == PreSyncing || status.Status == Syncing
if !jobRunning {
if status.Name == "job-fail" {
So(status.Status, ShouldEqual, Failed)
} else {
So(status.Status, ShouldNotEqual, Failed)
}
}
lastStatus[status.Name] = status.Status
}
case <-time.After(1 * time.Second):
So(len(lastStatus), ShouldEqual, 3)
So(len(nextSch), ShouldEqual, 3)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
})
}

查看文件

@@ -3,6 +3,7 @@ package worker
import ( import (
"fmt" "fmt"
"os" "os"
"os/user"
"strings" "strings"
"github.com/codeskyblue/go-sh" "github.com/codeskyblue/go-sh"
@@ -10,36 +11,44 @@ import (
type zfsHook struct { type zfsHook struct {
emptyHook emptyHook
provider mirrorProvider zpool string
zpool string
} }
func newZfsHook(provider mirrorProvider, zpool string) *zfsHook { func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
return &zfsHook{ return &zfsHook{
provider: provider, emptyHook: emptyHook{
zpool: zpool, provider: provider,
},
zpool: zpool,
} }
} }
// create zfs dataset for a new mirror func (z *zfsHook) printHelpMessage() {
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
zfsDataset = strings.ToLower(zfsDataset)
workingDir := z.provider.WorkingDir()
logger.Infof("You may create the ZFS dataset with:")
logger.Infof(" zfs create '%s'", zfsDataset)
logger.Infof(" zfs set mountpoint='%s' '%s'", workingDir, zfsDataset)
usr, err := user.Current()
if err != nil || usr.Uid == "0" {
return
}
logger.Infof(" chown %s '%s'", usr.Uid, workingDir)
}
// check if working directory is a zfs dataset
func (z *zfsHook) preJob() error { func (z *zfsHook) preJob() error {
workingDir := z.provider.WorkingDir() workingDir := z.provider.WorkingDir()
if _, err := os.Stat(workingDir); os.IsNotExist(err) { if _, err := os.Stat(workingDir); os.IsNotExist(err) {
// sudo zfs create $zfsDataset logger.Errorf("Directory %s doesn't exist", workingDir)
// sudo zfs set mountpoint=${absPath} ${zfsDataset} z.printHelpMessage()
return err
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name()) }
// Unknown issue of ZFS: if err := sh.Command("mountpoint", "-q", workingDir).Run(); err != nil {
// dataset name should not contain upper case letters logger.Errorf("%s is not a mount point", workingDir)
zfsDataset = strings.ToLower(zfsDataset) z.printHelpMessage()
logger.Infof("Creating ZFS dataset %s", zfsDataset) return err
if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
return err
}
logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
return err
}
} }
return nil return nil
} }

48
worker/zfs_hook_test.go 普通文件
查看文件

@@ -0,0 +1,48 @@
package worker
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestZFSHook(t *testing.T) {
Convey("ZFS Hook should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
tmpFile := filepath.Join(tmpDir, "log_file")
c := cmdConfig{
name: "tuna_zfs_hook_test",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 1 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
Convey("When working directory doesn't exist", func(ctx C) {
errRm := os.RemoveAll(tmpDir)
So(errRm, ShouldBeNil)
hook := newZfsHook(provider, "test_pool")
err := hook.preJob()
So(err, ShouldNotBeNil)
})
Convey("When working directory is not a mount point", func(ctx C) {
defer os.RemoveAll(tmpDir)
hook := newZfsHook(provider, "test_pool")
err := hook.preJob()
So(err, ShouldNotBeNil)
})
})
}