1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-07 15:06:47 +00:00

10 次代码提交

作者 SHA1 备注 提交日期
zyx
45e5d900fb bump version to 0.6.5 2020-06-08 22:30:28 +08:00
zyx
7b0cd490b7 fix misuse of a variable 2020-06-08 22:23:12 +08:00
zyx
9178966aed bump version to 0.6.4 2020-06-04 09:44:17 +08:00
zyx
b5d2a0ad89 bug fix: jobs not being scheduled after timeout 2020-06-04 09:37:20 +08:00
zyx
d8963c9946 test rsync inside a Docker container 2020-06-03 21:51:04 +08:00
zyx
198afa72cd bug fix: rsync can access the exclude file in Docker (close #59) 2020-06-03 21:50:38 +08:00
zyx
85ce9c1270 wait for docker container removal 2020-06-03 19:47:14 +08:00
zyx
a8a35fc259 Merge branch 'master' of github.com:tuna/tunasync 2020-06-03 13:28:58 +08:00
zyx
c00eb12a75 Two new options for rsync provider
- rsync_no_timeout=true/false # disable --timeout option
- rsync_timeout=n # set --timeout=n
related to issue #121
2020-06-03 13:26:49 +08:00
Yuxiang Zhang
95ae9c16a9 Update workers.conf 2020-06-01 16:59:44 +08:00
共有 12 个文件被更改,包括 217 次插入60 次删除

查看文件

@@ -486,7 +486,7 @@ name = "pypi"
provider = "command" provider = "command"
upstream = "https://pypi.python.org/" upstream = "https://pypi.python.org/"
command = "/home/scripts/pypi.sh" command = "/home/scripts/pypi.sh"
docker_image = "tunathu/tunasync-scripts:latest" docker_image = "tunathu/bandersnatch:latest"
interval = 5 interval = 5
[[mirrors]] [[mirrors]]

查看文件

@@ -1,4 +1,4 @@
package internal package internal
// Version of the program // Version of the program
const Version string = "0.6.3" const Version string = "0.6.5"

查看文件

@@ -1,6 +1,6 @@
package worker package worker
// put global viables and types here // put global variables and types here
import ( import (
"gopkg.in/op/go-logging.v1" "gopkg.in/op/go-logging.v1"

查看文件

@@ -142,6 +142,8 @@ type mirrorConfig struct {
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
RsyncNoTimeo bool `toml:"rsync_no_timeout"`
RsyncTimeout int `toml:"rsync_timeout"`
RsyncOptions []string `toml:"rsync_options"` RsyncOptions []string `toml:"rsync_options"`
RsyncOverride []string `toml:"rsync_override"` RsyncOverride []string `toml:"rsync_override"`
Stage1Profile string `toml:"stage1_profile"` Stage1Profile string `toml:"stage1_profile"`

查看文件

@@ -3,6 +3,9 @@ package worker
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/codeskyblue/go-sh"
) )
type dockerHook struct { type dockerHook struct {
@@ -16,6 +19,10 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
volumes := []string{} volumes := []string{}
volumes = append(volumes, gCfg.Volumes...) volumes = append(volumes, gCfg.Volumes...)
volumes = append(volumes, mCfg.DockerVolumes...) volumes = append(volumes, mCfg.DockerVolumes...)
if len(mCfg.ExcludeFile) > 0 {
arg := fmt.Sprintf("%s:%s:ro", mCfg.ExcludeFile, mCfg.ExcludeFile)
volumes = append(volumes, arg)
}
options := []string{} options := []string{}
options = append(options, gCfg.Options...) options = append(options, gCfg.Options...)
@@ -60,6 +67,27 @@ func (d *dockerHook) postExec() error {
// sh.Command( // sh.Command(
// "docker", "rm", "-f", d.Name(), // "docker", "rm", "-f", d.Name(),
// ).Run() // ).Run()
name := d.Name()
retry := 10
for ; retry > 0; retry-- {
out, err := sh.Command(
"docker", "ps", "-a",
"--filter", "name=^"+name+"$",
"--format", "{{.Status}}",
).Output()
if err != nil {
logger.Errorf("docker ps failed: %v", err)
break
}
if len(out) == 0 {
break
}
logger.Debugf("container %s still exists: '%s'", name, string(out))
time.Sleep(1 * time.Second)
}
if retry == 0 {
logger.Warningf("container %s not removed automatically, next sync may fail", name)
}
d.provider.ExitContext() d.provider.ExitContext()
return nil return nil
} }

查看文件

@@ -94,22 +94,27 @@ sleep 20
} }
exitedErr <- err exitedErr <- err
}() }()
cmdRun("ps", []string{"aux"})
// Wait for docker running // Wait for docker running
time.Sleep(8 * time.Second) for wait := 0; wait < 8; wait++ {
names, err := getDockerByName(d.Name())
cmdRun("ps", []string{"aux"}) So(err, ShouldBeNil)
if names != "" {
break
}
time.Sleep(1 * time.Second)
}
// cmdRun("ps", []string{"aux"})
// assert container running // assert container running
names, err := getDockerByName(d.Name()) names, err := getDockerByName(d.Name())
So(err, ShouldBeNil) So(err, ShouldBeNil)
// So(names, ShouldEqual, d.Name()+"\n") So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate() err = provider.Terminate()
// So(err, ShouldBeNil) So(err, ShouldBeNil)
cmdRun("ps", []string{"aux"}) // cmdRun("ps", []string{"aux"})
<-exitedErr <-exitedErr
// container should be terminated and removed // container should be terminated and removed

查看文件

@@ -180,7 +180,6 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
logger.Debug("syncing done") logger.Debug("syncing done")
case <-time.After(timeout): case <-time.After(timeout):
logger.Notice("provider timeout") logger.Notice("provider timeout")
stopASAP = true
termErr = provider.Terminate() termErr = provider.Terminate()
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout) syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
case <-kill: case <-kill:
@@ -190,7 +189,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncErr = errors.New("killed by manager") syncErr = errors.New("killed by manager")
} }
if termErr != nil { if termErr != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error()) logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
return termErr return termErr
} }

查看文件

@@ -335,7 +335,6 @@ echo $TUNASYNC_WORKING_DIR
}) })
}) })
Convey("When a job timed out", func(ctx C) { Convey("When a job timed out", func(ctx C) {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR echo $TUNASYNC_WORKING_DIR
@@ -371,6 +370,30 @@ echo $TUNASYNC_WORKING_DIR
job.ctrlChan <- jobDisable job.ctrlChan <- jobDisable
<-job.disabled <-job.disabled
}) })
Convey("It should be retried", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
for i := 0; i < defaultMaxRetry; i++ {
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldContainSubstring, "timeout after")
// re-schedule after last try
So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
}
job.ctrlChan <- jobDisable
<-job.disabled
})
}) })
}) })

查看文件

@@ -140,6 +140,8 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncNeverTimeout: mirror.RsyncNoTimeo,
rsyncTimeoutValue: mirror.RsyncTimeout,
overriddenOptions: mirror.RsyncOverride, overriddenOptions: mirror.RsyncOverride,
rsyncEnv: mirror.Env, rsyncEnv: mirror.Env,
workingDir: mirrorDir, workingDir: mirrorDir,
@@ -159,22 +161,24 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
provider = p provider = p
case provTwoStageRsync: case provTwoStageRsync:
rc := twoStageRsyncConfig{ rc := twoStageRsyncConfig{
name: mirror.Name, name: mirror.Name,
stage1Profile: mirror.Stage1Profile, stage1Profile: mirror.Stage1Profile,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command, rsyncCmd: mirror.Command,
username: mirror.Username, username: mirror.Username,
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncEnv: mirror.Env, rsyncNeverTimeout: mirror.RsyncNoTimeo,
workingDir: mirrorDir, rsyncTimeoutValue: mirror.RsyncTimeout,
logDir: logDir, rsyncEnv: mirror.Env,
logFile: filepath.Join(logDir, "latest.log"), workingDir: mirrorDir,
useIPv6: mirror.UseIPv6, logDir: logDir,
interval: time.Duration(mirror.Interval) * time.Minute, logFile: filepath.Join(logDir, "latest.log"),
retry: mirror.Retry, useIPv6: mirror.UseIPv6,
timeout: time.Duration(mirror.Timeout) * time.Second, interval: time.Duration(mirror.Interval) * time.Minute,
retry: mirror.Retry,
timeout: time.Duration(mirror.Timeout) * time.Second,
} }
p, err := newTwoStageRsyncProvider(rc) p, err := newTwoStageRsyncProvider(rc)
if err != nil { if err != nil {

查看文件

@@ -148,18 +148,19 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
proxyAddr := "127.0.0.1:1233" proxyAddr := "127.0.0.1:1233"
c := rsyncConfig{ c := rsyncConfig{
name: "tuna", name: "tuna",
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
username: "tunasync", username: "tunasync",
password: "tunasyncpassword", password: "tunasyncpassword",
workingDir: tmpDir, workingDir: tmpDir,
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr}, rsyncTimeoutValue: 30,
logDir: tmpDir, rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
logFile: tmpFile, logDir: tmpDir,
useIPv4: true, logFile: tmpFile,
interval: 600 * time.Second, useIPv4: true,
interval: 600 * time.Second,
} }
provider, err := newRsyncProvider(c) provider, err := newRsyncProvider(c)
@@ -191,7 +192,7 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 -4 --delete-excluded %s %s", "--timeout=30 -4 --delete-excluded %s %s",
provider.username, provider.password, proxyAddr, provider.username, provider.password, proxyAddr,
provider.upstreamURL, provider.WorkingDir(), provider.upstreamURL, provider.WorkingDir(),
), ),
@@ -221,6 +222,7 @@ func TestRsyncProviderWithOverriddenOptions(t *testing.T) {
upstreamURL: "rsync://rsync.tuna.moe/tuna/", upstreamURL: "rsync://rsync.tuna.moe/tuna/",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
rsyncNeverTimeout: true,
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"}, overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
logDir: tmpDir, logDir: tmpDir,
@@ -270,6 +272,78 @@ exit 0
}) })
} }
func TestRsyncProviderWithDocker(t *testing.T) {
Convey("Rsync in Docker should work", t, func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync")
excludeFile := filepath.Join(tmpDir, "exclude.txt")
g := &Config{
Global: globalConfig{
Retry: 2,
},
Docker: dockerConfig{
Enable: true,
Volumes: []string{
scriptFile + ":/bin/myrsync",
"/etc/gai.conf:/etc/gai.conf:ro",
},
},
}
c := mirrorConfig{
Name: "tuna",
Provider: provRsync,
Upstream: "rsync://rsync.tuna.moe/tuna/",
Command: "/bin/myrsync",
ExcludeFile: excludeFile,
DockerImage: "alpine:3.8",
LogDir: tmpDir,
MirrorDir: tmpDir,
UseIPv6: true,
Timeout: 100,
Interval: 600,
}
provider := newMirrorProvider(c, g)
So(provider.Type(), ShouldEqual, provRsync)
So(provider.Name(), ShouldEqual, c.Name)
So(provider.WorkingDir(), ShouldEqual, c.MirrorDir)
So(provider.LogDir(), ShouldEqual, c.LogDir)
cmdScriptContent := `#!/bin/sh
#echo "$@"
while [[ $# -gt 0 ]]; do
if [[ "$1" = "--exclude-from" ]]; then
cat "$2"
shift
fi
shift
done
`
err = ioutil.WriteFile(scriptFile, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
err = ioutil.WriteFile(excludeFile, []byte("__some_pattern"), 0755)
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.preExec()
So(err, ShouldBeNil)
}
err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
for _, hook := range provider.Hooks() {
err = hook.postExec()
So(err, ShouldBeNil)
}
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, "__some_pattern")
})
}
func TestCmdProvider(t *testing.T) { func TestCmdProvider(t *testing.T) {
Convey("Command Provider should work", t, func(ctx C) { Convey("Command Provider should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
@@ -490,18 +564,19 @@ func TestTwoStageRsyncProvider(t *testing.T) {
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
c := twoStageRsyncConfig{ c := twoStageRsyncConfig{
name: "tuna-two-stage-rsync", name: "tuna-two-stage-rsync",
upstreamURL: "rsync://mirrors.tuna.moe/", upstreamURL: "rsync://mirrors.tuna.moe/",
stage1Profile: "debian", stage1Profile: "debian",
rsyncCmd: scriptFile, rsyncCmd: scriptFile,
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv6: true, useIPv6: true,
excludeFile: tmpFile, excludeFile: tmpFile,
extraOptions: []string{"--delete-excluded", "--cache"}, rsyncTimeoutValue: 30,
username: "hello", extraOptions: []string{"--delete-excluded", "--cache"},
password: "world", username: "hello",
password: "world",
} }
provider, err := newTwoStageRsyncProvider(c) provider, err := newTwoStageRsyncProvider(c)
@@ -539,7 +614,7 @@ exit 0
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
@@ -547,7 +622,7 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s", "--delete-excluded --cache --timeout=30 -6 --exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -581,7 +656,7 @@ exit 0
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --exclude dists/ -6 "+ "--exclude dists/ --timeout=30 -6 "+
"--exclude-from %s %s %s\n", "--exclude-from %s %s %s\n",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
) )

查看文件

@@ -2,6 +2,7 @@ package worker
import ( import (
"errors" "errors"
"fmt"
"strings" "strings"
"time" "time"
@@ -14,6 +15,8 @@ type rsyncConfig struct {
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
overriddenOptions []string overriddenOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6, useIPv4 bool useIPv6, useIPv4 bool
@@ -66,12 +69,20 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--safe-links",
} }
if c.overriddenOptions != nil { if c.overriddenOptions != nil {
options = c.overriddenOptions options = c.overriddenOptions
} }
if !c.rsyncNeverTimeout {
timeo := 120
if c.rsyncTimeoutValue > 0 {
timeo = c.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if c.useIPv6 { if c.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} else if c.useIPv4 { } else if c.useIPv4 {

查看文件

@@ -15,6 +15,8 @@ type twoStageRsyncConfig struct {
stage1Profile string stage1Profile string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
rsyncNeverTimeout bool
rsyncTimeoutValue int
rsyncEnv map[string]string rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
@@ -61,13 +63,13 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
stage1Options: []string{ stage1Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--safe-links", "--timeout=120", "--safe-links",
}, },
stage2Options: []string{ stage2Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--safe-links",
}, },
} }
@@ -124,6 +126,14 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
return []string{}, fmt.Errorf("Invalid stage: %d", stage) return []string{}, fmt.Errorf("Invalid stage: %d", stage)
} }
if !p.rsyncNeverTimeout {
timeo := 120
if p.rsyncTimeoutValue > 0 {
timeo = p.rsyncTimeoutValue
}
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
}
if p.useIPv6 { if p.useIPv6 {
options = append(options, "-6") options = append(options, "-6")
} }