1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-06 22:46:47 +00:00

45 次代码提交

作者 SHA1 备注 提交日期
z4yx
a065a11b38 change timeout in tests 2019-11-07 12:29:57 +08:00
z4yx
b4fe4db82a Merge remote-tracking branch 'origin/dev' 2019-11-04 23:11:34 +08:00
z4yx
839363aaaa reschedule the job if any hook fails 2019-11-04 22:52:03 +08:00
Yuxiang Zhang
08aee8eb42 Merge pull request #98 from ziqin/feature/btrfs-snapshot
Reimplement Btrfs snapshots hook
2019-08-31 10:57:44 +08:00
Jeeken Wang
501f77ee41 Merge branch 'master' into feature/btrfs-snapshot 2019-08-15 01:26:28 +08:00
z4yx
9e91fd706e Merge branch 'dev' 2019-08-13 23:10:43 +08:00
z4yx
94cf0b4bdb fix possible null dereferencing, reported by #96 2019-08-13 23:07:01 +08:00
WANG Ziqin
8fd2059013 add doc for setup btrfs snapshots 2019-08-02 13:31:33 +08:00
WANG Ziqin
6b56c4254c feat(btrfs_snapshot_hook): reimplemented Btrfs snapshots
TODO: test coverage
2019-08-02 13:31:33 +08:00
Yuxiang Zhang
3872c41607 Merge pull request #97 from ziqin/master
Refine: remove outer `provider`s which shadow the embedded `provider`s provided by `emptyHook`
2019-08-02 09:27:04 +08:00
WANG Ziqin
30259da0f0 fix nil pointer dereference: check err first 2019-08-02 02:15:22 +08:00
WANG Ziqin
4854d9b981 Fix test: initialize dockerHook with embedded provider 2019-07-31 17:29:28 +08:00
WANG Ziqin
06fce98c00 Eliminate duplicate mirrorProvider in Hooks 2019-07-31 16:11:56 +08:00
Jeeken Wang
8408236646 Update "Job Run Process" diagram according to runJobWrapper 2019-07-31 12:26:09 +08:00
z4yx
540eea8aeb set golang version to 1.11 2019-07-05 16:54:29 +08:00
z4yx
a6fc97889d [bug fix] stalled scheduler if post-sync hook runs for a time which is longer than the sync interval 2019-07-05 16:29:00 +08:00
Yuxiang Zhang
5f7d974469 Merge pull request #93 from vgxbj/patch-1
Fix ascii chart for `Job Run Process`
2019-05-30 10:16:22 +08:00
Guō Xīng
3b52f93e7e Fix ascii chart for Job Run Process 2019-05-29 14:32:50 +08:00
zyx
1025189542 fix possible null dereferencing in server_test 2019-04-13 11:13:17 +08:00
zyx
9f91d90fc5 check Retry configuration in providers 2019-04-13 11:01:56 +08:00
zyx
1aa4ae9cc1 Merge remote-tracking branch 'kinosang/master' into wip-test-pr 2019-04-13 02:07:41 +08:00
zyx
d0deeb19a9 extract mirror size from rsync provider automatically 2019-04-13 01:27:35 +08:00
zyx
a283328dc4 increase test converage of worker 2019-04-12 09:43:57 +08:00
zyx
1890bbed3c add tests for last commit 2019-04-11 12:36:43 +08:00
zyx
ddc9efd155 report next scheduled sync time 2019-04-11 12:36:18 +08:00
zyx
7eb119b892 singleton of worker is not used, so remove it 2019-04-11 10:07:42 +08:00
zyx
96f11f57ed throw an error if executing reload command without worker id 2019-04-09 22:30:08 +08:00
Yuxiang Zhang
3e6e6f9b14 Update tips.md 2019-04-07 21:48:57 +08:00
Yuxiang Zhang
b06cadfe06 Update tips.md 2019-04-07 21:48:00 +08:00
Yuxiang Zhang
9c34372ae4 add link to tips.md 2019-04-07 21:35:40 +08:00
Yuxiang Zhang
ebbfff40f6 Merge pull request #91 from SCU-MingYuan/master
Added some control tips
2019-04-07 21:33:33 +08:00
GaryH4
5eeade22fc Update tips.md 2019-04-07 19:55:13 +08:00
GaryH4
4b3741308b Update tips.md 2019-04-06 23:48:33 +08:00
GaryH4
7d495c1956 Update tips.md 2019-04-06 23:40:43 +08:00
GaryH4
0bf8400077 Added some tips 2019-04-06 23:30:04 +08:00
Yuxiang Zhang
c611759394 Update get_started.md 2019-04-06 11:21:22 +08:00
Yuxiang Zhang
279aa32b68 Update get_started.md 2019-04-06 11:09:24 +08:00
Yuxiang Zhang
025544449a remove section of certificate generation 2019-04-06 10:56:38 +08:00
zyx
90d419ca66 add tests for last commit 2019-03-31 12:16:45 +08:00
zyx
96cb975412 Let user create ZFS dataset manually due to security considerations 2019-03-31 12:09:42 +08:00
王邈
ff3e690497 Revert "change owner of folder to current user after creating zfs dataset (close #89)"
This reverts commit a58e6d37ae and
re-opens #89.

Signed-off-by: 王邈 <shankerwangmiao@gmail.com>
2019-03-26 00:30:06 +08:00
zyx
a58e6d37ae change owner of folder to current user after creating zfs dataset (close #89) 2019-03-25 23:40:04 +08:00
zhang
7a4a8ad486 Merge branch 'master' of github.com:tuna/tunasync 2018-10-25 22:52:21 +08:00
zhang
e1c0c25efa add example of worker config 2018-10-25 22:52:02 +08:00
7IN0SAN9
563860d424 fix #63 2017-03-27 13:09:56 +08:00
共有 36 个文件被更改,包括 974 次插入148 次删除

查看文件

@@ -2,7 +2,7 @@ sudo: required
language: go
go:
- 1.8
- 1.11
before_install:
- sudo apt-get install cgroup-bin

查看文件

@@ -40,73 +40,16 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
# Job Run Process
PreSyncing Syncing Success
+-----------+ +-----------+ +-------------+ +--------------+
| pre-job +--+->| job run +--->| post-exec +-+-->| post-success |
+-----------+ ^ +-----------+ +-------------+ | +--------------+
| |
| +-----------------+ | Failed
+------+ post-fail |<---------+
+-----------------+
PreSyncing Syncing Success
+-----------+ +----------+ +-----------+ +-------------+ +--------------+
| pre-job +--+->| pre-exec +--->| job run +--->| post-exec +-+-->| post-success |
+-----------+ ^ +----------+ +-----------+ +-------------+ | +--------------+
| |
| +-----------------+ | Failed
+----------------+ post-fail |<---------------+
+-----------------+
```
## Generate Self-Signed Certificate
First, create root CA
```
openssl genrsa -out rootCA.key 2048
openssl req -x509 -new -nodes -key rootCA.key -days 365 -out rootCA.crt
```
Create host key
```
openssl genrsa -out host.key 2048
```
Now create CSR, before that, write a `req.cnf`
```
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = BJ
localityName = Locality Name (eg, city)
localityName_default = Beijing
organizationalUnitName = Organizational Unit Name (eg, section)
organizationalUnitName_default = TUNA
commonName = Common Name (server FQDN or domain name)
commonName_default = <server_FQDN>
commonName_max = 64
[v3_req]
# Extensions to add to a certificate request
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1 = <server_FQDN_1>
DNS.2 = <server_FQDN_2>
```
Substitute `<server_FQDN>` with your server's FQDN, then run
```
openssl req -new -key host.key -out host.csr -config req.cnf
```
Finally generate and sign host cert with root CA
```
openssl x509 -req -in host.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out host.crt -days 365 -extensions v3_req -extfile req.cnf
```
## Building

查看文件

@@ -60,7 +60,7 @@ func startWorker(c *cli.Context) error {
os.Exit(1)
}
w := worker.GetTUNASyncWorker(cfg)
w := worker.NewTUNASyncWorker(cfg)
if w == nil {
logger.Errorf("Error intializing TUNA sync worker.")
os.Exit(1)

查看文件

@@ -375,6 +375,11 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc {
return func(c *cli.Context) error {
if c.String("worker") == "" {
return cli.NewExitError("Please specify the worker with -w <worker-id>", 1)
}
cmd := tunasync.ClientCmd{
Cmd: cmd,
WorkerID: c.String("worker"),

查看文件

@@ -42,7 +42,7 @@ interval = 1
[manager]
api_base = "http://localhost:12345"
token = "some_token"
token = ""
ca_cert = ""
[cgroup]
@@ -98,6 +98,22 @@ $ tunasync worker --config ~/tunasync_demo/worker.conf
$ tunasynctl list -p 12345 --all
```
tunasynctl 也支持配置文件。配置文件可以放在 `/etc/tunasync/ctl.conf` 或者 `~/.config/tunasync/ctl.conf` 两个位置,后者可以覆盖前者的配置值。
配置文件内容为:
```
manager_addr = "127.0.0.1"
manager_port = 12345
ca_cert = ""
```
### 安全
worker 和 manager 之间用 http(s) 通信,如果你 worker 和 manager 都是在本机,那么没必要使用 https。此时 manager 就不指定 `ssl_key``ssl_cert`,留空;worker 的 `ca_cert` 留空,`api_base``http://` 开头。
如果需要加密的通信,manager 需要指定 `ssl_key``ssl_cert`,worker 要指定 `ca_cert`,并且 `api_base` 应该是 `https://` 开头。
## 更进一步
可以参看
@@ -108,3 +124,7 @@ $ tunasync worker --help
```
可以看一下 log 目录
一些 worker 配置文件示例 [workers.conf](workers.conf)
你可能会用到的操作 [tips.md](tips.md)

93
docs/zh_CN/tips.md 普通文件
查看文件

@@ -0,0 +1,93 @@
## 删除某worker的某镜像
先确定已经给tunasynctl写好config文件`~/.config/tunasync/ctl.conf`
```toml
manager_addr = "127.0.0.1"
manager_port = 12345
ca_cert = ""
```
接着
```shell
$ tunasynctl disable -w <worker_id> <mirror_name>
$ tunasynctl flush
```
## 热重载 `worker.conf`
```shell
$ tunasynctl reload -w <worker_id>
```
e.g. 删除 `test_worker``elvish` 镜像:
1. 删除存放镜像的文件夹
2. 删除 `worker.conf` 中对应的 `mirror` 段落
3. 接着操作:
```shell
$ tunasynctl reload -w test_worker
$ tunasynctl disable -w test_worker elvish
$ tunasynctl flush
```
4. (可选)最后删除日志文件夹里的日志
## 删除worker
```shell
$ tunasynctl rm-worker -w <worker_id>
```
e.g.
```shell
$ tunasynctl rm-worker -w test_worker
```
## 更新镜像的大小
```shell
$ tunasynctl set-size -w <worker_id> <mirror_name> <size>
```
其中,末尾的 <size> 参数,由操作者设定,或由某定时脚本生成
由于 `du -s` 比较耗时,故镜像大小可直接由rsync的日志文件读出
## Btrfs 文件系统快照
如果镜像文件存放在以 Btrfs 为文件系统的分区中,可启用由 Btrfs 提供的快照 (Snapshot) 功能。对于每一个镜像,tunasync 在每次成功同步后更新其快照。
`worker.conf` 中添加如下配置,即可启用 Btrfs 快照功能:
```toml
[btrfs_snapshot]
enable = true
snapshot_path = "/path/to/snapshot/directory"
```
其中 `snapshot_path` 为快照所在目录。如将其作为发布版本,则镜像同步过程对于镜像站用户而言具有原子性。如此可避免用户接收到仍处于“中间态”的(未完成同步的)文件。
也可以在 `[[mirrors]]` 中为特定镜像单独指定快照路径,如:
```toml
[[mirrors]]
name = "elvish"
provider = "rsync"
upstream = "rsync://rsync.elvish.io/elvish/"
interval = 1440
snapshot_path = "/data/publish/elvish"
```
**提示:**
若运行 tunasync 的用户无 root 权限,请确保该用户对镜像同步目录和快照目录均具有写和执行权限,并使用 [`user_subvol_rm_allowed` 选项](https://btrfs.wiki.kernel.org/index.php/Manpage/btrfs(5)#MOUNT_OPTIONS)挂载相应的 Btrfs 分区。

77
docs/zh_CN/workers.conf 普通文件
查看文件

@@ -0,0 +1,77 @@
[global]
name = "mirror_worker"
log_dir = "/srv/tunasync/log/tunasync/{{.Name}}"
mirror_dir = "/srv/tunasync"
concurrent = 10
interval = 1
[manager]
api_base = "http://localhost:12345"
token = "some_token"
ca_cert = ""
[cgroup]
enable = false
base_path = "/sys/fs/cgroup"
group = "tunasync"
[server]
hostname = "localhost"
listen_addr = "127.0.0.1"
listen_port = 6000
ssl_cert = ""
ssl_key = ""
[[mirrors]]
name = "adobe-fonts"
interval = 1440
provider = "command"
upstream = "https://github.com/adobe-fonts"
#https://github.com/tuna/tunasync-scripts/blob/master/adobe-fonts.sh
command = "/home/scripts/adobe-fonts.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "anaconda"
provider = "command"
upstream = "https://repo.continuum.io/"
#https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py
command = "/home/scripts/anaconda.py"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gnu"
provider = "rsync"
upstream = "rsync://mirrors.ocf.berkeley.edu/gnu/"
memory_limit = "256M"
[[mirrors]]
name = "pypi"
provider = "command"
upstream = "https://pypi.python.org/"
#https://github.com/tuna/tunasync-scripts/blob/master/pypi.sh
command = "/home/scripts/pypi.sh"
docker_image = "tunathu/tunasync-scripts:latest"
interval = 5
# set environment varialbes
[mirrors.env]
INIT = "0"
[[mirrors]]
name = "debian"
interval = 720
provider = "rsync"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/"
memory_limit = "256M"
[[mirrors]]
name = "ubuntu"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://archive.ubuntu.com/ubuntu/"
memory_limit = "256M"
# vim: ft=toml

查看文件

@@ -14,6 +14,7 @@ type MirrorStatus struct {
Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"`
Scheduled time.Time `json:"next_schedule"`
Upstream string `json:"upstream"`
Size string `json:"size"`
ErrorMsg string `json:"error_msg"`
@@ -28,6 +29,15 @@ type WorkerStatus struct {
LastOnline time.Time `json:"last_online"` // last seen
}
type MirrorSchedules struct {
Schedules []MirrorSchedule `json:"schedules"`
}
type MirrorSchedule struct {
MirrorName string `json:"name"`
NextSchedule time.Time `json:"next_schedule"`
}
// A CmdVerb is an action to a job or worker
type CmdVerb uint8

查看文件

@@ -45,6 +45,8 @@ type WebMirrorStatus struct {
LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"`
LastEndedTs stampTime `json:"last_ended_ts"`
Scheduled textTime `json:"next_schedule"`
ScheduledTs stampTime `json:"next_schedule_ts"`
Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size
}
@@ -58,6 +60,8 @@ func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded},
LastEndedTs: stampTime{m.LastEnded},
Scheduled: textTime{m.Scheduled},
ScheduledTs: stampTime{m.Scheduled},
Upstream: m.Upstream,
Size: m.Size,
}

查看文件

@@ -21,6 +21,8 @@ func TestStatus(t *testing.T) {
LastUpdateTs: stampTime{t},
LastEnded: textTime{t},
LastEndedTs: stampTime{t},
Scheduled: textTime{t},
ScheduledTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
@@ -42,6 +44,10 @@ func TestStatus(t *testing.T) {
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
@@ -53,6 +59,7 @@ func TestStatus(t *testing.T) {
Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(),
Scheduled: time.Now().Add(time.Minute * 5),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
@@ -70,6 +77,10 @@ func TestStatus(t *testing.T) {
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Scheduled.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.ScheduledTs.Unix(), ShouldEqual, m.Scheduled.Unix())
So(m2.Scheduled.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.ScheduledTs.UnixNano(), ShouldEqual, m.Scheduled.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})

查看文件

@@ -8,6 +8,7 @@ import (
"errors"
"io/ioutil"
"net/http"
"regexp"
"time"
)
@@ -84,3 +85,14 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response,
}
return resp, json.Unmarshal(body, obj)
}
func ExtractSizeFromRsyncLog(content []byte) string {
// (?m) flag enables multi-line mode
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`)
matches := re.FindAllSubmatch(content, -1)
// fmt.Printf("%q\n", matches)
if len(matches) == 0 {
return ""
}
return string(matches[len(matches)-1][1])
}

32
internal/util_test.go 普通文件
查看文件

@@ -0,0 +1,32 @@
package internal
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestExtractSizeFromRsyncLog(t *testing.T) {
realLogContent := `
Number of files: 998,470 (reg: 925,484, dir: 58,892, link: 14,094)
Number of created files: 1,049 (reg: 1,049)
Number of deleted files: 1,277 (reg: 1,277)
Number of regular files transferred: 5,694
Total file size: 1.33T bytes
Total transferred file size: 2.86G bytes
Literal data: 780.62M bytes
Matched data: 2.08G bytes
File list size: 37.55M
File list generation time: 7.845 seconds
File list transfer time: 0.000 seconds
Total bytes sent: 7.55M
Total bytes received: 823.25M
sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec
total size is 1.33T speedup is 1,604.11
`
Convey("Log parser should work", t, func() {
res := ExtractSizeFromRsyncLog([]byte(realLogContent))
So(res, ShouldEqual, "1.33T")
})
}

查看文件

@@ -91,6 +91,7 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
workerValidateGroup.POST(":id/schedules", s.updateSchedulesOfWorker)
}
// for tunasynctl to post commands
@@ -240,6 +241,48 @@ func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
})
}
func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
workerID := c.Param("id")
var schedules MirrorSchedules
c.BindJSON(&schedules)
for _, schedule := range schedules.Schedules {
mirrorName := schedule.MirrorName
if len(mirrorName) == 0 {
s.returnErrJSON(
c, http.StatusBadRequest,
errors.New("Mirror Name should not be empty"),
)
}
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
if err != nil {
fmt.Errorf("failed to get job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
continue
}
if curStatus.Scheduled == schedule.NextSchedule {
// no changes, skip update
continue
}
curStatus.Scheduled = schedule.NextSchedule
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
}
type empty struct{}
c.JSON(http.StatusOK, empty{})
}
func (s *Manager) updateJobOfWorker(c *gin.Context) {
workerID := c.Param("id")
var status MirrorStatus

查看文件

@@ -109,7 +109,7 @@ func TestHTTPServer(t *testing.T) {
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("flush disabled jobs", func(ctx C) {
Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil)
clt := &http.Client{}
@@ -132,8 +132,8 @@ func TestHTTPServer(t *testing.T) {
Size: "unknown",
}
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("list mirror status of an existed worker", func(ctx C) {
@@ -202,6 +202,20 @@ func TestHTTPServer(t *testing.T) {
})
})
Convey("Update schedule of valid mirrors", func(ctx C) {
msg := MirrorSchedules{
[]MirrorSchedule{
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
},
}
url := fmt.Sprintf("%s/workers/%s/schedules", baseURL, status.Worker)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
})
Convey("Update size of an invalid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
@@ -218,8 +232,8 @@ func TestHTTPServer(t *testing.T) {
status.Status = Failed
time.Sleep(3 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("What if syncing job failed", func(ctx C) {
@@ -263,6 +277,24 @@ func TestHTTPServer(t *testing.T) {
So(err, ShouldBeNil)
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("update schedule of an non-existent worker", func(ctx C) {
invalidWorker := "test_worker2"
sch := MirrorSchedules{
[]MirrorSchedule{
MirrorSchedule{"arch-sync1", time.Now().Add(time.Minute * 10)},
MirrorSchedule{"arch-sync2", time.Now().Add(time.Minute * 7)},
},
}
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/schedules",
baseURL, invalidWorker), sch, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
defer resp.Body.Close()
var msg map[string]string
err = json.NewDecoder(resp.Body).Decode(&msg)
So(err, ShouldBeNil)
So(msg[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("handle client command", func(ctx C) {
cmdChan := make(chan WorkerCmd, 1)
workerServer := makeMockWorkerServer(cmdChan)
@@ -281,11 +313,11 @@ func TestHTTPServer(t *testing.T) {
// run the mock worker server
workerServer.Run(bindAddress)
}()
time.Sleep(50 * time.Microsecond)
time.Sleep(50 * time.Millisecond)
// verify the worker mock server is running
workerResp, err := http.Get(workerBaseURL + "/ping")
defer workerResp.Body.Close()
So(err, ShouldBeNil)
defer workerResp.Body.Close()
So(workerResp.StatusCode, ShouldEqual, http.StatusOK)
Convey("when client send wrong cmd", func(ctx C) {
@@ -295,8 +327,8 @@ func TestHTTPServer(t *testing.T) {
WorkerID: "not_exist_worker",
}
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
})
@@ -308,9 +340,8 @@ func TestHTTPServer(t *testing.T) {
}
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
time.Sleep(50 * time.Microsecond)
select {

查看文件

@@ -15,6 +15,7 @@ type baseProvider struct {
ctx *Context
name string
interval time.Duration
retry int
isMaster bool
cmd *cmdJob
@@ -50,6 +51,10 @@ func (p *baseProvider) Interval() time.Duration {
return p.interval
}
func (p *baseProvider) Retry() int {
return p.retry
}
func (p *baseProvider) IsMaster() bool {
return p.isMaster
}
@@ -161,3 +166,7 @@ func (p *baseProvider) Terminate() error {
return err
}
func (p *baseProvider) DataSize() string {
return ""
}

查看文件

@@ -0,0 +1,90 @@
package worker
import (
"fmt"
"os"
"path/filepath"
"github.com/dennwc/btrfs"
)
type btrfsSnapshotHook struct {
provider mirrorProvider
mirrorSnapshotPath string
}
// the user who runs the jobs (typically `tunasync`) should be granted the permission to run btrfs commands
// TODO: check if the filesystem is Btrfs
func newBtrfsSnapshotHook(provider mirrorProvider, snapshotPath string, mirror mirrorConfig) *btrfsSnapshotHook {
mirrorSnapshotPath := mirror.SnapshotPath
if mirrorSnapshotPath == "" {
mirrorSnapshotPath = filepath.Join(snapshotPath, provider.Name())
}
return &btrfsSnapshotHook{
provider: provider,
mirrorSnapshotPath: mirrorSnapshotPath,
}
}
// check if path `snapshotPath/providerName` exists
// Case 1: Not exists => create a new subvolume
// Case 2: Exists as a subvolume => nothing to do
// Case 3: Exists as a directory => error detected
func (h *btrfsSnapshotHook) preJob() error {
path := h.provider.WorkingDir()
if _, err := os.Stat(path); os.IsNotExist(err) {
// create subvolume
err := btrfs.CreateSubVolume(path)
if err != nil {
logger.Errorf("failed to create Btrfs subvolume %s: %s", path, err.Error())
return err
}
logger.Noticef("created new Btrfs subvolume %s", path)
} else {
if is, err := btrfs.IsSubVolume(path); err != nil {
return err
} else if !is {
return fmt.Errorf("path %s exists but isn't a Btrfs subvolume", path)
}
}
return nil
}
func (h *btrfsSnapshotHook) preExec() error {
return nil
}
func (h *btrfsSnapshotHook) postExec() error {
return nil
}
// delete old snapshot if exists, then create a new snapshot
func (h *btrfsSnapshotHook) postSuccess() error {
if _, err := os.Stat(h.mirrorSnapshotPath); !os.IsNotExist(err) {
isSubVol, err := btrfs.IsSubVolume(h.mirrorSnapshotPath)
if err != nil {
return err
} else if !isSubVol {
return fmt.Errorf("path %s exists and isn't a Btrfs snapshot", h.mirrorSnapshotPath)
}
// is old snapshot => delete it
if err := btrfs.DeleteSubVolume(h.mirrorSnapshotPath); err != nil {
logger.Errorf("failed to delete old Btrfs snapshot %s", h.mirrorSnapshotPath)
return err
}
logger.Noticef("deleted old snapshot %s", h.mirrorSnapshotPath)
}
// create a new writable snapshot
// (the snapshot is writable so that it can be deleted easily)
if err := btrfs.SnapshotSubVolume(h.provider.WorkingDir(), h.mirrorSnapshotPath, false); err != nil {
logger.Errorf("failed to create new Btrfs snapshot %s", h.mirrorSnapshotPath)
return err
}
logger.Noticef("created new Btrfs snapshot %s", h.mirrorSnapshotPath)
return nil
}
// keep the old snapshot => nothing to do
func (h *btrfsSnapshotHook) postFail() error {
return nil
}

查看文件

@@ -17,7 +17,6 @@ import (
type cgroupHook struct {
emptyHook
provider mirrorProvider
basePath string
baseGroup string
created bool
@@ -36,7 +35,9 @@ func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit st
subsystem = "cpu"
}
return &cgroupHook{
provider: p,
emptyHook: emptyHook{
provider: p,
},
basePath: basePath,
baseGroup: baseGroup,
subsystem: subsystem,

查看文件

@@ -12,6 +12,7 @@ type cmdConfig struct {
upstreamURL, command string
workingDir, logDir, logFile string
interval time.Duration
retry int
env map[string]string
}
@@ -23,11 +24,15 @@ type cmdProvider struct {
func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
// TODO: check config options
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &cmdProvider{
baseProvider: baseProvider{
name: c.name,
ctx: NewContext(),
interval: c.interval,
retry: c.retry,
},
cmdConfig: c,
}

查看文件

@@ -8,6 +8,6 @@ import (
type empty struct{}
const maxRetry = 2
const defaultMaxRetry = 2
var logger = logging.MustGetLogger("tunasync")

查看文件

@@ -33,14 +33,15 @@ func (p *providerEnum) UnmarshalText(text []byte) error {
// Config represents worker config options
type Config struct {
Global globalConfig `toml:"global"`
Manager managerConfig `toml:"manager"`
Server serverConfig `toml:"server"`
Cgroup cgroupConfig `toml:"cgroup"`
ZFS zfsConfig `toml:"zfs"`
Docker dockerConfig `toml:"docker"`
Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"`
Global globalConfig `toml:"global"`
Manager managerConfig `toml:"manager"`
Server serverConfig `toml:"server"`
Cgroup cgroupConfig `toml:"cgroup"`
ZFS zfsConfig `toml:"zfs"`
BtrfsSnapshot btrfsSnapshotConfig `toml:"btrfs_snapshot"`
Docker dockerConfig `toml:"docker"`
Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"`
}
type globalConfig struct {
@@ -49,6 +50,7 @@ type globalConfig struct {
MirrorDir string `toml:"mirror_dir"`
Concurrent int `toml:"concurrent"`
Interval int `toml:"interval"`
Retry int `toml:"retry"`
ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"`
@@ -95,6 +97,11 @@ type zfsConfig struct {
Zpool string `toml:"zpool"`
}
type btrfsSnapshotConfig struct {
Enable bool `toml:"enable"`
SnapshotPath string `toml:"snapshot_path"`
}
type includeConfig struct {
IncludeMirrors string `toml:"include_mirrors"`
}
@@ -108,6 +115,7 @@ type mirrorConfig struct {
Provider providerEnum `toml:"provider"`
Upstream string `toml:"upstream"`
Interval int `toml:"interval"`
Retry int `toml:"retry"`
MirrorDir string `toml:"mirror_dir"`
LogDir string `toml:"log_dir"`
Env map[string]string `toml:"env"`
@@ -134,6 +142,8 @@ type mirrorConfig struct {
DockerImage string `toml:"docker_image"`
DockerVolumes []string `toml:"docker_volumes"`
DockerOptions []string `toml:"docker_options"`
SnapshotPath string `toml:"snapshot_path"`
}
// LoadConfig loads configuration

查看文件

@@ -18,6 +18,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors"
concurrent = 10
interval = 240
retry = 3
[manager]
api_base = "https://127.0.0.1:5000"
@@ -35,6 +36,7 @@ name = "AOSP"
provider = "command"
upstream = "https://aosp.google.com/"
interval = 720
retry = 2
mirror_dir = "/data/git/AOSP"
exec_on_success = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -116,6 +118,7 @@ use_ipv6 = true
So(err, ShouldBeNil)
So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@@ -126,6 +129,7 @@ use_ipv6 = true
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1]

查看文件

@@ -7,10 +7,9 @@ import (
type dockerHook struct {
emptyHook
provider mirrorProvider
image string
volumes []string
options []string
image string
volumes []string
options []string
}
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
@@ -23,10 +22,12 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
options = append(options, mCfg.DockerOptions...)
return &dockerHook{
provider: p,
image: mCfg.DockerImage,
volumes: volumes,
options: options,
emptyHook: emptyHook{
provider: p,
},
image: mCfg.DockerImage,
volumes: volumes,
options: options,
}
}

查看文件

@@ -55,8 +55,10 @@ sleep 10
So(err, ShouldBeNil)
d := &dockerHook{
provider: provider,
image: "alpine",
emptyHook: emptyHook{
provider: provider,
},
image: "alpine",
volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
},

查看文件

@@ -18,7 +18,6 @@ const (
type execPostHook struct {
emptyHook
provider mirrorProvider
// exec on success or on failure
execOn uint8
@@ -37,9 +36,11 @@ func newExecPostHook(provider mirrorProvider, execOn uint8, command string) (*ex
}
return &execPostHook{
provider: provider,
execOn: execOn,
command: cmd,
emptyHook: emptyHook{
provider: provider,
},
execOn: execOn,
command: cmd,
}, nil
}

查看文件

@@ -92,7 +92,7 @@ exit 1
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < maxRetry; i++ {
for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan

查看文件

@@ -53,6 +53,7 @@ type mirrorJob struct {
ctrlChan chan ctrlAction
disabled chan empty
state uint32
size string
}
func newMirrorJob(provider mirrorProvider) *mirrorJob {
@@ -112,7 +113,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
managerChan <- jobMessage{
tunasync.Failed, m.Name(),
fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()),
false,
true,
}
return err
}
@@ -138,7 +139,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return err
}
for retry := 0; retry < maxRetry; retry++ {
for retry := 0; retry < provider.Retry(); retry++ {
stopASAP := false // stop job as soon as possible
if retry > 0 {
@@ -182,26 +183,33 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
if syncErr == nil {
// syncing success
logger.Noticef("succeeded syncing %s", m.Name())
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
// post-success hooks
logger.Debug("post-success hooks")
err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success")
if err != nil {
return err
}
return nil
} else {
// syncing failed
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
// post-fail hooks
logger.Debug("post-fail hooks")
err := runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
}
if syncErr == nil {
// syncing success
m.size = provider.DataSize()
managerChan <- jobMessage{tunasync.Success, m.Name(), "", (m.State() == stateReady)}
return nil
}
// syncing failed
logger.Warningf("failed syncing %s: %s", m.Name(), syncErr.Error())
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == maxRetry-1) && (m.State() == stateReady)}
managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error(), (retry == provider.Retry()-1) && (m.State() == stateReady)}
// post-fail hooks
logger.Debug("post-fail hooks")
err = runHooks(rHooks, func(h jobHook) error { return h.postFail() }, "post-fail")
if err != nil {
return err
}
// gracefully exit
if stopASAP {
logger.Debug("No retry, exit directly")

查看文件

@@ -14,12 +14,13 @@ import (
type logLimiter struct {
emptyHook
provider mirrorProvider
}
func newLogLimiter(provider mirrorProvider) *logLimiter {
return &logLimiter{
provider: provider,
emptyHook: emptyHook{
provider: provider,
},
}
}

查看文件

@@ -45,11 +45,13 @@ type mirrorProvider interface {
Hooks() []jobHook
Interval() time.Duration
Retry() int
WorkingDir() string
LogDir() string
LogFile() string
IsMaster() bool
DataSize() string
// enter context
EnterContext() *Context
@@ -86,6 +88,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
if mirror.Interval == 0 {
mirror.Interval = cfg.Global.Interval
}
if mirror.Retry == 0 {
mirror.Retry = cfg.Global.Retry
}
logDir = formatLogDir(logDir, mirror)
// IsMaster
@@ -110,13 +115,14 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
env: mirror.Env,
}
p, err := newCmdProvider(pc)
p.isMaster = isMaster
if err != nil {
panic(err)
}
p.isMaster = isMaster
provider = p
case provRsync:
rc := rsyncConfig{
@@ -132,12 +138,13 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
useIPv6: mirror.UseIPv6,
useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
}
p, err := newRsyncProvider(rc)
p.isMaster = isMaster
if err != nil {
panic(err)
}
p.isMaster = isMaster
provider = p
case provTwoStageRsync:
rc := twoStageRsyncConfig{
@@ -153,12 +160,13 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6,
interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
}
p, err := newTwoStageRsyncProvider(rc)
p.isMaster = isMaster
if err != nil {
panic(err)
}
p.isMaster = isMaster
provider = p
default:
panic(errors.New("Invalid mirror provider"))
@@ -172,6 +180,11 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
}
// Add Btrfs Snapshot Hook
if cfg.BtrfsSnapshot.Enable {
provider.AddHook(newBtrfsSnapshotHook(provider, cfg.BtrfsSnapshot.SnapshotPath, mirror))
}
// Add Docker Hook
if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))

查看文件

@@ -73,6 +73,7 @@ func TestRsyncProvider(t *testing.T) {
echo "syncing to $(pwd)"
echo $RSYNC_PASSWORD $@
sleep 1
echo "Total file size: 1.33T bytes"
echo "Done"
exit 0
`
@@ -83,6 +84,7 @@ exit 0
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Total file size: 1.33T bytes\n"+
"Done\n",
targetDir,
fmt.Sprintf(
@@ -99,6 +101,7 @@ exit 0
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
// fmt.Println(string(loggedContent))
So(provider.DataSize(), ShouldEqual, "1.33T")
})
})

查看文件

@@ -2,8 +2,11 @@ package worker
import (
"errors"
"io/ioutil"
"strings"
"time"
"github.com/tuna/tunasync/internal"
)
type rsyncConfig struct {
@@ -13,13 +16,15 @@ type rsyncConfig struct {
workingDir, logDir, logFile string
useIPv6, useIPv4 bool
interval time.Duration
retry int
}
// An RsyncProvider provides the implementation to rsync-based syncing jobs
type rsyncProvider struct {
baseProvider
rsyncConfig
options []string
options []string
dataSize string
}
func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
@@ -27,11 +32,15 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if !strings.HasSuffix(c.upstreamURL, "/") {
return nil, errors.New("rsync upstream URL should ends with /")
}
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &rsyncProvider{
baseProvider: baseProvider{
name: c.name,
ctx: NewContext(),
interval: c.interval,
retry: c.retry,
},
rsyncConfig: c,
}
@@ -73,11 +82,22 @@ func (p *rsyncProvider) Upstream() string {
return p.upstreamURL
}
func (p *rsyncProvider) DataSize() string {
return p.dataSize
}
func (p *rsyncProvider) Run() error {
p.dataSize = ""
if err := p.Start(); err != nil {
return err
}
return p.Wait()
if err := p.Wait(); err != nil {
return err
}
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil
}
func (p *rsyncProvider) Start() error {

查看文件

@@ -15,6 +15,11 @@ type scheduleQueue struct {
jobs map[string]bool
}
type jobScheduleInfo struct {
jobName string
nextScheduled time.Time
}
func timeLessThan(l, r interface{}) bool {
tl := l.(time.Time)
tr := r.(time.Time)
@@ -28,6 +33,20 @@ func newScheduleQueue() *scheduleQueue {
return queue
}
func (q *scheduleQueue) GetJobs() (jobs []jobScheduleInfo) {
cur := q.list.Iterator()
defer cur.Close()
for cur.Next() {
cj := cur.Value().(*mirrorJob)
jobs = append(jobs, jobScheduleInfo{
cj.Name(),
cur.Key().(time.Time),
})
}
return
}
func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) {
q.Lock()
defer q.Unlock()

查看文件

@@ -3,8 +3,11 @@ package worker
import (
"errors"
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/tuna/tunasync/internal"
)
type twoStageRsyncConfig struct {
@@ -15,6 +18,7 @@ type twoStageRsyncConfig struct {
workingDir, logDir, logFile string
useIPv6 bool
interval time.Duration
retry int
}
// An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -23,6 +27,7 @@ type twoStageRsyncProvider struct {
twoStageRsyncConfig
stage1Options []string
stage2Options []string
dataSize string
}
var rsyncStage1Profiles = map[string]([]string){
@@ -38,12 +43,16 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
if !strings.HasSuffix(c.upstreamURL, "/") {
return nil, errors.New("rsync upstream URL should ends with /")
}
if c.retry == 0 {
c.retry = defaultMaxRetry
}
provider := &twoStageRsyncProvider{
baseProvider: baseProvider{
name: c.name,
ctx: NewContext(),
interval: c.interval,
retry: c.retry,
},
twoStageRsyncConfig: c,
stage1Options: []string{
@@ -78,6 +87,10 @@ func (p *twoStageRsyncProvider) Upstream() string {
return p.upstreamURL
}
func (p *twoStageRsyncProvider) DataSize() string {
return p.dataSize
}
func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
var options []string
if stage == 1 {
@@ -123,6 +136,7 @@ func (p *twoStageRsyncProvider) Run() error {
env["RSYNC_PASSWORD"] = p.password
}
p.dataSize = ""
stages := []int{1, 2}
for _, stage := range stages {
command := []string{p.rsyncCmd}
@@ -151,5 +165,8 @@ func (p *twoStageRsyncProvider) Run() error {
return err
}
}
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil {
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil
}

查看文件

@@ -12,8 +12,6 @@ import (
. "github.com/tuna/tunasync/internal"
)
var tunasyncWorker *Worker
// A Worker is a instance of tunasync worker
type Worker struct {
L sync.Mutex
@@ -29,10 +27,11 @@ type Worker struct {
httpClient *http.Client
}
// GetTUNASyncWorker returns a singalton worker
func GetTUNASyncWorker(cfg *Config) *Worker {
if tunasyncWorker != nil {
return tunasyncWorker
// NewTUNASyncWorker creates a worker
func NewTUNASyncWorker(cfg *Config) *Worker {
if cfg.Global.Retry == 0 {
cfg.Global.Retry = defaultMaxRetry
}
w := &Worker{
@@ -57,7 +56,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
w.initJobs()
w.makeHTTPServer()
tunasyncWorker = w
return w
}
@@ -310,6 +308,9 @@ func (w *Worker) runSchedule() {
w.L.Unlock()
schedInfo := w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
tick := time.Tick(5 * time.Second)
for {
select {
@@ -346,6 +347,9 @@ func (w *Worker) runSchedule() {
w.schedule.AddJob(schedTime, job)
}
schedInfo = w.schedule.GetJobs()
w.updateSchedInfo(schedInfo)
case <-tick:
// check schedule every 5 seconds
if job := w.schedule.Pop(); job != nil {
@@ -416,6 +420,12 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
ErrorMsg: jobMsg.msg,
}
// Certain Providers (rsync for example) may know the size of mirror,
// so we report it to Manager here
if len(job.size) != 0 {
smsg.Size = job.size
}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
@@ -427,6 +437,27 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
}
}
func (w *Worker) updateSchedInfo(schedInfo []jobScheduleInfo) {
var s []MirrorSchedule
for _, sched := range schedInfo {
s = append(s, MirrorSchedule{
MirrorName: sched.jobName,
NextSchedule: sched.nextScheduled,
})
}
msg := MirrorSchedules{Schedules: s}
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/schedules", root, w.Name(),
)
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to upload schedules: %s", err.Error())
}
}
}
func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus
apiBase := w.cfg.Manager.APIBaseList()[0]

253
worker/worker_test.go 普通文件
查看文件

@@ -0,0 +1,253 @@
package worker
import (
"net/http"
"strconv"
"testing"
"time"
"github.com/gin-gonic/gin"
. "github.com/smartystreets/goconvey/convey"
. "github.com/tuna/tunasync/internal"
)
type workTestFunc func(*Worker)
var managerPort = 5001
var workerPort = 5002
func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"_infoKey": "pong"})
})
r.POST("/workers", func(c *gin.Context) {
var _worker WorkerStatus
c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
recvData <- _worker
c.JSON(http.StatusOK, _worker)
})
r.POST("/workers/dut/schedules", func(c *gin.Context) {
var _sch MirrorSchedules
c.BindJSON(&_sch)
recvData <- _sch
c.JSON(http.StatusOK, empty{})
})
r.POST("/workers/dut/jobs/:job", func(c *gin.Context) {
var status MirrorStatus
c.BindJSON(&status)
recvData <- status
c.JSON(http.StatusOK, status)
})
r.GET("/workers/dut/jobs", func(c *gin.Context) {
mirrorStatusList := []MirrorStatus{}
c.JSON(http.StatusOK, mirrorStatusList)
})
return r
}
func startWorkerThenStop(cfg *Config, tester workTestFunc) {
exitedChan := make(chan int)
w := NewTUNASyncWorker(cfg)
So(w, ShouldNotBeNil)
go func() {
w.Run()
exitedChan <- 1
}()
tester(w)
w.Halt()
select {
case exited := <-exitedChan:
So(exited, ShouldEqual, 1)
case <-time.After(2 * time.Second):
So(0, ShouldEqual, 1)
}
}
func sendCommandToWorker(workerURL string, httpClient *http.Client, cmd CmdVerb, mirror string) {
workerCmd := WorkerCmd{
Cmd: cmd,
MirrorID: mirror,
}
logger.Debugf("POST to %s with cmd %s", workerURL, cmd)
_, err := PostJSON(workerURL, workerCmd, httpClient)
So(err, ShouldBeNil)
}
func TestWorker(t *testing.T) {
InitLogger(false, true, false)
recvDataChan := make(chan interface{})
_s := makeMockManagerServer(recvDataChan)
httpServer := &http.Server{
Addr: "localhost:" + strconv.Itoa(managerPort),
Handler: _s,
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
}
go func() {
err := httpServer.ListenAndServe()
So(err, ShouldBeNil)
}()
Convey("Worker should work", t, func(ctx C) {
httpClient, err := CreateHTTPClient("")
So(err, ShouldBeNil)
workerPort++
workerCfg := Config{
Global: globalConfig{
Name: "dut",
LogDir: "/tmp",
MirrorDir: "/tmp",
Concurrent: 2,
Interval: 1,
},
Server: serverConfig{
Hostname: "localhost",
Addr: "127.0.0.1",
Port: workerPort,
},
Manager: managerConfig{
APIBase: "http://localhost:" + strconv.Itoa(managerPort),
},
}
logger.Debugf("worker port %d", workerPort)
Convey("with no job", func(ctx C) {
dummyTester := func(*Worker) {
registered := false
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
registered = true
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(reg.URL, httpClient, CmdStart, "foobar")
} else if sch, ok := data.(MirrorSchedules); ok {
So(len(sch.Schedules), ShouldEqual, 0)
}
case <-time.After(2 * time.Second):
So(registered, ShouldBeTrue)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
Convey("with one job", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{
Name: "job-ls",
Provider: provCommand,
Command: "ls",
},
}
dummyTester := func(*Worker) {
url := ""
jobRunning := false
lastStatus := SyncStatus(None)
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
url = reg.URL
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(url, httpClient, CmdStart, "job-ls")
} else if sch, ok := data.(MirrorSchedules); ok {
if !jobRunning {
So(len(sch.Schedules), ShouldEqual, 1)
So(sch.Schedules[0].MirrorName, ShouldEqual, "job-ls")
So(sch.Schedules[0].NextSchedule,
ShouldHappenBetween,
time.Now().Add(-2*time.Second),
time.Now().Add(1*time.Minute))
}
} else if status, ok := data.(MirrorStatus); ok {
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
jobRunning = status.Status == PreSyncing || status.Status == Syncing
So(status.Status, ShouldNotEqual, Failed)
lastStatus = status.Status
}
case <-time.After(2 * time.Second):
So(url, ShouldNotEqual, "")
So(jobRunning, ShouldBeFalse)
So(lastStatus, ShouldEqual, Success)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
Convey("with several jobs", func(ctx C) {
workerCfg.Mirrors = []mirrorConfig{
mirrorConfig{
Name: "job-ls-1",
Provider: provCommand,
Command: "ls",
},
mirrorConfig{
Name: "job-fail",
Provider: provCommand,
Command: "non-existent-command-xxxx",
},
mirrorConfig{
Name: "job-ls-2",
Provider: provCommand,
Command: "ls",
},
}
dummyTester := func(*Worker) {
url := ""
lastStatus := make(map[string]SyncStatus)
nextSch := make(map[string]time.Time)
for {
select {
case data := <-recvDataChan:
if reg, ok := data.(WorkerStatus); ok {
So(reg.ID, ShouldEqual, "dut")
url = reg.URL
time.Sleep(500 * time.Millisecond)
sendCommandToWorker(url, httpClient, CmdStart, "job-fail")
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-1")
sendCommandToWorker(url, httpClient, CmdStart, "job-ls-2")
} else if sch, ok := data.(MirrorSchedules); ok {
//So(len(sch.Schedules), ShouldEqual, 3)
for _, item := range sch.Schedules {
nextSch[item.MirrorName] = item.NextSchedule
}
} else if status, ok := data.(MirrorStatus); ok {
logger.Noticef("Job %s status %s", status.Name, status.Status.String())
jobRunning := status.Status == PreSyncing || status.Status == Syncing
if !jobRunning {
if status.Name == "job-fail" {
So(status.Status, ShouldEqual, Failed)
} else {
So(status.Status, ShouldNotEqual, Failed)
}
}
lastStatus[status.Name] = status.Status
}
case <-time.After(2 * time.Second):
So(len(lastStatus), ShouldEqual, 3)
So(len(nextSch), ShouldEqual, 3)
return
}
}
}
startWorkerThenStop(&workerCfg, dummyTester)
})
})
}

查看文件

@@ -3,6 +3,7 @@ package worker
import (
"fmt"
"os"
"os/user"
"strings"
"github.com/codeskyblue/go-sh"
@@ -10,36 +11,44 @@ import (
type zfsHook struct {
emptyHook
provider mirrorProvider
zpool string
zpool string
}
func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
return &zfsHook{
provider: provider,
zpool: zpool,
emptyHook: emptyHook{
provider: provider,
},
zpool: zpool,
}
}
// create zfs dataset for a new mirror
func (z *zfsHook) printHelpMessage() {
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
zfsDataset = strings.ToLower(zfsDataset)
workingDir := z.provider.WorkingDir()
logger.Infof("You may create the ZFS dataset with:")
logger.Infof(" zfs create '%s'", zfsDataset)
logger.Infof(" zfs set mountpoint='%s' '%s'", workingDir, zfsDataset)
usr, err := user.Current()
if err != nil || usr.Uid == "0" {
return
}
logger.Infof(" chown %s '%s'", usr.Uid, workingDir)
}
// check if working directory is a zfs dataset
func (z *zfsHook) preJob() error {
workingDir := z.provider.WorkingDir()
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
// sudo zfs create $zfsDataset
// sudo zfs set mountpoint=${absPath} ${zfsDataset}
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
// Unknown issue of ZFS:
// dataset name should not contain upper case letters
zfsDataset = strings.ToLower(zfsDataset)
logger.Infof("Creating ZFS dataset %s", zfsDataset)
if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
return err
}
logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
return err
}
logger.Errorf("Directory %s doesn't exist", workingDir)
z.printHelpMessage()
return err
}
if err := sh.Command("mountpoint", "-q", workingDir).Run(); err != nil {
logger.Errorf("%s is not a mount point", workingDir)
z.printHelpMessage()
return err
}
return nil
}

48
worker/zfs_hook_test.go 普通文件
查看文件

@@ -0,0 +1,48 @@
package worker
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestZFSHook(t *testing.T) {
Convey("ZFS Hook should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
tmpFile := filepath.Join(tmpDir, "log_file")
c := cmdConfig{
name: "tuna_zfs_hook_test",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 1 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
Convey("When working directory doesn't exist", func(ctx C) {
errRm := os.RemoveAll(tmpDir)
So(errRm, ShouldBeNil)
hook := newZfsHook(provider, "test_pool")
err := hook.preJob()
So(err, ShouldNotBeNil)
})
Convey("When working directory is not a mount point", func(ctx C) {
defer os.RemoveAll(tmpDir)
hook := newZfsHook(provider, "test_pool")
err := hook.preJob()
So(err, ShouldNotBeNil)
})
})
}