镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-08 15:36:47 +00:00
比较提交
41 次代码提交
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
88b7827e66 | ||
|
|
49b74ae552 | ||
|
|
37255cc827 | ||
|
|
136e01f1cd | ||
|
|
cd73602988 | ||
|
|
2a8fa5636e | ||
|
|
94b9b20626 | ||
|
|
5a9c6b9020 | ||
|
|
75ee481cfa | ||
|
|
2f9e96a75a | ||
|
|
aa36b96828 | ||
|
|
e9ce7fc87a | ||
|
|
3fd71d777b | ||
|
|
984f8a1eb5 | ||
|
|
a4d94cae07 | ||
|
|
8ebace4d9a | ||
|
|
b578237df8 | ||
|
|
9f7f18c2c4 | ||
|
|
fd274cc976 | ||
|
|
b4b81ef7e9 | ||
|
|
c8600d094e | ||
|
|
2ba3a27fa3 | ||
|
|
b34238c097 | ||
|
|
16e458f354 | ||
|
|
16b4df1ec2 | ||
|
|
e3c8cded6c | ||
|
|
3809df6cfb | ||
|
|
600874ae54 | ||
|
|
2afe1f2e06 | ||
|
|
1b099520b2 | ||
|
|
85b2105a2b | ||
|
|
45e5d900fb | ||
|
|
7b0cd490b7 | ||
|
|
9178966aed | ||
|
|
b5d2a0ad89 | ||
|
|
d8963c9946 | ||
|
|
198afa72cd | ||
|
|
85ce9c1270 | ||
|
|
a8a35fc259 | ||
|
|
c00eb12a75 | ||
|
|
95ae9c16a9 |
26
.github/workflows/release.yml
vendored
26
.github/workflows/release.yml
vendored
@@ -21,16 +21,12 @@ jobs:
|
|||||||
- name: Check out code into the Go module directory
|
- name: Check out code into the Go module directory
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
- name: Get dependencies
|
|
||||||
run: |
|
|
||||||
go get -v -t -d ./cmd/tunasync
|
|
||||||
go get -v -t -d ./cmd/tunasynctl
|
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: |
|
run: |
|
||||||
make tunasync
|
for i in linux-amd64 linux-arm64; do
|
||||||
make tunasynctl
|
make ARCH=$i all
|
||||||
tar -jcf build/tunasync-linux-bin.tar.bz2 -C build tunasync tunasynctl
|
tar -cz --numeric-owner --owner root --group root -f tunasync-$i-bin.tar.gz -C build-$i tunasync tunasynctl
|
||||||
|
done
|
||||||
|
|
||||||
- name: Create Release
|
- name: Create Release
|
||||||
id: create_release
|
id: create_release
|
||||||
@@ -42,13 +38,9 @@ jobs:
|
|||||||
release_name: Release ${{ github.ref }}
|
release_name: Release ${{ github.ref }}
|
||||||
draft: false
|
draft: false
|
||||||
prerelease: false
|
prerelease: false
|
||||||
- name: Upload Release Asset
|
- name: Upload Release Assets
|
||||||
id: upload-release-asset
|
|
||||||
uses: actions/upload-release-asset@v1
|
|
||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
with:
|
TAG_NAME: ${{ github.ref }}
|
||||||
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
|
run: |
|
||||||
asset_path: ./build/tunasync-linux-bin.tar.bz2
|
hub release edit $(find . -type f -name "tunasync-*.tar.gz" -printf "-a %p ") -m "" "${TAG_NAME##*/}"
|
||||||
asset_name: tunasync-linux-bin.tar.bz2
|
|
||||||
asset_content_type: application/x-bzip2
|
|
||||||
|
|||||||
2
.github/workflows/tunasync.yml
vendored
2
.github/workflows/tunasync.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
|||||||
uses: actions/upload-artifact@v1
|
uses: actions/upload-artifact@v1
|
||||||
with:
|
with:
|
||||||
name: tunasync-bin
|
name: tunasync-bin
|
||||||
path: build/
|
path: build-linux-amd64/
|
||||||
|
|
||||||
test:
|
test:
|
||||||
name: Test
|
name: Test
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1 +1,2 @@
|
|||||||
/build
|
/build
|
||||||
|
/build-*
|
||||||
|
|||||||
25
Makefile
25
Makefile
@@ -1,19 +1,22 @@
|
|||||||
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
|
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
|
||||||
|
ARCH ?= linux-amd64
|
||||||
|
ARCH_LIST = $(subst -, ,$(ARCH))
|
||||||
|
GOOS = $(word 1, $(ARCH_LIST))
|
||||||
|
GOARCH = $(word 2, $(ARCH_LIST))
|
||||||
|
BUILDBIN = tunasync tunasynctl
|
||||||
|
|
||||||
all: get tunasync tunasynctl
|
all: $(BUILDBIN)
|
||||||
|
|
||||||
get:
|
build-$(ARCH):
|
||||||
go get ./cmd/tunasync
|
mkdir -p $@
|
||||||
go get ./cmd/tunasynctl
|
|
||||||
|
|
||||||
build:
|
$(BUILDBIN): % : build-$(ARCH) build-$(ARCH)/%
|
||||||
mkdir -p build
|
|
||||||
|
|
||||||
tunasync: build
|
$(BUILDBIN:%=build-$(ARCH)/%) : build-$(ARCH)/% : cmd/%
|
||||||
go build -o build/tunasync -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasync
|
GOOS=$(GOOS) GOARCH=$(GOARCH) go get ./$<
|
||||||
|
GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $@ -ldflags ${LDFLAGS} github.com/tuna/tunasync/$<
|
||||||
tunasynctl: build
|
|
||||||
go build -o build/tunasynctl -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasynctl
|
|
||||||
|
|
||||||
test:
|
test:
|
||||||
go test -v -covermode=count -coverprofile=profile.cov ./...
|
go test -v -covermode=count -coverprofile=profile.cov ./...
|
||||||
|
|
||||||
|
.PHONY: all test $(BUILDBIN)
|
||||||
|
|||||||
14
README.md
14
README.md
@@ -1,5 +1,4 @@
|
|||||||
tunasync
|
# tunasync
|
||||||
========
|
|
||||||
|
|
||||||

|

|
||||||
[](https://coveralls.io/github/tuna/tunasync?branch=master)
|
[](https://coveralls.io/github/tuna/tunasync?branch=master)
|
||||||
@@ -12,11 +11,11 @@ tunasync
|
|||||||
|
|
||||||
## Download
|
## Download
|
||||||
|
|
||||||
Pre-built binary for Linux x86_64 is available at [Github releases](https://github.com/tuna/tunasync/releases/latest).
|
Pre-built binary for Linux x86_64 and ARM64 is available at [Github releases](https://github.com/tuna/tunasync/releases/latest).
|
||||||
|
|
||||||
## Design
|
## Design
|
||||||
|
|
||||||
```
|
```text
|
||||||
# Architecture
|
# Architecture
|
||||||
|
|
||||||
- Manager: Central instance for status and job management
|
- Manager: Central instance for status and job management
|
||||||
@@ -50,13 +49,12 @@ PreSyncing Syncing Succe
|
|||||||
+-----------------+
|
+-----------------+
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
## Building
|
## Building
|
||||||
|
|
||||||
Go version: 1.13
|
Go version: 1.13
|
||||||
|
|
||||||
```
|
```shell
|
||||||
make all
|
> make all
|
||||||
```
|
```
|
||||||
|
|
||||||
Binaries in the `build/`.
|
Binaries in the `build-linux-amd64/`.
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
"github.com/BurntSushi/toml"
|
||||||
@@ -160,8 +161,31 @@ func listJobs(c *cli.Context) error {
|
|||||||
"of all jobs from manager server: %s", err.Error()),
|
"of all jobs from manager server: %s", err.Error()),
|
||||||
1)
|
1)
|
||||||
}
|
}
|
||||||
genericJobs = jobs
|
if statusStr := c.String("status"); statusStr != "" {
|
||||||
|
filteredJobs := make([]tunasync.WebMirrorStatus, 0, len(jobs))
|
||||||
|
var statuses []tunasync.SyncStatus
|
||||||
|
for _, s := range strings.Split(statusStr, ",") {
|
||||||
|
var status tunasync.SyncStatus
|
||||||
|
err = status.UnmarshalJSON([]byte("\"" + strings.TrimSpace(s) + "\""))
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error parsing status: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
statuses = append(statuses, status)
|
||||||
|
}
|
||||||
|
for _, job := range jobs {
|
||||||
|
for _, s := range statuses {
|
||||||
|
if job.Status == s {
|
||||||
|
filteredJobs = append(filteredJobs, job)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
genericJobs = filteredJobs
|
||||||
|
} else {
|
||||||
|
genericJobs = jobs
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
var jobs []tunasync.MirrorStatus
|
var jobs []tunasync.MirrorStatus
|
||||||
args := c.Args()
|
args := c.Args()
|
||||||
@@ -196,13 +220,46 @@ func listJobs(c *cli.Context) error {
|
|||||||
genericJobs = jobs
|
genericJobs = jobs
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(genericJobs, "", " ")
|
if format := c.String("format"); format != "" {
|
||||||
if err != nil {
|
tpl := template.New("")
|
||||||
return cli.NewExitError(
|
_, err := tpl.Parse(format)
|
||||||
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
if err != nil {
|
||||||
1)
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error parsing format template: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
switch jobs := genericJobs.(type) {
|
||||||
|
case []tunasync.WebMirrorStatus:
|
||||||
|
for _, job := range jobs {
|
||||||
|
err = tpl.Execute(os.Stdout, job)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
case []tunasync.MirrorStatus:
|
||||||
|
for _, job := range jobs {
|
||||||
|
err = tpl.Execute(os.Stdout, job)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
b, err := json.MarshalIndent(genericJobs, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println(string(b))
|
||||||
}
|
}
|
||||||
fmt.Println(string(b))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -506,6 +563,14 @@ func main() {
|
|||||||
Name: "all, a",
|
Name: "all, a",
|
||||||
Usage: "List all jobs of all workers",
|
Usage: "List all jobs of all workers",
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "status, s",
|
||||||
|
Usage: "Filter output based on status provided",
|
||||||
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "format, f",
|
||||||
|
Usage: "Pretty-print containers using a Go template",
|
||||||
|
},
|
||||||
}...),
|
}...),
|
||||||
Action: initializeWrapper(listJobs),
|
Action: initializeWrapper(listJobs),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
# tunasync 上手指南
|
# tunasync 上手指南
|
||||||
|
|
||||||
date: 2016-10-31 00:50:00
|
date: 2016-10-31 00:50:00
|
||||||
|
|
||||||
[tunasync](https://github.com/tuna/tunasync) 是[清华大学 TUNA 镜像源](https://mirrors.tuna.tsinghua.edu.cn)目前使用的镜像方案。
|
[tunasync](https://github.com/tuna/tunasync) 是[清华大学 TUNA 镜像源](https://mirrors.tuna.tsinghua.edu.cn)目前使用的镜像方案。
|
||||||
@@ -7,32 +8,32 @@ date: 2016-10-31 00:50:00
|
|||||||
|
|
||||||
本例中:
|
本例中:
|
||||||
|
|
||||||
- 只镜像[elvish](https://elvish.io)项目
|
- 只镜像[elvish](https://elvish.io)项目
|
||||||
- 禁用了https
|
- 禁用了https
|
||||||
- 禁用了cgroup支持
|
- 禁用了cgroup支持
|
||||||
|
|
||||||
## 获得tunasync
|
## 获得tunasync
|
||||||
|
|
||||||
### 二进制包
|
### 二进制包
|
||||||
|
|
||||||
到 [Github Releases](https://github.com/tuna/tunasync/releases/latest) 下载 `tunasync-linux-bin.tar.gz` 即可。
|
到 [Github Releases](https://github.com/tuna/tunasync/releases/latest) 下载 `tunasync-linux-amd64-bin.tar.gz` 即可。
|
||||||
|
|
||||||
### 自行编译
|
### 自行编译
|
||||||
|
|
||||||
```
|
```shell
|
||||||
$ make
|
> make
|
||||||
```
|
```
|
||||||
|
|
||||||
## 配置
|
## 配置
|
||||||
|
|
||||||
```
|
```shell
|
||||||
$ mkdir ~/tunasync_demo
|
> mkdir ~/tunasync_demo
|
||||||
$ mkdir /tmp/tunasync
|
> mkdir /tmp/tunasync
|
||||||
```
|
```
|
||||||
|
|
||||||
`~/tunasync_demo/worker.conf`:
|
编辑 `~/tunasync_demo/worker.conf`:
|
||||||
|
|
||||||
```
|
```conf
|
||||||
[global]
|
[global]
|
||||||
name = "test_worker"
|
name = "test_worker"
|
||||||
log_dir = "/tmp/tunasync/log/tunasync/{{.Name}}"
|
log_dir = "/tmp/tunasync/log/tunasync/{{.Name}}"
|
||||||
@@ -64,9 +65,9 @@ upstream = "rsync://rsync.elvish.io/elvish/"
|
|||||||
use_ipv6 = false
|
use_ipv6 = false
|
||||||
```
|
```
|
||||||
|
|
||||||
`~/tunasync_demo/manager.conf`:
|
编辑 `~/tunasync_demo/manager.conf`:
|
||||||
|
|
||||||
```
|
```conf
|
||||||
debug = false
|
debug = false
|
||||||
|
|
||||||
[server]
|
[server]
|
||||||
@@ -83,26 +84,26 @@ ca_cert = ""
|
|||||||
|
|
||||||
### 运行
|
### 运行
|
||||||
|
|
||||||
```
|
```shell
|
||||||
$ tunasync manager --config ~/tunasync_demo/manager.conf
|
> tunasync manager --config ~/tunasync_demo/manager.conf
|
||||||
$ tunasync worker --config ~/tunasync_demo/worker.conf
|
> tunasync worker --config ~/tunasync_demo/worker.conf
|
||||||
```
|
```
|
||||||
|
|
||||||
本例中,镜像的数据在`/tmp/tunasync/`
|
本例中,镜像的数据在 `/tmp/tunasync/`。
|
||||||
|
|
||||||
### 控制
|
### 控制
|
||||||
|
|
||||||
查看同步状态
|
查看同步状态
|
||||||
|
|
||||||
```
|
```shell
|
||||||
$ tunasynctl list -p 12345 --all
|
> tunasynctl list -p 12345 --all
|
||||||
```
|
```
|
||||||
|
|
||||||
tunasynctl 也支持配置文件。配置文件可以放在 `/etc/tunasync/ctl.conf` 或者 `~/.config/tunasync/ctl.conf` 两个位置,后者可以覆盖前者的配置值。
|
tunasynctl 也支持配置文件。配置文件可以放在 `/etc/tunasync/ctl.conf` 或者 `~/.config/tunasync/ctl.conf` 两个位置,后者可以覆盖前者的配置值。
|
||||||
|
|
||||||
配置文件内容为:
|
配置文件内容为:
|
||||||
|
|
||||||
```
|
```conf
|
||||||
manager_addr = "127.0.0.1"
|
manager_addr = "127.0.0.1"
|
||||||
manager_port = 12345
|
manager_port = 12345
|
||||||
ca_cert = ""
|
ca_cert = ""
|
||||||
@@ -118,13 +119,13 @@ worker 和 manager 之间用 http(s) 通信,如果你 worker 和 manager 都
|
|||||||
|
|
||||||
可以参看
|
可以参看
|
||||||
|
|
||||||
```
|
```shell
|
||||||
$ tunasync manager --help
|
> tunasync manager --help
|
||||||
$ tunasync worker --help
|
> tunasync worker --help
|
||||||
```
|
```
|
||||||
|
|
||||||
可以看一下 log 目录
|
可以看一下 log 目录
|
||||||
|
|
||||||
一些 worker 配置文件示例 [workers.conf](workers.conf)
|
一些 worker 配置文件示例 [workers.conf](workers.conf)。
|
||||||
|
|
||||||
你可能会用到的操作 [tips.md](tips.md)
|
你可能会用到的操作 [tips.md](tips.md)。
|
||||||
|
|||||||
@@ -7,6 +7,11 @@ mirror_dir = "/srv/tunasync"
|
|||||||
concurrent = 10
|
concurrent = 10
|
||||||
interval = 1
|
interval = 1
|
||||||
|
|
||||||
|
# ensure the exec user be add into `docker` group
|
||||||
|
[docker]
|
||||||
|
# in `command provider` can use docker_image and docker_volumes
|
||||||
|
enable = true
|
||||||
|
|
||||||
[manager]
|
[manager]
|
||||||
api_base = "http://localhost:12345"
|
api_base = "http://localhost:12345"
|
||||||
token = "some_token"
|
token = "some_token"
|
||||||
@@ -486,7 +491,7 @@ name = "pypi"
|
|||||||
provider = "command"
|
provider = "command"
|
||||||
upstream = "https://pypi.python.org/"
|
upstream = "https://pypi.python.org/"
|
||||||
command = "/home/scripts/pypi.sh"
|
command = "/home/scripts/pypi.sh"
|
||||||
docker_image = "tunathu/tunasync-scripts:latest"
|
docker_image = "tunathu/bandersnatch:latest"
|
||||||
interval = 5
|
interval = 5
|
||||||
|
|
||||||
[[mirrors]]
|
[[mirrors]]
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -24,10 +26,11 @@ type MirrorStatus struct {
|
|||||||
// A WorkerStatus is the information struct that describe
|
// A WorkerStatus is the information struct that describe
|
||||||
// a worker, and sent from the manager to clients.
|
// a worker, and sent from the manager to clients.
|
||||||
type WorkerStatus struct {
|
type WorkerStatus struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
URL string `json:"url"` // worker url
|
URL string `json:"url"` // worker url
|
||||||
Token string `json:"token"` // session token
|
Token string `json:"token"` // session token
|
||||||
LastOnline time.Time `json:"last_online"` // last seen
|
LastOnline time.Time `json:"last_online"` // last seen
|
||||||
|
LastRegister time.Time `json:"last_register"` // last register time
|
||||||
}
|
}
|
||||||
|
|
||||||
type MirrorSchedules struct {
|
type MirrorSchedules struct {
|
||||||
@@ -59,21 +62,45 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c CmdVerb) String() string {
|
func (c CmdVerb) String() string {
|
||||||
switch c {
|
mapping := map[CmdVerb]string{
|
||||||
case CmdStart:
|
CmdStart: "start",
|
||||||
return "start"
|
CmdStop: "stop",
|
||||||
case CmdStop:
|
CmdDisable: "disable",
|
||||||
return "stop"
|
CmdRestart: "restart",
|
||||||
case CmdDisable:
|
CmdPing: "ping",
|
||||||
return "disable"
|
CmdReload: "reload",
|
||||||
case CmdRestart:
|
|
||||||
return "restart"
|
|
||||||
case CmdPing:
|
|
||||||
return "ping"
|
|
||||||
case CmdReload:
|
|
||||||
return "reload"
|
|
||||||
}
|
}
|
||||||
return "unknown"
|
return mapping[c]
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCmdVerbFromString(s string) CmdVerb {
|
||||||
|
mapping := map[string]CmdVerb{
|
||||||
|
"start": CmdStart,
|
||||||
|
"stop": CmdStop,
|
||||||
|
"disable": CmdDisable,
|
||||||
|
"restart": CmdRestart,
|
||||||
|
"ping": CmdPing,
|
||||||
|
"reload": CmdReload,
|
||||||
|
}
|
||||||
|
return mapping[s]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal and Unmarshal for CmdVerb
|
||||||
|
func (s CmdVerb) MarshalJSON() ([]byte, error) {
|
||||||
|
buffer := bytes.NewBufferString(`"`)
|
||||||
|
buffer.WriteString(s.String())
|
||||||
|
buffer.WriteString(`"`)
|
||||||
|
return buffer.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *CmdVerb) UnmarshalJSON(b []byte) error {
|
||||||
|
var j string
|
||||||
|
err := json.Unmarshal(b, &j)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*s = NewCmdVerbFromString(j)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// A WorkerCmd is the command message send from the
|
// A WorkerCmd is the command message send from the
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
// Version of the program
|
// Version of the program
|
||||||
const Version string = "0.6.3"
|
const Version string = "0.6.9"
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
|
|
||||||
@@ -16,6 +17,7 @@ type dbAdapter interface {
|
|||||||
GetWorker(workerID string) (WorkerStatus, error)
|
GetWorker(workerID string) (WorkerStatus, error)
|
||||||
DeleteWorker(workerID string) error
|
DeleteWorker(workerID string) error
|
||||||
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
||||||
|
RefreshWorker(workerID string) (WorkerStatus, error)
|
||||||
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
||||||
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
||||||
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
|
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
|
||||||
@@ -26,7 +28,9 @@ type dbAdapter interface {
|
|||||||
|
|
||||||
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
|
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
|
||||||
if dbType == "bolt" {
|
if dbType == "bolt" {
|
||||||
innerDB, err := bolt.Open(dbFile, 0600, nil)
|
innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -122,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
|||||||
return w, err
|
return w, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
|
||||||
|
w, err = b.GetWorker(workerID)
|
||||||
|
if err == nil {
|
||||||
|
w.LastOnline = time.Now()
|
||||||
|
w, err = b.CreateWorker(w)
|
||||||
|
}
|
||||||
|
return w, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
|
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
|
||||||
id := mirrorID + "/" + workerID
|
id := mirrorID + "/" + workerID
|
||||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
ID: id,
|
ID: id,
|
||||||
Token: "token_" + id,
|
Token: "token_" + id,
|
||||||
LastOnline: time.Now(),
|
LastOnline: time.Now(),
|
||||||
|
LastRegister: time.Now(),
|
||||||
}
|
}
|
||||||
w, err = boltDB.CreateWorker(w)
|
w, err = boltDB.CreateWorker(w)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -23,6 +24,7 @@ type Manager struct {
|
|||||||
cfg *Config
|
cfg *Config
|
||||||
engine *gin.Engine
|
engine *gin.Engine
|
||||||
adapter dbAdapter
|
adapter dbAdapter
|
||||||
|
rwmu sync.RWMutex
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// listAllJobs repond with all jobs of specified workers
|
// listAllJobs respond with all jobs of specified workers
|
||||||
func (s *Manager) listAllJobs(c *gin.Context) {
|
func (s *Manager) listAllJobs(c *gin.Context) {
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list all mirror status: %s",
|
err := fmt.Errorf("failed to list all mirror status: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
|||||||
|
|
||||||
// flushDisabledJobs deletes all jobs that marks as deleted
|
// flushDisabledJobs deletes all jobs that marks as deleted
|
||||||
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.FlushDisabledJobs()
|
err := s.adapter.FlushDisabledJobs()
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
|||||||
// deleteWorker deletes one worker by id
|
// deleteWorker deletes one worker by id
|
||||||
func (s *Manager) deleteWorker(c *gin.Context) {
|
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.DeleteWorker(workerID)
|
err := s.adapter.DeleteWorker(workerID)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to delete worker: %s",
|
err := fmt.Errorf("failed to delete worker: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||||
}
|
}
|
||||||
|
|
||||||
// listWrokers respond with informations of all the workers
|
// listWorkers respond with information of all the workers
|
||||||
func (s *Manager) listWorkers(c *gin.Context) {
|
func (s *Manager) listWorkers(c *gin.Context) {
|
||||||
var workerInfos []WorkerStatus
|
var workerInfos []WorkerStatus
|
||||||
|
s.rwmu.RLock()
|
||||||
workers, err := s.adapter.ListWorkers()
|
workers, err := s.adapter.ListWorkers()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list workers: %s",
|
err := fmt.Errorf("failed to list workers: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -193,8 +203,11 @@ func (s *Manager) listWorkers(c *gin.Context) {
|
|||||||
for _, w := range workers {
|
for _, w := range workers {
|
||||||
workerInfos = append(workerInfos,
|
workerInfos = append(workerInfos,
|
||||||
WorkerStatus{
|
WorkerStatus{
|
||||||
ID: w.ID,
|
ID: w.ID,
|
||||||
LastOnline: w.LastOnline,
|
URL: w.URL,
|
||||||
|
Token: "REDACTED",
|
||||||
|
LastOnline: w.LastOnline,
|
||||||
|
LastRegister: w.LastRegister,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, workerInfos)
|
c.JSON(http.StatusOK, workerInfos)
|
||||||
@@ -205,6 +218,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
|
|||||||
var _worker WorkerStatus
|
var _worker WorkerStatus
|
||||||
c.BindJSON(&_worker)
|
c.BindJSON(&_worker)
|
||||||
_worker.LastOnline = time.Now()
|
_worker.LastOnline = time.Now()
|
||||||
|
_worker.LastRegister = time.Now()
|
||||||
newWorker, err := s.adapter.CreateWorker(_worker)
|
newWorker, err := s.adapter.CreateWorker(_worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to register worker: %s",
|
err := fmt.Errorf("failed to register worker: %s",
|
||||||
@@ -223,7 +237,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
|
|||||||
// listJobsOfWorker respond with all the jobs of the specified worker
|
// listJobsOfWorker respond with all the jobs of the specified worker
|
||||||
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
||||||
workerID, err.Error(),
|
workerID, err.Error(),
|
||||||
@@ -255,7 +271,10 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Errorf("failed to get job %s of worker %s: %s",
|
fmt.Errorf("failed to get job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -269,7 +288,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
curStatus.Scheduled = schedule.NextSchedule
|
curStatus.Scheduled = schedule.NextSchedule
|
||||||
|
s.rwmu.Lock()
|
||||||
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -295,7 +316,10 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
|
|
||||||
curTime := time.Now()
|
curTime := time.Now()
|
||||||
|
|
||||||
@@ -331,7 +355,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -353,7 +379,10 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
c.BindJSON(&msg)
|
c.BindJSON(&msg)
|
||||||
|
|
||||||
mirrorName := msg.Name
|
mirrorName := msg.Name
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf(
|
logger.Errorf(
|
||||||
"Failed to get status of mirror %s @<%s>: %s",
|
"Failed to get status of mirror %s @<%s>: %s",
|
||||||
@@ -370,7 +399,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
|
|
||||||
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -393,7 +424,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
w, err := s.adapter.GetWorker(workerID)
|
w, err := s.adapter.GetWorker(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
||||||
s.returnErrJSON(c, http.StatusBadRequest, err)
|
s.returnErrJSON(c, http.StatusBadRequest, err)
|
||||||
@@ -410,7 +443,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
|
|
||||||
// update job status, even if the job did not disable successfully,
|
// update job status, even if the job did not disable successfully,
|
||||||
// this status should be set as disabled
|
// this status should be set as disabled
|
||||||
|
s.rwmu.RLock()
|
||||||
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
changed := false
|
changed := false
|
||||||
switch clientCmd.Cmd {
|
switch clientCmd.Cmd {
|
||||||
case CmdDisable:
|
case CmdDisable:
|
||||||
@@ -421,7 +456,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
changed = true
|
changed = true
|
||||||
}
|
}
|
||||||
if changed {
|
if changed {
|
||||||
|
s.rwmu.Lock()
|
||||||
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
||||||
|
s.rwmu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("when register multiple workers", func(ctx C) {
|
||||||
|
N := 10
|
||||||
|
var cnt uint32
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
w := WorkerStatus{
|
||||||
|
ID: fmt.Sprintf("worker%d", id),
|
||||||
|
}
|
||||||
|
resp, err := PostJSON(baseURL+"/workers", w, nil)
|
||||||
|
ctx.So(err, ShouldBeNil)
|
||||||
|
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
atomic.AddUint32(&cnt, 1)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
So(cnt, ShouldEqual, N)
|
||||||
|
|
||||||
|
Convey("list all workers", func(ctx C) {
|
||||||
|
resp, err := http.Get(baseURL + "/workers")
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
var actualResponseObj []WorkerStatus
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(len(actualResponseObj), ShouldEqual, N+1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Convey("when register a worker", func(ctx C) {
|
Convey("when register a worker", func(ctx C) {
|
||||||
w := WorkerStatus{
|
w := WorkerStatus{
|
||||||
ID: "test_worker1",
|
ID: "test_worker1",
|
||||||
@@ -433,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
|
||||||
|
w, err = b.GetWorker(workerID)
|
||||||
|
if err == nil {
|
||||||
|
w.LastOnline = time.Now()
|
||||||
|
w, err = b.CreateWorker(w)
|
||||||
|
}
|
||||||
|
return w, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
|
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
|
||||||
id := mirrorID + "/" + workerID
|
id := mirrorID + "/" + workerID
|
||||||
status, ok := b.statusStore[id]
|
status, ok := b.statusStore[id]
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
// +build linux
|
||||||
|
|
||||||
package worker
|
package worker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
// +build !linux
|
||||||
|
|
||||||
|
package worker
|
||||||
|
|
||||||
|
type btrfsSnapshotHook struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBtrfsSnapshotHook(provider mirrorProvider, snapshotPath string, mirror mirrorConfig) *btrfsSnapshotHook {
|
||||||
|
return &btrfsSnapshotHook{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *btrfsSnapshotHook) postExec() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *btrfsSnapshotHook) postFail() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *btrfsSnapshotHook) postSuccess() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *btrfsSnapshotHook) preExec() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *btrfsSnapshotHook) preJob() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package worker
|
package worker
|
||||||
|
|
||||||
// put global viables and types here
|
// put global variables and types here
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gopkg.in/op/go-logging.v1"
|
"gopkg.in/op/go-logging.v1"
|
||||||
|
|||||||
@@ -142,6 +142,8 @@ type mirrorConfig struct {
|
|||||||
ExcludeFile string `toml:"exclude_file"`
|
ExcludeFile string `toml:"exclude_file"`
|
||||||
Username string `toml:"username"`
|
Username string `toml:"username"`
|
||||||
Password string `toml:"password"`
|
Password string `toml:"password"`
|
||||||
|
RsyncNoTimeo bool `toml:"rsync_no_timeout"`
|
||||||
|
RsyncTimeout int `toml:"rsync_timeout"`
|
||||||
RsyncOptions []string `toml:"rsync_options"`
|
RsyncOptions []string `toml:"rsync_options"`
|
||||||
RsyncOverride []string `toml:"rsync_override"`
|
RsyncOverride []string `toml:"rsync_override"`
|
||||||
Stage1Profile string `toml:"stage1_profile"`
|
Stage1Profile string `toml:"stage1_profile"`
|
||||||
|
|||||||
@@ -3,6 +3,9 @@ package worker
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/codeskyblue/go-sh"
|
||||||
)
|
)
|
||||||
|
|
||||||
type dockerHook struct {
|
type dockerHook struct {
|
||||||
@@ -16,6 +19,10 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
|
|||||||
volumes := []string{}
|
volumes := []string{}
|
||||||
volumes = append(volumes, gCfg.Volumes...)
|
volumes = append(volumes, gCfg.Volumes...)
|
||||||
volumes = append(volumes, mCfg.DockerVolumes...)
|
volumes = append(volumes, mCfg.DockerVolumes...)
|
||||||
|
if len(mCfg.ExcludeFile) > 0 {
|
||||||
|
arg := fmt.Sprintf("%s:%s:ro", mCfg.ExcludeFile, mCfg.ExcludeFile)
|
||||||
|
volumes = append(volumes, arg)
|
||||||
|
}
|
||||||
|
|
||||||
options := []string{}
|
options := []string{}
|
||||||
options = append(options, gCfg.Options...)
|
options = append(options, gCfg.Options...)
|
||||||
@@ -60,6 +67,27 @@ func (d *dockerHook) postExec() error {
|
|||||||
// sh.Command(
|
// sh.Command(
|
||||||
// "docker", "rm", "-f", d.Name(),
|
// "docker", "rm", "-f", d.Name(),
|
||||||
// ).Run()
|
// ).Run()
|
||||||
|
name := d.Name()
|
||||||
|
retry := 10
|
||||||
|
for ; retry > 0; retry-- {
|
||||||
|
out, err := sh.Command(
|
||||||
|
"docker", "ps", "-a",
|
||||||
|
"--filter", "name=^"+name+"$",
|
||||||
|
"--format", "{{.Status}}",
|
||||||
|
).Output()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("docker ps failed: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
logger.Debugf("container %s still exists: '%s'", name, string(out))
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
if retry == 0 {
|
||||||
|
logger.Warningf("container %s not removed automatically, next sync may fail", name)
|
||||||
|
}
|
||||||
d.provider.ExitContext()
|
d.provider.ExitContext()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -94,22 +94,27 @@ sleep 20
|
|||||||
}
|
}
|
||||||
exitedErr <- err
|
exitedErr <- err
|
||||||
}()
|
}()
|
||||||
cmdRun("ps", []string{"aux"})
|
|
||||||
|
|
||||||
// Wait for docker running
|
// Wait for docker running
|
||||||
time.Sleep(8 * time.Second)
|
for wait := 0; wait < 8; wait++ {
|
||||||
|
names, err := getDockerByName(d.Name())
|
||||||
cmdRun("ps", []string{"aux"})
|
So(err, ShouldBeNil)
|
||||||
|
if names != "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
// cmdRun("ps", []string{"aux"})
|
||||||
|
|
||||||
// assert container running
|
// assert container running
|
||||||
names, err := getDockerByName(d.Name())
|
names, err := getDockerByName(d.Name())
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
// So(names, ShouldEqual, d.Name()+"\n")
|
So(names, ShouldEqual, d.Name()+"\n")
|
||||||
|
|
||||||
err = provider.Terminate()
|
err = provider.Terminate()
|
||||||
// So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
cmdRun("ps", []string{"aux"})
|
// cmdRun("ps", []string{"aux"})
|
||||||
<-exitedErr
|
<-exitedErr
|
||||||
|
|
||||||
// container should be terminated and removed
|
// container should be terminated and removed
|
||||||
|
|||||||
@@ -180,7 +180,6 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
logger.Debug("syncing done")
|
logger.Debug("syncing done")
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
logger.Notice("provider timeout")
|
logger.Notice("provider timeout")
|
||||||
stopASAP = true
|
|
||||||
termErr = provider.Terminate()
|
termErr = provider.Terminate()
|
||||||
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
|
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
|
||||||
case <-kill:
|
case <-kill:
|
||||||
@@ -190,7 +189,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
syncErr = errors.New("killed by manager")
|
syncErr = errors.New("killed by manager")
|
||||||
}
|
}
|
||||||
if termErr != nil {
|
if termErr != nil {
|
||||||
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
|
logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
|
||||||
return termErr
|
return termErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -335,7 +335,6 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
Convey("When a job timed out", func(ctx C) {
|
Convey("When a job timed out", func(ctx C) {
|
||||||
scriptContent := `#!/bin/bash
|
scriptContent := `#!/bin/bash
|
||||||
echo $TUNASYNC_WORKING_DIR
|
echo $TUNASYNC_WORKING_DIR
|
||||||
@@ -371,6 +370,30 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
job.ctrlChan <- jobDisable
|
job.ctrlChan <- jobDisable
|
||||||
<-job.disabled
|
<-job.disabled
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("It should be retried", func(ctx C) {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
msg := <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
|
||||||
|
for i := 0; i < defaultMaxRetry; i++ {
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStart // should be ignored
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Failed)
|
||||||
|
So(msg.msg, ShouldContainSubstring, "timeout after")
|
||||||
|
// re-schedule after last try
|
||||||
|
So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -140,6 +140,8 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
|||||||
password: mirror.Password,
|
password: mirror.Password,
|
||||||
excludeFile: mirror.ExcludeFile,
|
excludeFile: mirror.ExcludeFile,
|
||||||
extraOptions: mirror.RsyncOptions,
|
extraOptions: mirror.RsyncOptions,
|
||||||
|
rsyncNeverTimeout: mirror.RsyncNoTimeo,
|
||||||
|
rsyncTimeoutValue: mirror.RsyncTimeout,
|
||||||
overriddenOptions: mirror.RsyncOverride,
|
overriddenOptions: mirror.RsyncOverride,
|
||||||
rsyncEnv: mirror.Env,
|
rsyncEnv: mirror.Env,
|
||||||
workingDir: mirrorDir,
|
workingDir: mirrorDir,
|
||||||
@@ -159,22 +161,24 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
|||||||
provider = p
|
provider = p
|
||||||
case provTwoStageRsync:
|
case provTwoStageRsync:
|
||||||
rc := twoStageRsyncConfig{
|
rc := twoStageRsyncConfig{
|
||||||
name: mirror.Name,
|
name: mirror.Name,
|
||||||
stage1Profile: mirror.Stage1Profile,
|
stage1Profile: mirror.Stage1Profile,
|
||||||
upstreamURL: mirror.Upstream,
|
upstreamURL: mirror.Upstream,
|
||||||
rsyncCmd: mirror.Command,
|
rsyncCmd: mirror.Command,
|
||||||
username: mirror.Username,
|
username: mirror.Username,
|
||||||
password: mirror.Password,
|
password: mirror.Password,
|
||||||
excludeFile: mirror.ExcludeFile,
|
excludeFile: mirror.ExcludeFile,
|
||||||
extraOptions: mirror.RsyncOptions,
|
extraOptions: mirror.RsyncOptions,
|
||||||
rsyncEnv: mirror.Env,
|
rsyncNeverTimeout: mirror.RsyncNoTimeo,
|
||||||
workingDir: mirrorDir,
|
rsyncTimeoutValue: mirror.RsyncTimeout,
|
||||||
logDir: logDir,
|
rsyncEnv: mirror.Env,
|
||||||
logFile: filepath.Join(logDir, "latest.log"),
|
workingDir: mirrorDir,
|
||||||
useIPv6: mirror.UseIPv6,
|
logDir: logDir,
|
||||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
logFile: filepath.Join(logDir, "latest.log"),
|
||||||
retry: mirror.Retry,
|
useIPv6: mirror.UseIPv6,
|
||||||
timeout: time.Duration(mirror.Timeout) * time.Second,
|
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||||
|
retry: mirror.Retry,
|
||||||
|
timeout: time.Duration(mirror.Timeout) * time.Second,
|
||||||
}
|
}
|
||||||
p, err := newTwoStageRsyncProvider(rc)
|
p, err := newTwoStageRsyncProvider(rc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ exit 0
|
|||||||
"Done\n",
|
"Done\n",
|
||||||
targetDir,
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"-aHvh --no-o --no-g --stats --filter risk .~tmp~/ --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
"--timeout=120 -6 %s %s",
|
"--timeout=120 -6 %s %s",
|
||||||
provider.upstreamURL, provider.WorkingDir(),
|
provider.upstreamURL, provider.WorkingDir(),
|
||||||
@@ -148,18 +148,19 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
|
|||||||
proxyAddr := "127.0.0.1:1233"
|
proxyAddr := "127.0.0.1:1233"
|
||||||
|
|
||||||
c := rsyncConfig{
|
c := rsyncConfig{
|
||||||
name: "tuna",
|
name: "tuna",
|
||||||
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
||||||
rsyncCmd: scriptFile,
|
rsyncCmd: scriptFile,
|
||||||
username: "tunasync",
|
username: "tunasync",
|
||||||
password: "tunasyncpassword",
|
password: "tunasyncpassword",
|
||||||
workingDir: tmpDir,
|
workingDir: tmpDir,
|
||||||
extraOptions: []string{"--delete-excluded"},
|
extraOptions: []string{"--delete-excluded"},
|
||||||
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
|
rsyncTimeoutValue: 30,
|
||||||
logDir: tmpDir,
|
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
|
||||||
logFile: tmpFile,
|
logDir: tmpDir,
|
||||||
useIPv4: true,
|
logFile: tmpFile,
|
||||||
interval: 600 * time.Second,
|
useIPv4: true,
|
||||||
|
interval: 600 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
provider, err := newRsyncProvider(c)
|
provider, err := newRsyncProvider(c)
|
||||||
@@ -189,9 +190,9 @@ exit 0
|
|||||||
"Done\n",
|
"Done\n",
|
||||||
targetDir,
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"%s %s %s -aHvh --no-o --no-g --stats --filter risk .~tmp~/ --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
"--timeout=120 -4 --delete-excluded %s %s",
|
"--timeout=30 -4 --delete-excluded %s %s",
|
||||||
provider.username, provider.password, proxyAddr,
|
provider.username, provider.password, proxyAddr,
|
||||||
provider.upstreamURL, provider.WorkingDir(),
|
provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
@@ -221,6 +222,7 @@ func TestRsyncProviderWithOverriddenOptions(t *testing.T) {
|
|||||||
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
||||||
rsyncCmd: scriptFile,
|
rsyncCmd: scriptFile,
|
||||||
workingDir: tmpDir,
|
workingDir: tmpDir,
|
||||||
|
rsyncNeverTimeout: true,
|
||||||
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
|
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
|
||||||
extraOptions: []string{"--delete-excluded"},
|
extraOptions: []string{"--delete-excluded"},
|
||||||
logDir: tmpDir,
|
logDir: tmpDir,
|
||||||
@@ -270,6 +272,78 @@ exit 0
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRsyncProviderWithDocker(t *testing.T) {
|
||||||
|
Convey("Rsync in Docker should work", t, func() {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
scriptFile := filepath.Join(tmpDir, "myrsync")
|
||||||
|
excludeFile := filepath.Join(tmpDir, "exclude.txt")
|
||||||
|
|
||||||
|
g := &Config{
|
||||||
|
Global: globalConfig{
|
||||||
|
Retry: 2,
|
||||||
|
},
|
||||||
|
Docker: dockerConfig{
|
||||||
|
Enable: true,
|
||||||
|
Volumes: []string{
|
||||||
|
scriptFile + ":/bin/myrsync",
|
||||||
|
"/etc/gai.conf:/etc/gai.conf:ro",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
c := mirrorConfig{
|
||||||
|
Name: "tuna",
|
||||||
|
Provider: provRsync,
|
||||||
|
Upstream: "rsync://rsync.tuna.moe/tuna/",
|
||||||
|
Command: "/bin/myrsync",
|
||||||
|
ExcludeFile: excludeFile,
|
||||||
|
DockerImage: "alpine:3.8",
|
||||||
|
LogDir: tmpDir,
|
||||||
|
MirrorDir: tmpDir,
|
||||||
|
UseIPv6: true,
|
||||||
|
Timeout: 100,
|
||||||
|
Interval: 600,
|
||||||
|
}
|
||||||
|
|
||||||
|
provider := newMirrorProvider(c, g)
|
||||||
|
|
||||||
|
So(provider.Type(), ShouldEqual, provRsync)
|
||||||
|
So(provider.Name(), ShouldEqual, c.Name)
|
||||||
|
So(provider.WorkingDir(), ShouldEqual, c.MirrorDir)
|
||||||
|
So(provider.LogDir(), ShouldEqual, c.LogDir)
|
||||||
|
|
||||||
|
cmdScriptContent := `#!/bin/sh
|
||||||
|
#echo "$@"
|
||||||
|
while [[ $# -gt 0 ]]; do
|
||||||
|
if [[ "$1" = "--exclude-from" ]]; then
|
||||||
|
cat "$2"
|
||||||
|
shift
|
||||||
|
fi
|
||||||
|
shift
|
||||||
|
done
|
||||||
|
`
|
||||||
|
err = ioutil.WriteFile(scriptFile, []byte(cmdScriptContent), 0755)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
err = ioutil.WriteFile(excludeFile, []byte("__some_pattern"), 0755)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
for _, hook := range provider.Hooks() {
|
||||||
|
err = hook.preExec()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
}
|
||||||
|
err = provider.Run(make(chan empty, 1))
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
for _, hook := range provider.Hooks() {
|
||||||
|
err = hook.postExec()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
}
|
||||||
|
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(string(loggedContent), ShouldEqual, "__some_pattern")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestCmdProvider(t *testing.T) {
|
func TestCmdProvider(t *testing.T) {
|
||||||
Convey("Command Provider should work", t, func(ctx C) {
|
Convey("Command Provider should work", t, func(ctx C) {
|
||||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
@@ -490,18 +564,19 @@ func TestTwoStageRsyncProvider(t *testing.T) {
|
|||||||
tmpFile := filepath.Join(tmpDir, "log_file")
|
tmpFile := filepath.Join(tmpDir, "log_file")
|
||||||
|
|
||||||
c := twoStageRsyncConfig{
|
c := twoStageRsyncConfig{
|
||||||
name: "tuna-two-stage-rsync",
|
name: "tuna-two-stage-rsync",
|
||||||
upstreamURL: "rsync://mirrors.tuna.moe/",
|
upstreamURL: "rsync://mirrors.tuna.moe/",
|
||||||
stage1Profile: "debian",
|
stage1Profile: "debian",
|
||||||
rsyncCmd: scriptFile,
|
rsyncCmd: scriptFile,
|
||||||
workingDir: tmpDir,
|
workingDir: tmpDir,
|
||||||
logDir: tmpDir,
|
logDir: tmpDir,
|
||||||
logFile: tmpFile,
|
logFile: tmpFile,
|
||||||
useIPv6: true,
|
useIPv6: true,
|
||||||
excludeFile: tmpFile,
|
excludeFile: tmpFile,
|
||||||
extraOptions: []string{"--delete-excluded", "--cache"},
|
rsyncTimeoutValue: 30,
|
||||||
username: "hello",
|
extraOptions: []string{"--delete-excluded", "--cache"},
|
||||||
password: "world",
|
username: "hello",
|
||||||
|
password: "world",
|
||||||
}
|
}
|
||||||
|
|
||||||
provider, err := newTwoStageRsyncProvider(c)
|
provider, err := newTwoStageRsyncProvider(c)
|
||||||
@@ -538,16 +613,16 @@ exit 0
|
|||||||
"Done\n",
|
"Done\n",
|
||||||
targetDir,
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
"-aHvh --no-o --no-g --stats --filter risk .~tmp~/ --exclude .~tmp~/ --safe-links "+
|
||||||
"--timeout=120 --exclude dists/ -6 "+
|
"--include=*.diff/ --exclude=*.diff/Index --exclude=Packages* --exclude=Sources* --exclude=Release* --exclude=InRelease --include=i18n/by-hash --exclude=i18n/* --exclude=ls-lR* --timeout=30 -6 "+
|
||||||
"--exclude-from %s %s %s",
|
"--exclude-from %s %s %s",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
targetDir,
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"-aHvh --no-o --no-g --stats --filter risk .~tmp~/ --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
"--timeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s",
|
"--delete-excluded --cache --timeout=30 -6 --exclude-from %s %s %s",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -580,8 +655,8 @@ exit 0
|
|||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
expectedOutput := fmt.Sprintf(
|
expectedOutput := fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
"-aHvh --no-o --no-g --stats --filter risk .~tmp~/ --exclude .~tmp~/ --safe-links "+
|
||||||
"--timeout=120 --exclude dists/ -6 "+
|
"--include=*.diff/ --exclude=*.diff/Index --exclude=Packages* --exclude=Sources* --exclude=Release* --exclude=InRelease --include=i18n/by-hash --exclude=i18n/* --exclude=ls-lR* --timeout=30 -6 "+
|
||||||
"--exclude-from %s %s %s\n",
|
"--exclude-from %s %s %s\n",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package worker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -14,6 +15,8 @@ type rsyncConfig struct {
|
|||||||
upstreamURL, username, password, excludeFile string
|
upstreamURL, username, password, excludeFile string
|
||||||
extraOptions []string
|
extraOptions []string
|
||||||
overriddenOptions []string
|
overriddenOptions []string
|
||||||
|
rsyncNeverTimeout bool
|
||||||
|
rsyncTimeoutValue int
|
||||||
rsyncEnv map[string]string
|
rsyncEnv map[string]string
|
||||||
workingDir, logDir, logFile string
|
workingDir, logDir, logFile string
|
||||||
useIPv6, useIPv4 bool
|
useIPv6, useIPv4 bool
|
||||||
@@ -64,14 +67,22 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
|||||||
|
|
||||||
options := []string{
|
options := []string{
|
||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--filter" , "risk .~tmp~/", "--exclude", ".~tmp~/",
|
||||||
"--delete", "--delete-after", "--delay-updates",
|
"--delete", "--delete-after", "--delay-updates",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
}
|
}
|
||||||
if c.overriddenOptions != nil {
|
if c.overriddenOptions != nil {
|
||||||
options = c.overriddenOptions
|
options = c.overriddenOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !c.rsyncNeverTimeout {
|
||||||
|
timeo := 120
|
||||||
|
if c.rsyncTimeoutValue > 0 {
|
||||||
|
timeo = c.rsyncTimeoutValue
|
||||||
|
}
|
||||||
|
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
|
||||||
|
}
|
||||||
|
|
||||||
if c.useIPv6 {
|
if c.useIPv6 {
|
||||||
options = append(options, "-6")
|
options = append(options, "-6")
|
||||||
} else if c.useIPv4 {
|
} else if c.useIPv4 {
|
||||||
|
|||||||
@@ -149,10 +149,10 @@ func (c *cmdJob) Terminate() error {
|
|||||||
select {
|
select {
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
|
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
|
||||||
return errors.New("SIGTERM failed to kill the job")
|
logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
|
||||||
case <-c.finished:
|
case <-c.finished:
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copied from go-sh
|
// Copied from go-sh
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ type twoStageRsyncConfig struct {
|
|||||||
stage1Profile string
|
stage1Profile string
|
||||||
upstreamURL, username, password, excludeFile string
|
upstreamURL, username, password, excludeFile string
|
||||||
extraOptions []string
|
extraOptions []string
|
||||||
|
rsyncNeverTimeout bool
|
||||||
|
rsyncTimeoutValue int
|
||||||
rsyncEnv map[string]string
|
rsyncEnv map[string]string
|
||||||
workingDir, logDir, logFile string
|
workingDir, logDir, logFile string
|
||||||
useIPv6 bool
|
useIPv6 bool
|
||||||
@@ -32,11 +34,12 @@ type twoStageRsyncProvider struct {
|
|||||||
dataSize string
|
dataSize string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ref: https://salsa.debian.org/mirror-team/archvsync/-/blob/master/bin/ftpsync#L431
|
||||||
var rsyncStage1Profiles = map[string]([]string){
|
var rsyncStage1Profiles = map[string]([]string){
|
||||||
"debian": []string{"dists/"},
|
"debian": []string{"--include=*.diff/", "--exclude=*.diff/Index", "--exclude=Packages*", "--exclude=Sources*", "--exclude=Release*", "--exclude=InRelease", "--include=i18n/by-hash", "--exclude=i18n/*", "--exclude=ls-lR*"},
|
||||||
"debian-oldstyle": []string{
|
"debian-oldstyle": []string{
|
||||||
"Packages*", "Sources*", "Release*",
|
"--exclude=Packages*", "--exclude=Sources*", "--exclude=Release*",
|
||||||
"InRelease", "i18n/*", "ls-lR*", "dep11/*",
|
"--exclude=InRelease", "--exclude=i18n/*", "--exclude=ls-lR*", "--exclude=dep11/*",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,14 +63,14 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
|
|||||||
twoStageRsyncConfig: c,
|
twoStageRsyncConfig: c,
|
||||||
stage1Options: []string{
|
stage1Options: []string{
|
||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--filter", "risk .~tmp~/", "--exclude", ".~tmp~/",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
},
|
},
|
||||||
stage2Options: []string{
|
stage2Options: []string{
|
||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--filter", "risk .~tmp~/", "--exclude", ".~tmp~/",
|
||||||
"--delete", "--delete-after", "--delay-updates",
|
"--delete", "--delete-after", "--delay-updates",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -107,12 +110,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
|||||||
var options []string
|
var options []string
|
||||||
if stage == 1 {
|
if stage == 1 {
|
||||||
options = append(options, p.stage1Options...)
|
options = append(options, p.stage1Options...)
|
||||||
stage1Excludes, ok := rsyncStage1Profiles[p.stage1Profile]
|
stage1Profile, ok := rsyncStage1Profiles[p.stage1Profile]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("Invalid Stage 1 Profile")
|
return nil, errors.New("Invalid Stage 1 Profile")
|
||||||
}
|
}
|
||||||
for _, exc := range stage1Excludes {
|
for _, exc := range stage1Profile {
|
||||||
options = append(options, "--exclude", exc)
|
options = append(options, exc)
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if stage == 2 {
|
} else if stage == 2 {
|
||||||
@@ -124,6 +127,14 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
|||||||
return []string{}, fmt.Errorf("Invalid stage: %d", stage)
|
return []string{}, fmt.Errorf("Invalid stage: %d", stage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !p.rsyncNeverTimeout {
|
||||||
|
timeo := 120
|
||||||
|
if p.rsyncTimeoutValue > 0 {
|
||||||
|
timeo = p.rsyncTimeoutValue
|
||||||
|
}
|
||||||
|
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
|
||||||
|
}
|
||||||
|
|
||||||
if p.useIPv6 {
|
if p.useIPv6 {
|
||||||
options = append(options, "-6")
|
options = append(options, "-6")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
|
|||||||
|
|
||||||
// Run runs worker forever
|
// Run runs worker forever
|
||||||
func (w *Worker) Run() {
|
func (w *Worker) Run() {
|
||||||
w.registorWorker()
|
w.registerWorker()
|
||||||
go w.runHTTPServer()
|
go w.runHTTPServer()
|
||||||
w.runSchedule()
|
w.runSchedule()
|
||||||
}
|
}
|
||||||
@@ -393,7 +393,7 @@ func (w *Worker) URL() string {
|
|||||||
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
|
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) registorWorker() {
|
func (w *Worker) registerWorker() {
|
||||||
msg := WorkerStatus{
|
msg := WorkerStatus{
|
||||||
ID: w.Name(),
|
ID: w.Name(),
|
||||||
URL: w.URL(),
|
URL: w.URL(),
|
||||||
@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
|
|||||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||||
url := fmt.Sprintf("%s/workers", root)
|
url := fmt.Sprintf("%s/workers", root)
|
||||||
logger.Debugf("register on manager url: %s", url)
|
logger.Debugf("register on manager url: %s", url)
|
||||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
for retry := 10; retry > 0; {
|
||||||
logger.Errorf("Failed to register worker")
|
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||||
|
logger.Errorf("Failed to register worker")
|
||||||
|
retry--
|
||||||
|
if retry > 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
logger.Noticef("Retrying... (%d)", retry)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
|
|||||||
var _worker WorkerStatus
|
var _worker WorkerStatus
|
||||||
c.BindJSON(&_worker)
|
c.BindJSON(&_worker)
|
||||||
_worker.LastOnline = time.Now()
|
_worker.LastOnline = time.Now()
|
||||||
|
_worker.LastRegister = time.Now()
|
||||||
recvData <- _worker
|
recvData <- _worker
|
||||||
c.JSON(http.StatusOK, _worker)
|
c.JSON(http.StatusOK, _worker)
|
||||||
})
|
})
|
||||||
|
|||||||
在新工单中引用
屏蔽一个用户