镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-06 14:36:47 +00:00
比较提交
72 次代码提交
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
9e91fd706e | ||
|
|
94cf0b4bdb | ||
|
|
3872c41607 | ||
|
|
30259da0f0 | ||
|
|
4854d9b981 | ||
|
|
06fce98c00 | ||
|
|
8408236646 | ||
|
|
540eea8aeb | ||
|
|
a6fc97889d | ||
|
|
5f7d974469 | ||
|
|
3b52f93e7e | ||
|
|
1025189542 | ||
|
|
9f91d90fc5 | ||
|
|
1aa4ae9cc1 | ||
|
|
d0deeb19a9 | ||
|
|
a283328dc4 | ||
|
|
1890bbed3c | ||
|
|
ddc9efd155 | ||
|
|
7eb119b892 | ||
|
|
96f11f57ed | ||
|
|
3e6e6f9b14 | ||
|
|
b06cadfe06 | ||
|
|
9c34372ae4 | ||
|
|
ebbfff40f6 | ||
|
|
5eeade22fc | ||
|
|
4b3741308b | ||
|
|
7d495c1956 | ||
|
|
0bf8400077 | ||
|
|
c611759394 | ||
|
|
279aa32b68 | ||
|
|
025544449a | ||
|
|
90d419ca66 | ||
|
|
96cb975412 | ||
|
|
ff3e690497 | ||
|
|
a58e6d37ae | ||
|
|
7a4a8ad486 | ||
|
|
e1c0c25efa | ||
|
|
9ac374527a | ||
|
|
f03626d4e1 | ||
|
|
23bf4890cc | ||
|
|
2f6a61aee5 | ||
|
|
b6043142e1 | ||
|
|
6241576b12 | ||
|
|
ef78563b8c | ||
|
|
ca106f1360 | ||
|
|
628266ac5a | ||
|
|
7e601d9fff | ||
|
|
c750aa1871 | ||
|
|
6cbe91b4f1 | ||
|
|
89a792986d | ||
|
|
0fdb07d061 | ||
|
|
c5bb172f99 | ||
|
|
79e6167028 | ||
|
|
285ffb2f2f | ||
|
|
95bb4bbd5e | ||
|
|
6bca9d2cd5 | ||
|
|
4fe7d03e54 | ||
|
|
1fe9499728 | ||
|
|
a475b044c6 | ||
|
|
a50a360a91 | ||
|
|
d536aca2ac | ||
|
|
28545d61e7 | ||
|
|
a87fb0f8b4 | ||
|
|
095e7c6320 | ||
|
|
7b441312f4 | ||
|
|
563860d424 | ||
|
|
93194cde2e | ||
|
|
aa4c31a32b | ||
|
|
4c6a407c17 | ||
|
|
939abaef9b | ||
|
|
d5a438462f | ||
|
|
d4e07a7b29 |
@@ -2,7 +2,7 @@ sudo: required
|
||||
|
||||
language: go
|
||||
go:
|
||||
- 1.6
|
||||
- 1.11
|
||||
|
||||
before_install:
|
||||
- sudo apt-get install cgroup-bin
|
||||
@@ -35,9 +35,10 @@ deploy:
|
||||
file:
|
||||
- "build/tunasync-linux-bin.tar.gz"
|
||||
api_key:
|
||||
secure: "F9kaVaR1mxEh2+EL9Nm8GZmbVY98pXCJA0LGDNrq1C2vU61AUNOeX6yI1mMklHNZPLBqoFDvGN1M5HnJ+xWCFH+KnJgLD2GVIAcAxFNpcNWQe8XKE5heklNsIQNQfuh/rJKM6YzeDB9G5RN4Y76iL4WIAXhNnMm48W6jLnWhf70="
|
||||
secure: ZOYL/CALrVJsZzbZqUMSI89Gw4zsBJH1StD/2yTyG45GfKgvtK4hG0S5cQM/L0wcikjEkgxSMsmr4ycq+OwbN++gc0umfoAQ/VSjzetiobAlT1E854aRKRjT82WxYdnPW2fsFjuEJTcyZmcbgJGTMi86MDt7w8tEjLomhd1+rUo=
|
||||
skip_cleanup: true
|
||||
overwrite: true
|
||||
on:
|
||||
tags: true
|
||||
all_branches: true
|
||||
repo: tuna/tunasync
|
||||
|
||||
75
README.md
75
README.md
@@ -19,7 +19,7 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
|
||||
```
|
||||
# Architecture
|
||||
|
||||
- Manager: Centural instance on status and job management
|
||||
- Manager: Central instance for status and job management
|
||||
- Worker: Runs mirror jobs
|
||||
|
||||
+------------+ +---+ +---+
|
||||
@@ -40,73 +40,16 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
|
||||
# Job Run Process
|
||||
|
||||
|
||||
PreSyncing Syncing Success
|
||||
+-----------+ +-----------+ +-------------+ +--------------+
|
||||
| pre-job +--+->| job run +--->| post-exec +-+-->| post-success |
|
||||
+-----------+ ^ +-----------+ +-------------+ | +--------------+
|
||||
| |
|
||||
| +-----------------+ | Failed
|
||||
+------+ post-fail |<---------+
|
||||
+-----------------+
|
||||
PreSyncing Syncing Success
|
||||
+-----------+ +----------+ +-----------+ +-------------+ +--------------+
|
||||
| pre-job +--+->| pre-exec +--->| job run +--->| post-exec +-+-->| post-success |
|
||||
+-----------+ ^ +----------+ +-----------+ +-------------+ | +--------------+
|
||||
| |
|
||||
| +-----------------+ | Failed
|
||||
+----------------+ post-fail |<---------------+
|
||||
+-----------------+
|
||||
```
|
||||
|
||||
## Generate Self-Signed Certificate
|
||||
|
||||
Fisrt, create root CA
|
||||
|
||||
```
|
||||
openssl genrsa -out rootCA.key 2048
|
||||
openssl req -x509 -new -nodes -key rootCA.key -days 365 -out rootCA.crt
|
||||
```
|
||||
|
||||
Create host key
|
||||
|
||||
```
|
||||
openssl genrsa -out host.key 2048
|
||||
```
|
||||
|
||||
Now create CSR, before that, write a `req.cnf`
|
||||
|
||||
```
|
||||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
req_extensions = v3_req
|
||||
|
||||
[req_distinguished_name]
|
||||
countryName = Country Name (2 letter code)
|
||||
countryName_default = CN
|
||||
stateOrProvinceName = State or Province Name (full name)
|
||||
stateOrProvinceName_default = BJ
|
||||
localityName = Locality Name (eg, city)
|
||||
localityName_default = Beijing
|
||||
organizationalUnitName = Organizational Unit Name (eg, section)
|
||||
organizationalUnitName_default = TUNA
|
||||
commonName = Common Name (server FQDN or domain name)
|
||||
commonName_default = <server_FQDN>
|
||||
commonName_max = 64
|
||||
|
||||
[v3_req]
|
||||
# Extensions to add to a certificate request
|
||||
basicConstraints = CA:FALSE
|
||||
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
|
||||
subjectAltName = @alt_names
|
||||
|
||||
[alt_names]
|
||||
DNS.1 = <server_FQDN_1>
|
||||
DNS.2 = <server_FQDN_2>
|
||||
```
|
||||
|
||||
Substitute `<server_FQDN>` with your server's FQDN, then run
|
||||
|
||||
```
|
||||
openssl req -new -key host.key -out host.csr -config req.cnf
|
||||
```
|
||||
|
||||
Finally generate and sign host cert with root CA
|
||||
|
||||
```
|
||||
openssl x509 -req -in host.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out host.crt -days 365 -extensions v3_req -extfile req.cnf
|
||||
```
|
||||
|
||||
## Building
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func startWorker(c *cli.Context) error {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
w := worker.GetTUNASyncWorker(cfg)
|
||||
w := worker.NewTUNASyncWorker(cfg)
|
||||
if w == nil {
|
||||
logger.Errorf("Error intializing TUNA sync worker.")
|
||||
os.Exit(1)
|
||||
|
||||
@@ -140,9 +140,9 @@ func listWorkers(c *cli.Context) error {
|
||||
}
|
||||
|
||||
func listJobs(c *cli.Context) error {
|
||||
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl
|
||||
var jobs []tunasync.MirrorStatus
|
||||
var genericJobs interface{}
|
||||
if c.Bool("all") {
|
||||
var jobs []tunasync.WebMirrorStatus
|
||||
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
@@ -150,8 +150,10 @@ func listJobs(c *cli.Context) error {
|
||||
"of all jobs from manager server: %s", err.Error()),
|
||||
1)
|
||||
}
|
||||
genericJobs = jobs
|
||||
|
||||
} else {
|
||||
var jobs []tunasync.MirrorStatus
|
||||
args := c.Args()
|
||||
if len(args) == 0 {
|
||||
return cli.NewExitError(
|
||||
@@ -174,9 +176,10 @@ func listJobs(c *cli.Context) error {
|
||||
for range args {
|
||||
jobs = append(jobs, <-ans...)
|
||||
}
|
||||
genericJobs = jobs
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(jobs, "", " ")
|
||||
b, err := json.MarshalIndent(genericJobs, "", " ")
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Error printing out informations: %s", err.Error()),
|
||||
@@ -186,6 +189,103 @@ func listJobs(c *cli.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateMirrorSize(c *cli.Context) error {
|
||||
args := c.Args()
|
||||
if len(args) != 2 {
|
||||
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
|
||||
}
|
||||
workerID := c.String("worker")
|
||||
mirrorID := args.Get(0)
|
||||
mirrorSize := args.Get(1)
|
||||
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{
|
||||
Name: mirrorID,
|
||||
Size: mirrorSize,
|
||||
}
|
||||
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
|
||||
)
|
||||
|
||||
resp, err := tunasync.PostJSON(url, msg, client)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to send request to manager: %s",
|
||||
err.Error()),
|
||||
1)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
|
||||
)
|
||||
}
|
||||
|
||||
var status tunasync.MirrorStatus
|
||||
json.Unmarshal(body, &status)
|
||||
if status.Size != mirrorSize {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf(
|
||||
"Mirror size error, expecting %s, manager returned %s",
|
||||
mirrorSize, status.Size,
|
||||
), 1,
|
||||
)
|
||||
}
|
||||
|
||||
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeWorker(c *cli.Context) error {
|
||||
args := c.Args()
|
||||
if len(args) != 0 {
|
||||
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
|
||||
}
|
||||
workerID := c.String("worker")
|
||||
if len(workerID) == 0 {
|
||||
return cli.NewExitError("Please specify the <worker-id>", 1)
|
||||
}
|
||||
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
|
||||
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if err != nil {
|
||||
logger.Panicf("Invalid HTTP Request: %s", err.Error())
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to parse response: %s", err.Error()),
|
||||
1)
|
||||
}
|
||||
|
||||
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
|
||||
" command: HTTP status code is not 200: %s", body),
|
||||
1)
|
||||
}
|
||||
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
if res["message"] == "deleted" {
|
||||
logger.Info("Successfully removed the worker")
|
||||
} else {
|
||||
logger.Info("Failed to remove the worker")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushDisabledJobs(c *cli.Context) error {
|
||||
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
|
||||
if err != nil {
|
||||
@@ -235,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
|
||||
"argument WORKER", 1)
|
||||
}
|
||||
|
||||
options := map[string]bool{}
|
||||
if c.Bool("force") {
|
||||
options["force"] = true
|
||||
}
|
||||
cmd := tunasync.ClientCmd{
|
||||
Cmd: cmd,
|
||||
MirrorID: mirrorID,
|
||||
WorkerID: c.String("worker"),
|
||||
Args: argsList,
|
||||
Options: options,
|
||||
}
|
||||
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
|
||||
if err != nil {
|
||||
@@ -270,6 +375,11 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
|
||||
|
||||
func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc {
|
||||
return func(c *cli.Context) error {
|
||||
|
||||
if c.String("worker") == "" {
|
||||
return cli.NewExitError("Please specify the worker with -w <worker-id>", 1)
|
||||
}
|
||||
|
||||
cmd := tunasync.ClientCmd{
|
||||
Cmd: cmd,
|
||||
WorkerID: c.String("worker"),
|
||||
@@ -360,6 +470,11 @@ func main() {
|
||||
},
|
||||
}
|
||||
|
||||
forceStartFlag := cli.BoolFlag{
|
||||
Name: "force, f",
|
||||
Usage: "Override the concurrent limit",
|
||||
}
|
||||
|
||||
app.Commands = []cli.Command{
|
||||
{
|
||||
Name: "list",
|
||||
@@ -385,10 +500,34 @@ func main() {
|
||||
Flags: commonFlags,
|
||||
Action: initializeWrapper(listWorkers),
|
||||
},
|
||||
{
|
||||
Name: "rm-worker",
|
||||
Usage: "Remove a worker",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
cli.StringFlag{
|
||||
Name: "worker, w",
|
||||
Usage: "worker-id of the worker to be removed",
|
||||
},
|
||||
),
|
||||
Action: initializeWrapper(removeWorker),
|
||||
},
|
||||
{
|
||||
Name: "set-size",
|
||||
Usage: "Set mirror size",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
cli.StringFlag{
|
||||
Name: "worker, w",
|
||||
Usage: "specify worker-id of the mirror job",
|
||||
},
|
||||
),
|
||||
Action: initializeWrapper(updateMirrorSize),
|
||||
},
|
||||
{
|
||||
Name: "start",
|
||||
Usage: "Start a job",
|
||||
Flags: append(commonFlags, cmdFlags...),
|
||||
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
|
||||
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
|
||||
},
|
||||
{
|
||||
|
||||
@@ -42,7 +42,7 @@ interval = 1
|
||||
|
||||
[manager]
|
||||
api_base = "http://localhost:12345"
|
||||
token = "some_token"
|
||||
token = ""
|
||||
ca_cert = ""
|
||||
|
||||
[cgroup]
|
||||
@@ -90,6 +90,30 @@ $ tunasync worker --config ~/tunasync_demo/worker.conf
|
||||
|
||||
本例中,镜像的数据在`/tmp/tunasync/`
|
||||
|
||||
### 控制
|
||||
|
||||
查看同步状态
|
||||
|
||||
```
|
||||
$ tunasynctl list -p 12345 --all
|
||||
```
|
||||
|
||||
tunasynctl 也支持配置文件。配置文件可以放在 `/etc/tunasync/ctl.conf` 或者 `~/.config/tunasync/ctl.conf` 两个位置,后者可以覆盖前者的配置值。
|
||||
|
||||
配置文件内容为:
|
||||
|
||||
```
|
||||
manager_addr = "127.0.0.1"
|
||||
manager_port = 12345
|
||||
ca_cert = ""
|
||||
```
|
||||
|
||||
### 安全
|
||||
|
||||
worker 和 manager 之间用 http(s) 通信,如果你 worker 和 manager 都是在本机,那么没必要使用 https。此时 manager 就不指定 `ssl_key` 和 `ssl_cert`,留空;worker 的 `ca_cert` 留空,`api_base` 以 `http://` 开头。
|
||||
|
||||
如果需要加密的通信,manager 需要指定 `ssl_key` 和 `ssl_cert`,worker 要指定 `ca_cert`,并且 `api_base` 应该是 `https://` 开头。
|
||||
|
||||
## 更进一步
|
||||
|
||||
可以参看
|
||||
@@ -100,3 +124,7 @@ $ tunasync worker --help
|
||||
```
|
||||
|
||||
可以看一下 log 目录
|
||||
|
||||
一些 worker 配置文件示例 [workers.conf](workers.conf)
|
||||
|
||||
你可能会用到的操作 [tips.md](tips.md)
|
||||
|
||||
54
docs/zh_CN/tips.md
普通文件
54
docs/zh_CN/tips.md
普通文件
@@ -0,0 +1,54 @@
|
||||
## 删除某worker的某镜像
|
||||
|
||||
先确定已经给tunasynctl写好config文件:`~/.config/tunasync/ctl.conf`
|
||||
|
||||
```
|
||||
manager_addr = "127.0.0.1"
|
||||
manager_port = 12345
|
||||
ca_cert = ""
|
||||
```
|
||||
|
||||
接着
|
||||
|
||||
```
|
||||
$ tunasynctl disable -w <worker_id> <mirror_name>
|
||||
$ tunasynctl flush
|
||||
```
|
||||
|
||||
|
||||
## 热重载 `worker.conf`
|
||||
|
||||
`$ tunasynctl reload -w <worker_id>`
|
||||
|
||||
|
||||
e.g. 删除 `test_worker` 的 `elvish` 镜像:
|
||||
|
||||
1. 删除存放镜像的文件夹
|
||||
|
||||
2. 删除 `worker.conf` 中对应的 `mirror` 段落
|
||||
|
||||
3. 接着操作:
|
||||
|
||||
```
|
||||
$ tunasynctl reload -w test_worker
|
||||
$ tunasynctl disable -w test_worker elvish
|
||||
$ tunasynctl flush
|
||||
```
|
||||
|
||||
4. (可选)最后删除日志文件夹里的日志
|
||||
|
||||
|
||||
## 删除worker
|
||||
|
||||
`$ tunasynctl rm-worker -w <worker_id>`
|
||||
|
||||
e.g. `$ tunasynctl rm-worker -w test_worker`
|
||||
|
||||
|
||||
## 更新镜像的大小
|
||||
|
||||
`$ tunasynctl set-size -w <worker_id> <mirror_name> <size>`
|
||||
|
||||
其中,末尾的 <size> 参数,由操作者设定,或由某定时脚本生成
|
||||
|
||||
由于 `du -s` 比较耗时,故镜像大小可直接由rsync的日志文件读出
|
||||
77
docs/zh_CN/workers.conf
普通文件
77
docs/zh_CN/workers.conf
普通文件
@@ -0,0 +1,77 @@
|
||||
|
||||
[global]
|
||||
name = "mirror_worker"
|
||||
log_dir = "/srv/tunasync/log/tunasync/{{.Name}}"
|
||||
mirror_dir = "/srv/tunasync"
|
||||
concurrent = 10
|
||||
interval = 1
|
||||
|
||||
[manager]
|
||||
api_base = "http://localhost:12345"
|
||||
token = "some_token"
|
||||
ca_cert = ""
|
||||
|
||||
[cgroup]
|
||||
enable = false
|
||||
base_path = "/sys/fs/cgroup"
|
||||
group = "tunasync"
|
||||
|
||||
[server]
|
||||
hostname = "localhost"
|
||||
listen_addr = "127.0.0.1"
|
||||
listen_port = 6000
|
||||
ssl_cert = ""
|
||||
ssl_key = ""
|
||||
|
||||
[[mirrors]]
|
||||
name = "adobe-fonts"
|
||||
interval = 1440
|
||||
provider = "command"
|
||||
upstream = "https://github.com/adobe-fonts"
|
||||
#https://github.com/tuna/tunasync-scripts/blob/master/adobe-fonts.sh
|
||||
command = "/home/scripts/adobe-fonts.sh"
|
||||
docker_image = "tunathu/tunasync-scripts:latest"
|
||||
|
||||
[[mirrors]]
|
||||
name = "anaconda"
|
||||
provider = "command"
|
||||
upstream = "https://repo.continuum.io/"
|
||||
#https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py
|
||||
command = "/home/scripts/anaconda.py"
|
||||
interval = 1440
|
||||
docker_image = "tunathu/tunasync-scripts:latest"
|
||||
|
||||
[[mirrors]]
|
||||
name = "gnu"
|
||||
provider = "rsync"
|
||||
upstream = "rsync://mirrors.ocf.berkeley.edu/gnu/"
|
||||
memory_limit = "256M"
|
||||
|
||||
[[mirrors]]
|
||||
name = "pypi"
|
||||
provider = "command"
|
||||
upstream = "https://pypi.python.org/"
|
||||
#https://github.com/tuna/tunasync-scripts/blob/master/pypi.sh
|
||||
command = "/home/scripts/pypi.sh"
|
||||
docker_image = "tunathu/tunasync-scripts:latest"
|
||||
interval = 5
|
||||
# set environment varialbes
|
||||
[mirrors.env]
|
||||
INIT = "0"
|
||||
|
||||
|
||||
[[mirrors]]
|
||||
name = "debian"
|
||||
interval = 720
|
||||
provider = "rsync"
|
||||
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/"
|
||||
memory_limit = "256M"
|
||||
|
||||
[[mirrors]]
|
||||
name = "ubuntu"
|
||||
provider = "two-stage-rsync"
|
||||
stage1_profile = "debian"
|
||||
upstream = "rsync://archive.ubuntu.com/ubuntu/"
|
||||
memory_limit = "256M"
|
||||
|
||||
# vim: ft=toml
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// A StatusUpdateMsg represents a msg when
|
||||
// A MirrorStatus represents a msg when
|
||||
// a worker has done syncing
|
||||
type MirrorStatus struct {
|
||||
Name string `json:"name"`
|
||||
@@ -13,6 +13,8 @@ type MirrorStatus struct {
|
||||
IsMaster bool `json:"is_master"`
|
||||
Status SyncStatus `json:"status"`
|
||||
LastUpdate time.Time `json:"last_update"`
|
||||
LastEnded time.Time `json:"last_ended"`
|
||||
Scheduled time.Time `json:"next_schedule"`
|
||||
Upstream string `json:"upstream"`
|
||||
Size string `json:"size"`
|
||||
ErrorMsg string `json:"error_msg"`
|
||||
@@ -27,6 +29,15 @@ type WorkerStatus struct {
|
||||
LastOnline time.Time `json:"last_online"` // last seen
|
||||
}
|
||||
|
||||
type MirrorSchedules struct {
|
||||
Schedules []MirrorSchedule `json:"schedules"`
|
||||
}
|
||||
|
||||
type MirrorSchedule struct {
|
||||
MirrorName string `json:"name"`
|
||||
NextSchedule time.Time `json:"next_schedule"`
|
||||
}
|
||||
|
||||
// A CmdVerb is an action to a job or worker
|
||||
type CmdVerb uint8
|
||||
|
||||
@@ -67,9 +78,10 @@ func (c CmdVerb) String() string {
|
||||
// A WorkerCmd is the command message send from the
|
||||
// manager to a worker
|
||||
type WorkerCmd struct {
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
Args []string `json:"args"`
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
Args []string `json:"args"`
|
||||
Options map[string]bool `json:"options"`
|
||||
}
|
||||
|
||||
func (c WorkerCmd) String() string {
|
||||
@@ -82,8 +94,9 @@ func (c WorkerCmd) String() string {
|
||||
// A ClientCmd is the command message send from client
|
||||
// to the manager
|
||||
type ClientCmd struct {
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
WorkerID string `json:"worker_id"`
|
||||
Args []string `json:"args"`
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
WorkerID string `json:"worker_id"`
|
||||
Args []string `json:"args"`
|
||||
Options map[string]bool `json:"options"`
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package manager
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
. "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
type textTime struct {
|
||||
@@ -38,24 +36,32 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// webMirrorStatus is the mirror status to be shown in the web page
|
||||
type webMirrorStatus struct {
|
||||
// WebMirrorStatus is the mirror status to be shown in the web page
|
||||
type WebMirrorStatus struct {
|
||||
Name string `json:"name"`
|
||||
IsMaster bool `json:"is_master"`
|
||||
Status SyncStatus `json:"status"`
|
||||
LastUpdate textTime `json:"last_update"`
|
||||
LastUpdateTs stampTime `json:"last_update_ts"`
|
||||
LastEnded textTime `json:"last_ended"`
|
||||
LastEndedTs stampTime `json:"last_ended_ts"`
|
||||
Scheduled textTime `json:"next_schedule"`
|
||||
ScheduledTs stampTime `json:"next_schedule_ts"`
|
||||
Upstream string `json:"upstream"`
|
||||
Size string `json:"size"` // approximate size
|
||||
}
|
||||
|
||||
func convertMirrorStatus(m MirrorStatus) webMirrorStatus {
|
||||
return webMirrorStatus{
|
||||
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
|
||||
return WebMirrorStatus{
|
||||
Name: m.Name,
|
||||
IsMaster: m.IsMaster,
|
||||
Status: m.Status,
|
||||
LastUpdate: textTime{m.LastUpdate},
|
||||
LastUpdateTs: stampTime{m.LastUpdate},
|
||||
LastEnded: textTime{m.LastEnded},
|
||||
LastEndedTs: stampTime{m.LastEnded},
|
||||
Scheduled: textTime{m.Scheduled},
|
||||
ScheduledTs: stampTime{m.Scheduled},
|
||||
Upstream: m.Upstream,
|
||||
Size: m.Size,
|
||||
}
|
||||
87
internal/status_web_test.go
普通文件
87
internal/status_web_test.go
普通文件
@@ -0,0 +1,87 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestStatus(t *testing.T) {
|
||||
Convey("status json ser-de should work", t, func() {
|
||||
tz := "Asia/Tokyo"
|
||||
loc, err := time.LoadLocation(tz)
|
||||
So(err, ShouldBeNil)
|
||||
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
||||
m := WebMirrorStatus{
|
||||
Name: "tunalinux",
|
||||
Status: Success,
|
||||
LastUpdate: textTime{t},
|
||||
LastUpdateTs: stampTime{t},
|
||||
LastEnded: textTime{t},
|
||||
LastEndedTs: stampTime{t},
|
||||
Scheduled: textTime{t},
|
||||
ScheduledTs: stampTime{t},
|
||||
Size: "5GB",
|
||||
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
So(err, ShouldBeNil)
|
||||
//fmt.Println(string(b))
|
||||
var m2 WebMirrorStatus
|
||||
err = json.Unmarshal(b, &m2)
|
||||
So(err, ShouldBeNil)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
|
||||
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
|
||||
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
|
||||
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
Convey("BuildWebMirrorStatus should work", t, func() {
|
||||
m := MirrorStatus{
|
||||
Name: "arch-sync3",
|
||||
Worker: "testWorker",
|
||||
IsMaster: true,
|
||||
Status: Failed,
|
||||
LastUpdate: time.Now().Add(-time.Minute * 30),
|
||||
LastEnded: time.Now(),
|
||||
Scheduled: time.Now().Add(time.Minute * 5),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
}
|
||||
|
||||
var m2 WebMirrorStatus
|
||||
m2 = BuildWebMirrorStatus(m)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
|
||||
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
|
||||
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
|
||||
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -84,3 +85,14 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response,
|
||||
}
|
||||
return resp, json.Unmarshal(body, obj)
|
||||
}
|
||||
|
||||
func ExtractSizeFromRsyncLog(content []byte) string {
|
||||
// (?m) flag enables multi-line mode
|
||||
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`)
|
||||
matches := re.FindAllSubmatch(content, -1)
|
||||
// fmt.Printf("%q\n", matches)
|
||||
if len(matches) == 0 {
|
||||
return ""
|
||||
}
|
||||
return string(matches[len(matches)-1][1])
|
||||
}
|
||||
|
||||
32
internal/util_test.go
普通文件
32
internal/util_test.go
普通文件
@@ -0,0 +1,32 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestExtractSizeFromRsyncLog(t *testing.T) {
|
||||
realLogContent := `
|
||||
Number of files: 998,470 (reg: 925,484, dir: 58,892, link: 14,094)
|
||||
Number of created files: 1,049 (reg: 1,049)
|
||||
Number of deleted files: 1,277 (reg: 1,277)
|
||||
Number of regular files transferred: 5,694
|
||||
Total file size: 1.33T bytes
|
||||
Total transferred file size: 2.86G bytes
|
||||
Literal data: 780.62M bytes
|
||||
Matched data: 2.08G bytes
|
||||
File list size: 37.55M
|
||||
File list generation time: 7.845 seconds
|
||||
File list transfer time: 0.000 seconds
|
||||
Total bytes sent: 7.55M
|
||||
Total bytes received: 823.25M
|
||||
|
||||
sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec
|
||||
total size is 1.33T speedup is 1,604.11
|
||||
`
|
||||
Convey("Log parser should work", t, func() {
|
||||
res := ExtractSizeFromRsyncLog([]byte(realLogContent))
|
||||
So(res, ShouldEqual, "1.33T")
|
||||
})
|
||||
}
|
||||
@@ -1,3 +1,3 @@
|
||||
package internal
|
||||
|
||||
const Version string = "0.2-dev"
|
||||
const Version string = "0.3.3"
|
||||
|
||||
@@ -14,6 +14,7 @@ type dbAdapter interface {
|
||||
Init() error
|
||||
ListWorkers() ([]WorkerStatus, error)
|
||||
GetWorker(workerID string) (WorkerStatus, error)
|
||||
DeleteWorker(workerID string) error
|
||||
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
||||
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
||||
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
||||
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
|
||||
err = b.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||
v := bucket.Get([]byte(workerID))
|
||||
if v == nil {
|
||||
return fmt.Errorf("invalid workerID %s", workerID)
|
||||
}
|
||||
err := bucket.Delete([]byte(workerID))
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
|
||||
bucket := tx.Bucket([]byte(_statusBucketKey))
|
||||
v := bucket.Get([]byte(id))
|
||||
if v == nil {
|
||||
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
|
||||
return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
|
||||
}
|
||||
err := json.Unmarshal(v, &m)
|
||||
return err
|
||||
|
||||
@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
Convey("get exists worker", func() {
|
||||
Convey("get existent worker", func() {
|
||||
_, err := boltDB.GetWorker(testWorkerIDs[0])
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("list exist worker", func() {
|
||||
Convey("list existent workers", func() {
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("get inexist worker", func() {
|
||||
Convey("get non-existent worker", func() {
|
||||
_, err := boltDB.GetWorker("invalid workerID")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("delete existent worker", func() {
|
||||
err := boltDB.DeleteWorker(testWorkerIDs[0])
|
||||
So(err, ShouldBeNil)
|
||||
_, err = boltDB.GetWorker(testWorkerIDs[0])
|
||||
So(err, ShouldNotBeNil)
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 1)
|
||||
})
|
||||
|
||||
Convey("delete non-existent worker", func() {
|
||||
err := boltDB.DeleteWorker("invalid workerID")
|
||||
So(err, ShouldNotBeNil)
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 2)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("update mirror status", func() {
|
||||
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "3GB",
|
||||
},
|
||||
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
|
||||
Worker: testWorkerIDs[1],
|
||||
IsMaster: true,
|
||||
Status: Disabled,
|
||||
LastUpdate: time.Now(),
|
||||
LastUpdate: time.Now().Add(-time.Hour),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
},
|
||||
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
|
||||
Worker: testWorkerIDs[1],
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastUpdate: time.Now().Add(-time.Second),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
},
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
@@ -83,10 +84,14 @@ func GetTUNASyncManager(cfg *Config) *Manager {
|
||||
// workerID should be valid in this route group
|
||||
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
|
||||
{
|
||||
// delete specified worker
|
||||
workerValidateGroup.DELETE(":id", s.deleteWorker)
|
||||
// get job list
|
||||
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
|
||||
// post job status
|
||||
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
|
||||
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
|
||||
workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
|
||||
}
|
||||
|
||||
// for tunasynctl to post commands
|
||||
@@ -133,11 +138,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
webMirStatusList := []webMirrorStatus{}
|
||||
webMirStatusList := []WebMirrorStatus{}
|
||||
for _, m := range mirrorStatusList {
|
||||
webMirStatusList = append(
|
||||
webMirStatusList,
|
||||
convertMirrorStatus(m),
|
||||
BuildWebMirrorStatus(m),
|
||||
)
|
||||
}
|
||||
c.JSON(http.StatusOK, webMirStatusList)
|
||||
@@ -157,6 +162,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
|
||||
}
|
||||
|
||||
// deleteWorker deletes one worker by id
|
||||
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
err := s.adapter.DeleteWorker(workerID)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to delete worker: %s",
|
||||
err.Error(),
|
||||
)
|
||||
c.Error(err)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
logger.Noticef("Worker <%s> deleted", workerID)
|
||||
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||
}
|
||||
|
||||
// listWrokers respond with informations of all the workers
|
||||
func (s *Manager) listWorkers(c *gin.Context) {
|
||||
var workerInfos []WorkerStatus
|
||||
@@ -220,11 +241,59 @@ func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
var schedules MirrorSchedules
|
||||
c.BindJSON(&schedules)
|
||||
|
||||
for _, schedule := range schedules.Schedules {
|
||||
mirrorName := schedule.MirrorName
|
||||
if len(mirrorName) == 0 {
|
||||
s.returnErrJSON(
|
||||
c, http.StatusBadRequest,
|
||||
errors.New("Mirror Name should not be empty"),
|
||||
)
|
||||
}
|
||||
|
||||
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||
if err != nil {
|
||||
fmt.Errorf("failed to get job %s of worker %s: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if curStatus.Scheduled == schedule.NextSchedule {
|
||||
// no changes, skip update
|
||||
continue
|
||||
}
|
||||
|
||||
curStatus.Scheduled = schedule.NextSchedule
|
||||
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
c.Error(err)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
type empty struct{}
|
||||
c.JSON(http.StatusOK, empty{})
|
||||
}
|
||||
|
||||
func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
var status MirrorStatus
|
||||
c.BindJSON(&status)
|
||||
mirrorName := status.Name
|
||||
if len(mirrorName) == 0 {
|
||||
s.returnErrJSON(
|
||||
c, http.StatusBadRequest,
|
||||
errors.New("Mirror Name should not be empty"),
|
||||
)
|
||||
}
|
||||
|
||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||
|
||||
@@ -234,21 +303,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
} else {
|
||||
status.LastUpdate = curStatus.LastUpdate
|
||||
}
|
||||
if status.Status == Success || status.Status == Failed {
|
||||
status.LastEnded = time.Now()
|
||||
} else {
|
||||
status.LastEnded = curStatus.LastEnded
|
||||
}
|
||||
|
||||
// Only message with meaningful size updates the mirror size
|
||||
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
|
||||
if len(status.Size) == 0 || status.Size == "unknown" {
|
||||
status.Size = curStatus.Size
|
||||
}
|
||||
}
|
||||
|
||||
// for logging
|
||||
switch status.Status {
|
||||
case Success:
|
||||
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
|
||||
case Failed:
|
||||
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
|
||||
case Syncing:
|
||||
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
||||
case Disabled:
|
||||
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
|
||||
case Paused:
|
||||
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
|
||||
default:
|
||||
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status)
|
||||
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||
}
|
||||
|
||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||
@@ -263,6 +336,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, newStatus)
|
||||
}
|
||||
|
||||
func (s *Manager) updateMirrorSize(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
type SizeMsg struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}
|
||||
var msg SizeMsg
|
||||
c.BindJSON(&msg)
|
||||
|
||||
mirrorName := msg.Name
|
||||
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||
if err != nil {
|
||||
logger.Errorf(
|
||||
"Failed to get status of mirror %s @<%s>: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Only message with meaningful size updates the mirror size
|
||||
if len(msg.Size) > 0 || msg.Size != "unknown" {
|
||||
status.Size = msg.Size
|
||||
}
|
||||
|
||||
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||
|
||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
c.Error(err)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, newStatus)
|
||||
}
|
||||
|
||||
func (s *Manager) handleClientCmd(c *gin.Context) {
|
||||
var clientCmd ClientCmd
|
||||
c.BindJSON(&clientCmd)
|
||||
@@ -286,6 +398,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
||||
Cmd: clientCmd.Cmd,
|
||||
MirrorID: clientCmd.MirrorID,
|
||||
Args: clientCmd.Args,
|
||||
Options: clientCmd.Options,
|
||||
}
|
||||
|
||||
// update job status, even if the job did not disable successfully,
|
||||
|
||||
@@ -21,9 +21,16 @@ const (
|
||||
)
|
||||
|
||||
func TestHTTPServer(t *testing.T) {
|
||||
var listenPort = 5000
|
||||
Convey("HTTP server should work", t, func(ctx C) {
|
||||
listenPort++
|
||||
port := listenPort
|
||||
addr := "127.0.0.1"
|
||||
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
|
||||
InitLogger(true, true, false)
|
||||
s := GetTUNASyncManager(&Config{Debug: false})
|
||||
s := GetTUNASyncManager(&Config{Debug: true})
|
||||
s.cfg.Server.Addr = addr
|
||||
s.cfg.Server.Port = port
|
||||
So(s, ShouldNotBeNil)
|
||||
s.setDBAdapter(&mockDBAdapter{
|
||||
workerStore: map[string]WorkerStatus{
|
||||
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
|
||||
}},
|
||||
statusStore: make(map[string]MirrorStatus),
|
||||
})
|
||||
port := rand.Intn(10000) + 20000
|
||||
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
|
||||
go func() {
|
||||
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
|
||||
}()
|
||||
time.Sleep(50 * time.Microsecond)
|
||||
go s.Run()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
resp, err := http.Get(baseURL + "/ping")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
@@ -79,7 +82,34 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(len(actualResponseObj), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("flush disabled jobs", func(ctx C) {
|
||||
Convey("delete an existent worker", func(ctx C) {
|
||||
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
|
||||
So(err, ShouldBeNil)
|
||||
clt := &http.Client{}
|
||||
resp, err := clt.Do(req)
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
So(err, ShouldBeNil)
|
||||
So(res[_infoKey], ShouldEqual, "deleted")
|
||||
})
|
||||
|
||||
Convey("delete non-existent worker", func(ctx C) {
|
||||
invalidWorker := "test_worker233"
|
||||
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
|
||||
So(err, ShouldBeNil)
|
||||
clt := &http.Client{}
|
||||
resp, err := clt.Do(req)
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
So(err, ShouldBeNil)
|
||||
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
|
||||
})
|
||||
|
||||
Convey("flush disabled jobs", func(ctx C) {
|
||||
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
|
||||
So(err, ShouldBeNil)
|
||||
clt := &http.Client{}
|
||||
@@ -99,11 +129,11 @@ func TestHTTPServer(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "3GB",
|
||||
Size: "unknown",
|
||||
}
|
||||
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||
defer resp.Body.Close()
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("list mirror status of an existed worker", func(ctx C) {
|
||||
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
|
||||
})
|
||||
|
||||
Convey("list all job status of all workers", func(ctx C) {
|
||||
var ms []webMirrorStatus
|
||||
var ms []WebMirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
@@ -137,8 +168,91 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
|
||||
|
||||
})
|
||||
|
||||
Convey("Update size of a valid mirror", func(ctx C) {
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{status.Name, "5GB"}
|
||||
|
||||
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||
resp, err := PostJSON(url, msg, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("Get new size of a mirror", func(ctx C) {
|
||||
var ms []MirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||
m := ms[0]
|
||||
So(m.Name, ShouldEqual, status.Name)
|
||||
So(m.Worker, ShouldEqual, status.Worker)
|
||||
So(m.Status, ShouldEqual, status.Status)
|
||||
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||
So(m.Size, ShouldEqual, "5GB")
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("Update schedule of valid mirrors", func(ctx C) {
|
||||
msg := MirrorSchedules{
|
||||
[]MirrorSchedule{
|
||||
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
|
||||
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
|
||||
},
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
|
||||
resp, err := PostJSON(url, msg, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
})
|
||||
|
||||
Convey("Update size of an invalid mirror", func(ctx C) {
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{"Invalid mirror", "5GB"}
|
||||
|
||||
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||
resp, err := PostJSON(url, msg, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
||||
})
|
||||
|
||||
// what if status changed to failed
|
||||
status.Status = Failed
|
||||
time.Sleep(3 * time.Second)
|
||||
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("What if syncing job failed", func(ctx C) {
|
||||
var ms []MirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||
m := ms[0]
|
||||
So(m.Name, ShouldEqual, status.Name)
|
||||
So(m.Worker, ShouldEqual, status.Worker)
|
||||
So(m.Status, ShouldEqual, status.Status)
|
||||
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("update mirror status of an inexisted worker", func(ctx C) {
|
||||
@@ -149,6 +263,7 @@ func TestHTTPServer(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
}
|
||||
@@ -162,6 +277,24 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
|
||||
})
|
||||
Convey("update schedule of an non-existent worker", func(ctx C) {
|
||||
invalidWorker := "test_worker2"
|
||||
sch := MirrorSchedules{
|
||||
[]MirrorSchedule{
|
||||
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
|
||||
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
|
||||
},
|
||||
}
|
||||
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
|
||||
baseURL, invalidWorker), sch, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
|
||||
defer resp.Body.Close()
|
||||
var msg map[string]string
|
||||
err = json.NewDecoder(resp.Body).Decode(&msg)
|
||||
So(err, ShouldBeNil)
|
||||
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
|
||||
})
|
||||
Convey("handle client command", func(ctx C) {
|
||||
cmdChan := make(chan WorkerCmd, 1)
|
||||
workerServer := makeMockWorkerServer(cmdChan)
|
||||
@@ -180,11 +313,11 @@ func TestHTTPServer(t *testing.T) {
|
||||
// run the mock worker server
|
||||
workerServer.Run(bindAddress)
|
||||
}()
|
||||
time.Sleep(50 * time.Microsecond)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// verify the worker mock server is running
|
||||
workerResp, err := http.Get(workerBaseURL + "/ping")
|
||||
defer workerResp.Body.Close()
|
||||
So(err, ShouldBeNil)
|
||||
defer workerResp.Body.Close()
|
||||
So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("when client send wrong cmd", func(ctx C) {
|
||||
@@ -194,8 +327,8 @@ func TestHTTPServer(t *testing.T) {
|
||||
WorkerID: "not_exist_worker",
|
||||
}
|
||||
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
|
||||
defer resp.Body.Close()
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
|
||||
})
|
||||
|
||||
@@ -207,9 +340,8 @@ func TestHTTPServer(t *testing.T) {
|
||||
}
|
||||
|
||||
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
|
||||
defer resp.Body.Close()
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
time.Sleep(50 * time.Microsecond)
|
||||
select {
|
||||
@@ -252,6 +384,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
|
||||
delete(b.workerStore, workerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||
// _, ok := b.workerStore[w.ID]
|
||||
// if ok {
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
tunasync "github.com/tuna/tunasync/internal"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestStatus(t *testing.T) {
|
||||
Convey("status json ser-de should work", t, func() {
|
||||
tz := "Asia/Tokyo"
|
||||
loc, err := time.LoadLocation(tz)
|
||||
So(err, ShouldBeNil)
|
||||
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
||||
m := webMirrorStatus{
|
||||
Name: "tunalinux",
|
||||
Status: tunasync.Success,
|
||||
LastUpdate: textTime{t},
|
||||
LastUpdateTs: stampTime{t},
|
||||
Size: "5GB",
|
||||
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
So(err, ShouldBeNil)
|
||||
//fmt.Println(string(b))
|
||||
var m2 webMirrorStatus
|
||||
err = json.Unmarshal(b, &m2)
|
||||
So(err, ShouldBeNil)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
}
|
||||
@@ -15,13 +15,12 @@ type baseProvider struct {
|
||||
ctx *Context
|
||||
name string
|
||||
interval time.Duration
|
||||
retry int
|
||||
isMaster bool
|
||||
|
||||
cmd *cmdJob
|
||||
isRunning atomic.Value
|
||||
|
||||
logFile *os.File
|
||||
|
||||
cgroup *cgroupHook
|
||||
zfs *zfsHook
|
||||
docker *dockerHook
|
||||
@@ -52,6 +51,10 @@ func (p *baseProvider) Interval() time.Duration {
|
||||
return p.interval
|
||||
}
|
||||
|
||||
func (p *baseProvider) Retry() int {
|
||||
return p.retry
|
||||
}
|
||||
|
||||
func (p *baseProvider) IsMaster() bool {
|
||||
return p.isMaster
|
||||
}
|
||||
@@ -111,20 +114,21 @@ func (p *baseProvider) Docker() *dockerHook {
|
||||
return p.docker
|
||||
}
|
||||
|
||||
func (p *baseProvider) prepareLogFile() error {
|
||||
func (p *baseProvider) prepareLogFile(append bool) error {
|
||||
if p.LogFile() == "/dev/null" {
|
||||
p.cmd.SetLogFile(nil)
|
||||
return nil
|
||||
}
|
||||
if p.logFile == nil {
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
}
|
||||
p.logFile = logFile
|
||||
appendMode := 0
|
||||
if append {
|
||||
appendMode = os.O_APPEND
|
||||
}
|
||||
p.cmd.SetLogFile(p.logFile)
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
}
|
||||
p.cmd.SetLogFile(logFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -143,32 +147,26 @@ func (p *baseProvider) IsRunning() bool {
|
||||
|
||||
func (p *baseProvider) Wait() error {
|
||||
defer func() {
|
||||
p.Lock()
|
||||
logger.Debugf("set isRunning to false: %s", p.Name())
|
||||
p.isRunning.Store(false)
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
logger.Debugf("calling Wait: %s", p.Name())
|
||||
return p.cmd.Wait()
|
||||
}
|
||||
|
||||
func (p *baseProvider) Terminate() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
logger.Debugf("terminating provider: %s", p.Name())
|
||||
if !p.IsRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
|
||||
err := p.cmd.Terminate()
|
||||
p.isRunning.Store(false)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *baseProvider) DataSize() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
|
||||
type cgroupHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
basePath string
|
||||
baseGroup string
|
||||
created bool
|
||||
@@ -36,7 +35,9 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit st
|
||||
subsystem = "cpu"
|
||||
}
|
||||
return &cgroupHook{
|
||||
provider: p,
|
||||
emptyHook: emptyHook{
|
||||
provider: p,
|
||||
},
|
||||
basePath: basePath,
|
||||
baseGroup: baseGroup,
|
||||
subsystem: subsystem,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/anmitsu/go-shlex"
|
||||
@@ -11,6 +12,7 @@ type cmdConfig struct {
|
||||
upstreamURL, command string
|
||||
workingDir, logDir, logFile string
|
||||
interval time.Duration
|
||||
retry int
|
||||
env map[string]string
|
||||
}
|
||||
|
||||
@@ -22,11 +24,15 @@ type cmdProvider struct {
|
||||
|
||||
func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
|
||||
// TODO: check config options
|
||||
if c.retry == 0 {
|
||||
c.retry = defaultMaxRetry
|
||||
}
|
||||
provider := &cmdProvider{
|
||||
baseProvider: baseProvider{
|
||||
name: c.name,
|
||||
ctx: NewContext(),
|
||||
interval: c.interval,
|
||||
retry: c.retry,
|
||||
},
|
||||
cmdConfig: c,
|
||||
}
|
||||
@@ -60,17 +66,25 @@ func (p *cmdProvider) Run() error {
|
||||
}
|
||||
|
||||
func (p *cmdProvider) Start() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{
|
||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||
"TUNASYNC_UPSTREAM_URL": p.upstreamURL,
|
||||
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||
}
|
||||
for k, v := range p.env {
|
||||
env[k] = v
|
||||
}
|
||||
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,6 @@ import (
|
||||
|
||||
type empty struct{}
|
||||
|
||||
const maxRetry = 2
|
||||
const defaultMaxRetry = 2
|
||||
|
||||
var logger = logging.MustGetLogger("tunasync")
|
||||
|
||||
@@ -49,6 +49,7 @@ type globalConfig struct {
|
||||
MirrorDir string `toml:"mirror_dir"`
|
||||
Concurrent int `toml:"concurrent"`
|
||||
Interval int `toml:"interval"`
|
||||
Retry int `toml:"retry"`
|
||||
|
||||
ExecOnSuccess []string `toml:"exec_on_success"`
|
||||
ExecOnFailure []string `toml:"exec_on_failure"`
|
||||
@@ -108,6 +109,7 @@ type mirrorConfig struct {
|
||||
Provider providerEnum `toml:"provider"`
|
||||
Upstream string `toml:"upstream"`
|
||||
Interval int `toml:"interval"`
|
||||
Retry int `toml:"retry"`
|
||||
MirrorDir string `toml:"mirror_dir"`
|
||||
LogDir string `toml:"log_dir"`
|
||||
Env map[string]string `toml:"env"`
|
||||
@@ -123,6 +125,7 @@ type mirrorConfig struct {
|
||||
|
||||
Command string `toml:"command"`
|
||||
UseIPv6 bool `toml:"use_ipv6"`
|
||||
UseIPv4 bool `toml:"use_ipv4"`
|
||||
ExcludeFile string `toml:"exclude_file"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
|
||||
@@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
|
||||
mirror_dir = "/data/mirrors"
|
||||
concurrent = 10
|
||||
interval = 240
|
||||
retry = 3
|
||||
|
||||
[manager]
|
||||
api_base = "https://127.0.0.1:5000"
|
||||
@@ -35,6 +36,7 @@ name = "AOSP"
|
||||
provider = "command"
|
||||
upstream = "https://aosp.google.com/"
|
||||
interval = 720
|
||||
retry = 2
|
||||
mirror_dir = "/data/git/AOSP"
|
||||
exec_on_success = [
|
||||
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
|
||||
@@ -116,6 +118,7 @@ use_ipv6 = true
|
||||
So(err, ShouldBeNil)
|
||||
So(cfg.Global.Name, ShouldEqual, "test_worker")
|
||||
So(cfg.Global.Interval, ShouldEqual, 240)
|
||||
So(cfg.Global.Retry, ShouldEqual, 3)
|
||||
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
|
||||
|
||||
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
|
||||
@@ -126,6 +129,7 @@ use_ipv6 = true
|
||||
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
|
||||
So(m.Provider, ShouldEqual, provCommand)
|
||||
So(m.Interval, ShouldEqual, 720)
|
||||
So(m.Retry, ShouldEqual, 2)
|
||||
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
|
||||
|
||||
m = cfg.Mirrors[1]
|
||||
|
||||
@@ -7,10 +7,9 @@ import (
|
||||
|
||||
type dockerHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
image string
|
||||
volumes []string
|
||||
options []string
|
||||
image string
|
||||
volumes []string
|
||||
options []string
|
||||
}
|
||||
|
||||
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
|
||||
@@ -23,15 +22,18 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
|
||||
options = append(options, mCfg.DockerOptions...)
|
||||
|
||||
return &dockerHook{
|
||||
provider: p,
|
||||
image: mCfg.DockerImage,
|
||||
volumes: volumes,
|
||||
options: options,
|
||||
emptyHook: emptyHook{
|
||||
provider: p,
|
||||
},
|
||||
image: mCfg.DockerImage,
|
||||
volumes: volumes,
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dockerHook) preExec() error {
|
||||
p := d.provider
|
||||
logDir := p.LogDir()
|
||||
logFile := p.LogFile()
|
||||
workingDir := p.WorkingDir()
|
||||
|
||||
@@ -42,17 +44,13 @@ func (d *dockerHook) preExec() error {
|
||||
}
|
||||
}
|
||||
|
||||
logFileNew := "/log_latest"
|
||||
workingDirNew := "/data"
|
||||
|
||||
// Override workingDir
|
||||
ctx := p.EnterContext()
|
||||
ctx.Set(_WorkingDirKey, workingDirNew)
|
||||
ctx.Set(_LogFileKey+":docker", logFileNew)
|
||||
ctx.Set(
|
||||
"volumes", []string{
|
||||
fmt.Sprintf("%s:%s", logFile, logFileNew),
|
||||
fmt.Sprintf("%s:%s", workingDir, workingDirNew),
|
||||
fmt.Sprintf("%s:%s", logDir, logDir),
|
||||
fmt.Sprintf("%s:%s", logFile, logFile),
|
||||
fmt.Sprintf("%s:%s", workingDir, workingDir),
|
||||
},
|
||||
)
|
||||
return nil
|
||||
|
||||
@@ -55,8 +55,10 @@ sleep 10
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
d := &dockerHook{
|
||||
provider: provider,
|
||||
image: "alpine",
|
||||
emptyHook: emptyHook{
|
||||
provider: provider,
|
||||
},
|
||||
image: "alpine",
|
||||
volumes: []string{
|
||||
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
|
||||
},
|
||||
|
||||
@@ -18,7 +18,6 @@ const (
|
||||
|
||||
type execPostHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
|
||||
// exec on success or on failure
|
||||
execOn uint8
|
||||
@@ -37,9 +36,11 @@ func newExecPostHook(provider mirrorProvider, execOn uint8, command string) (*ex
|
||||
}
|
||||
|
||||
return &execPostHook{
|
||||
provider: provider,
|
||||
execOn: execOn,
|
||||
command: cmd,
|
||||
emptyHook: emptyHook{
|
||||
provider: provider,
|
||||
},
|
||||
execOn: execOn,
|
||||
command: cmd,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -71,6 +72,7 @@ func (h *execPostHook) Do() error {
|
||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||
"TUNASYNC_UPSTREAM_URL": p.Upstream(),
|
||||
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||
"TUNASYNC_JOB_EXIT_STATUS": exitStatus,
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ exit 1
|
||||
job.ctrlChan <- jobStart
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
for i := 0; i < maxRetry; i++ {
|
||||
for i := 0; i < defaultMaxRetry; i++ {
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
msg = <-managerChan
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
tunasync "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
@@ -14,12 +15,13 @@ import (
|
||||
type ctrlAction uint8
|
||||
|
||||
const (
|
||||
jobStart ctrlAction = iota
|
||||
jobStop // stop syncing keep the job
|
||||
jobDisable // disable the job (stops goroutine)
|
||||
jobRestart // restart syncing
|
||||
jobPing // ensure the goroutine is alive
|
||||
jobHalt // worker halts
|
||||
jobStart ctrlAction = iota
|
||||
jobStop // stop syncing keep the job
|
||||
jobDisable // disable the job (stops goroutine)
|
||||
jobRestart // restart syncing
|
||||
jobPing // ensure the goroutine is alive
|
||||
jobHalt // worker halts
|
||||
jobForceStart // ignore concurrent limit
|
||||
)
|
||||
|
||||
type jobMessage struct {
|
||||
@@ -51,6 +53,7 @@ type mirrorJob struct {
|
||||
ctrlChan chan ctrlAction
|
||||
disabled chan empty
|
||||
state uint32
|
||||
size string
|
||||
}
|
||||
|
||||
func newMirrorJob(provider mirrorProvider) *mirrorJob {
|
||||
@@ -136,7 +139,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
return err
|
||||
}
|
||||
|
||||
for retry := 0; retry < maxRetry; retry++ {
|
||||
for retry := 0; retry < provider.Retry(); retry++ {
|
||||
stopASAP := false // stop job as soon as possible
|
||||
|
||||
if retry > 0 {
|
||||
@@ -154,9 +157,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
syncDone := make(chan error, 1)
|
||||
go func() {
|
||||
err := provider.Run()
|
||||
if !stopASAP {
|
||||
syncDone <- err
|
||||
}
|
||||
syncDone <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -182,26 +183,33 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
if syncErr == nil {
|
||||
// syncing success
|
||||
logger.Noticef("succeeded syncing %s", m.Name())
|
||||
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
|
||||
// post-success hooks
|
||||
logger.Debug("post-success hooks")
|
||||
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
// syncing failed
|
||||
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
|
||||
// post-fail hooks
|
||||
logger.Debug("post-fail hooks")
|
||||
err := runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if syncErr == nil {
|
||||
// syncing success
|
||||
m.size = provider.DataSize()
|
||||
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncing failed
|
||||
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
|
||||
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
|
||||
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
|
||||
|
||||
// post-fail hooks
|
||||
logger.Debug("post-fail hooks")
|
||||
err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// gracefully exit
|
||||
if stopASAP {
|
||||
logger.Debug("No retry, exit directly")
|
||||
@@ -212,22 +220,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
return nil
|
||||
}
|
||||
|
||||
runJob := func(kill <-chan empty, jobDone chan<- empty) {
|
||||
runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
|
||||
select {
|
||||
case semaphore <- empty{}:
|
||||
defer func() { <-semaphore }()
|
||||
runJobWrapper(kill, jobDone)
|
||||
case <-bypassSemaphore:
|
||||
logger.Noticef("Concurrent limit ignored by %s", m.Name())
|
||||
runJobWrapper(kill, jobDone)
|
||||
case <-kill:
|
||||
jobDone <- empty{}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
bypassSemaphore := make(chan empty, 1)
|
||||
for {
|
||||
if m.State() == stateReady {
|
||||
kill := make(chan empty)
|
||||
jobDone := make(chan empty)
|
||||
go runJob(kill, jobDone)
|
||||
go runJob(kill, jobDone, bypassSemaphore)
|
||||
|
||||
_wait_for_job:
|
||||
select {
|
||||
@@ -248,7 +260,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
m.SetState(stateReady)
|
||||
close(kill)
|
||||
<-jobDone
|
||||
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
|
||||
continue
|
||||
case jobForceStart:
|
||||
select { //non-blocking
|
||||
default:
|
||||
case bypassSemaphore <- empty{}:
|
||||
}
|
||||
fallthrough
|
||||
case jobStart:
|
||||
m.SetState(stateReady)
|
||||
goto _wait_for_job
|
||||
@@ -272,8 +291,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
case jobDisable:
|
||||
m.SetState(stateDisabled)
|
||||
return nil
|
||||
case jobForceStart:
|
||||
select { //non-blocking
|
||||
default:
|
||||
case bypassSemaphore <- empty{}:
|
||||
}
|
||||
fallthrough
|
||||
case jobRestart:
|
||||
m.SetState(stateReady)
|
||||
fallthrough
|
||||
case jobStart:
|
||||
m.SetState(stateReady)
|
||||
default:
|
||||
|
||||
@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobStart // should be ignored
|
||||
|
||||
job.ctrlChan <- jobStop
|
||||
|
||||
msg = <-managerChan
|
||||
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we restart it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobRestart
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Success)
|
||||
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"%s\n%s\n",
|
||||
provider.WorkingDir(), provider.WorkingDir(),
|
||||
)
|
||||
|
||||
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we disable it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobDisable
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we stop it twice, than start it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobStop
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
job.ctrlChan <- jobStop // should be ignored
|
||||
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Success)
|
||||
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"%s\n%s\n",
|
||||
provider.WorkingDir(), provider.WorkingDir(),
|
||||
)
|
||||
|
||||
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestConcurrentMirrorJobs(t *testing.T) {
|
||||
|
||||
InitLogger(true, true, false)
|
||||
|
||||
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
const CONCURRENT = 5
|
||||
|
||||
var providers [CONCURRENT]*cmdProvider
|
||||
var jobs [CONCURRENT]*mirrorJob
|
||||
for i := 0; i < CONCURRENT; i++ {
|
||||
c := cmdConfig{
|
||||
name: fmt.Sprintf("job-%d", i),
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "sleep 2",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: "/dev/null",
|
||||
interval: 10 * time.Second,
|
||||
}
|
||||
|
||||
var err error
|
||||
providers[i], err = newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
jobs[i] = newMirrorJob(providers[i])
|
||||
}
|
||||
|
||||
managerChan := make(chan jobMessage, 10)
|
||||
semaphore := make(chan empty, CONCURRENT-2)
|
||||
|
||||
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
|
||||
counterEnded := 0
|
||||
counterRunning := 0
|
||||
peakConcurrent = 0
|
||||
counterFailed = 0
|
||||
for counterEnded < totalJobs {
|
||||
msg := <-managerChan
|
||||
switch msg.status {
|
||||
case PreSyncing:
|
||||
counterRunning++
|
||||
case Syncing:
|
||||
case Failed:
|
||||
counterFailed++
|
||||
fallthrough
|
||||
case Success:
|
||||
counterEnded++
|
||||
counterRunning--
|
||||
default:
|
||||
So(0, ShouldEqual, 1)
|
||||
}
|
||||
// Test if semaphore works
|
||||
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
|
||||
if counterRunning > peakConcurrent {
|
||||
peakConcurrent = counterRunning
|
||||
}
|
||||
}
|
||||
// select {
|
||||
// case msg := <-managerChan:
|
||||
// logger.Errorf("extra message received: %v", msg)
|
||||
// So(0, ShouldEqual, 1)
|
||||
// case <-time.After(2 * time.Second):
|
||||
// }
|
||||
return
|
||||
}
|
||||
|
||||
Convey("When we run them all", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
Convey("If we cancel one job", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobRestart
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Cancel the one waiting for semaphore
|
||||
jobs[len(jobs)-1].ctrlChan <- jobStop
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
Convey("If we override the concurrent limit", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
jobs[len(jobs)-1].ctrlChan <- jobForceStart
|
||||
jobs[len(jobs)-2].ctrlChan <- jobForceStart
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// fmt.Println("Restart them")
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
|
||||
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -14,12 +14,13 @@ import (
|
||||
|
||||
type logLimiter struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
}
|
||||
|
||||
func newLogLimiter(provider mirrorProvider) *logLimiter {
|
||||
return &logLimiter{
|
||||
provider: provider,
|
||||
emptyHook: emptyHook{
|
||||
provider: provider,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,11 +45,13 @@ type mirrorProvider interface {
|
||||
Hooks() []jobHook
|
||||
|
||||
Interval() time.Duration
|
||||
Retry() int
|
||||
|
||||
WorkingDir() string
|
||||
LogDir() string
|
||||
LogFile() string
|
||||
IsMaster() bool
|
||||
DataSize() string
|
||||
|
||||
// enter context
|
||||
EnterContext() *Context
|
||||
@@ -86,6 +88,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
if mirror.Interval == 0 {
|
||||
mirror.Interval = cfg.Global.Interval
|
||||
}
|
||||
if mirror.Retry == 0 {
|
||||
mirror.Retry = cfg.Global.Retry
|
||||
}
|
||||
logDir = formatLogDir(logDir, mirror)
|
||||
|
||||
// IsMaster
|
||||
@@ -110,13 +115,14 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
retry: mirror.Retry,
|
||||
env: mirror.Env,
|
||||
}
|
||||
p, err := newCmdProvider(pc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.isMaster = isMaster
|
||||
provider = p
|
||||
case provRsync:
|
||||
rc := rsyncConfig{
|
||||
@@ -130,13 +136,15 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
useIPv4: mirror.UseIPv4,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
retry: mirror.Retry,
|
||||
}
|
||||
p, err := newRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.isMaster = isMaster
|
||||
provider = p
|
||||
case provTwoStageRsync:
|
||||
rc := twoStageRsyncConfig{
|
||||
@@ -152,12 +160,13 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
retry: mirror.Retry,
|
||||
}
|
||||
p, err := newTwoStageRsyncProvider(rc)
|
||||
p.isMaster = isMaster
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.isMaster = isMaster
|
||||
provider = p
|
||||
default:
|
||||
panic(errors.New("Invalid mirror provider"))
|
||||
|
||||
@@ -73,17 +73,20 @@ func TestRsyncProvider(t *testing.T) {
|
||||
echo "syncing to $(pwd)"
|
||||
echo $RSYNC_PASSWORD $@
|
||||
sleep 1
|
||||
echo "Total file size: 1.33T bytes"
|
||||
echo "Done"
|
||||
exit 0
|
||||
`
|
||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Total file size: 1.33T bytes\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
@@ -98,6 +101,7 @@ exit 0
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||
// fmt.Println(string(loggedContent))
|
||||
So(provider.DataSize(), ShouldEqual, "1.33T")
|
||||
})
|
||||
|
||||
})
|
||||
@@ -144,11 +148,12 @@ exit 0
|
||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
@@ -260,6 +265,40 @@ sleep 5
|
||||
|
||||
})
|
||||
})
|
||||
Convey("Command Provider without log file should work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
c := cmdConfig{
|
||||
name: "run-ls",
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "ls",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: "/dev/null",
|
||||
interval: 600 * time.Second,
|
||||
}
|
||||
|
||||
provider, err := newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(provider.IsMaster(), ShouldEqual, false)
|
||||
So(provider.ZFS(), ShouldBeNil)
|
||||
So(provider.Type(), ShouldEqual, provCommand)
|
||||
So(provider.Name(), ShouldEqual, c.name)
|
||||
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
|
||||
So(provider.LogDir(), ShouldEqual, c.logDir)
|
||||
So(provider.LogFile(), ShouldEqual, c.logFile)
|
||||
So(provider.Interval(), ShouldEqual, c.interval)
|
||||
|
||||
Convey("Run the command", func() {
|
||||
|
||||
err = provider.Run()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestTwoStageRsyncProvider(t *testing.T) {
|
||||
@@ -280,6 +319,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
|
||||
logFile: tmpFile,
|
||||
useIPv6: true,
|
||||
excludeFile: tmpFile,
|
||||
username: "hello",
|
||||
password: "world",
|
||||
}
|
||||
|
||||
provider, err := newTwoStageRsyncProvider(c)
|
||||
@@ -306,6 +347,7 @@ exit 0
|
||||
err = provider.Run()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
@@ -313,14 +355,14 @@ exit 0
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
||||
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
|
||||
"--exclude-from %s %s %s",
|
||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||
),
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
|
||||
@@ -2,8 +2,11 @@ package worker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
type rsyncConfig struct {
|
||||
@@ -11,15 +14,17 @@ type rsyncConfig struct {
|
||||
rsyncCmd string
|
||||
upstreamURL, username, password, excludeFile string
|
||||
workingDir, logDir, logFile string
|
||||
useIPv6 bool
|
||||
useIPv6, useIPv4 bool
|
||||
interval time.Duration
|
||||
retry int
|
||||
}
|
||||
|
||||
// An RsyncProvider provides the implementation to rsync-based syncing jobs
|
||||
type rsyncProvider struct {
|
||||
baseProvider
|
||||
rsyncConfig
|
||||
options []string
|
||||
options []string
|
||||
dataSize string
|
||||
}
|
||||
|
||||
func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
||||
@@ -27,11 +32,15 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
||||
if !strings.HasSuffix(c.upstreamURL, "/") {
|
||||
return nil, errors.New("rsync upstream URL should ends with /")
|
||||
}
|
||||
if c.retry == 0 {
|
||||
c.retry = defaultMaxRetry
|
||||
}
|
||||
provider := &rsyncProvider{
|
||||
baseProvider: baseProvider{
|
||||
name: c.name,
|
||||
ctx: NewContext(),
|
||||
interval: c.interval,
|
||||
retry: c.retry,
|
||||
},
|
||||
rsyncConfig: c,
|
||||
}
|
||||
@@ -49,6 +58,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
||||
|
||||
if c.useIPv6 {
|
||||
options = append(options, "-6")
|
||||
} else if c.useIPv4 {
|
||||
options = append(options, "-4")
|
||||
}
|
||||
|
||||
if c.excludeFile != "" {
|
||||
@@ -71,14 +82,31 @@ func (p *rsyncProvider) Upstream() string {
|
||||
return p.upstreamURL
|
||||
}
|
||||
|
||||
func (p *rsyncProvider) DataSize() string {
|
||||
return p.dataSize
|
||||
}
|
||||
|
||||
func (p *rsyncProvider) Run() error {
|
||||
p.dataSize = ""
|
||||
if err := p.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
return p.Wait()
|
||||
if err := p.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
|
||||
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *rsyncProvider) Start() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{}
|
||||
if p.username != "" {
|
||||
@@ -92,7 +120,7 @@ func (p *rsyncProvider) Start() error {
|
||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||
|
||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -41,13 +41,17 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
|
||||
"--name", d.Name(),
|
||||
"-w", workingDir,
|
||||
}
|
||||
// specify user
|
||||
args = append(
|
||||
args, "-u",
|
||||
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
|
||||
)
|
||||
// add volumes
|
||||
for _, vol := range d.Volumes() {
|
||||
logger.Debugf("volume: %s", vol)
|
||||
args = append(args, "-v", vol)
|
||||
}
|
||||
// set env
|
||||
env["TUNASYNC_LOG_FILE"] = d.LogFile()
|
||||
for k, v := range env {
|
||||
kv := fmt.Sprintf("%s=%s", k, v)
|
||||
args = append(args, "-e", kv)
|
||||
@@ -114,6 +118,9 @@ func (c *cmdJob) Wait() error {
|
||||
return c.retErr
|
||||
default:
|
||||
err := c.cmd.Wait()
|
||||
if c.cmd.Stdout != nil {
|
||||
c.cmd.Stdout.(*os.File).Close()
|
||||
}
|
||||
c.retErr = err
|
||||
close(c.finished)
|
||||
return err
|
||||
|
||||
@@ -15,6 +15,11 @@ type scheduleQueue struct {
|
||||
jobs map[string]bool
|
||||
}
|
||||
|
||||
type jobScheduleInfo struct {
|
||||
jobName string
|
||||
nextScheduled time.Time
|
||||
}
|
||||
|
||||
func timeLessThan(l, r interface{}) bool {
|
||||
tl := l.(time.Time)
|
||||
tr := r.(time.Time)
|
||||
@@ -28,6 +33,20 @@ func newScheduleQueue() *scheduleQueue {
|
||||
return queue
|
||||
}
|
||||
|
||||
func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) {
|
||||
cur := q.list.Iterator()
|
||||
defer cur.Close()
|
||||
|
||||
for cur.Next() {
|
||||
cj := cur.Value().(*mirrorJob)
|
||||
jobs = append(jobs, jobScheduleInfo{
|
||||
cj.Name(),
|
||||
cur.Key().(time.Time),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
@@ -3,8 +3,11 @@ package worker
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
type twoStageRsyncConfig struct {
|
||||
@@ -15,6 +18,7 @@ type twoStageRsyncConfig struct {
|
||||
workingDir, logDir, logFile string
|
||||
useIPv6 bool
|
||||
interval time.Duration
|
||||
retry int
|
||||
}
|
||||
|
||||
// An RsyncProvider provides the implementation to rsync-based syncing jobs
|
||||
@@ -23,6 +27,7 @@ type twoStageRsyncProvider struct {
|
||||
twoStageRsyncConfig
|
||||
stage1Options []string
|
||||
stage2Options []string
|
||||
dataSize string
|
||||
}
|
||||
|
||||
var rsyncStage1Profiles = map[string]([]string){
|
||||
@@ -38,12 +43,16 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
|
||||
if !strings.HasSuffix(c.upstreamURL, "/") {
|
||||
return nil, errors.New("rsync upstream URL should ends with /")
|
||||
}
|
||||
if c.retry == 0 {
|
||||
c.retry = defaultMaxRetry
|
||||
}
|
||||
|
||||
provider := &twoStageRsyncProvider{
|
||||
baseProvider: baseProvider{
|
||||
name: c.name,
|
||||
ctx: NewContext(),
|
||||
interval: c.interval,
|
||||
retry: c.retry,
|
||||
},
|
||||
twoStageRsyncConfig: c,
|
||||
stage1Options: []string{
|
||||
@@ -78,6 +87,10 @@ func (p *twoStageRsyncProvider) Upstream() string {
|
||||
return p.upstreamURL
|
||||
}
|
||||
|
||||
func (p *twoStageRsyncProvider) DataSize() string {
|
||||
return p.dataSize
|
||||
}
|
||||
|
||||
func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
||||
var options []string
|
||||
if stage == 1 {
|
||||
@@ -108,7 +121,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
||||
}
|
||||
|
||||
func (p *twoStageRsyncProvider) Run() error {
|
||||
defer p.Wait()
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{}
|
||||
if p.username != "" {
|
||||
@@ -118,6 +136,7 @@ func (p *twoStageRsyncProvider) Run() error {
|
||||
env["RSYNC_PASSWORD"] = p.password
|
||||
}
|
||||
|
||||
p.dataSize = ""
|
||||
stages := []int{1, 2}
|
||||
for _, stage := range stages {
|
||||
command := []string{p.rsyncCmd}
|
||||
@@ -129,7 +148,7 @@ func (p *twoStageRsyncProvider) Run() error {
|
||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||
|
||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(stage > 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -137,12 +156,17 @@ func (p *twoStageRsyncProvider) Run() error {
|
||||
return err
|
||||
}
|
||||
p.isRunning.Store(true)
|
||||
logger.Debugf("set isRunning to true: %s", p.Name())
|
||||
|
||||
err = p.cmd.Wait()
|
||||
p.isRunning.Store(false)
|
||||
p.Unlock()
|
||||
err = p.Wait()
|
||||
p.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
|
||||
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@ import (
|
||||
. "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
var tunasyncWorker *Worker
|
||||
|
||||
// A Worker is a instance of tunasync worker
|
||||
type Worker struct {
|
||||
L sync.Mutex
|
||||
@@ -29,10 +27,11 @@ type Worker struct {
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// GetTUNASyncWorker returns a singalton worker
|
||||
func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
if tunasyncWorker != nil {
|
||||
return tunasyncWorker
|
||||
// NewTUNASyncWorker creates a worker
|
||||
func NewTUNASyncWorker(cfg *Config) *Worker {
|
||||
|
||||
if cfg.Global.Retry == 0 {
|
||||
cfg.Global.Retry = defaultMaxRetry
|
||||
}
|
||||
|
||||
w := &Worker{
|
||||
@@ -57,7 +56,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
|
||||
w.initJobs()
|
||||
w.makeHTTPServer()
|
||||
tunasyncWorker = w
|
||||
return w
|
||||
}
|
||||
|
||||
@@ -219,7 +217,11 @@ func (w *Worker) makeHTTPServer() {
|
||||
}
|
||||
switch cmd.Cmd {
|
||||
case CmdStart:
|
||||
job.ctrlChan <- jobStart
|
||||
if cmd.Options["force"] {
|
||||
job.ctrlChan <- jobForceStart
|
||||
} else {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
case CmdRestart:
|
||||
job.ctrlChan <- jobRestart
|
||||
case CmdStop:
|
||||
@@ -306,6 +308,9 @@ func (w *Worker) runSchedule() {
|
||||
|
||||
w.L.Unlock()
|
||||
|
||||
schedInfo := w.schedule.GetJobs()
|
||||
w.updateSchedInfo(schedInfo)
|
||||
|
||||
tick := time.Tick(5 * time.Second)
|
||||
for {
|
||||
select {
|
||||
@@ -342,6 +347,9 @@ func (w *Worker) runSchedule() {
|
||||
w.schedule.AddJob(schedTime, job)
|
||||
}
|
||||
|
||||
schedInfo = w.schedule.GetJobs()
|
||||
w.updateSchedInfo(schedInfo)
|
||||
|
||||
case <-tick:
|
||||
// check schedule every 5 seconds
|
||||
if job := w.schedule.Pop(); job != nil {
|
||||
@@ -412,6 +420,12 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
|
||||
ErrorMsg: jobMsg.msg,
|
||||
}
|
||||
|
||||
// Certain Providers (rsync for example) may know the size of mirror,
|
||||
// so we report it to Manager here
|
||||
if len(job.size) != 0 {
|
||||
smsg.Size = job.size
|
||||
}
|
||||
|
||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
|
||||
@@ -423,6 +437,27 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) {
|
||||
var s []MirrorSchedule
|
||||
for _, sched := range schedInfo {
|
||||
s = append(s, MirrorSchedule{
|
||||
MirrorName: sched.jobName,
|
||||
NextSchedule: sched.nextScheduled,
|
||||
})
|
||||
}
|
||||
msg := MirrorSchedules{Schedules: s}
|
||||
|
||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/schedules", root, w.Name(),
|
||||
)
|
||||
logger.Debugf("reporting on manager url: %s", url)
|
||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to upload schedules: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) fetchJobStatus() []MirrorStatus {
|
||||
var mirrorList []MirrorStatus
|
||||
apiBase := w.cfg.Manager.APIBaseList()[0]
|
||||
|
||||
253
worker/worker_test.go
普通文件
253
worker/worker_test.go
普通文件
@@ -0,0 +1,253 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
. "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
type workTestFunc func(*Worker)
|
||||
|
||||
var managerPort = 5001
|
||||
var workerPort = 5002
|
||||
|
||||
func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
|
||||
r := gin.Default()
|
||||
r.GET("/ping", func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
|
||||
})
|
||||
r.POST("/workers", func(c *gin.Context) {
|
||||
var _worker WorkerStatus
|
||||
c.BindJSON(&_worker)
|
||||
_worker.LastOnline = time.Now()
|
||||
recvData <- _worker
|
||||
c.JSON(http.StatusOK, _worker)
|
||||
})
|
||||
r.POST("/workers/dut/schedules", func(c *gin.Context) {
|
||||
var _sch MirrorSchedules
|
||||
c.BindJSON(&_sch)
|
||||
recvData <- _sch
|
||||
c.JSON(http.StatusOK, empty{})
|
||||
})
|
||||
r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
|
||||
var status MirrorStatus
|
||||
c.BindJSON(&status)
|
||||
recvData <- status
|
||||
c.JSON(http.StatusOK, status)
|
||||
})
|
||||
r.GET("/workers/dut/jobs", func(c *gin.Context) {
|
||||
mirrorStatusList := []MirrorStatus{}
|
||||
c.JSON(http.StatusOK, mirrorStatusList)
|
||||
})
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func startWorkerThenStop(cfg *Config, tester workTestFunc) {
|
||||
exitedChan := make(chan int)
|
||||
w := NewTUNASyncWorker(cfg)
|
||||
So(w, ShouldNotBeNil)
|
||||
go func() {
|
||||
w.Run()
|
||||
exitedChan <- 1
|
||||
}()
|
||||
|
||||
tester(w)
|
||||
|
||||
w.Halt()
|
||||
select {
|
||||
case exited := <-exitedChan:
|
||||
So(exited, ShouldEqual, 1)
|
||||
case <-time.After(2 * time.Second):
|
||||
So(0, ShouldEqual, 1)
|
||||
}
|
||||
|
||||
}
|
||||
func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
|
||||
workerCmd := WorkerCmd{
|
||||
Cmd: cmd,
|
||||
MirrorID: mirror,
|
||||
}
|
||||
logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
|
||||
_, err := PostJSON(workerURL, workerCmd, httpClient)
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
func TestWorker(t *testing.T) {
|
||||
InitLogger(false, true, false)
|
||||
|
||||
recvDataChan := make(chan interface{})
|
||||
_s := makeMockManagerServer(recvDataChan)
|
||||
httpServer := &http.Server{
|
||||
Addr: "localhost:" + strconv.Itoa(managerPort),
|
||||
Handler: _s,
|
||||
ReadTimeout: 2 * time.Second,
|
||||
WriteTimeout: 2 * time.Second,
|
||||
}
|
||||
go func() {
|
||||
err := httpServer.ListenAndServe()
|
||||
So(err, ShouldBeNil)
|
||||
}()
|
||||
|
||||
Convey("Worker should work", t, func(ctx C) {
|
||||
|
||||
httpClient, err := CreateHTTPClient("")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
workerPort++
|
||||
|
||||
workerCfg := Config{
|
||||
Global: globalConfig{
|
||||
Name: "dut",
|
||||
LogDir: "/tmp",
|
||||
MirrorDir: "/tmp",
|
||||
Concurrent: 2,
|
||||
Interval: 1,
|
||||
},
|
||||
Server: serverConfig{
|
||||
Hostname: "localhost",
|
||||
Addr: "127.0.0.1",
|
||||
Port: workerPort,
|
||||
},
|
||||
Manager: managerConfig{
|
||||
APIBase: "http://localhost:" + strconv.Itoa(managerPort),
|
||||
},
|
||||
}
|
||||
logger.Debugf("worker port %d", workerPort)
|
||||
Convey("with no job", func(ctx C) {
|
||||
dummyTester := func(*Worker) {
|
||||
registered := false
|
||||
for {
|
||||
select {
|
||||
case data := <-recvDataChan:
|
||||
if reg, ok := data.(WorkerStatus); ok {
|
||||
So(reg.ID, ShouldEqual, "dut")
|
||||
registered = true
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
|
||||
} else if sch, ok := data.(MirrorSchedules); ok {
|
||||
So(len(sch.Schedules), ShouldEqual, 0)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
So(registered, ShouldBeTrue)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startWorkerThenStop(&workerCfg, dummyTester)
|
||||
})
|
||||
Convey("with one job", func(ctx C) {
|
||||
workerCfg.Mirrors = []mirrorConfig{
|
||||
mirrorConfig{
|
||||
Name: "job-ls",
|
||||
Provider: provCommand,
|
||||
Command: "ls",
|
||||
},
|
||||
}
|
||||
|
||||
dummyTester := func(*Worker) {
|
||||
url := ""
|
||||
jobRunning := false
|
||||
lastStatus := SyncStatus(None)
|
||||
for {
|
||||
select {
|
||||
case data := <-recvDataChan:
|
||||
if reg, ok := data.(WorkerStatus); ok {
|
||||
So(reg.ID, ShouldEqual, "dut")
|
||||
url = reg.URL
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
|
||||
} else if sch, ok := data.(MirrorSchedules); ok {
|
||||
if !jobRunning {
|
||||
So(len(sch.Schedules), ShouldEqual, 1)
|
||||
So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
|
||||
So(sch.Schedules[0].NextSchedule,
|
||||
ShouldHappenBetween,
|
||||
time.Now().Add(-2*time.Second),
|
||||
time.Now().Add(1*time.Minute))
|
||||
}
|
||||
} else if status, ok := data.(MirrorStatus); ok {
|
||||
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
|
||||
jobRunning = status.Status == PreSyncing || status.Status == Syncing
|
||||
So(status.Status, ShouldNotEqual, Failed)
|
||||
lastStatus = status.Status
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
So(url, ShouldNotEqual, "")
|
||||
So(jobRunning, ShouldBeFalse)
|
||||
So(lastStatus, ShouldEqual, Success)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startWorkerThenStop(&workerCfg, dummyTester)
|
||||
})
|
||||
Convey("with several jobs", func(ctx C) {
|
||||
workerCfg.Mirrors = []mirrorConfig{
|
||||
mirrorConfig{
|
||||
Name: "job-ls-1",
|
||||
Provider: provCommand,
|
||||
Command: "ls",
|
||||
},
|
||||
mirrorConfig{
|
||||
Name: "job-fail",
|
||||
Provider: provCommand,
|
||||
Command: "non-existent-command-xxxx",
|
||||
},
|
||||
mirrorConfig{
|
||||
Name: "job-ls-2",
|
||||
Provider: provCommand,
|
||||
Command: "ls",
|
||||
},
|
||||
}
|
||||
|
||||
dummyTester := func(*Worker) {
|
||||
url := ""
|
||||
lastStatus := make(map[string]SyncStatus)
|
||||
nextSch := make(map[string]time.Time)
|
||||
for {
|
||||
select {
|
||||
case data := <-recvDataChan:
|
||||
if reg, ok := data.(WorkerStatus); ok {
|
||||
So(reg.ID, ShouldEqual, "dut")
|
||||
url = reg.URL
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
|
||||
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
|
||||
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
|
||||
} else if sch, ok := data.(MirrorSchedules); ok {
|
||||
//So(len(sch.Schedules), ShouldEqual, 3)
|
||||
for _, item := range sch.Schedules {
|
||||
nextSch[item.MirrorName] = item.NextSchedule
|
||||
}
|
||||
} else if status, ok := data.(MirrorStatus); ok {
|
||||
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
|
||||
jobRunning := status.Status == PreSyncing || status.Status == Syncing
|
||||
if !jobRunning {
|
||||
if status.Name == "job-fail" {
|
||||
So(status.Status, ShouldEqual, Failed)
|
||||
} else {
|
||||
So(status.Status, ShouldNotEqual, Failed)
|
||||
}
|
||||
}
|
||||
lastStatus[status.Name] = status.Status
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
So(len(lastStatus), ShouldEqual, 3)
|
||||
So(len(nextSch), ShouldEqual, 3)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startWorkerThenStop(&workerCfg, dummyTester)
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package worker
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/user"
|
||||
"strings"
|
||||
|
||||
"github.com/codeskyblue/go-sh"
|
||||
@@ -10,36 +11,44 @@ import (
|
||||
|
||||
type zfsHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
zpool string
|
||||
zpool string
|
||||
}
|
||||
|
||||
func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
|
||||
return &zfsHook{
|
||||
provider: provider,
|
||||
zpool: zpool,
|
||||
emptyHook: emptyHook{
|
||||
provider: provider,
|
||||
},
|
||||
zpool: zpool,
|
||||
}
|
||||
}
|
||||
|
||||
// create zfs dataset for a new mirror
|
||||
func (z *zfsHook) printHelpMessage() {
|
||||
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
|
||||
zfsDataset = strings.ToLower(zfsDataset)
|
||||
workingDir := z.provider.WorkingDir()
|
||||
logger.Infof("You may create the ZFS dataset with:")
|
||||
logger.Infof(" zfs create '%s'", zfsDataset)
|
||||
logger.Infof(" zfs set mountpoint='%s' '%s'", workingDir, zfsDataset)
|
||||
usr, err := user.Current()
|
||||
if err != nil || usr.Uid == "0" {
|
||||
return
|
||||
}
|
||||
logger.Infof(" chown %s '%s'", usr.Uid, workingDir)
|
||||
}
|
||||
|
||||
// check if working directory is a zfs dataset
|
||||
func (z *zfsHook) preJob() error {
|
||||
workingDir := z.provider.WorkingDir()
|
||||
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
|
||||
// sudo zfs create $zfsDataset
|
||||
// sudo zfs set mountpoint=${absPath} ${zfsDataset}
|
||||
|
||||
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
|
||||
// Unknown issue of ZFS:
|
||||
// dataset name should not contain upper case letters
|
||||
zfsDataset = strings.ToLower(zfsDataset)
|
||||
logger.Infof("Creating ZFS dataset %s", zfsDataset)
|
||||
if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
|
||||
if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Errorf("Directory %s doesn't exist", workingDir)
|
||||
z.printHelpMessage()
|
||||
return err
|
||||
}
|
||||
if err := sh.Command("mountpoint", "-q", workingDir).Run(); err != nil {
|
||||
logger.Errorf("%s is not a mount point", workingDir)
|
||||
z.printHelpMessage()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
48
worker/zfs_hook_test.go
普通文件
48
worker/zfs_hook_test.go
普通文件
@@ -0,0 +1,48 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestZFSHook(t *testing.T) {
|
||||
|
||||
Convey("ZFS Hook should work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
tmpFile := filepath.Join(tmpDir, "log_file")
|
||||
|
||||
c := cmdConfig{
|
||||
name: "tuna_zfs_hook_test",
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "ls",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: tmpFile,
|
||||
interval: 1 * time.Second,
|
||||
}
|
||||
|
||||
provider, err := newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
Convey("When working directory doesn't exist", func(ctx C) {
|
||||
|
||||
errRm := os.RemoveAll(tmpDir)
|
||||
So(errRm, ShouldBeNil)
|
||||
|
||||
hook := newZfsHook(provider, "test_pool")
|
||||
err := hook.preJob()
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
Convey("When working directory is not a mount point", func(ctx C) {
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
hook := newZfsHook(provider, "test_pool")
|
||||
err := hook.preJob()
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
在新工单中引用
屏蔽一个用户