1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-06 14:36:47 +00:00

39 次代码提交

作者 SHA1 备注 提交日期
Miao Wang
a4d94cae07 bump to version 0.6.7 2020-09-11 18:21:15 +08:00
Miao Wang
8ebace4d9a Add support for multiarch builds 2020-09-11 17:59:33 +08:00
Yuxiang Zhang
b578237df8 Merge pull request #134 from tuna/worker-last-online-register
Worker last online and last register
2020-09-10 23:08:57 +08:00
Jiajie Chen
9f7f18c2c4 Fix missing method in mock test 2020-09-10 21:58:31 +08:00
Jiajie Chen
fd274cc976 Refresh worker LastOnline when worker updates 2020-09-10 21:51:33 +08:00
Jiajie Chen
b4b81ef7e9 Fix typo: registor -> register 2020-09-10 21:32:22 +08:00
Jiajie Chen
c8600d094e Add LastRegister to WorkerStatus 2020-09-10 21:31:31 +08:00
z4yx
2ba3a27fa3 ignore the SIGTERM failure 2020-09-06 19:23:26 +08:00
Yuxiang Zhang
b34238c097 Merge pull request #130 from tuna/add-bolt-open-timeout
Add 5 seconds timeout for bolt
2020-08-05 12:41:29 +08:00
Jiajie Chen
16e458f354 Add 5 seconds timeout for bolt 2020-08-03 14:46:45 +08:00
Yuxiang Zhang
16b4df1ec2 Merge pull request #127 from hxsf/patch-1
fix examlpe with docker_image
2020-06-30 09:47:28 +08:00
呼啸随风
e3c8cded6c fix examlpe with docker_image
If `docker.enable` not be `true`, the worker will ignore docker provider's config, and just exec the command.
so we need to doc it.
2020-06-30 09:45:57 +08:00
Yuxiang Zhang
3809df6cfb Merge pull request #126 from lrh3321/master
Add `--format` and `--status` for tunasynctl
2020-06-22 12:58:15 +08:00
zack.liu
600874ae54 Add --format and --status for tunasynctl 2020-06-22 11:25:18 +08:00
z4yx
2afe1f2e06 bump version to 0.6.6 2020-06-17 22:12:11 +08:00
z4yx
1b099520b2 [manager] protect DB with RW lock 2020-06-17 22:10:39 +08:00
z4yx
85b2105a2b [worker] retry registration 2020-06-17 21:34:55 +08:00
zyx
45e5d900fb bump version to 0.6.5 2020-06-08 22:30:28 +08:00
zyx
7b0cd490b7 fix misuse of a variable 2020-06-08 22:23:12 +08:00
zyx
9178966aed bump version to 0.6.4 2020-06-04 09:44:17 +08:00
zyx
b5d2a0ad89 bug fix: jobs not being scheduled after timeout 2020-06-04 09:37:20 +08:00
zyx
d8963c9946 test rsync inside a Docker container 2020-06-03 21:51:04 +08:00
zyx
198afa72cd bug fix: rsync can access the exclude file in Docker (close #59) 2020-06-03 21:50:38 +08:00
zyx
85ce9c1270 wait for docker container removal 2020-06-03 19:47:14 +08:00
zyx
a8a35fc259 Merge branch 'master' of github.com:tuna/tunasync 2020-06-03 13:28:58 +08:00
zyx
c00eb12a75 Two new options for rsync provider
- rsync_no_timeout=true/false # disable --timeout option
- rsync_timeout=n # set --timeout=n
related to issue #121
2020-06-03 13:26:49 +08:00
Yuxiang Zhang
95ae9c16a9 Update workers.conf 2020-06-01 16:59:44 +08:00
zyx
0392ef28c7 bump version to 0.6.3 2020-05-25 19:21:27 +08:00
zyx
b2a22a9bbc update editor config 2020-05-25 19:16:53 +08:00
zyx
31862210ba implement the timeout 2020-05-25 19:15:05 +08:00
zyx
e47ba2097e add a timeout field to providers 2020-05-25 18:24:05 +08:00
zyx
e8c7ff3d7f config items of timeout 2020-05-25 18:08:31 +08:00
Yuxiang Zhang
7e7b469f1e Update workers.conf 2020-05-23 15:28:32 +08:00
Yuxiang Zhang
eac66c7554 add config examples of the worker (#118) 2020-05-23 15:23:15 +08:00
z4yx
38b0156fae [bug fix] provider is not terminated if premature stop command received 2020-05-09 18:42:54 +08:00
z4yx
c8e7d29f34 bump version to 0.6.2 2020-04-08 20:12:41 +08:00
Yuxiang Zhang
d40638d738 Merge pull request #116 from BITNP/laststarted
Add MirrorStatus.LastStarted property
2020-04-06 23:01:58 +08:00
Phy
471d865042 Add LastStarted test case 2020-04-05 01:07:46 -04:00
Phy
c1641b6714 Add MirrorStatus.LastStarted property
- status.Status is in PreSyncing, and
- curStatus.Status is not in PreSyncing
2020-04-05 00:12:10 -04:00
共有 32 个文件被更改,包括 1473 次插入240 次删除

查看文件

@@ -21,16 +21,12 @@ jobs:
- name: Check out code into the Go module directory - name: Check out code into the Go module directory
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Get dependencies
run: |
go get -v -t -d ./cmd/tunasync
go get -v -t -d ./cmd/tunasynctl
- name: Build - name: Build
run: | run: |
make tunasync for i in linux-amd64 linux-arm64; do
make tunasynctl make ARCH=$i all
tar -jcf build/tunasync-linux-bin.tar.bz2 -C build tunasync tunasynctl tar -cz --numeric-owner --owner root -f tunasync-$i-bin.tar.gz -C build-$i tunasync tunasynctl
done
- name: Create Release - name: Create Release
id: create_release id: create_release
@@ -42,13 +38,9 @@ jobs:
release_name: Release ${{ github.ref }} release_name: Release ${{ github.ref }}
draft: false draft: false
prerelease: false prerelease: false
- name: Upload Release Asset - name: Upload Release Assets
id: upload-release-asset
uses: actions/upload-release-asset@v1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with: TAG_NAME: ${{ github.ref }}
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps run: |
asset_path: ./build/tunasync-linux-bin.tar.bz2 hub release edit $(find . -type f -name "tunasync-*.tar.gz" -printf "-a %p ") -m "" "${TAG_NAME##*/}"
asset_name: tunasync-linux-bin.tar.bz2
asset_content_type: application/x-bzip2

查看文件

@@ -32,7 +32,7 @@ jobs:
uses: actions/upload-artifact@v1 uses: actions/upload-artifact@v1
with: with:
name: tunasync-bin name: tunasync-bin
path: build/ path: build-linux-amd64/
test: test:
name: Test name: Test

1
.gitignore vendored
查看文件

@@ -1 +1,2 @@
/build /build
/build-*

13
.vscode/settings.json vendored 普通文件
查看文件

@@ -0,0 +1,13 @@
{
"cSpell.words": [
"Btrfs",
"Debugf",
"Infof",
"Noticef",
"Warningf",
"cgroup",
"mergo",
"tmpl",
"zpool"
]
}

查看文件

@@ -1,19 +1,22 @@
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`" LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
ARCH ?= linux-amd64
ARCH_LIST = $(subst -, ,$(ARCH))
GOOS = $(word 1, $(ARCH_LIST))
GOARCH = $(word 2, $(ARCH_LIST))
BUILDBIN = tunasync tunasynctl
all: get tunasync tunasynctl all: $(BUILDBIN)
get: build-$(ARCH):
go get ./cmd/tunasync mkdir -p $@
go get ./cmd/tunasynctl
build: $(BUILDBIN): % : build-$(ARCH) build-$(ARCH)/%
mkdir -p build
tunasync: build $(BUILDBIN:%=build-$(ARCH)/%) : build-$(ARCH)/% : cmd/%
go build -o build/tunasync -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasync GOOS=$(GOOS) GOARCH=$(GOARCH) go get ./$<
GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $@ -ldflags ${LDFLAGS} github.com/tuna/tunasync/$<
tunasynctl: build
go build -o build/tunasynctl -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasynctl
test: test:
go test -v -covermode=count -coverprofile=profile.cov ./... go test -v -covermode=count -coverprofile=profile.cov ./...
.PHONY: all test $(BUILDBIN)

查看文件

@@ -8,6 +8,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"text/template"
"time" "time"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
@@ -160,8 +161,31 @@ func listJobs(c *cli.Context) error {
"of all jobs from manager server: %s", err.Error()), "of all jobs from manager server: %s", err.Error()),
1) 1)
} }
genericJobs = jobs if statusStr := c.String("status"); statusStr != "" {
filteredJobs := make([]tunasync.WebMirrorStatus, 0, len(jobs))
var statuses []tunasync.SyncStatus
for _, s := range strings.Split(statusStr, ",") {
var status tunasync.SyncStatus
err = status.UnmarshalJSON([]byte("\"" + strings.TrimSpace(s) + "\""))
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error parsing status: %s", err.Error()),
1)
}
statuses = append(statuses, status)
}
for _, job := range jobs {
for _, s := range statuses {
if job.Status == s {
filteredJobs = append(filteredJobs, job)
break
}
}
}
genericJobs = filteredJobs
} else {
genericJobs = jobs
}
} else { } else {
var jobs []tunasync.MirrorStatus var jobs []tunasync.MirrorStatus
args := c.Args() args := c.Args()
@@ -196,13 +220,46 @@ func listJobs(c *cli.Context) error {
genericJobs = jobs genericJobs = jobs
} }
b, err := json.MarshalIndent(genericJobs, "", " ") if format := c.String("format"); format != "" {
if err != nil { tpl := template.New("")
return cli.NewExitError( _, err := tpl.Parse(format)
fmt.Sprintf("Error printing out information: %s", err.Error()), if err != nil {
1) return cli.NewExitError(
fmt.Sprintf("Error parsing format template: %s", err.Error()),
1)
}
switch jobs := genericJobs.(type) {
case []tunasync.WebMirrorStatus:
for _, job := range jobs {
err = tpl.Execute(os.Stdout, job)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println()
}
case []tunasync.MirrorStatus:
for _, job := range jobs {
err = tpl.Execute(os.Stdout, job)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println()
}
}
} else {
b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println(string(b))
} }
fmt.Println(string(b))
return nil return nil
} }
@@ -506,6 +563,14 @@ func main() {
Name: "all, a", Name: "all, a",
Usage: "List all jobs of all workers", Usage: "List all jobs of all workers",
}, },
cli.StringFlag{
Name: "status, s",
Usage: "Filter output based on status provided",
},
cli.StringFlag{
Name: "format, f",
Usage: "Pretty-print containers using a Go template",
},
}...), }...),
Action: initializeWrapper(listJobs), Action: initializeWrapper(listJobs),
}, },

查看文件

@@ -1,3 +1,4 @@
# /home/scripts in this example points to https://github.com/tuna/tunasync-scripts/
[global] [global]
name = "mirror_worker" name = "mirror_worker"
@@ -6,6 +7,11 @@ mirror_dir = "/srv/tunasync"
concurrent = 10 concurrent = 10
interval = 1 interval = 1
# ensure the exec user be add into `docker` group
[docker]
# in `command provider` can use docker_image and docker_volumes
enable = true
[manager] [manager]
api_base = "http://localhost:12345" api_base = "http://localhost:12345"
token = "some_token" token = "some_token"
@@ -22,52 +28,637 @@ listen_addr = "127.0.0.1"
listen_port = 6000 listen_port = 6000
ssl_cert = "" ssl_cert = ""
ssl_key = "" ssl_key = ""
[[mirrors]] [[mirrors]]
name = "adobe-fonts" name = "adobe-fonts"
interval = 1440 interval = 1440
provider = "command" provider = "command"
upstream = "https://github.com/adobe-fonts" upstream = "https://github.com/adobe-fonts"
#https://github.com/tuna/tunasync-scripts/blob/master/adobe-fonts.sh
command = "/home/scripts/adobe-fonts.sh" command = "/home/scripts/adobe-fonts.sh"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "AdoptOpenJDK"
interval = 5760
provider = "command"
command = "/home/scripts/adoptopenjdk.py"
upstream = "https://adoptopenjdk.jfrog.io/adoptopenjdk"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "alpine"
provider = "rsync"
upstream = "rsync://rsync.alpinelinux.org/alpine/"
memory_limit = "256M"
[[mirrors]] [[mirrors]]
name = "anaconda" name = "anaconda"
provider = "command" provider = "command"
upstream = "https://repo.continuum.io/" upstream = "https://repo.continuum.io/"
#https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py command = "/home/scripts/anaconda.py --delete"
command = "/home/scripts/anaconda.py" size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
interval = 720
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "apache"
provider = "rsync"
upstream = "rsync://rsync.apache.org/apache-dist/"
use_ipv4 = true
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "armbian"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://rsync.armbian.com/apt/"
memory_limit = "256M"
[[mirrors]]
name = "armbian-releases"
provider = "rsync"
stage1_profile = "debian"
upstream = "rsync://rsync.armbian.com/dl/"
memory_limit = "256M"
[[mirrors]]
name = "bananian"
provider = "command"
upstream = "https://dl.bananian.org/"
command = "/home/scripts/lftp.sh"
interval = 1440 interval = 1440
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "gnu" name = "bioconductor"
provider = "rsync" provider = "rsync"
upstream = "rsync://mirrors.ocf.berkeley.edu/gnu/" upstream = "master.bioconductor.org:./"
rsync_options = [ "--rsh=ssh -i /root/id_rsa -o PasswordAuthentication=no -l sync" ]
exclude_file = "/etc/excludes/bioconductor.txt"
memory_limit = "256M"
[[mirrors]]
name = "blender"
provider = "rsync"
upstream = "rsync://mirrors.dotsrc.org/blender/"
rsync_options = [ "--delete-excluded" ]
exclude_file = "/etc/excludes/blender.txt"
interval = 1440
memory_limit = "256M"
[[mirrors]]
name = "chakra"
provider = "rsync"
upstream = "rsync://rsync.chakralinux.org/packages/"
memory_limit = "256M"
[[mirrors]]
name = "chakra-releases"
provider = "rsync"
upstream = "rsync://rsync.chakralinux.org/releases/"
memory_limit = "256M"
[[mirrors]]
name = "chef"
interval = 1440
provider = "command"
upstream = "https://packages.chef.io/repos"
command = "/home/scripts/chef.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "clickhouse"
interval = 2880
provider = "rsync"
upstream = "rsync://repo.yandex.ru/yandexrepo/clickhouse/"
exclude_file = "/etc/excludes/clickhouse.txt"
memory_limit = "256M"
[[mirrors]]
name = "clojars"
provider = "command"
upstream = "s3://clojars-repo-production/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.dualstack.us-east-2.amazonaws.com"
#TUNASYNC_S3_ENDPOINT = "https://s3.us-east-2.amazonaws.com"
TUNASYNC_AWS_OPTIONS = "--delete --exclude index.html"
[[mirrors]]
name = "CPAN"
provider = "rsync"
upstream = "rsync://cpan-rsync.perl.org/CPAN/"
memory_limit = "256M"
[[mirrors]]
name = "CRAN"
provider = "rsync"
upstream = "rsync://cran.r-project.org/CRAN/"
rsync_options = [ "--delete-excluded" ] rsync_options = [ "--delete-excluded" ]
memory_limit = "256M" memory_limit = "256M"
[[mirrors]]
name = "CTAN"
provider = "rsync"
upstream = "rsync://mirrors.rit.edu/CTAN/"
memory_limit = "256M"
[[mirrors]]
name = "dart-pub"
provider = "command"
upstream = "https://pub.dev/api"
command = "/home/scripts/pub.sh"
interval = 30
docker_image = "tunathu/pub-mirror:latest"
[mirrors.env]
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/dart-pub"
[[mirrors]]
name = "debian"
provider = "command"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/"
command = "/home/scripts/debian.sh sync:archive:debian"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/ftpsync"
docker_volumes = [
"/etc/misc/ftpsync-debian.conf:/ftpsync/etc/ftpsync-debian.conf:ro",
"/log/ftpsync:/home/log/tunasync/ftpsync",
]
[mirrors.env]
FTPSYNC_LOG_DIR = "/home/log/tunasync/ftpsync"
[[mirrors]]
name = "docker-ce"
provider = "command"
upstream = "https://download.docker.com/"
command = "timeout 3h /home/scripts/docker-ce.py --workers 10 --fast-skip"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "ELK"
interval = 1440
provider = "command"
upstream = "https://packages.elastic.co"
command = "/home/scripts/ELK.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
WGET_OPTIONS = "-6"
[[mirrors]]
name = "elasticstack"
interval = 1440
provider = "command"
upstream = "https://artifacts.elastic.co/"
command = "/home/scripts/elastic.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "erlang-solutions"
interval = 1440
provider = "command"
upstream = "https://packages.erlang-solutions.com"
command = "/home/scripts/erlang.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "flutter"
interval = 1440
provider = "command"
upstream = "https://storage.googleapis.com/flutter_infra/"
command = "/home/scripts/flutter.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "github-release"
provider = "command"
upstream = "https://api.github.com/repos/"
command = "/home/scripts/github-release.py --workers 5"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
interval = 720
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
GITHUB_TOKEN = "xxxxx"
[[mirrors]]
name = "gitlab-ce"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/gitlab/gitlab-ce/"
command = "/home/scripts/gitlab-ce.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gitlab-ee"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/gitlab/gitlab-ee/"
command = "/home/scripts/gitlab-ce.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gitlab-runner"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/runner/gitlab-runner"
command = "/home/scripts/gitlab-runner.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "grafana"
interval = 1440
provider = "command"
upstream = "https://packages.grafana.com/oss"
command = "/home/scripts/grafana.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "hackage"
provider = "command"
command = "/home/scripts/hackage.sh"
upstream = "https://hackage.haskell.org/"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "homebrew-bottles"
provider = "command"
upstream = "https://homebrew.bintray.com"
command = "/home/scripts/linuxbrew-bottles.sh"
docker_image = "tunathu/homebrew-mirror"
# set environment varialbes
[mirrors.env]
HOMEBREW_REPO = "https://neomirrors.tuna.tsinghua.edu.cn/git/homebrew"
[[mirrors]]
name = "influxdata"
interval = 1440
provider = "command"
upstream = "https://repos.influxdata.com"
command = "/home/scripts/influxdata.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "kali"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://ftp.nluug.nl/kali/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "kali-images"
provider = "rsync"
upstream = "rsync://ftp.nluug.nl/kali-images/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "KaOS"
provider = "rsync"
upstream = "rsync://kaosx.tk/kaos/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "kernel"
provider = "rsync"
upstream = "rsync://rsync.kernel.org/pub/linux/kernel/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "kicad"
provider = "command"
upstream = "s3://kicad-downloads/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.cern.ch"
TUNASYNC_AWS_OPTIONS = "--delete --exclude index.html"
[[mirrors]]
name = "kodi"
provider = "rsync"
upstream = "rsync://mirror.yandex.ru/mirrors/xbmc/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
use_ipv6 = true
[[mirrors]]
name = "kubernetes"
interval = 2880
provider = "command"
upstream = "http://packages.cloud.google.com"
command = "/home/scripts/kubernetes.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "linuxbrew-bottles"
provider = "command"
upstream = "https://linuxbrew.bintray.com"
command = "/home/scripts/linuxbrew-bottles.sh"
docker_image = "tunathu/homebrew-mirror"
# set environment varialbes
[mirrors.env]
RUN_LINUXBREW = "true"
HOMEBREW_REPO = "https://neomirrors.tuna.tsinghua.edu.cn/git/homebrew"
[[mirrors]]
name = "linuxmint"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://mirrors.kernel.org/linuxmint-packages/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "lxc-images"
provider = "command"
upstream = "https://us.images.linuxcontainers.org/"
command = "/home/scripts/lxc-images.sh"
docker_image = "tunathu/tunasync-scripts:latest"
interval = 720
[[mirrors]]
name = "lyx"
provider = "command"
upstream = "ftp://ftp.lyx.org/pub/lyx/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer"
[[mirrors]]
name = "mongodb"
interval = 1440
provider = "command"
upstream = "https://repo.mongodb.org"
command = "/home/scripts/mongodb.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "msys2"
provider = "command"
upstream = "http://repo.msys2.org/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "mysql"
interval = 30
provider = "command"
upstream = "https://repo.mysql.com"
command = "/home/scripts/mysql.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
USE_IPV6 = "1"
[[mirrors]]
name = "nix"
interval = 1440
provider = "command"
upstream = "s3://nix-releases/nix/"
command = "/home/scripts/nix.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
MIRROR_BASE_URL = 'https://mirrors.tuna.tsinghua.edu.cn/nix/'
[[mirrors]]
name = "nix-channels"
interval = 300
provider = "command"
upstream = "https://nixos.org/channels"
command = "timeout 20h /home/scripts/nix-channels.py"
docker_image = "tunathu/nix-channels:latest"
docker_options = [
"--cpus", "20",
]
[[mirrors]]
name = "nodesource"
provider = "command"
upstream = "https://deb.nodesource.com/"
command = "/home/scripts/nodesource.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "openresty"
provider = "command"
upstream = "https://openresty.org/package/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer"
[[mirrors]]
name = "packagist"
provider = "command"
upstream = "http://packagist.org/"
command = "/home/scripts/packagist.sh"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "proxmox"
interval = 1440
provider = "command"
upstream = "http://download.proxmox.com"
command = "/home/scripts/proxmox.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "pypi" name = "pypi"
provider = "command" provider = "command"
upstream = "https://pypi.python.org/" upstream = "https://pypi.python.org/"
#https://github.com/tuna/tunasync-scripts/blob/master/pypi.sh
command = "/home/scripts/pypi.sh" command = "/home/scripts/pypi.sh"
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/bandersnatch:latest"
interval = 5 interval = 5
[[mirrors]]
name = "qt"
provider = "rsync"
upstream = "rsync://master.qt-project.org/qt-all/"
exclude_file = "/etc/excludes/qt.txt"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "raspberrypi"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://apt-repo.raspberrypi.org/archive/debian/"
memory_limit = "256M"
[[mirrors]]
name = "raspbian-images"
interval = 5760
provider = "command"
upstream = "https://downloads.raspberrypi.org/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "-x ^icons/$ -c --only-missing -v --no-perms"
[[mirrors]]
name = "raspbian"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://archive.raspbian.org/archive/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "redhat"
provider = "rsync"
upstream = "rsync://ftp.redhat.com/redhat/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
exclude_file = "/etc/excludes/redhat.txt"
interval = 1440
[mirrors.env]
RSYNC_PROXY="127.0.0.1:8123"
[[mirrors]]
name = "remi"
interval = 1440
provider = "command"
upstream = "rsync://rpms.remirepo.net"
command = "/home/scripts/remi.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "repo-ck"
provider = "command"
upstream = "http://repo-ck.com"
command = "/home/scripts/repo-ck.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "ros"
provider = "rsync"
upstream = "rsync://mirror.umd.edu/packages.ros.org/ros/"
memory_limit = "256M"
[[mirrors]]
name = "ros2"
interval = 1440
provider = "command"
upstream = "http://packages.ros.org/ros2"
command = "/home/scripts/ros2.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "rubygems"
provider = "command"
upstream = "https://rubygems.org"
command = "/home/scripts/rubygems.sh"
docker_image = "tunathu/rubygems-mirror"
interval = 60
# set environment varialbes # set environment varialbes
[mirrors.env] [mirrors.env]
INIT = "0" INIT = "0"
[[mirrors]]
name = "rudder"
interval = 2880
provider = "command"
upstream = "https://repository.rudder.io"
command = "/home/scripts/rudder.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "debian" name = "rustup"
interval = 720 provider = "command"
upstream = "https://rustup.rs/"
command = "/home/scripts/rustup.sh"
interval = 1440
docker_image = "tunathu/rustup-mirror:latest"
docker_volumes = [
]
docker_options = [
]
[mirrors.env]
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/rustup"
[[mirrors]]
name = "saltstack"
interval = 1440 # required on http://repo.saltstack.com/#mirror
provider = "command"
upstream = "s3://s3/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.repo.saltstack.com"
TUNASYNC_AWS_OPTIONS = "--delete --exact-timestamps"
[[mirrors]]
name = "solus"
provider = "rsync" provider = "rsync"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/" upstream = "rsync://mirrors.rit.edu/solus/"
rsync_options = [ "--exclude", "/shannon", "--exclude", "/unstable" ]
memory_limit = "256M" memory_limit = "256M"
[[mirrors]]
name = "stackage"
provider = "command"
command = "/home/scripts/stackage.py"
upstream = "https://www.stackage.org/"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
GIT_COMMITTER_NAME = "TUNA mirrors"
GIT_COMMITTER_EMAIL = "mirrors@tuna.tsinghua.edu.cn"
[[mirrors]]
name = "steamos"
interval = 1440
provider = "command"
upstream = "http://repo.steampowered.com"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer --exclude icons/ "
[[mirrors]]
name = "termux"
interval = 1440
provider = "command"
upstream = "https://dl.bintray.com/termux/termux-packages-24/"
command = "/home/scripts/termux.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "ubuntu" name = "ubuntu"
provider = "two-stage-rsync" provider = "two-stage-rsync"
@@ -76,4 +667,156 @@ upstream = "rsync://archive.ubuntu.com/ubuntu/"
rsync_options = [ "--delete-excluded" ] rsync_options = [ "--delete-excluded" ]
memory_limit = "256M" memory_limit = "256M"
# vim: ft=toml [[mirrors]]
name = "ubuntu-ports"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://ports.ubuntu.com/ubuntu-ports/"
rsync_options = [ "--delete-excluded" ]
exclude_file = "/etc/excludes/ubuntu-ports-exclude.txt"
memory_limit = "256M"
[[mirrors]]
name = "virtualbox"
interval = 1440
provider = "command"
upstream = "http://download.virtualbox.org/virtualbox"
command = "/home/scripts/virtualbox.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "winehq"
provider = "command"
upstream = "ftp://ftp.winehq.org/pub/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "-x wine-builds.old/ -x /\\..+"
[[mirrors]]
name = "zabbix"
provider = "rsync"
upstream = "rsync://repo.zabbix.com/mirror/"
rsync_options = [ "--delete-excluded", "--chmod=o+r,Do+x,Fa-x" ]
memory_limit = "256M"
[[mirrors]]
name = "AOSP"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/aosp.sh"
upstream = "https://android.googlesource.com/mirror/manifest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
REPO = "/usr/local/bin/aosp-repo"
REPO_URL = "https://mirrors.tuna.tsinghua.edu.cn/git/git-repo"
USE_BITMAP_INDEX = "1"
[[mirrors]]
name = "lineageOS"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/aosp.sh"
upstream = "https://github.com/LineageOS/mirror"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
REPO = "/usr/local/bin/aosp-repo"
REPO_URL = "https://mirrors.tuna.tsinghua.edu.cn/git/git-repo"
USE_BITMAP_INDEX = "1"
[[mirrors]]
name = "chromiumos"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/cros.sh"
upstream = "https://chromium.googlesource.com"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
fail_on_match = "fatal: "
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
USE_BITMAP_INDEX = "1"
CONCURRENT_JOBS = "20"
[[mirrors]]
name = "crates.io-index.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "https://github.com/rust-lang/crates.io-index.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "flutter-sdk.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://github.com/flutter/flutter.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "gcc.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://gcc.gnu.org/git/gcc.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "gentoo-portage.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://github.com/gentoo-mirror/gentoo.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "git-repo"
provider = "command"
command = "/home/tunasync-scripts/git-repo.sh"
upstream = "https://gerrit.googlesource.com/git-repo"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
fail_on_match = "fatal: "
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "homebrew"
provider = "command"
command = "/home/tunasync-scripts/homebrew.sh"
upstream = "https://github.com/Homebrew"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[[mirrors]]
name = "CocoaPods"
provider = "command"
command = "/home/tunasync-scripts/cocoapods.sh"
upstream = "https://github.com/CocoaPods"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[[mirrors]]
name = "pybombs"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/pybombs.sh"
upstream = "https://github.com/scateu/pybombs-mirror/"
docker_image = "tunathu/tunasync-scripts:latest"
docker_volumes = ["/home/pybombs-mirror:/opt/pybombs-mirror"]
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[mirrors.env]
PYBOMBS_MIRROR_SCRIPT_PATH = "/opt/pybombs-mirror"
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/pybombs"
[[mirrors]]
name = "llvm"
provider = "command"
command = "/home/tunasync-scripts/llvm.sh"
upstream = "https://git.llvm.org/git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
# vim: ft=toml

查看文件

@@ -8,25 +8,27 @@ import (
// A MirrorStatus represents a msg when // A MirrorStatus represents a msg when
// a worker has done syncing // a worker has done syncing
type MirrorStatus struct { type MirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
Worker string `json:"worker"` Worker string `json:"worker"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"` LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"` LastStarted time.Time `json:"last_started"`
Scheduled time.Time `json:"next_schedule"` LastEnded time.Time `json:"last_ended"`
Upstream string `json:"upstream"` Scheduled time.Time `json:"next_schedule"`
Size string `json:"size"` Upstream string `json:"upstream"`
ErrorMsg string `json:"error_msg"` Size string `json:"size"`
ErrorMsg string `json:"error_msg"`
} }
// A WorkerStatus is the information struct that describe // A WorkerStatus is the information struct that describe
// a worker, and sent from the manager to clients. // a worker, and sent from the manager to clients.
type WorkerStatus struct { type WorkerStatus struct {
ID string `json:"id"` ID string `json:"id"`
URL string `json:"url"` // worker url URL string `json:"url"` // worker url
Token string `json:"token"` // session token Token string `json:"token"` // session token
LastOnline time.Time `json:"last_online"` // last seen LastOnline time.Time `json:"last_online"` // last seen
LastRegister time.Time `json:"last_register"` // last register time
} }
type MirrorSchedules struct { type MirrorSchedules struct {

查看文件

@@ -38,31 +38,35 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
// WebMirrorStatus is the mirror status to be shown in the web page // WebMirrorStatus is the mirror status to be shown in the web page
type WebMirrorStatus struct { type WebMirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"` LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"` LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"` LastStarted textTime `json:"last_started"`
LastEndedTs stampTime `json:"last_ended_ts"` LastStartedTs stampTime `json:"last_started_ts"`
Scheduled textTime `json:"next_schedule"` LastEnded textTime `json:"last_ended"`
ScheduledTs stampTime `json:"next_schedule_ts"` LastEndedTs stampTime `json:"last_ended_ts"`
Upstream string `json:"upstream"` Scheduled textTime `json:"next_schedule"`
Size string `json:"size"` // approximate size ScheduledTs stampTime `json:"next_schedule_ts"`
Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size
} }
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus { func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
return WebMirrorStatus{ return WebMirrorStatus{
Name: m.Name, Name: m.Name,
IsMaster: m.IsMaster, IsMaster: m.IsMaster,
Status: m.Status, Status: m.Status,
LastUpdate: textTime{m.LastUpdate}, LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate}, LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded}, LastStarted: textTime{m.LastStarted},
LastEndedTs: stampTime{m.LastEnded}, LastStartedTs: stampTime{m.LastStarted},
Scheduled: textTime{m.Scheduled}, LastEnded: textTime{m.LastEnded},
ScheduledTs: stampTime{m.Scheduled}, LastEndedTs: stampTime{m.LastEnded},
Upstream: m.Upstream, Scheduled: textTime{m.Scheduled},
Size: m.Size, ScheduledTs: stampTime{m.Scheduled},
Upstream: m.Upstream,
Size: m.Size,
} }
} }

查看文件

@@ -15,16 +15,18 @@ func TestStatus(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc) t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := WebMirrorStatus{ m := WebMirrorStatus{
Name: "tunalinux", Name: "tunalinux",
Status: Success, Status: Success,
LastUpdate: textTime{t}, LastUpdate: textTime{t},
LastUpdateTs: stampTime{t}, LastUpdateTs: stampTime{t},
LastEnded: textTime{t}, LastStarted: textTime{t},
LastEndedTs: stampTime{t}, LastStartedTs: stampTime{t},
Scheduled: textTime{t}, LastEnded: textTime{t},
ScheduledTs: stampTime{t}, LastEndedTs: stampTime{t},
Size: "5GB", Scheduled: textTime{t},
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/", ScheduledTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
} }
b, err := json.Marshal(m) b, err := json.Marshal(m)
@@ -40,6 +42,10 @@ func TestStatus(t *testing.T) {
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix()) So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastStarted.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStartedTs.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStarted.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastStartedTs.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
@@ -53,15 +59,16 @@ func TestStatus(t *testing.T) {
}) })
Convey("BuildWebMirrorStatus should work", t, func() { Convey("BuildWebMirrorStatus should work", t, func() {
m := MirrorStatus{ m := MirrorStatus{
Name: "arch-sync3", Name: "arch-sync3",
Worker: "testWorker", Worker: "testWorker",
IsMaster: true, IsMaster: true,
Status: Failed, Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30), LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute * 1),
Scheduled: time.Now().Add(time.Minute * 5), LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Scheduled: time.Now().Add(time.Minute * 5),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
} }
var m2 WebMirrorStatus var m2 WebMirrorStatus
@@ -73,6 +80,10 @@ func TestStatus(t *testing.T) {
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix()) So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastStarted.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStartedTs.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStarted.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastStartedTs.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())

查看文件

@@ -1,4 +1,4 @@
package internal package internal
// Version of the program // Version of the program
const Version string = "0.6.0" const Version string = "0.6.7"

查看文件

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
@@ -16,6 +17,7 @@ type dbAdapter interface {
GetWorker(workerID string) (WorkerStatus, error) GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error) CreateWorker(w WorkerStatus) (WorkerStatus, error)
RefreshWorker(workerID string) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
ListMirrorStatus(workerID string) ([]MirrorStatus, error) ListMirrorStatus(workerID string) ([]MirrorStatus, error)
@@ -26,7 +28,9 @@ type dbAdapter interface {
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
if dbType == "bolt" { if dbType == "bolt" {
innerDB, err := bolt.Open(dbFile, 0600, nil) innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
Timeout: 5 * time.Second,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -122,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, err return w, err
} }
func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID id := mirrorID + "/" + workerID
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {

查看文件

@@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
ID: id, ID: id,
Token: "token_" + id, Token: "token_" + id,
LastOnline: time.Now(), LastOnline: time.Now(),
LastRegister: time.Now(),
} }
w, err = boltDB.CreateWorker(w) w, err = boltDB.CreateWorker(w)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -78,34 +79,37 @@ func TestBoltAdapter(t *testing.T) {
Convey("update mirror status", func() { Convey("update mirror status", func() {
status := []MirrorStatus{ status := []MirrorStatus{
MirrorStatus{ MirrorStatus{
Name: "arch-sync1", Name: "arch-sync1",
Worker: testWorkerIDs[0], Worker: testWorkerIDs[0],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "3GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
}, },
MirrorStatus{ MirrorStatus{
Name: "arch-sync2", Name: "arch-sync2",
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Disabled, Status: Disabled,
LastUpdate: time.Now().Add(-time.Hour), LastUpdate: time.Now().Add(-time.Hour),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}, },
MirrorStatus{ MirrorStatus{
Name: "arch-sync3", Name: "arch-sync3",
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now().Add(-time.Second), LastUpdate: time.Now().Add(-time.Minute),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Second),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}, },
} }

查看文件

@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -23,6 +24,7 @@ type Manager struct {
cfg *Config cfg *Config
engine *gin.Engine engine *gin.Engine
adapter dbAdapter adapter dbAdapter
rwmu sync.RWMutex
httpClient *http.Client httpClient *http.Client
} }
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
} }
} }
// listAllJobs repond with all jobs of specified workers // listAllJobs respond with all jobs of specified workers
func (s *Manager) listAllJobs(c *gin.Context) { func (s *Manager) listAllJobs(c *gin.Context) {
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListAllMirrorStatus() mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list all mirror status: %s", err := fmt.Errorf("failed to list all mirror status: %s",
err.Error(), err.Error(),
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
// flushDisabledJobs deletes all jobs that marks as deleted // flushDisabledJobs deletes all jobs that marks as deleted
func (s *Manager) flushDisabledJobs(c *gin.Context) { func (s *Manager) flushDisabledJobs(c *gin.Context) {
s.rwmu.Lock()
err := s.adapter.FlushDisabledJobs() err := s.adapter.FlushDisabledJobs()
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to flush disabled jobs: %s", err := fmt.Errorf("failed to flush disabled jobs: %s",
err.Error(), err.Error(),
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
// deleteWorker deletes one worker by id // deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) { func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.Lock()
err := s.adapter.DeleteWorker(workerID) err := s.adapter.DeleteWorker(workerID)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to delete worker: %s", err := fmt.Errorf("failed to delete worker: %s",
err.Error(), err.Error(),
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"}) c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
} }
// listWrokers respond with informations of all the workers // listWorkers respond with information of all the workers
func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus var workerInfos []WorkerStatus
s.rwmu.RLock()
workers, err := s.adapter.ListWorkers() workers, err := s.adapter.ListWorkers()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list workers: %s", err := fmt.Errorf("failed to list workers: %s",
err.Error(), err.Error(),
@@ -193,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) {
for _, w := range workers { for _, w := range workers {
workerInfos = append(workerInfos, workerInfos = append(workerInfos,
WorkerStatus{ WorkerStatus{
ID: w.ID, ID: w.ID,
LastOnline: w.LastOnline, LastOnline: w.LastOnline,
LastRegister: w.LastRegister,
}) })
} }
c.JSON(http.StatusOK, workerInfos) c.JSON(http.StatusOK, workerInfos)
@@ -205,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus var _worker WorkerStatus
c.BindJSON(&_worker) c.BindJSON(&_worker)
_worker.LastOnline = time.Now() _worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
newWorker, err := s.adapter.CreateWorker(_worker) newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil { if err != nil {
err := fmt.Errorf("failed to register worker: %s", err := fmt.Errorf("failed to register worker: %s",
@@ -223,7 +235,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
// listJobsOfWorker respond with all the jobs of the specified worker // listJobsOfWorker respond with all the jobs of the specified worker
func (s *Manager) listJobsOfWorker(c *gin.Context) { func (s *Manager) listJobsOfWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID) mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list jobs of worker %s: %s", err := fmt.Errorf("failed to list jobs of worker %s: %s",
workerID, err.Error(), workerID, err.Error(),
@@ -255,7 +269,10 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
fmt.Errorf("failed to get job %s of worker %s: %s", fmt.Errorf("failed to get job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -269,7 +286,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
} }
curStatus.Scheduled = schedule.NextSchedule curStatus.Scheduled = schedule.NextSchedule
s.rwmu.Lock()
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus) _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -295,16 +314,26 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
curTime := time.Now()
if status.Status == PreSyncing && curStatus.Status != PreSyncing {
status.LastStarted = curTime
} else {
status.LastStarted = curStatus.LastStarted
}
// Only successful syncing needs last_update // Only successful syncing needs last_update
if status.Status == Success { if status.Status == Success {
status.LastUpdate = time.Now() status.LastUpdate = curTime
} else { } else {
status.LastUpdate = curStatus.LastUpdate status.LastUpdate = curStatus.LastUpdate
} }
if status.Status == Success || status.Status == Failed { if status.Status == Success || status.Status == Failed {
status.LastEnded = time.Now() status.LastEnded = curTime
} else { } else {
status.LastEnded = curStatus.LastEnded status.LastEnded = curStatus.LastEnded
} }
@@ -324,7 +353,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status) logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
} }
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -346,7 +377,10 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
c.BindJSON(&msg) c.BindJSON(&msg)
mirrorName := msg.Name mirrorName := msg.Name
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName) status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
logger.Errorf( logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s", "Failed to get status of mirror %s @<%s>: %s",
@@ -363,7 +397,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size) logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -386,7 +422,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
return return
} }
s.rwmu.RLock()
w, err := s.adapter.GetWorker(workerID) w, err := s.adapter.GetWorker(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("worker %s is not registered yet", workerID) err := fmt.Errorf("worker %s is not registered yet", workerID)
s.returnErrJSON(c, http.StatusBadRequest, err) s.returnErrJSON(c, http.StatusBadRequest, err)
@@ -403,7 +441,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
// update job status, even if the job did not disable successfully, // update job status, even if the job did not disable successfully,
// this status should be set as disabled // this status should be set as disabled
s.rwmu.RLock()
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID) curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
s.rwmu.RUnlock()
changed := false changed := false
switch clientCmd.Cmd { switch clientCmd.Cmd {
case CmdDisable: case CmdDisable:
@@ -414,7 +454,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
changed = true changed = true
} }
if changed { if changed {
s.rwmu.Lock()
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat) s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
s.rwmu.Unlock()
} }
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID) logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)

查看文件

@@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail")) So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
}) })
Convey("when register multiple workers", func(ctx C) {
N := 10
var cnt uint32
for i := 0; i < N; i++ {
go func(id int) {
w := WorkerStatus{
ID: fmt.Sprintf("worker%d", id),
}
resp, err := PostJSON(baseURL+"/workers", w, nil)
ctx.So(err, ShouldBeNil)
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
atomic.AddUint32(&cnt, 1)
}(i)
}
time.Sleep(2 * time.Second)
So(cnt, ShouldEqual, N)
Convey("list all workers", func(ctx C) {
resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil)
defer resp.Body.Close()
var actualResponseObj []WorkerStatus
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
So(err, ShouldBeNil)
So(len(actualResponseObj), ShouldEqual, N+1)
})
})
Convey("when register a worker", func(ctx C) { Convey("when register a worker", func(ctx C) {
w := WorkerStatus{ w := WorkerStatus{
ID: "test_worker1", ID: "test_worker1",
@@ -151,10 +180,41 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(m.LastStarted.IsZero(), ShouldBeTrue) // hasn't been initialized yet
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
// start syncing
status.Status = PreSyncing
time.Sleep(1 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("update mirror status to PreSync - starting sync", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 1*time.Second)
So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeGreaterThan, 1*time.Second)
})
Convey("list all job status of all workers", func(ctx C) { Convey("list all job status of all workers", func(ctx C) {
var ms []WebMirrorStatus var ms []WebMirrorStatus
resp, err := GetJSON(baseURL+"/jobs", &ms, nil) resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
@@ -167,8 +227,9 @@ func TestHTTPServer(t *testing.T) {
So(m.Upstream, ShouldEqual, status.Upstream) So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastStarted.Time), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 3*time.Second)
}) })
@@ -197,8 +258,9 @@ func TestHTTPServer(t *testing.T) {
So(m.Upstream, ShouldEqual, status.Upstream) So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, "5GB") So(m.Size, ShouldEqual, "5GB")
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
}) })
}) })
@@ -251,6 +313,7 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastStarted), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
}) })
@@ -258,14 +321,15 @@ func TestHTTPServer(t *testing.T) {
Convey("update mirror status of an inexisted worker", func(ctx C) { Convey("update mirror status of an inexisted worker", func(ctx C) {
invalidWorker := "test_worker2" invalidWorker := "test_worker2"
status := MirrorStatus{ status := MirrorStatus{
Name: "arch-sync2", Name: "arch-sync2",
Worker: invalidWorker, Worker: invalidWorker,
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(), LastStarted: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
} }
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
baseURL, status.Worker, status.Name), status, nil) baseURL, status.Worker, status.Name), status, nil)
@@ -398,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, nil return w, nil
} }
func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) { func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
id := mirrorID + "/" + workerID id := mirrorID + "/" + workerID
status, ok := b.statusStore[id] status, ok := b.statusStore[id]

查看文件

@@ -16,6 +16,7 @@ type baseProvider struct {
name string name string
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
isMaster bool isMaster bool
cmd *cmdJob cmd *cmdJob
@@ -56,6 +57,10 @@ func (p *baseProvider) Retry() int {
return p.retry return p.retry
} }
func (p *baseProvider) Timeout() time.Duration {
return p.timeout
}
func (p *baseProvider) IsMaster() bool { func (p *baseProvider) IsMaster() bool {
return p.isMaster return p.isMaster
} }
@@ -142,7 +147,7 @@ func (p *baseProvider) closeLogFile() (err error) {
return return
} }
func (p *baseProvider) Run() error { func (p *baseProvider) Run(started chan empty) error {
panic("Not Implemented") panic("Not Implemented")
} }
@@ -169,6 +174,7 @@ func (p *baseProvider) Terminate() error {
defer p.Unlock() defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name()) logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() { if !p.IsRunning() {
logger.Warningf("Terminate() called while IsRunning is false: %s", p.Name())
return nil return nil
} }

查看文件

@@ -83,7 +83,7 @@ sleep 30
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func() { go func() {
err = provider.Run() err := provider.Run(make(chan empty, 1))
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()

查看文件

@@ -16,6 +16,7 @@ type cmdConfig struct {
workingDir, logDir, logFile string workingDir, logDir, logFile string
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
env map[string]string env map[string]string
failOnMatch string failOnMatch string
sizePattern string sizePattern string
@@ -41,6 +42,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
cmdConfig: c, cmdConfig: c,
} }
@@ -86,12 +88,13 @@ func (p *cmdProvider) DataSize() string {
return p.dataSize return p.dataSize
} }
func (p *cmdProvider) Run() error { func (p *cmdProvider) Run(started chan empty) error {
p.dataSize = "" p.dataSize = ""
defer p.closeLogFile() defer p.closeLogFile()
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
started <- empty{}
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
return err return err
} }
@@ -139,5 +142,6 @@ func (p *cmdProvider) Start() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
return nil return nil
} }

查看文件

@@ -1,6 +1,6 @@
package worker package worker
// put global viables and types here // put global variables and types here
import ( import (
"gopkg.in/op/go-logging.v1" "gopkg.in/op/go-logging.v1"

查看文件

@@ -53,6 +53,7 @@ type globalConfig struct {
Concurrent int `toml:"concurrent"` Concurrent int `toml:"concurrent"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"` Retry int `toml:"retry"`
Timeout int `toml:"timeout"`
ExecOnSuccess []string `toml:"exec_on_success"` ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"` ExecOnFailure []string `toml:"exec_on_failure"`
@@ -118,6 +119,7 @@ type mirrorConfig struct {
Upstream string `toml:"upstream"` Upstream string `toml:"upstream"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"` Retry int `toml:"retry"`
Timeout int `toml:"timeout"`
MirrorDir string `toml:"mirror_dir"` MirrorDir string `toml:"mirror_dir"`
MirrorSubDir string `toml:"mirror_subdir"` MirrorSubDir string `toml:"mirror_subdir"`
LogDir string `toml:"log_dir"` LogDir string `toml:"log_dir"`
@@ -140,6 +142,8 @@ type mirrorConfig struct {
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
RsyncNoTimeo bool `toml:"rsync_no_timeout"`
RsyncTimeout int `toml:"rsync_timeout"`
RsyncOptions []string `toml:"rsync_options"` RsyncOptions []string `toml:"rsync_options"`
RsyncOverride []string `toml:"rsync_override"` RsyncOverride []string `toml:"rsync_override"`
Stage1Profile string `toml:"stage1_profile"` Stage1Profile string `toml:"stage1_profile"`

查看文件

@@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
@@ -19,6 +20,7 @@ mirror_dir = "/data/mirrors"
concurrent = 10 concurrent = 10
interval = 240 interval = 240
retry = 3 retry = 3
timeout = 86400
[manager] [manager]
api_base = "https://127.0.0.1:5000" api_base = "https://127.0.0.1:5000"
@@ -37,6 +39,7 @@ provider = "command"
upstream = "https://aosp.google.com/" upstream = "https://aosp.google.com/"
interval = 720 interval = 720
retry = 2 retry = 2
timeout = 3600
mirror_dir = "/data/git/AOSP" mirror_dir = "/data/git/AOSP"
exec_on_success = [ exec_on_success = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -119,6 +122,7 @@ use_ipv6 = true
So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240) So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3) So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.Timeout, ShouldEqual, 86400)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@@ -130,6 +134,7 @@ use_ipv6 = true
So(m.Provider, ShouldEqual, provCommand) So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720) So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2) So(m.Retry, ShouldEqual, 2)
So(m.Timeout, ShouldEqual, 3600)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1] m = cfg.Mirrors[1]
@@ -316,6 +321,7 @@ log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors" mirror_dir = "/data/mirrors"
concurrent = 10 concurrent = 10
interval = 240 interval = 240
timeout = 86400
retry = 3 retry = 3
[manager] [manager]
@@ -388,5 +394,6 @@ use_ipv6 = true
rp, ok := p.(*rsyncProvider) rp, ok := p.(*rsyncProvider)
So(ok, ShouldBeTrue) So(ok, ShouldBeTrue)
So(rp.WorkingDir(), ShouldEqual, "/data/mirrors/debian-cd") So(rp.WorkingDir(), ShouldEqual, "/data/mirrors/debian-cd")
So(p.Timeout(), ShouldEqual, 86400*time.Second)
}) })
} }

查看文件

@@ -3,6 +3,9 @@ package worker
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/codeskyblue/go-sh"
) )
type dockerHook struct { type dockerHook struct {
@@ -16,6 +19,10 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
volumes := []string{} volumes := []string{}
volumes = append(volumes, gCfg.Volumes...) volumes = append(volumes, gCfg.Volumes...)
volumes = append(volumes, mCfg.DockerVolumes...) volumes = append(volumes, mCfg.DockerVolumes...)
if len(mCfg.ExcludeFile) > 0 {
arg := fmt.Sprintf("%s:%s:ro", mCfg.ExcludeFile, mCfg.ExcludeFile)
volumes = append(volumes, arg)
}
options := []string{} options := []string{}
options = append(options, gCfg.Options...) options = append(options, gCfg.Options...)
@@ -60,6 +67,27 @@ func (d *dockerHook) postExec() error {
// sh.Command( // sh.Command(
// "docker", "rm", "-f", d.Name(), // "docker", "rm", "-f", d.Name(),
// ).Run() // ).Run()
name := d.Name()
retry := 10
for ; retry > 0; retry-- {
out, err := sh.Command(
"docker", "ps", "-a",
"--filter", "name=^"+name+"$",
"--format", "{{.Status}}",
).Output()
if err != nil {
logger.Errorf("docker ps failed: %v", err)
break
}
if len(out) == 0 {
break
}
logger.Debugf("container %s still exists: '%s'", name, string(out))
time.Sleep(1 * time.Second)
}
if retry == 0 {
logger.Warningf("container %s not removed automatically, next sync may fail", name)
}
d.provider.ExitContext() d.provider.ExitContext()
return nil return nil
} }

查看文件

@@ -87,29 +87,34 @@ sleep 20
cmdRun("docker", []string{"images"}) cmdRun("docker", []string{"images"})
exitedErr := make(chan error, 1) exitedErr := make(chan error, 1)
go func() { go func() {
err = provider.Run() err = provider.Run(make(chan empty, 1))
logger.Debugf("provider.Run() exited") logger.Debugf("provider.Run() exited")
if err != nil { if err != nil {
logger.Errorf("provider.Run() failed: %v", err) logger.Errorf("provider.Run() failed: %v", err)
} }
exitedErr <- err exitedErr <- err
}() }()
cmdRun("ps", []string{"aux"})
// Wait for docker running // Wait for docker running
time.Sleep(8 * time.Second) for wait := 0; wait < 8; wait++ {
names, err := getDockerByName(d.Name())
cmdRun("ps", []string{"aux"}) So(err, ShouldBeNil)
if names != "" {
break
}
time.Sleep(1 * time.Second)
}
// cmdRun("ps", []string{"aux"})
// assert container running // assert container running
names, err := getDockerByName(d.Name()) names, err := getDockerByName(d.Name())
So(err, ShouldBeNil) So(err, ShouldBeNil)
// So(names, ShouldEqual, d.Name()+"\n") So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate() err = provider.Terminate()
// So(err, ShouldBeNil) So(err, ShouldBeNil)
cmdRun("ps", []string{"aux"}) // cmdRun("ps", []string{"aux"})
<-exitedErr <-exitedErr
// container should be terminated and removed // container should be terminated and removed

查看文件

@@ -155,24 +155,43 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
var syncErr error var syncErr error
syncDone := make(chan error, 1) syncDone := make(chan error, 1)
started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync)
go func() { go func() {
err := provider.Run() err := provider.Run(started)
syncDone <- err syncDone <- err
}() }()
select { // Wait until provider started or error happened
case err := <-syncDone:
logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error())
syncDone <- err // it will be read again later
case <-started:
logger.Debug("provider started")
}
// Now terminating the provider is feasible
var termErr error
timeout := provider.Timeout()
if timeout <= 0 {
timeout = 100000 * time.Hour // never time out
}
select { select {
case syncErr = <-syncDone: case syncErr = <-syncDone:
logger.Debug("syncing done") logger.Debug("syncing done")
case <-time.After(timeout):
logger.Notice("provider timeout")
termErr = provider.Terminate()
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
case <-kill: case <-kill:
logger.Debug("received kill") logger.Debug("received kill")
stopASAP = true stopASAP = true
err := provider.Terminate() termErr = provider.Terminate()
if err != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
return err
}
syncErr = errors.New("killed by manager") syncErr = errors.New("killed by manager")
} }
if termErr != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
return termErr
}
// post-exec hooks // post-exec hooks
herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec") herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")

查看文件

@@ -31,6 +31,7 @@ func TestMirrorJob(t *testing.T) {
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
interval: 1 * time.Second, interval: 1 * time.Second,
timeout: 7 * time.Second,
} }
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
@@ -41,6 +42,7 @@ func TestMirrorJob(t *testing.T) {
So(provider.LogDir(), ShouldEqual, c.logDir) So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile) So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval) So(provider.Interval(), ShouldEqual, c.interval)
So(provider.Timeout(), ShouldEqual, c.timeout)
Convey("For a normal mirror job", func(ctx C) { Convey("For a normal mirror job", func(ctx C) {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
@@ -333,6 +335,66 @@ echo $TUNASYNC_WORKING_DIR
}) })
}) })
Convey("When a job timed out", func(ctx C) {
scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR
sleep 10
echo $TUNASYNC_WORKING_DIR
`
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, 1)
job := newMirrorJob(provider)
Convey("It should be automatically terminated", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("It should be retried", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldContainSubstring, "timeout after")
// re-schedule after last try
So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
}
job.ctrlChan <- jobDisable
<-job.disabled
})
})
}) })
} }

查看文件

@@ -24,9 +24,9 @@ type mirrorProvider interface {
Type() providerEnum Type() providerEnum
// run mirror job in background // Start then Wait
Run() error Run(started chan empty) error
// run mirror job in background // Start the job
Start() error Start() error
// Wait job to finish // Wait job to finish
Wait() error Wait() error
@@ -46,6 +46,7 @@ type mirrorProvider interface {
Interval() time.Duration Interval() time.Duration
Retry() int Retry() int
Timeout() time.Duration
WorkingDir() string WorkingDir() string
LogDir() string LogDir() string
@@ -91,6 +92,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
if mirror.Retry == 0 { if mirror.Retry == 0 {
mirror.Retry = cfg.Global.Retry mirror.Retry = cfg.Global.Retry
} }
if mirror.Timeout == 0 {
mirror.Timeout = cfg.Global.Timeout
}
logDir = formatLogDir(logDir, mirror) logDir = formatLogDir(logDir, mirror)
// IsMaster // IsMaster
@@ -118,6 +122,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry, retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
env: mirror.Env, env: mirror.Env,
} }
p, err := newCmdProvider(pc) p, err := newCmdProvider(pc)
@@ -135,6 +140,8 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncNeverTimeout: mirror.RsyncNoTimeo,
rsyncTimeoutValue: mirror.RsyncTimeout,
overriddenOptions: mirror.RsyncOverride, overriddenOptions: mirror.RsyncOverride,
rsyncEnv: mirror.Env, rsyncEnv: mirror.Env,
workingDir: mirrorDir, workingDir: mirrorDir,
@@ -144,6 +151,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
useIPv4: mirror.UseIPv4, useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry, retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
} }
p, err := newRsyncProvider(rc) p, err := newRsyncProvider(rc)
if err != nil { if err != nil {
@@ -153,21 +161,24 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider = p provider = p
case provTwoStageRsync: case provTwoStageRsync:
rc := twoStageRsyncConfig{ rc := twoStageRsyncConfig{
name: mirror.Name, name: mirror.Name,
stage1Profile: mirror.Stage1Profile, stage1Profile: mirror.Stage1Profile,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command, rsyncCmd: mirror.Command,
username: mirror.Username, username: mirror.Username,
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncEnv: mirror.Env, rsyncNeverTimeout: mirror.RsyncNoTimeo,
workingDir: mirrorDir, rsyncTimeoutValue: mirror.RsyncTimeout,
logDir: logDir, rsyncEnv: mirror.Env,
logFile: filepath.Join(logDir, "latest.log"), workingDir: mirrorDir,
useIPv6: mirror.UseIPv6, logDir: logDir,
interval: time.Duration(mirror.Interval) * time.Minute, logFile: filepath.Join(logDir, "latest.log"),
retry: mirror.Retry, useIPv6: mirror.UseIPv6,
interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
} }
p, err := newTwoStageRsyncProvider(rc) p, err := newTwoStageRsyncProvider(rc)
if err != nil { if err != nil {

查看文件

@@ -28,6 +28,7 @@ func TestRsyncProvider(t *testing.T) {
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
timeout: 100 * time.Second,
interval: 600 * time.Second, interval: 600 * time.Second,
} }
@@ -40,6 +41,7 @@ func TestRsyncProvider(t *testing.T) {
So(provider.LogDir(), ShouldEqual, c.logDir) So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile) So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval) So(provider.Interval(), ShouldEqual, c.interval)
So(provider.Timeout(), ShouldEqual, c.timeout)
Convey("When entering a context (auto exit)", func() { Convey("When entering a context (auto exit)", func() {
func() { func() {
@@ -96,7 +98,7 @@ exit 0
), ),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -127,7 +129,7 @@ exit 0
provider, err := newRsyncProvider(c) provider, err := newRsyncProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -146,18 +148,19 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
proxyAddr := "127.0.0.1:1233" proxyAddr := "127.0.0.1:1233"
c := rsyncConfig{ c := rsyncConfig{
name: "tuna", name: "tuna",
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
username: "tunasync", username: "tunasync",
password: "tunasyncpassword", password: "tunasyncpassword",
workingDir: tmpDir, workingDir: tmpDir,
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr}, rsyncTimeoutValue: 30,
logDir: tmpDir, rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
logFile: tmpFile, logDir: tmpDir,
useIPv4: true, logFile: tmpFile,
interval: 600 * time.Second, useIPv4: true,
interval: 600 * time.Second,
} }
provider, err := newRsyncProvider(c) provider, err := newRsyncProvider(c)
@@ -189,13 +192,13 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 -4 --delete-excluded %s %s", "--timeout=30 -4 --delete-excluded %s %s",
provider.username, provider.password, proxyAddr, provider.username, provider.password, proxyAddr,
provider.upstreamURL, provider.WorkingDir(), provider.upstreamURL, provider.WorkingDir(),
), ),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -219,6 +222,7 @@ func TestRsyncProviderWithOverriddenOptions(t *testing.T) {
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
rsyncNeverTimeout: true,
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"}, overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
logDir: tmpDir, logDir: tmpDir,
@@ -257,7 +261,7 @@ exit 0
provider.WorkingDir(), provider.WorkingDir(),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -268,6 +272,78 @@ exit 0
}) })
} }
func TestRsyncProviderWithDocker(t *testing.T) {
Convey("Rsync in Docker should work", t, func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync")
excludeFile := filepath.Join(tmpDir, "exclude.txt")
g := &Config{
Global: globalConfig{
Retry: 2,
},
Docker: dockerConfig{
Enable: true,
Volumes: []string{
scriptFile + ":/bin/myrsync",
"/etc/gai.conf:/etc/gai.conf:ro",
},
},
}
c := mirrorConfig{
Name: "tuna",
Provider: provRsync,
Upstream: "rsync://rsync.tuna.moe/tuna/",
Command: "/bin/myrsync",
ExcludeFile: excludeFile,
DockerImage: "alpine:3.8",
LogDir: tmpDir,
MirrorDir: tmpDir,
UseIPv6: true,
Timeout: 100,
Interval: 600,
}
provider := newMirrorProvider(c, g)
So(provider.Type(), ShouldEqual, provRsync)
So(provider.Name(), ShouldEqual, c.Name)
So(provider.WorkingDir(), ShouldEqual, c.MirrorDir)
So(provider.LogDir(), ShouldEqual, c.LogDir)
cmdScriptContent := `#!/bin/sh
#echo "$@"
while [[ $# -gt 0 ]]; do
if [[ "$1" = "--exclude-from" ]]; then
cat "$2"
shift
fi
shift
done
`
err = ioutil.WriteFile(scriptFile, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
err = ioutil.WriteFile(excludeFile, []byte("__some_pattern"), 0755)
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.preExec()
So(err, ShouldBeNil)
}
err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.postExec()
So(err, ShouldBeNil)
}
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, "__some_pattern")
})
}
func TestCmdProvider(t *testing.T) { func TestCmdProvider(t *testing.T) {
Convey("Command Provider should work", t, func(ctx C) { Convey("Command Provider should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
@@ -321,7 +397,7 @@ echo $AOSP_REPO_BIN
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent)) So(readedScriptContent, ShouldResemble, []byte(scriptContent))
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
@@ -337,7 +413,7 @@ echo $AOSP_REPO_BIN
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent)) So(readedScriptContent, ShouldResemble, []byte(scriptContent))
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
@@ -349,11 +425,14 @@ sleep 10
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
started := make(chan empty, 1)
go func() { go func() {
err = provider.Run() err := provider.Run(started)
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()
<-started
So(provider.IsRunning(), ShouldBeTrue)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
err = provider.Terminate() err = provider.Terminate()
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -389,7 +468,7 @@ sleep 10
Convey("Run the command", func() { Convey("Run the command", func() {
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -417,7 +496,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty) So(provider.DataSize(), ShouldBeEmpty)
}) })
@@ -427,7 +506,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -437,7 +516,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
@@ -446,7 +525,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(provider.DataSize(), ShouldNotBeEmpty) So(provider.DataSize(), ShouldNotBeEmpty)
_, err = strconv.ParseFloat(provider.DataSize(), 32) _, err = strconv.ParseFloat(provider.DataSize(), 32)
@@ -458,7 +537,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(provider.DataSize(), ShouldBeEmpty) So(provider.DataSize(), ShouldBeEmpty)
}) })
@@ -469,7 +548,7 @@ sleep 10
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty) So(provider.DataSize(), ShouldBeEmpty)
}) })
@@ -485,18 +564,19 @@ func TestTwoStageRsyncProvider(t *testing.T) {
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
c := twoStageRsyncConfig{ c := twoStageRsyncConfig{
name: "tuna-two-stage-rsync", name: "tuna-two-stage-rsync",
upstreamURL: "rsync://mirrors.tuna.moe/", upstreamURL: "rsync://mirrors.tuna.moe/",
stage1Profile: "debian", stage1Profile: "debian",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
excludeFile: tmpFile, excludeFile: tmpFile,
extraOptions: []string{"--delete-excluded", "--cache"}, rsyncTimeoutValue: 30,
username: "hello", extraOptions: []string{"--delete-excluded", "--cache"},
password: "world", username: "hello",
password: "world",
} }
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
@@ -520,7 +600,7 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 2))
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
@@ -534,7 +614,7 @@ exit 0
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
@@ -542,7 +622,7 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s", "--delete-excluded --cache --timeout=30 -6 --exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -562,18 +642,21 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
started := make(chan empty, 2)
go func() { go func() {
err = provider.Run() err := provider.Run(started)
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()
<-started
So(provider.IsRunning(), ShouldBeTrue)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
err = provider.Terminate() err = provider.Terminate()
So(err, ShouldBeNil) So(err, ShouldBeNil)
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s\n", "--exclude-from %s %s %s\n",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
) )
@@ -606,7 +689,7 @@ exit 0
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 2))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)

查看文件

@@ -2,6 +2,7 @@ package worker
import ( import (
"errors" "errors"
"fmt"
"strings" "strings"
"time" "time"
@@ -14,11 +15,14 @@ type rsyncConfig struct {
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
overriddenOptions []string overriddenOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6, useIPv4 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -43,6 +47,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
rsyncConfig: c, rsyncConfig: c,
} }
@@ -64,12 +69,20 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--safe-links",
} }
if c.overriddenOptions != nil { if c.overriddenOptions != nil {
options = c.overriddenOptions options = c.overriddenOptions
} }
if !c.rsyncNeverTimeout {
timeo := 120
if c.rsyncTimeoutValue > 0 {
timeo = c.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if c.useIPv6 { if c.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} else if c.useIPv4 { } else if c.useIPv4 {
@@ -103,12 +116,13 @@ func (p *rsyncProvider) DataSize() string {
return p.dataSize return p.dataSize
} }
func (p *rsyncProvider) Run() error { func (p *rsyncProvider) Run(started chan empty) error {
p.dataSize = "" p.dataSize = ""
defer p.closeLogFile() defer p.closeLogFile()
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
started <- empty{}
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
code, msg := internal.TranslateRsyncErrorCode(err) code, msg := internal.TranslateRsyncErrorCode(err)
if code != 0 { if code != 0 {
@@ -144,5 +158,6 @@ func (p *rsyncProvider) Start() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
return nil return nil
} }

查看文件

@@ -149,10 +149,10 @@ func (c *cmdJob) Terminate() error {
select { select {
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL) unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
return errors.New("SIGTERM failed to kill the job") logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
case <-c.finished: case <-c.finished:
return nil
} }
return nil
} }
// Copied from go-sh // Copied from go-sh

查看文件

@@ -15,11 +15,14 @@ type twoStageRsyncConfig struct {
stage1Profile string stage1Profile string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -54,18 +57,19 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
twoStageRsyncConfig: c, twoStageRsyncConfig: c,
stage1Options: []string{ stage1Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--safe-links", "--timeout=120", "--safe-links",
}, },
stage2Options: []string{ stage2Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--safe-links",
}, },
} }
@@ -122,6 +126,14 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
return []string{}, fmt.Errorf("Invalid stage: %d", stage) return []string{}, fmt.Errorf("Invalid stage: %d", stage)
} }
if !p.rsyncNeverTimeout {
timeo := 120
if p.rsyncTimeoutValue > 0 {
timeo = p.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if p.useIPv6 { if p.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} }
@@ -133,7 +145,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
return options, nil return options, nil
} }
func (p *twoStageRsyncProvider) Run() error { func (p *twoStageRsyncProvider) Run(started chan empty) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@@ -163,6 +175,7 @@ func (p *twoStageRsyncProvider) Run() error {
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name()) logger.Debugf("set isRunning to true: %s", p.Name())
started <- empty{}
p.Unlock() p.Unlock()
err = p.Wait() err = p.Wait()

查看文件

@@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
// Run runs worker forever // Run runs worker forever
func (w *Worker) Run() { func (w *Worker) Run() {
w.registorWorker() w.registerWorker()
go w.runHTTPServer() go w.runHTTPServer()
w.runSchedule() w.runSchedule()
} }
@@ -393,7 +393,7 @@ func (w *Worker) URL() string {
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port) return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
} }
func (w *Worker) registorWorker() { func (w *Worker) registerWorker() {
msg := WorkerStatus{ msg := WorkerStatus{
ID: w.Name(), ID: w.Name(),
URL: w.URL(), URL: w.URL(),
@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
for _, root := range w.cfg.Manager.APIBaseList() { for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf("%s/workers", root) url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url) logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil { for retry := 10; retry > 0; {
logger.Errorf("Failed to register worker") if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
retry--
if retry > 0 {
time.Sleep(1 * time.Second)
logger.Noticef("Retrying... (%d)", retry)
}
} else {
break
}
} }
} }
} }

查看文件

@@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
var _worker WorkerStatus var _worker WorkerStatus
c.BindJSON(&_worker) c.BindJSON(&_worker)
_worker.LastOnline = time.Now() _worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
recvData <- _worker recvData <- _worker
c.JSON(http.StatusOK, _worker) c.JSON(http.StatusOK, _worker)
}) })