1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-07 15:06:47 +00:00

51 次代码提交

作者 SHA1 备注 提交日期
z4yx
2afe1f2e06 bump version to 0.6.6 2020-06-17 22:12:11 +08:00
z4yx
1b099520b2 [manager] protect DB with RW lock 2020-06-17 22:10:39 +08:00
z4yx
85b2105a2b [worker] retry registration 2020-06-17 21:34:55 +08:00
zyx
45e5d900fb bump version to 0.6.5 2020-06-08 22:30:28 +08:00
zyx
7b0cd490b7 fix misuse of a variable 2020-06-08 22:23:12 +08:00
zyx
9178966aed bump version to 0.6.4 2020-06-04 09:44:17 +08:00
zyx
b5d2a0ad89 bug fix: jobs not being scheduled after timeout 2020-06-04 09:37:20 +08:00
zyx
d8963c9946 test rsync inside a Docker container 2020-06-03 21:51:04 +08:00
zyx
198afa72cd bug fix: rsync can access the exclude file in Docker (close #59) 2020-06-03 21:50:38 +08:00
zyx
85ce9c1270 wait for docker container removal 2020-06-03 19:47:14 +08:00
zyx
a8a35fc259 Merge branch 'master' of github.com:tuna/tunasync 2020-06-03 13:28:58 +08:00
zyx
c00eb12a75 Two new options for rsync provider
- rsync_no_timeout=true/false # disable --timeout option
- rsync_timeout=n # set --timeout=n
related to issue #121
2020-06-03 13:26:49 +08:00
Yuxiang Zhang
95ae9c16a9 Update workers.conf 2020-06-01 16:59:44 +08:00
zyx
0392ef28c7 bump version to 0.6.3 2020-05-25 19:21:27 +08:00
zyx
b2a22a9bbc update editor config 2020-05-25 19:16:53 +08:00
zyx
31862210ba implement the timeout 2020-05-25 19:15:05 +08:00
zyx
e47ba2097e add a timeout field to providers 2020-05-25 18:24:05 +08:00
zyx
e8c7ff3d7f config items of timeout 2020-05-25 18:08:31 +08:00
Yuxiang Zhang
7e7b469f1e Update workers.conf 2020-05-23 15:28:32 +08:00
Yuxiang Zhang
eac66c7554 add config examples of the worker (#118) 2020-05-23 15:23:15 +08:00
z4yx
38b0156fae [bug fix] provider is not terminated if premature stop command received 2020-05-09 18:42:54 +08:00
z4yx
c8e7d29f34 bump version to 0.6.2 2020-04-08 20:12:41 +08:00
Yuxiang Zhang
d40638d738 Merge pull request #116 from BITNP/laststarted
Add MirrorStatus.LastStarted property
2020-04-06 23:01:58 +08:00
Phy
471d865042 Add LastStarted test case 2020-04-05 01:07:46 -04:00
Phy
c1641b6714 Add MirrorStatus.LastStarted property
- status.Status is in PreSyncing, and
- curStatus.Status is not in PreSyncing
2020-04-05 00:12:10 -04:00
z4yx
b8edc1f714 bump version to 0.6 2020-03-29 12:48:29 +08:00
z4yx
001703a059 CI runs slower, give it more time 2020-03-29 12:01:39 +08:00
z4yx
2bbd4afda8 remove logger.Error() 2020-03-29 11:54:39 +08:00
z4yx
e8e6ab6ed6 Merge branch 'wip-newlog' 2020-03-29 11:47:53 +08:00
Yuxiang Zhang
3fed3f1cf3 Merge pull request #114 from tuna/nest_mirror
Support nested mirror config
2020-03-29 11:44:40 +08:00
z4yx
1491b6c42b format the code 2020-03-29 09:06:19 +08:00
Miao Wang
7a9895350b Support nested mirror config 2020-03-29 00:24:58 +08:00
z4yx
95d6acb026 tunasynctl: print command results with plain text instead of logging messages 2020-03-28 17:07:53 +08:00
z4yx
b132192448 Add a debugging log level to tunasynctl 2020-03-28 16:33:56 +08:00
z4yx
91209cab60 translate rsync exit code to error message (solve #20). May help #109 and #110 2020-03-28 16:26:40 +08:00
z4yx
1fb9f85862 closing log files where they were opened 2020-03-28 16:26:40 +08:00
Yuxiang Zhang
d10387e40b Merge pull request #112 from BITNP/cli-logging
Use proper logging for some debug output
2020-03-23 22:21:08 +08:00
Phy
5c01e3fa22 Use fmt.Println for cli JSON output 2020-03-23 10:19:55 -04:00
Phy
a44891d3e8 Set proper logging level on tunasynctl-cmd 2020-03-23 01:21:16 -04:00
Phy
4d461bd172 Use logger to print some debug messages than fmt.print 2020-03-23 01:20:49 -04:00
zyx
c5ed682a49 Bump version to 0.5.1 2020-03-20 10:39:34 +08:00
zyx
2c33380ce0 fix util_test 2020-03-20 10:35:54 +08:00
zyx
70cb22096f Merge branch 'master' of github.ip4.run:tuna/tunasync 2020-03-20 10:30:53 +08:00
zyx
b1f2679fbf [cmd provider] add support of match size in logs 2020-03-20 10:30:44 +08:00
Yuxiang Zhang
92a255fd3c Update tunasync.yml 2020-03-16 22:43:41 +08:00
zyx
aee1a705b7 remove "--contimeout=120" from default rsync options 2020-03-16 22:23:47 +08:00
zyx
c99916cc2a Bump version to 0.4.3 2020-03-16 22:03:40 +08:00
zyx
9eb72c5db0 fix misuse of variables 2020-03-16 21:59:34 +08:00
z4yx
b490c22984 add test of rsyncEnv 2020-03-16 21:16:23 +08:00
z4yx
ae5ff25d20 in case rsyncEnv is nil 2020-03-16 21:11:15 +08:00
z4yx
365f49e6d3 add support of env config for rsync provider 2020-03-16 20:59:08 +08:00
共有 32 个文件被更改,包括 1879 次插入273 次删除

查看文件

@@ -28,6 +28,11 @@ jobs:
make tunasync make tunasync
make tunasynctl make tunasynctl
- name: Keep artifacts
uses: actions/upload-artifact@v1
with:
name: tunasync-bin
path: build/
test: test:
name: Test name: Test
@@ -65,4 +70,4 @@ jobs:
uses: coverallsapp/github-action@v1.0.1 uses: coverallsapp/github-action@v1.0.1
with: with:
github-token: ${{ secrets.github_token }} github-token: ${{ secrets.github_token }}
path-to-lcov: coverage.lcov path-to-lcov: coverage.lcov

13
.vscode/settings.json vendored 普通文件
查看文件

@@ -0,0 +1,13 @@
{
"cSpell.words": [
"Btrfs",
"Debugf",
"Infof",
"Noticef",
"Warningf",
"cgroup",
"mergo",
"tmpl",
"zpool"
]
}

查看文件

@@ -32,7 +32,7 @@ const (
userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf userCfgFile = "$HOME/.config/tunasync/ctl.conf" // user-specific conf
) )
var logger = logging.MustGetLogger("tunasynctl-cmd") var logger = logging.MustGetLogger("tunasynctl")
var baseURL string var baseURL string
var client *http.Client var client *http.Client
@@ -41,7 +41,7 @@ func initializeWrapper(handler cli.ActionFunc) cli.ActionFunc {
return func(c *cli.Context) error { return func(c *cli.Context) error {
err := initialize(c) err := initialize(c)
if err != nil { if err != nil {
return cli.NewExitError("", 1) return cli.NewExitError(err.Error(), 1)
} }
return handler(c) return handler(c)
} }
@@ -55,8 +55,9 @@ type config struct {
func loadConfig(cfgFile string, cfg *config) error { func loadConfig(cfgFile string, cfg *config) error {
if cfgFile != "" { if cfgFile != "" {
logger.Infof("Loading config: %s", cfgFile)
if _, err := toml.DecodeFile(cfgFile, cfg); err != nil { if _, err := toml.DecodeFile(cfgFile, cfg); err != nil {
logger.Errorf(err.Error()) // logger.Errorf(err.Error())
return err return err
} }
} }
@@ -66,7 +67,7 @@ func loadConfig(cfgFile string, cfg *config) error {
func initialize(c *cli.Context) error { func initialize(c *cli.Context) error {
// init logger // init logger
tunasync.InitLogger(c.Bool("verbose"), c.Bool("verbose"), false) tunasync.InitLogger(c.Bool("verbose"), c.Bool("debug"), false)
cfg := new(config) cfg := new(config)
@@ -76,14 +77,23 @@ func initialize(c *cli.Context) error {
// find config file and load config // find config file and load config
if _, err := os.Stat(systemCfgFile); err == nil { if _, err := os.Stat(systemCfgFile); err == nil {
loadConfig(systemCfgFile, cfg) err = loadConfig(systemCfgFile, cfg)
if err != nil {
return err
}
} }
fmt.Println(os.ExpandEnv(userCfgFile)) logger.Debug("user config file: %s", os.ExpandEnv(userCfgFile))
if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil { if _, err := os.Stat(os.ExpandEnv(userCfgFile)); err == nil {
loadConfig(os.ExpandEnv(userCfgFile), cfg) err = loadConfig(os.ExpandEnv(userCfgFile), cfg)
if err != nil {
return err
}
} }
if c.String("config") != "" { if c.String("config") != "" {
loadConfig(c.String("config"), cfg) err := loadConfig(c.String("config"), cfg)
if err != nil {
return err
}
} }
// override config using the command-line arguments // override config using the command-line arguments
@@ -112,7 +122,7 @@ func initialize(c *cli.Context) error {
client, err = tunasync.CreateHTTPClient(cfg.CACert) client, err = tunasync.CreateHTTPClient(cfg.CACert)
if err != nil { if err != nil {
err = fmt.Errorf("Error initializing HTTP client: %s", err.Error()) err = fmt.Errorf("Error initializing HTTP client: %s", err.Error())
logger.Error(err.Error()) // logger.Error(err.Error())
return err return err
} }
@@ -135,7 +145,7 @@ func listWorkers(c *cli.Context) error {
err.Error()), err.Error()),
1) 1)
} }
fmt.Print(string(b)) fmt.Println(string(b))
return nil return nil
} }
@@ -167,14 +177,21 @@ func listJobs(c *cli.Context) error {
_, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs", _, err := tunasync.GetJSON(fmt.Sprintf("%s/workers/%s/jobs",
baseURL, workerID), &workerJobs, client) baseURL, workerID), &workerJobs, client)
if err != nil { if err != nil {
logger.Errorf("Filed to correctly get jobs"+ logger.Infof("Failed to correctly get jobs"+
" for worker %s: %s", workerID, err.Error()) " for worker %s: %s", workerID, err.Error())
} }
ans <- workerJobs ans <- workerJobs
}(workerID) }(workerID)
} }
for range args { for range args {
jobs = append(jobs, <-ans...) job := <-ans
if job == nil {
return cli.NewExitError(
fmt.Sprintf("Failed to correctly get information "+
"of jobs from at least one manager"),
1)
}
jobs = append(jobs, job...)
} }
genericJobs = jobs genericJobs = jobs
} }
@@ -182,10 +199,10 @@ func listJobs(c *cli.Context) error {
b, err := json.MarshalIndent(genericJobs, "", " ") b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil { if err != nil {
return cli.NewExitError( return cli.NewExitError(
fmt.Sprintf("Error printing out informations: %s", err.Error()), fmt.Sprintf("Error printing out information: %s", err.Error()),
1) 1)
} }
fmt.Printf(string(b)) fmt.Println(string(b))
return nil return nil
} }
@@ -236,7 +253,7 @@ func updateMirrorSize(c *cli.Context) error {
) )
} }
logger.Infof("Successfully updated mirror size to %s", mirrorSize) fmt.Printf("Successfully updated mirror size to %s\n", mirrorSize)
return nil return nil
} }
@@ -279,9 +296,9 @@ func removeWorker(c *cli.Context) error {
res := map[string]string{} res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res) err = json.NewDecoder(resp.Body).Decode(&res)
if res["message"] == "deleted" { if res["message"] == "deleted" {
logger.Info("Successfully removed the worker") fmt.Println("Successfully removed the worker")
} else { } else {
logger.Info("Failed to remove the worker") return cli.NewExitError("Failed to remove the worker", 1)
} }
return nil return nil
} }
@@ -314,7 +331,7 @@ func flushDisabledJobs(c *cli.Context) error {
1) 1)
} }
logger.Info("Successfully flushed disabled jobs") fmt.Println("Successfully flushed disabled jobs")
return nil return nil
} }
@@ -367,7 +384,7 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
" command: HTTP status code is not 200: %s", body), " command: HTTP status code is not 200: %s", body),
1) 1)
} }
logger.Info("Succesfully send command") fmt.Println("Successfully send the command")
return nil return nil
} }
@@ -405,7 +422,7 @@ func cmdWorker(cmd tunasync.CmdVerb) cli.ActionFunc {
" command: HTTP status code is not 200: %s", body), " command: HTTP status code is not 200: %s", body),
1) 1)
} }
logger.Info("Succesfully send command") fmt.Println("Successfully send the command")
return nil return nil
} }
@@ -462,6 +479,10 @@ func main() {
Name: "verbose, v", Name: "verbose, v",
Usage: "Enable verbosely logging", Usage: "Enable verbosely logging",
}, },
cli.BoolFlag{
Name: "debug",
Usage: "Enable debugging logging",
},
} }
cmdFlags := []cli.Flag{ cmdFlags := []cli.Flag{
cli.StringFlag{ cli.StringFlag{

查看文件

@@ -1,3 +1,4 @@
# /home/scripts in this example points to https://github.com/tuna/tunasync-scripts/
[global] [global]
name = "mirror_worker" name = "mirror_worker"
@@ -22,52 +23,637 @@ listen_addr = "127.0.0.1"
listen_port = 6000 listen_port = 6000
ssl_cert = "" ssl_cert = ""
ssl_key = "" ssl_key = ""
[[mirrors]] [[mirrors]]
name = "adobe-fonts" name = "adobe-fonts"
interval = 1440 interval = 1440
provider = "command" provider = "command"
upstream = "https://github.com/adobe-fonts" upstream = "https://github.com/adobe-fonts"
#https://github.com/tuna/tunasync-scripts/blob/master/adobe-fonts.sh
command = "/home/scripts/adobe-fonts.sh" command = "/home/scripts/adobe-fonts.sh"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "AdoptOpenJDK"
interval = 5760
provider = "command"
command = "/home/scripts/adoptopenjdk.py"
upstream = "https://adoptopenjdk.jfrog.io/adoptopenjdk"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "alpine"
provider = "rsync"
upstream = "rsync://rsync.alpinelinux.org/alpine/"
memory_limit = "256M"
[[mirrors]] [[mirrors]]
name = "anaconda" name = "anaconda"
provider = "command" provider = "command"
upstream = "https://repo.continuum.io/" upstream = "https://repo.continuum.io/"
#https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py command = "/home/scripts/anaconda.py --delete"
command = "/home/scripts/anaconda.py" size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
interval = 720
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "apache"
provider = "rsync"
upstream = "rsync://rsync.apache.org/apache-dist/"
use_ipv4 = true
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "armbian"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://rsync.armbian.com/apt/"
memory_limit = "256M"
[[mirrors]]
name = "armbian-releases"
provider = "rsync"
stage1_profile = "debian"
upstream = "rsync://rsync.armbian.com/dl/"
memory_limit = "256M"
[[mirrors]]
name = "bananian"
provider = "command"
upstream = "https://dl.bananian.org/"
command = "/home/scripts/lftp.sh"
interval = 1440 interval = 1440
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "gnu" name = "bioconductor"
provider = "rsync" provider = "rsync"
upstream = "rsync://mirrors.ocf.berkeley.edu/gnu/" upstream = "master.bioconductor.org:./"
rsync_options = [ "--rsh=ssh -i /root/id_rsa -o PasswordAuthentication=no -l sync" ]
exclude_file = "/etc/excludes/bioconductor.txt"
memory_limit = "256M"
[[mirrors]]
name = "blender"
provider = "rsync"
upstream = "rsync://mirrors.dotsrc.org/blender/"
rsync_options = [ "--delete-excluded" ]
exclude_file = "/etc/excludes/blender.txt"
interval = 1440
memory_limit = "256M"
[[mirrors]]
name = "chakra"
provider = "rsync"
upstream = "rsync://rsync.chakralinux.org/packages/"
memory_limit = "256M"
[[mirrors]]
name = "chakra-releases"
provider = "rsync"
upstream = "rsync://rsync.chakralinux.org/releases/"
memory_limit = "256M"
[[mirrors]]
name = "chef"
interval = 1440
provider = "command"
upstream = "https://packages.chef.io/repos"
command = "/home/scripts/chef.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "clickhouse"
interval = 2880
provider = "rsync"
upstream = "rsync://repo.yandex.ru/yandexrepo/clickhouse/"
exclude_file = "/etc/excludes/clickhouse.txt"
memory_limit = "256M"
[[mirrors]]
name = "clojars"
provider = "command"
upstream = "s3://clojars-repo-production/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.dualstack.us-east-2.amazonaws.com"
#TUNASYNC_S3_ENDPOINT = "https://s3.us-east-2.amazonaws.com"
TUNASYNC_AWS_OPTIONS = "--delete --exclude index.html"
[[mirrors]]
name = "CPAN"
provider = "rsync"
upstream = "rsync://cpan-rsync.perl.org/CPAN/"
memory_limit = "256M"
[[mirrors]]
name = "CRAN"
provider = "rsync"
upstream = "rsync://cran.r-project.org/CRAN/"
rsync_options = [ "--delete-excluded" ] rsync_options = [ "--delete-excluded" ]
memory_limit = "256M" memory_limit = "256M"
[[mirrors]]
name = "CTAN"
provider = "rsync"
upstream = "rsync://mirrors.rit.edu/CTAN/"
memory_limit = "256M"
[[mirrors]]
name = "dart-pub"
provider = "command"
upstream = "https://pub.dev/api"
command = "/home/scripts/pub.sh"
interval = 30
docker_image = "tunathu/pub-mirror:latest"
[mirrors.env]
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/dart-pub"
[[mirrors]]
name = "debian"
provider = "command"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/"
command = "/home/scripts/debian.sh sync:archive:debian"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/ftpsync"
docker_volumes = [
"/etc/misc/ftpsync-debian.conf:/ftpsync/etc/ftpsync-debian.conf:ro",
"/log/ftpsync:/home/log/tunasync/ftpsync",
]
[mirrors.env]
FTPSYNC_LOG_DIR = "/home/log/tunasync/ftpsync"
[[mirrors]]
name = "docker-ce"
provider = "command"
upstream = "https://download.docker.com/"
command = "timeout 3h /home/scripts/docker-ce.py --workers 10 --fast-skip"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "ELK"
interval = 1440
provider = "command"
upstream = "https://packages.elastic.co"
command = "/home/scripts/ELK.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
WGET_OPTIONS = "-6"
[[mirrors]]
name = "elasticstack"
interval = 1440
provider = "command"
upstream = "https://artifacts.elastic.co/"
command = "/home/scripts/elastic.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "erlang-solutions"
interval = 1440
provider = "command"
upstream = "https://packages.erlang-solutions.com"
command = "/home/scripts/erlang.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "flutter"
interval = 1440
provider = "command"
upstream = "https://storage.googleapis.com/flutter_infra/"
command = "/home/scripts/flutter.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "github-release"
provider = "command"
upstream = "https://api.github.com/repos/"
command = "/home/scripts/github-release.py --workers 5"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
interval = 720
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
GITHUB_TOKEN = "xxxxx"
[[mirrors]]
name = "gitlab-ce"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/gitlab/gitlab-ce/"
command = "/home/scripts/gitlab-ce.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gitlab-ee"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/gitlab/gitlab-ee/"
command = "/home/scripts/gitlab-ce.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "gitlab-runner"
interval = 1440
provider = "command"
upstream = "https://packages.gitlab.com/runner/gitlab-runner"
command = "/home/scripts/gitlab-runner.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "grafana"
interval = 1440
provider = "command"
upstream = "https://packages.grafana.com/oss"
command = "/home/scripts/grafana.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "hackage"
provider = "command"
command = "/home/scripts/hackage.sh"
upstream = "https://hackage.haskell.org/"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "homebrew-bottles"
provider = "command"
upstream = "https://homebrew.bintray.com"
command = "/home/scripts/linuxbrew-bottles.sh"
docker_image = "tunathu/homebrew-mirror"
# set environment varialbes
[mirrors.env]
HOMEBREW_REPO = "https://neomirrors.tuna.tsinghua.edu.cn/git/homebrew"
[[mirrors]]
name = "influxdata"
interval = 1440
provider = "command"
upstream = "https://repos.influxdata.com"
command = "/home/scripts/influxdata.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "kali"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://ftp.nluug.nl/kali/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "kali-images"
provider = "rsync"
upstream = "rsync://ftp.nluug.nl/kali-images/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "KaOS"
provider = "rsync"
upstream = "rsync://kaosx.tk/kaos/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "kernel"
provider = "rsync"
upstream = "rsync://rsync.kernel.org/pub/linux/kernel/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "kicad"
provider = "command"
upstream = "s3://kicad-downloads/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.cern.ch"
TUNASYNC_AWS_OPTIONS = "--delete --exclude index.html"
[[mirrors]]
name = "kodi"
provider = "rsync"
upstream = "rsync://mirror.yandex.ru/mirrors/xbmc/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
use_ipv6 = true
[[mirrors]]
name = "kubernetes"
interval = 2880
provider = "command"
upstream = "http://packages.cloud.google.com"
command = "/home/scripts/kubernetes.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "linuxbrew-bottles"
provider = "command"
upstream = "https://linuxbrew.bintray.com"
command = "/home/scripts/linuxbrew-bottles.sh"
docker_image = "tunathu/homebrew-mirror"
# set environment varialbes
[mirrors.env]
RUN_LINUXBREW = "true"
HOMEBREW_REPO = "https://neomirrors.tuna.tsinghua.edu.cn/git/homebrew"
[[mirrors]]
name = "linuxmint"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://mirrors.kernel.org/linuxmint-packages/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "lxc-images"
provider = "command"
upstream = "https://us.images.linuxcontainers.org/"
command = "/home/scripts/lxc-images.sh"
docker_image = "tunathu/tunasync-scripts:latest"
interval = 720
[[mirrors]]
name = "lyx"
provider = "command"
upstream = "ftp://ftp.lyx.org/pub/lyx/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer"
[[mirrors]]
name = "mongodb"
interval = 1440
provider = "command"
upstream = "https://repo.mongodb.org"
command = "/home/scripts/mongodb.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "msys2"
provider = "command"
upstream = "http://repo.msys2.org/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "mysql"
interval = 30
provider = "command"
upstream = "https://repo.mysql.com"
command = "/home/scripts/mysql.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
USE_IPV6 = "1"
[[mirrors]]
name = "nix"
interval = 1440
provider = "command"
upstream = "s3://nix-releases/nix/"
command = "/home/scripts/nix.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
MIRROR_BASE_URL = 'https://mirrors.tuna.tsinghua.edu.cn/nix/'
[[mirrors]]
name = "nix-channels"
interval = 300
provider = "command"
upstream = "https://nixos.org/channels"
command = "timeout 20h /home/scripts/nix-channels.py"
docker_image = "tunathu/nix-channels:latest"
docker_options = [
"--cpus", "20",
]
[[mirrors]]
name = "nodesource"
provider = "command"
upstream = "https://deb.nodesource.com/"
command = "/home/scripts/nodesource.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "openresty"
provider = "command"
upstream = "https://openresty.org/package/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer"
[[mirrors]]
name = "packagist"
provider = "command"
upstream = "http://packagist.org/"
command = "/home/scripts/packagist.sh"
interval = 1440
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "proxmox"
interval = 1440
provider = "command"
upstream = "http://download.proxmox.com"
command = "/home/scripts/proxmox.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "pypi" name = "pypi"
provider = "command" provider = "command"
upstream = "https://pypi.python.org/" upstream = "https://pypi.python.org/"
#https://github.com/tuna/tunasync-scripts/blob/master/pypi.sh
command = "/home/scripts/pypi.sh" command = "/home/scripts/pypi.sh"
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/bandersnatch:latest"
interval = 5 interval = 5
[[mirrors]]
name = "qt"
provider = "rsync"
upstream = "rsync://master.qt-project.org/qt-all/"
exclude_file = "/etc/excludes/qt.txt"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
[[mirrors]]
name = "raspberrypi"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://apt-repo.raspberrypi.org/archive/debian/"
memory_limit = "256M"
[[mirrors]]
name = "raspbian-images"
interval = 5760
provider = "command"
upstream = "https://downloads.raspberrypi.org/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "-x ^icons/$ -c --only-missing -v --no-perms"
[[mirrors]]
name = "raspbian"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://archive.raspbian.org/archive/"
rsync_options = [ "--delete-excluded" ] # delete .~tmp~ folders
memory_limit = "256M"
[[mirrors]]
name = "redhat"
provider = "rsync"
upstream = "rsync://ftp.redhat.com/redhat/"
rsync_options = [ "--delete-excluded" ]
memory_limit = "256M"
exclude_file = "/etc/excludes/redhat.txt"
interval = 1440
[mirrors.env]
RSYNC_PROXY="127.0.0.1:8123"
[[mirrors]]
name = "remi"
interval = 1440
provider = "command"
upstream = "rsync://rpms.remirepo.net"
command = "/home/scripts/remi.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "repo-ck"
provider = "command"
upstream = "http://repo-ck.com"
command = "/home/scripts/repo-ck.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "ros"
provider = "rsync"
upstream = "rsync://mirror.umd.edu/packages.ros.org/ros/"
memory_limit = "256M"
[[mirrors]]
name = "ros2"
interval = 1440
provider = "command"
upstream = "http://packages.ros.org/ros2"
command = "/home/scripts/ros2.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "rubygems"
provider = "command"
upstream = "https://rubygems.org"
command = "/home/scripts/rubygems.sh"
docker_image = "tunathu/rubygems-mirror"
interval = 60
# set environment varialbes # set environment varialbes
[mirrors.env] [mirrors.env]
INIT = "0" INIT = "0"
[[mirrors]]
name = "rudder"
interval = 2880
provider = "command"
upstream = "https://repository.rudder.io"
command = "/home/scripts/rudder.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "debian" name = "rustup"
interval = 720 provider = "command"
upstream = "https://rustup.rs/"
command = "/home/scripts/rustup.sh"
interval = 1440
docker_image = "tunathu/rustup-mirror:latest"
docker_volumes = [
]
docker_options = [
]
[mirrors.env]
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/rustup"
[[mirrors]]
name = "saltstack"
interval = 1440 # required on http://repo.saltstack.com/#mirror
provider = "command"
upstream = "s3://s3/"
command = "/home/scripts/s3.sh"
docker_image = "tunathu/ftpsync:latest"
[mirrors.env]
TUNASYNC_S3_ENDPOINT = "https://s3.repo.saltstack.com"
TUNASYNC_AWS_OPTIONS = "--delete --exact-timestamps"
[[mirrors]]
name = "solus"
provider = "rsync" provider = "rsync"
upstream = "rsync://mirrors.tuna.tsinghua.edu.cn/debian/" upstream = "rsync://mirrors.rit.edu/solus/"
rsync_options = [ "--exclude", "/shannon", "--exclude", "/unstable" ]
memory_limit = "256M" memory_limit = "256M"
[[mirrors]]
name = "stackage"
provider = "command"
command = "/home/scripts/stackage.py"
upstream = "https://www.stackage.org/"
docker_image = "tunathu/tunasync-scripts:latest"
# set environment varialbes
[mirrors.env]
GIT_COMMITTER_NAME = "TUNA mirrors"
GIT_COMMITTER_EMAIL = "mirrors@tuna.tsinghua.edu.cn"
[[mirrors]]
name = "steamos"
interval = 1440
provider = "command"
upstream = "http://repo.steampowered.com"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "--only-newer --exclude icons/ "
[[mirrors]]
name = "termux"
interval = 1440
provider = "command"
upstream = "https://dl.bintray.com/termux/termux-packages-24/"
command = "/home/scripts/termux.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]] [[mirrors]]
name = "ubuntu" name = "ubuntu"
provider = "two-stage-rsync" provider = "two-stage-rsync"
@@ -76,4 +662,156 @@ upstream = "rsync://archive.ubuntu.com/ubuntu/"
rsync_options = [ "--delete-excluded" ] rsync_options = [ "--delete-excluded" ]
memory_limit = "256M" memory_limit = "256M"
# vim: ft=toml [[mirrors]]
name = "ubuntu-ports"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://ports.ubuntu.com/ubuntu-ports/"
rsync_options = [ "--delete-excluded" ]
exclude_file = "/etc/excludes/ubuntu-ports-exclude.txt"
memory_limit = "256M"
[[mirrors]]
name = "virtualbox"
interval = 1440
provider = "command"
upstream = "http://download.virtualbox.org/virtualbox"
command = "/home/scripts/virtualbox.sh"
size_pattern = "size-sum: ([0-9\\.]+[KMGTP])"
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "winehq"
provider = "command"
upstream = "ftp://ftp.winehq.org/pub/"
command = "/home/scripts/lftp.sh"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
TUNASYNC_LFTP_OPTIONS = "-x wine-builds.old/ -x /\\..+"
[[mirrors]]
name = "zabbix"
provider = "rsync"
upstream = "rsync://repo.zabbix.com/mirror/"
rsync_options = [ "--delete-excluded", "--chmod=o+r,Do+x,Fa-x" ]
memory_limit = "256M"
[[mirrors]]
name = "AOSP"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/aosp.sh"
upstream = "https://android.googlesource.com/mirror/manifest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
REPO = "/usr/local/bin/aosp-repo"
REPO_URL = "https://mirrors.tuna.tsinghua.edu.cn/git/git-repo"
USE_BITMAP_INDEX = "1"
[[mirrors]]
name = "lineageOS"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/aosp.sh"
upstream = "https://github.com/LineageOS/mirror"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
REPO = "/usr/local/bin/aosp-repo"
REPO_URL = "https://mirrors.tuna.tsinghua.edu.cn/git/git-repo"
USE_BITMAP_INDEX = "1"
[[mirrors]]
name = "chromiumos"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/cros.sh"
upstream = "https://chromium.googlesource.com"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
fail_on_match = "fatal: "
docker_image = "tunathu/tunasync-scripts:latest"
[mirrors.env]
USE_BITMAP_INDEX = "1"
CONCURRENT_JOBS = "20"
[[mirrors]]
name = "crates.io-index.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "https://github.com/rust-lang/crates.io-index.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "flutter-sdk.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://github.com/flutter/flutter.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "gcc.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://gcc.gnu.org/git/gcc.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "gentoo-portage.git"
provider = "command"
command = "/home/tunasync-scripts/git.sh"
upstream = "git://github.com/gentoo-mirror/gentoo.git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
[[mirrors]]
name = "git-repo"
provider = "command"
command = "/home/tunasync-scripts/git-repo.sh"
upstream = "https://gerrit.googlesource.com/git-repo"
size_pattern = "size-pack: ([0-9\\.]+[KMGTP])"
fail_on_match = "fatal: "
docker_image = "tunathu/tunasync-scripts:latest"
[[mirrors]]
name = "homebrew"
provider = "command"
command = "/home/tunasync-scripts/homebrew.sh"
upstream = "https://github.com/Homebrew"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[[mirrors]]
name = "CocoaPods"
provider = "command"
command = "/home/tunasync-scripts/cocoapods.sh"
upstream = "https://github.com/CocoaPods"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[[mirrors]]
name = "pybombs"
interval = 720
provider = "command"
command = "/home/tunasync-scripts/pybombs.sh"
upstream = "https://github.com/scateu/pybombs-mirror/"
docker_image = "tunathu/tunasync-scripts:latest"
docker_volumes = ["/home/pybombs-mirror:/opt/pybombs-mirror"]
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
[mirrors.env]
PYBOMBS_MIRROR_SCRIPT_PATH = "/opt/pybombs-mirror"
MIRROR_BASE_URL = "https://mirrors.tuna.tsinghua.edu.cn/pybombs"
[[mirrors]]
name = "llvm"
provider = "command"
command = "/home/tunasync-scripts/llvm.sh"
upstream = "https://git.llvm.org/git"
docker_image = "tunathu/tunasync-scripts:latest"
size_pattern = "Total size is ([0-9\\.]+[KMGTP]?)"
# vim: ft=toml

1
go.mod
查看文件

@@ -9,6 +9,7 @@ require (
github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27 github.com/codeskyblue/go-sh v0.0.0-20190412065543-76bd3d59ff27
github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035 github.com/dennwc/btrfs v0.0.0-20190517175702-d917b30ff035
github.com/gin-gonic/gin v1.5.0 github.com/gin-gonic/gin v1.5.0
github.com/imdario/mergo v0.3.9
github.com/mattn/goveralls v0.0.5 // indirect github.com/mattn/goveralls v0.0.5 // indirect
github.com/pkg/profile v1.4.0 github.com/pkg/profile v1.4.0
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46

5
go.sum
查看文件

@@ -29,6 +29,9 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
@@ -38,7 +41,9 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/goveralls v0.0.5 h1:spfq8AyZ0cCk57Za6/juJ5btQxeE1FaEGMdfcI+XO48= github.com/mattn/goveralls v0.0.5 h1:spfq8AyZ0cCk57Za6/juJ5btQxeE1FaEGMdfcI+XO48=
github.com/mattn/goveralls v0.0.5/go.mod h1:Xg2LHi51faXLyKXwsndxiW6uxEEQT9+3sjGzzwU4xy0= github.com/mattn/goveralls v0.0.5/go.mod h1:Xg2LHi51faXLyKXwsndxiW6uxEEQT9+3sjGzzwU4xy0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/profile v1.4.0 h1:uCmaf4vVbWAOZz36k1hrQD7ijGRzLwaME8Am/7a4jZI= github.com/pkg/profile v1.4.0 h1:uCmaf4vVbWAOZz36k1hrQD7ijGRzLwaME8Am/7a4jZI=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=

查看文件

@@ -24,9 +24,12 @@ func InitLogger(verbose, debug, withSystemd bool) {
if debug { if debug {
logging.SetLevel(logging.DEBUG, "tunasync") logging.SetLevel(logging.DEBUG, "tunasync")
logging.SetLevel(logging.DEBUG, "tunasynctl")
} else if verbose { } else if verbose {
logging.SetLevel(logging.INFO, "tunasync") logging.SetLevel(logging.INFO, "tunasync")
logging.SetLevel(logging.INFO, "tunasynctl")
} else { } else {
logging.SetLevel(logging.NOTICE, "tunasync") logging.SetLevel(logging.NOTICE, "tunasync")
logging.SetLevel(logging.NOTICE, "tunasynctl")
} }
} }

查看文件

@@ -8,16 +8,17 @@ import (
// A MirrorStatus represents a msg when // A MirrorStatus represents a msg when
// a worker has done syncing // a worker has done syncing
type MirrorStatus struct { type MirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
Worker string `json:"worker"` Worker string `json:"worker"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"` LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"` LastStarted time.Time `json:"last_started"`
Scheduled time.Time `json:"next_schedule"` LastEnded time.Time `json:"last_ended"`
Upstream string `json:"upstream"` Scheduled time.Time `json:"next_schedule"`
Size string `json:"size"` Upstream string `json:"upstream"`
ErrorMsg string `json:"error_msg"` Size string `json:"size"`
ErrorMsg string `json:"error_msg"`
} }
// A WorkerStatus is the information struct that describe // A WorkerStatus is the information struct that describe

查看文件

@@ -38,31 +38,35 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
// WebMirrorStatus is the mirror status to be shown in the web page // WebMirrorStatus is the mirror status to be shown in the web page
type WebMirrorStatus struct { type WebMirrorStatus struct {
Name string `json:"name"` Name string `json:"name"`
IsMaster bool `json:"is_master"` IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"` Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"` LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"` LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"` LastStarted textTime `json:"last_started"`
LastEndedTs stampTime `json:"last_ended_ts"` LastStartedTs stampTime `json:"last_started_ts"`
Scheduled textTime `json:"next_schedule"` LastEnded textTime `json:"last_ended"`
ScheduledTs stampTime `json:"next_schedule_ts"` LastEndedTs stampTime `json:"last_ended_ts"`
Upstream string `json:"upstream"` Scheduled textTime `json:"next_schedule"`
Size string `json:"size"` // approximate size ScheduledTs stampTime `json:"next_schedule_ts"`
Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size
} }
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus { func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
return WebMirrorStatus{ return WebMirrorStatus{
Name: m.Name, Name: m.Name,
IsMaster: m.IsMaster, IsMaster: m.IsMaster,
Status: m.Status, Status: m.Status,
LastUpdate: textTime{m.LastUpdate}, LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate}, LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded}, LastStarted: textTime{m.LastStarted},
LastEndedTs: stampTime{m.LastEnded}, LastStartedTs: stampTime{m.LastStarted},
Scheduled: textTime{m.Scheduled}, LastEnded: textTime{m.LastEnded},
ScheduledTs: stampTime{m.Scheduled}, LastEndedTs: stampTime{m.LastEnded},
Upstream: m.Upstream, Scheduled: textTime{m.Scheduled},
Size: m.Size, ScheduledTs: stampTime{m.Scheduled},
Upstream: m.Upstream,
Size: m.Size,
} }
} }

查看文件

@@ -15,16 +15,18 @@ func TestStatus(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc) t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := WebMirrorStatus{ m := WebMirrorStatus{
Name: "tunalinux", Name: "tunalinux",
Status: Success, Status: Success,
LastUpdate: textTime{t}, LastUpdate: textTime{t},
LastUpdateTs: stampTime{t}, LastUpdateTs: stampTime{t},
LastEnded: textTime{t}, LastStarted: textTime{t},
LastEndedTs: stampTime{t}, LastStartedTs: stampTime{t},
Scheduled: textTime{t}, LastEnded: textTime{t},
ScheduledTs: stampTime{t}, LastEndedTs: stampTime{t},
Size: "5GB", Scheduled: textTime{t},
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/", ScheduledTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
} }
b, err := json.Marshal(m) b, err := json.Marshal(m)
@@ -40,6 +42,10 @@ func TestStatus(t *testing.T) {
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix()) So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastStarted.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStartedTs.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStarted.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastStartedTs.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
@@ -53,15 +59,16 @@ func TestStatus(t *testing.T) {
}) })
Convey("BuildWebMirrorStatus should work", t, func() { Convey("BuildWebMirrorStatus should work", t, func() {
m := MirrorStatus{ m := MirrorStatus{
Name: "arch-sync3", Name: "arch-sync3",
Worker: "testWorker", Worker: "testWorker",
IsMaster: true, IsMaster: true,
Status: Failed, Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30), LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute * 1),
Scheduled: time.Now().Add(time.Minute * 5), LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", Scheduled: time.Now().Add(time.Minute * 5),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
} }
var m2 WebMirrorStatus var m2 WebMirrorStatus
@@ -73,6 +80,10 @@ func TestStatus(t *testing.T) {
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix()) So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano()) So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastStarted.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStartedTs.Unix(), ShouldEqual, m.LastStarted.Unix())
So(m2.LastStarted.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastStartedTs.UnixNano(), ShouldEqual, m.LastStarted.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix()) So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano()) So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())

查看文件

@@ -6,12 +6,37 @@ import (
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os/exec"
"regexp" "regexp"
"time" "time"
) )
var rsyncExitValues = map[int]string{
0: "Success",
1: "Syntax or usage error",
2: "Protocol incompatibility",
3: "Errors selecting input/output files, dirs",
4: "Requested action not supported: an attempt was made to manipulate 64-bit files on a platform that cannot support them; or an option was specified that is supported by the client and not by the server.",
5: "Error starting client-server protocol",
6: "Daemon unable to append to log-file",
10: "Error in socket I/O",
11: "Error in file I/O",
12: "Error in rsync protocol data stream",
13: "Errors with program diagnostics",
14: "Error in IPC code",
20: "Received SIGUSR1 or SIGINT",
21: "Some error returned by waitpid()",
22: "Error allocating core memory buffers",
23: "Partial transfer due to error",
24: "Partial transfer due to vanished source files",
25: "The --max-delete limit stopped deletions",
30: "Timeout in data send/receive",
35: "Timeout waiting for daemon connection",
}
// GetTLSConfig generate tls.Config from CAFile // GetTLSConfig generate tls.Config from CAFile
func GetTLSConfig(CAFile string) (*tls.Config, error) { func GetTLSConfig(CAFile string) (*tls.Config, error) {
caCert, err := ioutil.ReadFile(CAFile) caCert, err := ioutil.ReadFile(CAFile)
@@ -86,13 +111,45 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response,
return resp, json.Unmarshal(body, obj) return resp, json.Unmarshal(body, obj)
} }
func ExtractSizeFromRsyncLog(content []byte) string { // FindAllSubmatchInFile calls re.FindAllSubmatch to find matches in given file
// (?m) flag enables multi-line mode func FindAllSubmatchInFile(fileName string, re *regexp.Regexp) (matches [][][]byte, err error) {
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`) if fileName == "/dev/null" {
matches := re.FindAllSubmatch(content, -1) err = errors.New("Invalid log file")
// fmt.Printf("%q\n", matches) return
if len(matches) == 0 { }
if content, err := ioutil.ReadFile(fileName); err == nil {
matches = re.FindAllSubmatch(content, -1)
// fmt.Printf("FindAllSubmatchInFile: %q\n", matches)
}
return
}
// ExtractSizeFromLog uses a regexp to extract the size from log files
func ExtractSizeFromLog(logFile string, re *regexp.Regexp) string {
matches, _ := FindAllSubmatchInFile(logFile, re)
if matches == nil || len(matches) == 0 {
return "" return ""
} }
// return the first capture group of the last occurrence
return string(matches[len(matches)-1][1]) return string(matches[len(matches)-1][1])
} }
// ExtractSizeFromRsyncLog extracts the size from rsync logs
func ExtractSizeFromRsyncLog(logFile string) string {
// (?m) flag enables multi-line mode
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`)
return ExtractSizeFromLog(logFile, re)
}
// TranslateRsyncErrorCode translates the exit code of rsync to a message
func TranslateRsyncErrorCode(cmdErr error) (exitCode int, msg string) {
if exiterr, ok := cmdErr.(*exec.ExitError); ok {
exitCode = exiterr.ExitCode()
strerr, valid := rsyncExitValues[exitCode]
if valid {
msg = fmt.Sprintf("rsync error: %s", strerr)
}
}
return
}

查看文件

@@ -1,6 +1,9 @@
package internal package internal
import ( import (
"io/ioutil"
"os"
"path/filepath"
"testing" "testing"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
@@ -26,7 +29,14 @@ sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec
total size is 1.33T speedup is 1,604.11 total size is 1.33T speedup is 1,604.11
` `
Convey("Log parser should work", t, func() { Convey("Log parser should work", t, func() {
res := ExtractSizeFromRsyncLog([]byte(realLogContent)) tmpDir, err := ioutil.TempDir("", "tunasync")
So(err, ShouldBeNil)
defer os.RemoveAll(tmpDir)
logFile := filepath.Join(tmpDir, "rs.log")
err = ioutil.WriteFile(logFile, []byte(realLogContent), 0755)
So(err, ShouldBeNil)
res := ExtractSizeFromRsyncLog(logFile)
So(res, ShouldEqual, "1.33T") So(res, ShouldEqual, "1.33T")
}) })
} }

查看文件

@@ -1,3 +1,4 @@
package internal package internal
const Version string = "0.4.2" // Version of the program
const Version string = "0.6.6"

查看文件

@@ -78,34 +78,37 @@ func TestBoltAdapter(t *testing.T) {
Convey("update mirror status", func() { Convey("update mirror status", func() {
status := []MirrorStatus{ status := []MirrorStatus{
MirrorStatus{ MirrorStatus{
Name: "arch-sync1", Name: "arch-sync1",
Worker: testWorkerIDs[0], Worker: testWorkerIDs[0],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "3GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
}, },
MirrorStatus{ MirrorStatus{
Name: "arch-sync2", Name: "arch-sync2",
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Disabled, Status: Disabled,
LastUpdate: time.Now().Add(-time.Hour), LastUpdate: time.Now().Add(-time.Hour),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Minute),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}, },
MirrorStatus{ MirrorStatus{
Name: "arch-sync3", Name: "arch-sync3",
Worker: testWorkerIDs[1], Worker: testWorkerIDs[1],
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now().Add(-time.Second), LastUpdate: time.Now().Add(-time.Minute),
LastEnded: time.Now(), LastStarted: time.Now().Add(-time.Second),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}, },
} }

查看文件

@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -23,6 +24,7 @@ type Manager struct {
cfg *Config cfg *Config
engine *gin.Engine engine *gin.Engine
adapter dbAdapter adapter dbAdapter
rwmu sync.RWMutex
httpClient *http.Client httpClient *http.Client
} }
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
} }
} }
// listAllJobs repond with all jobs of specified workers // listAllJobs respond with all jobs of specified workers
func (s *Manager) listAllJobs(c *gin.Context) { func (s *Manager) listAllJobs(c *gin.Context) {
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListAllMirrorStatus() mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list all mirror status: %s", err := fmt.Errorf("failed to list all mirror status: %s",
err.Error(), err.Error(),
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
// flushDisabledJobs deletes all jobs that marks as deleted // flushDisabledJobs deletes all jobs that marks as deleted
func (s *Manager) flushDisabledJobs(c *gin.Context) { func (s *Manager) flushDisabledJobs(c *gin.Context) {
s.rwmu.Lock()
err := s.adapter.FlushDisabledJobs() err := s.adapter.FlushDisabledJobs()
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to flush disabled jobs: %s", err := fmt.Errorf("failed to flush disabled jobs: %s",
err.Error(), err.Error(),
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
// deleteWorker deletes one worker by id // deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) { func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.Lock()
err := s.adapter.DeleteWorker(workerID) err := s.adapter.DeleteWorker(workerID)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to delete worker: %s", err := fmt.Errorf("failed to delete worker: %s",
err.Error(), err.Error(),
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"}) c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
} }
// listWrokers respond with informations of all the workers // listWorkers respond with information of all the workers
func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus var workerInfos []WorkerStatus
s.rwmu.RLock()
workers, err := s.adapter.ListWorkers() workers, err := s.adapter.ListWorkers()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list workers: %s", err := fmt.Errorf("failed to list workers: %s",
err.Error(), err.Error(),
@@ -223,7 +233,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
// listJobsOfWorker respond with all the jobs of the specified worker // listJobsOfWorker respond with all the jobs of the specified worker
func (s *Manager) listJobsOfWorker(c *gin.Context) { func (s *Manager) listJobsOfWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID) mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list jobs of worker %s: %s", err := fmt.Errorf("failed to list jobs of worker %s: %s",
workerID, err.Error(), workerID, err.Error(),
@@ -255,7 +267,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
fmt.Errorf("failed to get job %s of worker %s: %s", fmt.Errorf("failed to get job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -269,7 +283,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
} }
curStatus.Scheduled = schedule.NextSchedule curStatus.Scheduled = schedule.NextSchedule
s.rwmu.Lock()
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus) _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -295,16 +311,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
curTime := time.Now()
if status.Status == PreSyncing && curStatus.Status != PreSyncing {
status.LastStarted = curTime
} else {
status.LastStarted = curStatus.LastStarted
}
// Only successful syncing needs last_update // Only successful syncing needs last_update
if status.Status == Success { if status.Status == Success {
status.LastUpdate = time.Now() status.LastUpdate = curTime
} else { } else {
status.LastUpdate = curStatus.LastUpdate status.LastUpdate = curStatus.LastUpdate
} }
if status.Status == Success || status.Status == Failed { if status.Status == Success || status.Status == Failed {
status.LastEnded = time.Now() status.LastEnded = curTime
} else { } else {
status.LastEnded = curStatus.LastEnded status.LastEnded = curStatus.LastEnded
} }
@@ -324,7 +349,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status) logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
} }
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -346,7 +373,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
c.BindJSON(&msg) c.BindJSON(&msg)
mirrorName := msg.Name mirrorName := msg.Name
s.rwmu.RLock()
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName) status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
logger.Errorf( logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s", "Failed to get status of mirror %s @<%s>: %s",
@@ -363,7 +392,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size) logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -386,7 +417,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
return return
} }
s.rwmu.RLock()
w, err := s.adapter.GetWorker(workerID) w, err := s.adapter.GetWorker(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("worker %s is not registered yet", workerID) err := fmt.Errorf("worker %s is not registered yet", workerID)
s.returnErrJSON(c, http.StatusBadRequest, err) s.returnErrJSON(c, http.StatusBadRequest, err)
@@ -403,7 +436,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
// update job status, even if the job did not disable successfully, // update job status, even if the job did not disable successfully,
// this status should be set as disabled // this status should be set as disabled
s.rwmu.RLock()
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID) curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
s.rwmu.RUnlock()
changed := false changed := false
switch clientCmd.Cmd { switch clientCmd.Cmd {
case CmdDisable: case CmdDisable:
@@ -414,7 +449,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
changed = true changed = true
} }
if changed { if changed {
s.rwmu.Lock()
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat) s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
s.rwmu.Unlock()
} }
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID) logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)

查看文件

@@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail")) So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
}) })
Convey("when register multiple workers", func(ctx C) {
N := 10
var cnt uint32
for i := 0; i < N; i++ {
go func(id int) {
w := WorkerStatus{
ID: fmt.Sprintf("worker%d", id),
}
resp, err := PostJSON(baseURL+"/workers", w, nil)
ctx.So(err, ShouldBeNil)
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
atomic.AddUint32(&cnt, 1)
}(i)
}
time.Sleep(2 * time.Second)
So(cnt, ShouldEqual, N)
Convey("list all workers", func(ctx C) {
resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil)
defer resp.Body.Close()
var actualResponseObj []WorkerStatus
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
So(err, ShouldBeNil)
So(len(actualResponseObj), ShouldEqual, N+1)
})
})
Convey("when register a worker", func(ctx C) { Convey("when register a worker", func(ctx C) {
w := WorkerStatus{ w := WorkerStatus{
ID: "test_worker1", ID: "test_worker1",
@@ -151,10 +180,41 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(m.LastStarted.IsZero(), ShouldBeTrue) // hasn't been initialized yet
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
// start syncing
status.Status = PreSyncing
time.Sleep(1 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
So(err, ShouldBeNil)
defer resp.Body.Close()
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("update mirror status to PreSync - starting sync", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 1*time.Second)
So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeGreaterThan, 1*time.Second)
})
Convey("list all job status of all workers", func(ctx C) { Convey("list all job status of all workers", func(ctx C) {
var ms []WebMirrorStatus var ms []WebMirrorStatus
resp, err := GetJSON(baseURL+"/jobs", &ms, nil) resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
@@ -167,8 +227,9 @@ func TestHTTPServer(t *testing.T) {
So(m.Upstream, ShouldEqual, status.Upstream) So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastStarted.Time), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 3*time.Second)
}) })
@@ -197,8 +258,9 @@ func TestHTTPServer(t *testing.T) {
So(m.Upstream, ShouldEqual, status.Upstream) So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, "5GB") So(m.Size, ShouldEqual, "5GB")
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastStarted), ShouldBeLessThan, 2*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 3*time.Second)
}) })
}) })
@@ -251,6 +313,7 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size) So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster) So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second) So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastStarted), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second) So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
}) })
}) })
@@ -258,14 +321,15 @@ func TestHTTPServer(t *testing.T) {
Convey("update mirror status of an inexisted worker", func(ctx C) { Convey("update mirror status of an inexisted worker", func(ctx C) {
invalidWorker := "test_worker2" invalidWorker := "test_worker2"
status := MirrorStatus{ status := MirrorStatus{
Name: "arch-sync2", Name: "arch-sync2",
Worker: invalidWorker, Worker: invalidWorker,
IsMaster: true, IsMaster: true,
Status: Success, Status: Success,
LastUpdate: time.Now(), LastUpdate: time.Now(),
LastEnded: time.Now(), LastStarted: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn", LastEnded: time.Now(),
Size: "4GB", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
} }
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
baseURL, status.Worker, status.Name), status, nil) baseURL, status.Worker, status.Name), status, nil)

查看文件

@@ -16,9 +16,11 @@ type baseProvider struct {
name string name string
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
isMaster bool isMaster bool
cmd *cmdJob cmd *cmdJob
logFileFd *os.File
isRunning atomic.Value isRunning atomic.Value
cgroup *cgroupHook cgroup *cgroupHook
@@ -55,6 +57,10 @@ func (p *baseProvider) Retry() int {
return p.retry return p.retry
} }
func (p *baseProvider) Timeout() time.Duration {
return p.timeout
}
func (p *baseProvider) IsMaster() bool { func (p *baseProvider) IsMaster() bool {
return p.isMaster return p.isMaster
} }
@@ -128,11 +134,20 @@ func (p *baseProvider) prepareLogFile(append bool) error {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error()) logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err return err
} }
p.logFileFd = logFile
p.cmd.SetLogFile(logFile) p.cmd.SetLogFile(logFile)
return nil return nil
} }
func (p *baseProvider) Run() error { func (p *baseProvider) closeLogFile() (err error) {
if p.logFileFd != nil {
err = p.logFileFd.Close()
p.logFileFd = nil
}
return
}
func (p *baseProvider) Run(started chan empty) error {
panic("Not Implemented") panic("Not Implemented")
} }
@@ -159,6 +174,7 @@ func (p *baseProvider) Terminate() error {
defer p.Unlock() defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name()) logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() { if !p.IsRunning() {
logger.Warningf("Terminate() called while IsRunning is false: %s", p.Name())
return nil return nil
} }

查看文件

@@ -83,7 +83,7 @@ sleep 30
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func() { go func() {
err = provider.Run() err := provider.Run(make(chan empty, 1))
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()

查看文件

@@ -3,11 +3,11 @@ package worker
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"regexp" "regexp"
"time" "time"
"github.com/anmitsu/go-shlex" "github.com/anmitsu/go-shlex"
"github.com/tuna/tunasync/internal"
) )
type cmdConfig struct { type cmdConfig struct {
@@ -16,15 +16,19 @@ type cmdConfig struct {
workingDir, logDir, logFile string workingDir, logDir, logFile string
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
env map[string]string env map[string]string
failOnMatch string failOnMatch string
sizePattern string
} }
type cmdProvider struct { type cmdProvider struct {
baseProvider baseProvider
cmdConfig cmdConfig
command []string command []string
dataSize string
failOnMatch *regexp.Regexp failOnMatch *regexp.Regexp
sizePattern *regexp.Regexp
} }
func newCmdProvider(c cmdConfig) (*cmdProvider, error) { func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
@@ -38,6 +42,7 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
cmdConfig: c, cmdConfig: c,
} }
@@ -59,6 +64,14 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
} }
provider.failOnMatch = failOnMatch provider.failOnMatch = failOnMatch
} }
if len(c.sizePattern) > 0 {
var err error
sizePattern, err := regexp.Compile(c.sizePattern)
if err != nil {
return nil, errors.New("size-pattern regexp error: " + err.Error())
}
provider.sizePattern = sizePattern
}
return provider, nil return provider, nil
} }
@@ -71,24 +84,33 @@ func (p *cmdProvider) Upstream() string {
return p.upstreamURL return p.upstreamURL
} }
func (p *cmdProvider) Run() error { func (p *cmdProvider) DataSize() string {
return p.dataSize
}
func (p *cmdProvider) Run(started chan empty) error {
p.dataSize = ""
defer p.closeLogFile()
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
started <- empty{}
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
return err return err
} }
if p.failOnMatch != nil { if p.failOnMatch != nil {
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { matches, err := internal.FindAllSubmatchInFile(p.LogFile(), p.failOnMatch)
matches := p.failOnMatch.FindAllSubmatch(logContent, -1) logger.Infof("FindAllSubmatchInFile: %q\n", matches)
if len(matches) != 0 { if err != nil {
logger.Debug("Fail-on-match: %r", matches)
return errors.New(
fmt.Sprintf("Fail-on-match regexp found %d matches", len(matches)))
}
} else {
return err return err
} }
if len(matches) != 0 {
logger.Debug("Fail-on-match: %r", matches)
return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches))
}
}
if p.sizePattern != nil {
p.dataSize = internal.ExtractSizeFromLog(p.LogFile(), p.sizePattern)
} }
return nil return nil
} }
@@ -120,5 +142,6 @@ func (p *cmdProvider) Start() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
return nil return nil
} }

查看文件

@@ -1,6 +1,6 @@
package worker package worker
// put global viables and types here // put global variables and types here
import ( import (
"gopkg.in/op/go-logging.v1" "gopkg.in/op/go-logging.v1"

查看文件

@@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/imdario/mergo"
) )
type providerEnum uint8 type providerEnum uint8
@@ -41,7 +42,8 @@ type Config struct {
BtrfsSnapshot btrfsSnapshotConfig `toml:"btrfs_snapshot"` BtrfsSnapshot btrfsSnapshotConfig `toml:"btrfs_snapshot"`
Docker dockerConfig `toml:"docker"` Docker dockerConfig `toml:"docker"`
Include includeConfig `toml:"include"` Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"` MirrorsConf []mirrorConfig `toml:"mirrors"`
Mirrors []mirrorConfig
} }
type globalConfig struct { type globalConfig struct {
@@ -51,6 +53,7 @@ type globalConfig struct {
Concurrent int `toml:"concurrent"` Concurrent int `toml:"concurrent"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"` Retry int `toml:"retry"`
Timeout int `toml:"timeout"`
ExecOnSuccess []string `toml:"exec_on_success"` ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"` ExecOnFailure []string `toml:"exec_on_failure"`
@@ -111,15 +114,17 @@ type includedMirrorConfig struct {
} }
type mirrorConfig struct { type mirrorConfig struct {
Name string `toml:"name"` Name string `toml:"name"`
Provider providerEnum `toml:"provider"` Provider providerEnum `toml:"provider"`
Upstream string `toml:"upstream"` Upstream string `toml:"upstream"`
Interval int `toml:"interval"` Interval int `toml:"interval"`
Retry int `toml:"retry"` Retry int `toml:"retry"`
MirrorDir string `toml:"mirror_dir"` Timeout int `toml:"timeout"`
LogDir string `toml:"log_dir"` MirrorDir string `toml:"mirror_dir"`
Env map[string]string `toml:"env"` MirrorSubDir string `toml:"mirror_subdir"`
Role string `toml:"role"` LogDir string `toml:"log_dir"`
Env map[string]string `toml:"env"`
Role string `toml:"role"`
// These two options over-write the global options // These two options over-write the global options
ExecOnSuccess []string `toml:"exec_on_success"` ExecOnSuccess []string `toml:"exec_on_success"`
@@ -131,11 +136,14 @@ type mirrorConfig struct {
Command string `toml:"command"` Command string `toml:"command"`
FailOnMatch string `toml:"fail_on_match"` FailOnMatch string `toml:"fail_on_match"`
SizePattern string `toml:"size_pattern"`
UseIPv6 bool `toml:"use_ipv6"` UseIPv6 bool `toml:"use_ipv6"`
UseIPv4 bool `toml:"use_ipv4"` UseIPv4 bool `toml:"use_ipv4"`
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
RsyncNoTimeo bool `toml:"rsync_no_timeout"`
RsyncTimeout int `toml:"rsync_timeout"`
RsyncOptions []string `toml:"rsync_options"` RsyncOptions []string `toml:"rsync_options"`
RsyncOverride []string `toml:"rsync_override"` RsyncOverride []string `toml:"rsync_override"`
Stage1Profile string `toml:"stage1_profile"` Stage1Profile string `toml:"stage1_profile"`
@@ -147,6 +155,8 @@ type mirrorConfig struct {
DockerOptions []string `toml:"docker_options"` DockerOptions []string `toml:"docker_options"`
SnapshotPath string `toml:"snapshot_path"` SnapshotPath string `toml:"snapshot_path"`
ChildMirrors []mirrorConfig `toml:"mirrors"`
} }
// LoadConfig loads configuration // LoadConfig loads configuration
@@ -173,9 +183,36 @@ func LoadConfig(cfgFile string) (*Config, error) {
logger.Errorf(err.Error()) logger.Errorf(err.Error())
return nil, err return nil, err
} }
cfg.Mirrors = append(cfg.Mirrors, incMirCfg.Mirrors...) cfg.MirrorsConf = append(cfg.MirrorsConf, incMirCfg.Mirrors...)
}
}
for _, m := range cfg.MirrorsConf {
if err := recursiveMirrors(cfg, nil, m); err != nil {
return nil, err
} }
} }
return cfg, nil return cfg, nil
} }
func recursiveMirrors(cfg *Config, parent *mirrorConfig, mirror mirrorConfig) error {
var curMir mirrorConfig
if parent != nil {
curMir = *parent
}
curMir.ChildMirrors = nil
if err := mergo.Merge(&curMir, mirror, mergo.WithOverride); err != nil {
return err
}
if mirror.ChildMirrors == nil {
cfg.Mirrors = append(cfg.Mirrors, curMir)
} else {
for _, m := range mirror.ChildMirrors {
if err := recursiveMirrors(cfg, &curMir, m); err != nil {
return err
}
}
}
return nil
}

查看文件

@@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
@@ -19,6 +20,7 @@ mirror_dir = "/data/mirrors"
concurrent = 10 concurrent = 10
interval = 240 interval = 240
retry = 3 retry = 3
timeout = 86400
[manager] [manager]
api_base = "https://127.0.0.1:5000" api_base = "https://127.0.0.1:5000"
@@ -37,6 +39,7 @@ provider = "command"
upstream = "https://aosp.google.com/" upstream = "https://aosp.google.com/"
interval = 720 interval = 720
retry = 2 retry = 2
timeout = 3600
mirror_dir = "/data/git/AOSP" mirror_dir = "/data/git/AOSP"
exec_on_success = [ exec_on_success = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'" "bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -83,9 +86,9 @@ exec_on_failure = [
tmpDir, tmpDir,
) )
cfgBlob = cfgBlob + incSection curCfgBlob := cfgBlob + incSection
err = ioutil.WriteFile(tmpfile.Name(), []byte(cfgBlob), 0644) err = ioutil.WriteFile(tmpfile.Name(), []byte(curCfgBlob), 0644)
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
defer tmpfile.Close() defer tmpfile.Close()
@@ -119,6 +122,7 @@ use_ipv6 = true
So(cfg.Global.Name, ShouldEqual, "test_worker") So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240) So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3) So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.Timeout, ShouldEqual, 86400)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors") So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000") So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
@@ -130,6 +134,7 @@ use_ipv6 = true
So(m.Provider, ShouldEqual, provCommand) So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720) So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2) So(m.Retry, ShouldEqual, 2)
So(m.Timeout, ShouldEqual, 3600)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo") So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1] m = cfg.Mirrors[1]
@@ -157,6 +162,102 @@ use_ipv6 = true
So(len(cfg.Mirrors), ShouldEqual, 6) So(len(cfg.Mirrors), ShouldEqual, 6)
}) })
Convey("Everything should work on nested config file", t, func() {
tmpfile, err := ioutil.TempFile("", "tunasync")
So(err, ShouldEqual, nil)
defer os.Remove(tmpfile.Name())
tmpDir, err := ioutil.TempDir("", "tunasync")
So(err, ShouldBeNil)
defer os.RemoveAll(tmpDir)
incSection := fmt.Sprintf(
"\n[include]\n"+
"include_mirrors = \"%s/*.conf\"",
tmpDir,
)
curCfgBlob := cfgBlob + incSection
err = ioutil.WriteFile(tmpfile.Name(), []byte(curCfgBlob), 0644)
So(err, ShouldEqual, nil)
defer tmpfile.Close()
incBlob1 := `
[[mirrors]]
name = "ipv6s"
use_ipv6 = true
[[mirrors.mirrors]]
name = "debians"
mirror_subdir = "debian"
provider = "two-stage-rsync"
stage1_profile = "debian"
[[mirrors.mirrors.mirrors]]
name = "debian-security"
upstream = "rsync://test.host/debian-security/"
[[mirrors.mirrors.mirrors]]
name = "ubuntu"
stage1_profile = "ubuntu"
upstream = "rsync://test.host2/ubuntu/"
[[mirrors.mirrors]]
name = "debian-cd"
provider = "rsync"
upstream = "rsync://test.host3/debian-cd/"
`
err = ioutil.WriteFile(filepath.Join(tmpDir, "nest.conf"), []byte(incBlob1), 0644)
So(err, ShouldEqual, nil)
cfg, err := LoadConfig(tmpfile.Name())
So(err, ShouldBeNil)
So(cfg.Global.Name, ShouldEqual, "test_worker")
So(cfg.Global.Interval, ShouldEqual, 240)
So(cfg.Global.Retry, ShouldEqual, 3)
So(cfg.Global.MirrorDir, ShouldEqual, "/data/mirrors")
So(cfg.Manager.APIBase, ShouldEqual, "https://127.0.0.1:5000")
So(cfg.Server.Hostname, ShouldEqual, "worker1.example.com")
m := cfg.Mirrors[0]
So(m.Name, ShouldEqual, "AOSP")
So(m.MirrorDir, ShouldEqual, "/data/git/AOSP")
So(m.Provider, ShouldEqual, provCommand)
So(m.Interval, ShouldEqual, 720)
So(m.Retry, ShouldEqual, 2)
So(m.Env["REPO"], ShouldEqual, "/usr/local/bin/aosp-repo")
m = cfg.Mirrors[1]
So(m.Name, ShouldEqual, "debian")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provTwoStageRsync)
m = cfg.Mirrors[2]
So(m.Name, ShouldEqual, "fedora")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provRsync)
So(m.ExcludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt")
m = cfg.Mirrors[3]
So(m.Name, ShouldEqual, "debian-security")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provTwoStageRsync)
So(m.UseIPv6, ShouldEqual, true)
So(m.Stage1Profile, ShouldEqual, "debian")
m = cfg.Mirrors[4]
So(m.Name, ShouldEqual, "ubuntu")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provTwoStageRsync)
So(m.UseIPv6, ShouldEqual, true)
So(m.Stage1Profile, ShouldEqual, "ubuntu")
m = cfg.Mirrors[5]
So(m.Name, ShouldEqual, "debian-cd")
So(m.UseIPv6, ShouldEqual, true)
So(m.Provider, ShouldEqual, provRsync)
So(len(cfg.Mirrors), ShouldEqual, 6)
})
Convey("Providers can be inited from a valid config file", t, func() { Convey("Providers can be inited from a valid config file", t, func() {
tmpfile, err := ioutil.TempFile("", "tunasync") tmpfile, err := ioutil.TempFile("", "tunasync")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
@@ -207,4 +308,92 @@ use_ipv6 = true
So(rp.excludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt") So(rp.excludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt")
}) })
Convey("MirrorSubdir should work", t, func() {
tmpfile, err := ioutil.TempFile("", "tunasync")
So(err, ShouldEqual, nil)
defer os.Remove(tmpfile.Name())
cfgBlob1 := `
[global]
name = "test_worker"
log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors"
concurrent = 10
interval = 240
timeout = 86400
retry = 3
[manager]
api_base = "https://127.0.0.1:5000"
token = "some_token"
[server]
hostname = "worker1.example.com"
listen_addr = "127.0.0.1"
listen_port = 6000
ssl_cert = "/etc/tunasync.d/worker1.cert"
ssl_key = "/etc/tunasync.d/worker1.key"
[[mirrors]]
name = "ipv6s"
use_ipv6 = true
[[mirrors.mirrors]]
name = "debians"
mirror_subdir = "debian"
provider = "two-stage-rsync"
stage1_profile = "debian"
[[mirrors.mirrors.mirrors]]
name = "debian-security"
upstream = "rsync://test.host/debian-security/"
[[mirrors.mirrors.mirrors]]
name = "ubuntu"
stage1_profile = "ubuntu"
upstream = "rsync://test.host2/ubuntu/"
[[mirrors.mirrors]]
name = "debian-cd"
provider = "rsync"
upstream = "rsync://test.host3/debian-cd/"
`
err = ioutil.WriteFile(tmpfile.Name(), []byte(cfgBlob1), 0644)
So(err, ShouldEqual, nil)
defer tmpfile.Close()
cfg, err := LoadConfig(tmpfile.Name())
So(err, ShouldBeNil)
providers := map[string]mirrorProvider{}
for _, m := range cfg.Mirrors {
p := newMirrorProvider(m, cfg)
providers[p.Name()] = p
}
p := providers["debian-security"]
So(p.Name(), ShouldEqual, "debian-security")
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian-security")
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian-security/latest.log")
r2p, ok := p.(*twoStageRsyncProvider)
So(ok, ShouldBeTrue)
So(r2p.stage1Profile, ShouldEqual, "debian")
So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian/debian-security")
p = providers["ubuntu"]
So(p.Name(), ShouldEqual, "ubuntu")
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/ubuntu")
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/ubuntu/latest.log")
r2p, ok = p.(*twoStageRsyncProvider)
So(ok, ShouldBeTrue)
So(r2p.stage1Profile, ShouldEqual, "ubuntu")
So(r2p.WorkingDir(), ShouldEqual, "/data/mirrors/debian/ubuntu")
p = providers["debian-cd"]
So(p.Name(), ShouldEqual, "debian-cd")
So(p.LogDir(), ShouldEqual, "/var/log/tunasync/debian-cd")
So(p.LogFile(), ShouldEqual, "/var/log/tunasync/debian-cd/latest.log")
rp, ok := p.(*rsyncProvider)
So(ok, ShouldBeTrue)
So(rp.WorkingDir(), ShouldEqual, "/data/mirrors/debian-cd")
So(p.Timeout(), ShouldEqual, 86400*time.Second)
})
} }

查看文件

@@ -3,6 +3,9 @@ package worker
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/codeskyblue/go-sh"
) )
type dockerHook struct { type dockerHook struct {
@@ -16,6 +19,10 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
volumes := []string{} volumes := []string{}
volumes = append(volumes, gCfg.Volumes...) volumes = append(volumes, gCfg.Volumes...)
volumes = append(volumes, mCfg.DockerVolumes...) volumes = append(volumes, mCfg.DockerVolumes...)
if len(mCfg.ExcludeFile) > 0 {
arg := fmt.Sprintf("%s:%s:ro", mCfg.ExcludeFile, mCfg.ExcludeFile)
volumes = append(volumes, arg)
}
options := []string{} options := []string{}
options = append(options, gCfg.Options...) options = append(options, gCfg.Options...)
@@ -60,6 +67,27 @@ func (d *dockerHook) postExec() error {
// sh.Command( // sh.Command(
// "docker", "rm", "-f", d.Name(), // "docker", "rm", "-f", d.Name(),
// ).Run() // ).Run()
name := d.Name()
retry := 10
for ; retry > 0; retry-- {
out, err := sh.Command(
"docker", "ps", "-a",
"--filter", "name=^"+name+"$",
"--format", "{{.Status}}",
).Output()
if err != nil {
logger.Errorf("docker ps failed: %v", err)
break
}
if len(out) == 0 {
break
}
logger.Debugf("container %s still exists: '%s'", name, string(out))
time.Sleep(1 * time.Second)
}
if retry == 0 {
logger.Warningf("container %s not removed automatically, next sync may fail", name)
}
d.provider.ExitContext() d.provider.ExitContext()
return nil return nil
} }

查看文件

@@ -87,29 +87,34 @@ sleep 20
cmdRun("docker", []string{"images"}) cmdRun("docker", []string{"images"})
exitedErr := make(chan error, 1) exitedErr := make(chan error, 1)
go func() { go func() {
err = provider.Run() err = provider.Run(make(chan empty, 1))
logger.Debugf("provider.Run() exited") logger.Debugf("provider.Run() exited")
if err != nil { if err != nil {
logger.Errorf("provider.Run() failed: %v", err) logger.Errorf("provider.Run() failed: %v", err)
} }
exitedErr <- err exitedErr <- err
}() }()
cmdRun("ps", []string{"aux"})
// Wait for docker running // Wait for docker running
time.Sleep(8 * time.Second) for wait := 0; wait < 8; wait++ {
names, err := getDockerByName(d.Name())
cmdRun("ps", []string{"aux"}) So(err, ShouldBeNil)
if names != "" {
break
}
time.Sleep(1 * time.Second)
}
// cmdRun("ps", []string{"aux"})
// assert container running // assert container running
names, err := getDockerByName(d.Name()) names, err := getDockerByName(d.Name())
So(err, ShouldBeNil) So(err, ShouldBeNil)
// So(names, ShouldEqual, d.Name()+"\n") So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate() err = provider.Terminate()
// So(err, ShouldBeNil) So(err, ShouldBeNil)
cmdRun("ps", []string{"aux"}) // cmdRun("ps", []string{"aux"})
<-exitedErr <-exitedErr
// container should be terminated and removed // container should be terminated and removed

查看文件

@@ -155,24 +155,43 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
var syncErr error var syncErr error
syncDone := make(chan error, 1) syncDone := make(chan error, 1)
started := make(chan empty, 10) // we may receive "started" more than one time (e.g. two_stage_rsync)
go func() { go func() {
err := provider.Run() err := provider.Run(started)
syncDone <- err syncDone <- err
}() }()
select { // Wait until provider started or error happened
case err := <-syncDone:
logger.Errorf("failed to start provider %s: %s", m.Name(), err.Error())
syncDone <- err // it will be read again later
case <-started:
logger.Debug("provider started")
}
// Now terminating the provider is feasible
var termErr error
timeout := provider.Timeout()
if timeout <= 0 {
timeout = 100000 * time.Hour // never time out
}
select { select {
case syncErr = <-syncDone: case syncErr = <-syncDone:
logger.Debug("syncing done") logger.Debug("syncing done")
case <-time.After(timeout):
logger.Notice("provider timeout")
termErr = provider.Terminate()
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
case <-kill: case <-kill:
logger.Debug("received kill") logger.Debug("received kill")
stopASAP = true stopASAP = true
err := provider.Terminate() termErr = provider.Terminate()
if err != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
return err
}
syncErr = errors.New("killed by manager") syncErr = errors.New("killed by manager")
} }
if termErr != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
return termErr
}
// post-exec hooks // post-exec hooks
herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec") herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")

查看文件

@@ -31,6 +31,7 @@ func TestMirrorJob(t *testing.T) {
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
interval: 1 * time.Second, interval: 1 * time.Second,
timeout: 7 * time.Second,
} }
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
@@ -41,6 +42,7 @@ func TestMirrorJob(t *testing.T) {
So(provider.LogDir(), ShouldEqual, c.logDir) So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile) So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval) So(provider.Interval(), ShouldEqual, c.interval)
So(provider.Timeout(), ShouldEqual, c.timeout)
Convey("For a normal mirror job", func(ctx C) { Convey("For a normal mirror job", func(ctx C) {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
@@ -333,6 +335,66 @@ echo $TUNASYNC_WORKING_DIR
}) })
}) })
Convey("When a job timed out", func(ctx C) {
scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR
sleep 10
echo $TUNASYNC_WORKING_DIR
`
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, 1)
job := newMirrorJob(provider)
Convey("It should be automatically terminated", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("It should be retried", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldContainSubstring, "timeout after")
// re-schedule after last try
So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
}
job.ctrlChan <- jobDisable
<-job.disabled
})
})
}) })
} }

查看文件

@@ -24,9 +24,9 @@ type mirrorProvider interface {
Type() providerEnum Type() providerEnum
// run mirror job in background // Start then Wait
Run() error Run(started chan empty) error
// run mirror job in background // Start the job
Start() error Start() error
// Wait job to finish // Wait job to finish
Wait() error Wait() error
@@ -46,6 +46,7 @@ type mirrorProvider interface {
Interval() time.Duration Interval() time.Duration
Retry() int Retry() int
Timeout() time.Duration
WorkingDir() string WorkingDir() string
LogDir() string LogDir() string
@@ -82,7 +83,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
} }
if mirrorDir == "" { if mirrorDir == "" {
mirrorDir = filepath.Join( mirrorDir = filepath.Join(
cfg.Global.MirrorDir, mirror.Name, cfg.Global.MirrorDir, mirror.MirrorSubDir, mirror.Name,
) )
} }
if mirror.Interval == 0 { if mirror.Interval == 0 {
@@ -91,6 +92,9 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
if mirror.Retry == 0 { if mirror.Retry == 0 {
mirror.Retry = cfg.Global.Retry mirror.Retry = cfg.Global.Retry
} }
if mirror.Timeout == 0 {
mirror.Timeout = cfg.Global.Timeout
}
logDir = formatLogDir(logDir, mirror) logDir = formatLogDir(logDir, mirror)
// IsMaster // IsMaster
@@ -113,10 +117,12 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
command: mirror.Command, command: mirror.Command,
workingDir: mirrorDir, workingDir: mirrorDir,
failOnMatch: mirror.FailOnMatch, failOnMatch: mirror.FailOnMatch,
sizePattern: mirror.SizePattern,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry, retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
env: mirror.Env, env: mirror.Env,
} }
p, err := newCmdProvider(pc) p, err := newCmdProvider(pc)
@@ -134,7 +140,10 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncNeverTimeout: mirror.RsyncNoTimeo,
rsyncTimeoutValue: mirror.RsyncTimeout,
overriddenOptions: mirror.RsyncOverride, overriddenOptions: mirror.RsyncOverride,
rsyncEnv: mirror.Env,
workingDir: mirrorDir, workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
@@ -142,6 +151,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
useIPv4: mirror.UseIPv4, useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry, retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
} }
p, err := newRsyncProvider(rc) p, err := newRsyncProvider(rc)
if err != nil { if err != nil {
@@ -151,20 +161,24 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider = p provider = p
case provTwoStageRsync: case provTwoStageRsync:
rc := twoStageRsyncConfig{ rc := twoStageRsyncConfig{
name: mirror.Name, name: mirror.Name,
stage1Profile: mirror.Stage1Profile, stage1Profile: mirror.Stage1Profile,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command, rsyncCmd: mirror.Command,
username: mirror.Username, username: mirror.Username,
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
workingDir: mirrorDir, rsyncNeverTimeout: mirror.RsyncNoTimeo,
logDir: logDir, rsyncTimeoutValue: mirror.RsyncTimeout,
logFile: filepath.Join(logDir, "latest.log"), rsyncEnv: mirror.Env,
useIPv6: mirror.UseIPv6, workingDir: mirrorDir,
interval: time.Duration(mirror.Interval) * time.Minute, logDir: logDir,
retry: mirror.Retry, logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6,
interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
} }
p, err := newTwoStageRsyncProvider(rc) p, err := newTwoStageRsyncProvider(rc)
if err != nil { if err != nil {

查看文件

@@ -5,6 +5,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"testing" "testing"
"time" "time"
@@ -27,6 +28,7 @@ func TestRsyncProvider(t *testing.T) {
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
timeout: 100 * time.Second,
interval: 600 * time.Second, interval: 600 * time.Second,
} }
@@ -39,6 +41,7 @@ func TestRsyncProvider(t *testing.T) {
So(provider.LogDir(), ShouldEqual, c.logDir) So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile) So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval) So(provider.Interval(), ShouldEqual, c.interval)
So(provider.Timeout(), ShouldEqual, c.timeout)
Convey("When entering a context (auto exit)", func() { Convey("When entering a context (auto exit)", func() {
func() { func() {
@@ -90,12 +93,12 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 -6 %s %s", "--timeout=120 -6 %s %s",
provider.upstreamURL, provider.WorkingDir(), provider.upstreamURL, provider.WorkingDir(),
), ),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -105,6 +108,34 @@ exit 0
}) })
}) })
Convey("If the rsync program fails", t, func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
tmpFile := filepath.Join(tmpDir, "log_file")
Convey("in the rsyncProvider", func() {
c := rsyncConfig{
name: "tuna",
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
extraOptions: []string{"--somethine-invalid"},
interval: 600 * time.Second,
}
provider, err := newRsyncProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldContainSubstring, "Syntax or usage error")
})
})
} }
func TestRsyncProviderWithAuthentication(t *testing.T) { func TestRsyncProviderWithAuthentication(t *testing.T) {
@@ -114,19 +145,22 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync") scriptFile := filepath.Join(tmpDir, "myrsync")
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
proxyAddr := "127.0.0.1:1233"
c := rsyncConfig{ c := rsyncConfig{
name: "tuna", name: "tuna",
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
username: "tunasync", username: "tunasync",
password: "tunasyncpassword", password: "tunasyncpassword",
workingDir: tmpDir, workingDir: tmpDir,
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
logDir: tmpDir, rsyncTimeoutValue: 30,
logFile: tmpFile, rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
useIPv4: true, logDir: tmpDir,
interval: 600 * time.Second, logFile: tmpFile,
useIPv4: true,
interval: 600 * time.Second,
} }
provider, err := newRsyncProvider(c) provider, err := newRsyncProvider(c)
@@ -141,7 +175,7 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
Convey("Let's try a run", func() { Convey("Let's try a run", func() {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
echo "syncing to $(pwd)" echo "syncing to $(pwd)"
echo $USER $RSYNC_PASSWORD $@ echo $USER $RSYNC_PASSWORD $RSYNC_PROXY $@
sleep 1 sleep 1
echo "Done" echo "Done"
exit 0 exit 0
@@ -156,14 +190,15 @@ exit 0
"Done\n", "Done\n",
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 -4 --delete-excluded %s %s", "--timeout=30 -4 --delete-excluded %s %s",
provider.username, provider.password, provider.upstreamURL, provider.WorkingDir(), provider.username, provider.password, proxyAddr,
provider.upstreamURL, provider.WorkingDir(),
), ),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -187,6 +222,7 @@ func TestRsyncProviderWithOverriddenOptions(t *testing.T) {
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
rsyncNeverTimeout: true,
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"}, overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
logDir: tmpDir, logDir: tmpDir,
@@ -225,7 +261,7 @@ exit 0
provider.WorkingDir(), provider.WorkingDir(),
) )
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -236,6 +272,78 @@ exit 0
}) })
} }
func TestRsyncProviderWithDocker(t *testing.T) {
Convey("Rsync in Docker should work", t, func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync")
excludeFile := filepath.Join(tmpDir, "exclude.txt")
g := &Config{
Global: globalConfig{
Retry: 2,
},
Docker: dockerConfig{
Enable: true,
Volumes: []string{
scriptFile + ":/bin/myrsync",
"/etc/gai.conf:/etc/gai.conf:ro",
},
},
}
c := mirrorConfig{
Name: "tuna",
Provider: provRsync,
Upstream: "rsync://rsync.tuna.moe/tuna/",
Command: "/bin/myrsync",
ExcludeFile: excludeFile,
DockerImage: "alpine:3.8",
LogDir: tmpDir,
MirrorDir: tmpDir,
UseIPv6: true,
Timeout: 100,
Interval: 600,
}
provider := newMirrorProvider(c, g)
So(provider.Type(), ShouldEqual, provRsync)
So(provider.Name(), ShouldEqual, c.Name)
So(provider.WorkingDir(), ShouldEqual, c.MirrorDir)
So(provider.LogDir(), ShouldEqual, c.LogDir)
cmdScriptContent := `#!/bin/sh
#echo "$@"
while [[ $# -gt 0 ]]; do
if [[ "$1" = "--exclude-from" ]]; then
cat "$2"
shift
fi
shift
done
`
err = ioutil.WriteFile(scriptFile, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
err = ioutil.WriteFile(excludeFile, []byte("__some_pattern"), 0755)
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.preExec()
So(err, ShouldBeNil)
}
err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.postExec()
So(err, ShouldBeNil)
}
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, "__some_pattern")
})
}
func TestCmdProvider(t *testing.T) { func TestCmdProvider(t *testing.T) {
Convey("Command Provider should work", t, func(ctx C) { Convey("Command Provider should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
@@ -289,7 +397,7 @@ echo $AOSP_REPO_BIN
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent)) So(readedScriptContent, ShouldResemble, []byte(scriptContent))
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
@@ -305,23 +413,26 @@ echo $AOSP_REPO_BIN
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent)) So(readedScriptContent, ShouldResemble, []byte(scriptContent))
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("If a long job is killed", func(ctx C) { Convey("If a long job is killed", func(ctx C) {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
sleep 5 sleep 10
` `
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
started := make(chan empty, 1)
go func() { go func() {
err = provider.Run() err := provider.Run(started)
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()
<-started
So(provider.IsRunning(), ShouldBeTrue)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
err = provider.Terminate() err = provider.Terminate()
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -357,12 +468,12 @@ sleep 5
Convey("Run the command", func() { Convey("Run the command", func() {
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
}) })
Convey("Command Provider with fail-on-match regexp should work", t, func(ctx C) { Convey("Command Provider with RegExprs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -373,29 +484,74 @@ sleep 5
upstreamURL: "http://mirrors.tuna.moe/", upstreamURL: "http://mirrors.tuna.moe/",
command: "uptime", command: "uptime",
failOnMatch: "", failOnMatch: "",
sizePattern: "",
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
interval: 600 * time.Second, interval: 600 * time.Second,
} }
Convey("when regexp matches", func() { Convey("when fail-on-match regexp matches", func() {
c.failOnMatch = `[a-z]+` c.failOnMatch = `[a-z]+`
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty)
}) })
Convey("when regexp does not match", func() { Convey("when fail-on-match regexp does not match", func() {
c.failOnMatch = `load average_` c.failOnMatch = `load average_`
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
Convey("when fail-on-match regexp meets /dev/null", func() {
c.failOnMatch = `load average_`
c.logFile = "/dev/null"
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil)
})
Convey("when size-pattern regexp matches", func() {
c.sizePattern = `load average: ([\d\.]+)`
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
So(provider.DataSize(), ShouldNotBeEmpty)
_, err = strconv.ParseFloat(provider.DataSize(), 32)
So(err, ShouldBeNil)
})
Convey("when size-pattern regexp does not match", func() {
c.sizePattern = `load ave: ([\d\.]+)`
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
So(provider.DataSize(), ShouldBeEmpty)
})
Convey("when size-pattern regexp meets /dev/null", func() {
c.sizePattern = `load ave: ([\d\.]+)`
c.logFile = "/dev/null"
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty)
})
}) })
} }
@@ -408,18 +564,19 @@ func TestTwoStageRsyncProvider(t *testing.T) {
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
c := twoStageRsyncConfig{ c := twoStageRsyncConfig{
name: "tuna-two-stage-rsync", name: "tuna-two-stage-rsync",
upstreamURL: "rsync://mirrors.tuna.moe/", upstreamURL: "rsync://mirrors.tuna.moe/",
stage1Profile: "debian", stage1Profile: "debian",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
excludeFile: tmpFile, excludeFile: tmpFile,
extraOptions: []string{"--delete-excluded", "--cache"}, rsyncTimeoutValue: 30,
username: "hello", extraOptions: []string{"--delete-excluded", "--cache"},
password: "world", username: "hello",
password: "world",
} }
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
@@ -443,7 +600,7 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run(make(chan empty, 2))
So(err, ShouldBeNil) So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir()) targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
@@ -457,7 +614,7 @@ exit 0
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
@@ -465,7 +622,7 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s", "--delete-excluded --cache --timeout=30 -6 --exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -479,32 +636,65 @@ exit 0
Convey("Try terminating", func(ctx C) { Convey("Try terminating", func(ctx C) {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
echo $@ echo $@
sleep 4 sleep 10
exit 0 exit 0
` `
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil) So(err, ShouldBeNil)
started := make(chan empty, 2)
go func() { go func() {
err = provider.Run() err := provider.Run(started)
ctx.So(err, ShouldNotBeNil) ctx.So(err, ShouldNotBeNil)
}() }()
<-started
So(provider.IsRunning(), ShouldBeTrue)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
err = provider.Terminate() err = provider.Terminate()
So(err, ShouldBeNil) So(err, ShouldBeNil)
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s\n", "--exclude-from %s %s %s\n",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
) )
loggedContent, err := ioutil.ReadFile(provider.LogFile()) loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput) So(string(loggedContent), ShouldStartWith, expectedOutput)
// fmt.Println(string(loggedContent)) // fmt.Println(string(loggedContent))
}) })
}) })
Convey("If the rsync program fails", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
tmpFile := filepath.Join(tmpDir, "log_file")
Convey("in the twoStageRsyncProvider", func() {
c := twoStageRsyncConfig{
name: "tuna-two-stage-rsync",
upstreamURL: "rsync://0.0.0.1/",
stage1Profile: "debian",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
excludeFile: tmpFile,
}
provider, err := newTwoStageRsyncProvider(c)
So(err, ShouldBeNil)
err = provider.Run(make(chan empty, 2))
So(err, ShouldNotBeNil)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldContainSubstring, "Error in socket I/O")
})
})
} }

查看文件

@@ -2,7 +2,7 @@ package worker
import ( import (
"errors" "errors"
"io/ioutil" "fmt"
"strings" "strings"
"time" "time"
@@ -15,10 +15,14 @@ type rsyncConfig struct {
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
overriddenOptions []string overriddenOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6, useIPv4 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -43,6 +47,7 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
rsyncConfig: c, rsyncConfig: c,
} }
@@ -50,17 +55,34 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if c.rsyncCmd == "" { if c.rsyncCmd == "" {
provider.rsyncCmd = "rsync" provider.rsyncCmd = "rsync"
} }
if c.rsyncEnv == nil {
provider.rsyncEnv = map[string]string{}
}
if c.username != "" {
provider.rsyncEnv["USER"] = c.username
}
if c.password != "" {
provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
}
options := []string{ options := []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links",
} }
if c.overriddenOptions != nil { if c.overriddenOptions != nil {
options = c.overriddenOptions options = c.overriddenOptions
} }
if !c.rsyncNeverTimeout {
timeo := 120
if c.rsyncTimeoutValue > 0 {
timeo = c.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if c.useIPv6 { if c.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} else if c.useIPv4 { } else if c.useIPv4 {
@@ -94,17 +116,24 @@ func (p *rsyncProvider) DataSize() string {
return p.dataSize return p.dataSize
} }
func (p *rsyncProvider) Run() error { func (p *rsyncProvider) Run(started chan empty) error {
p.dataSize = "" p.dataSize = ""
defer p.closeLogFile()
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
started <- empty{}
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
code, msg := internal.TranslateRsyncErrorCode(err)
if code != 0 {
logger.Debug("Rsync exitcode %d (%s)", code, msg)
if p.logFileFd != nil {
p.logFileFd.WriteString(msg + "\n")
}
}
return err return err
} }
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil return nil
} }
@@ -116,18 +145,11 @@ func (p *rsyncProvider) Start() error {
return errors.New("provider is currently running") return errors.New("provider is currently running")
} }
env := map[string]string{}
if p.username != "" {
env["USER"] = p.username
}
if p.password != "" {
env["RSYNC_PASSWORD"] = p.password
}
command := []string{p.rsyncCmd} command := []string{p.rsyncCmd}
command = append(command, p.options...) command = append(command, p.options...)
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
if err := p.prepareLogFile(false); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }
@@ -136,5 +158,6 @@ func (p *rsyncProvider) Start() error {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
return nil return nil
} }

查看文件

@@ -118,9 +118,6 @@ func (c *cmdJob) Wait() error {
return c.retErr return c.retErr
default: default:
err := c.cmd.Wait() err := c.cmd.Wait()
if c.cmd.Stdout != nil {
c.cmd.Stdout.(*os.File).Close()
}
c.retErr = err c.retErr = err
close(c.finished) close(c.finished)
return err return err

查看文件

@@ -3,7 +3,6 @@ package worker
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"time" "time"
@@ -16,10 +15,14 @@ type twoStageRsyncConfig struct {
stage1Profile string stage1Profile string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
interval time.Duration interval time.Duration
retry int retry int
timeout time.Duration
} }
// An RsyncProvider provides the implementation to rsync-based syncing jobs // An RsyncProvider provides the implementation to rsync-based syncing jobs
@@ -54,21 +57,31 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
ctx: NewContext(), ctx: NewContext(),
interval: c.interval, interval: c.interval,
retry: c.retry, retry: c.retry,
timeout: c.timeout,
}, },
twoStageRsyncConfig: c, twoStageRsyncConfig: c,
stage1Options: []string{ stage1Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links",
}, },
stage2Options: []string{ stage2Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links",
}, },
} }
if c.rsyncEnv == nil {
provider.rsyncEnv = map[string]string{}
}
if c.username != "" {
provider.rsyncEnv["USER"] = c.username
}
if c.password != "" {
provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
}
if c.rsyncCmd == "" { if c.rsyncCmd == "" {
provider.rsyncCmd = "rsync" provider.rsyncCmd = "rsync"
} }
@@ -113,6 +126,14 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
return []string{}, fmt.Errorf("Invalid stage: %d", stage) return []string{}, fmt.Errorf("Invalid stage: %d", stage)
} }
if !p.rsyncNeverTimeout {
timeo := 120
if p.rsyncTimeoutValue > 0 {
timeo = p.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if p.useIPv6 { if p.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} }
@@ -124,7 +145,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
return options, nil return options, nil
} }
func (p *twoStageRsyncProvider) Run() error { func (p *twoStageRsyncProvider) Run(started chan empty) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@@ -132,14 +153,6 @@ func (p *twoStageRsyncProvider) Run() error {
return errors.New("provider is currently running") return errors.New("provider is currently running")
} }
env := map[string]string{}
if p.username != "" {
env["USER"] = p.username
}
if p.password != "" {
env["RSYNC_PASSWORD"] = p.password
}
p.dataSize = "" p.dataSize = ""
stages := []int{1, 2} stages := []int{1, 2}
for _, stage := range stages { for _, stage := range stages {
@@ -151,26 +164,33 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, options...) command = append(command, options...)
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
if err := p.prepareLogFile(stage > 1); err != nil { if err := p.prepareLogFile(stage > 1); err != nil {
return err return err
} }
defer p.closeLogFile()
if err = p.cmd.Start(); err != nil { if err = p.cmd.Start(); err != nil {
return err return err
} }
p.isRunning.Store(true) p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name()) logger.Debugf("set isRunning to true: %s", p.Name())
started <- empty{}
p.Unlock() p.Unlock()
err = p.Wait() err = p.Wait()
p.Lock() p.Lock()
if err != nil { if err != nil {
code, msg := internal.TranslateRsyncErrorCode(err)
if code != 0 {
logger.Debug("Rsync exitcode %d (%s)", code, msg)
if p.logFileFd != nil {
p.logFileFd.WriteString(msg + "\n")
}
}
return err return err
} }
} }
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil return nil
} }

查看文件

@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
for _, root := range w.cfg.Manager.APIBaseList() { for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf("%s/workers", root) url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url) logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil { for retry := 10; retry > 0; {
logger.Errorf("Failed to register worker") if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
retry--
if retry > 0 {
time.Sleep(1 * time.Second)
logger.Noticef("Retrying... (%d)", retry)
}
} else {
break
}
} }
} }
} }