1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-08 07:26:47 +00:00

15 次代码提交

作者 SHA1 备注 提交日期
zyx
c5ed682a49 Bump version to 0.5.1 2020-03-20 10:39:34 +08:00
zyx
2c33380ce0 fix util_test 2020-03-20 10:35:54 +08:00
zyx
70cb22096f Merge branch 'master' of github.ip4.run:tuna/tunasync 2020-03-20 10:30:53 +08:00
zyx
b1f2679fbf [cmd provider] add support of match size in logs 2020-03-20 10:30:44 +08:00
Yuxiang Zhang
92a255fd3c Update tunasync.yml 2020-03-16 22:43:41 +08:00
zyx
aee1a705b7 remove "--contimeout=120" from default rsync options 2020-03-16 22:23:47 +08:00
zyx
c99916cc2a Bump version to 0.4.3 2020-03-16 22:03:40 +08:00
zyx
9eb72c5db0 fix misuse of variables 2020-03-16 21:59:34 +08:00
z4yx
b490c22984 add test of rsyncEnv 2020-03-16 21:16:23 +08:00
z4yx
ae5ff25d20 in case rsyncEnv is nil 2020-03-16 21:11:15 +08:00
z4yx
365f49e6d3 add support of env config for rsync provider 2020-03-16 20:59:08 +08:00
z4yx
fddb793ca1 v0.4.2 2020-03-14 11:30:44 +08:00
z4yx
c41d7a4038 Bring docker test back
commit 4540ba24c72cb2d24e2e04870025dfa233dedf30
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 11:16:13 2020 +0800

    wait longer

commit c8f07b81a7fe5fdef9224e8bc187500c4d67f049
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 10:55:49 2020 +0800

    try to Terminate

commit 10d2d4b9d0756cf8f60fe27e1e41ae29b5ea6cbe
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 10:50:26 2020 +0800

    forward the error

commit 38c96ee44d31088b9e6de67ebb745358fac8d49a
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 10:31:39 2020 +0800

    now enable the assertion

commit 3b3c46a065a035d906d4cc5022d42e30b1f52a08
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 10:26:40 2020 +0800

    rm un-related info

commit dd7ef7e3d0a0765c1fc48296d70966b3b4d581dd
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 10:12:01 2020 +0800

    print err of provider.Run

commit 49a7b57dbf52d410c0dfe796be9c2f6213884931
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 09:55:48 2020 +0800

    wait until it exits

commit a3e8f699072e3252b3300c667f1425a966aedb39
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 09:54:19 2020 +0800

    targeting alpine:3.8

commit f30b8565049bb373a1a91a34ad07c8c3df8e1036
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 09:47:27 2020 +0800

    see what happens

commit 8c21229a8be8e2ac0737bbc4bb88ba54e9fb7a20
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Sat Mar 14 09:30:50 2020 +0800

    remove one assertion

commit 123368e6ef07aa63c489bb49bdf370d3abdd17bb
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 23:32:45 2020 +0800

    docker test somehow works now

commit 94fa294a9bbedb569e6dd9cc7e4f27e73ed97443
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 23:27:12 2020 +0800

    should use -d

commit b35bae2a9cb5e006c513da95377ab9487fc4341a
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 23:22:25 2020 +0800

    docker run not working??

commit 9aea0036f434d333087f0cff3ce5165a53554e5f
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 23:12:39 2020 +0800

    test if docker works

commit f92578b159587a8bbda296bbf9261fb4c5e2f186
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 17:42:00 2020 +0800

    debugging docker_test

commit b649e32f76549711af597ce3a642309a41a08bf9
Author: z4yx <z4yx@users.noreply.github.com>
Date:   Fri Mar 13 17:27:55 2020 +0800

    Revert "remove docker_test.go"

    This reverts commit a517a4bb64.
2020-03-14 11:23:19 +08:00
z4yx
8b0ef2bb53 fix the test 2020-03-14 11:11:10 +08:00
z4yx
b25be80670 extra options should only be applied to the second stage 2020-03-14 11:01:34 +08:00
共有 12 个文件被更改,包括 299 次插入68 次删除

查看文件

@@ -28,6 +28,11 @@ jobs:
make tunasync make tunasync
make tunasynctl make tunasynctl
- name: Keep artifacts
uses: actions/upload-artifact@v1
with:
name: tunasync-bin
path: build/
test: test:
name: Test name: Test
@@ -36,10 +41,9 @@ jobs:
- name: Setup test dependencies - name: Setup test dependencies
run: | run: |
sudo apt update sudo apt-get update
sudo apt install -y cgroup-bin sudo apt-get install -y cgroup-bin
/usr/bin/docker pull alpine docker pull alpine:3.8
/usr/bin/docker images
lssubsys -am lssubsys -am
sudo cgcreate -a $USER -t $USER -g cpu:tunasync sudo cgcreate -a $USER -t $USER -g cpu:tunasync
sudo cgcreate -a $USER -t $USER -g memory:tunasync sudo cgcreate -a $USER -t $USER -g memory:tunasync

查看文件

@@ -86,13 +86,32 @@ func GetJSON(url string, obj interface{}, client *http.Client) (*http.Response,
return resp, json.Unmarshal(body, obj) return resp, json.Unmarshal(body, obj)
} }
func ExtractSizeFromRsyncLog(content []byte) string { // FindAllSubmatchInFile calls re.FindAllSubmatch to find matches in given file
// (?m) flag enables multi-line mode func FindAllSubmatchInFile(fileName string, re *regexp.Regexp) (matches [][][]byte, err error) {
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`) if fileName == "/dev/null" {
matches := re.FindAllSubmatch(content, -1) err = errors.New("Invalid log file")
// fmt.Printf("%q\n", matches) return
if len(matches) == 0 { }
if content, err := ioutil.ReadFile(fileName); err == nil {
matches = re.FindAllSubmatch(content, -1)
// fmt.Printf("FindAllSubmatchInFile: %q\n", matches)
}
return
}
// ExtractSizeFromLog uses a regexp to extract the size from log files
func ExtractSizeFromLog(logFile string, re *regexp.Regexp) string {
matches, _ := FindAllSubmatchInFile(logFile, re)
if matches == nil || len(matches) == 0 {
return "" return ""
} }
// return the first capture group of the last occurrence
return string(matches[len(matches)-1][1]) return string(matches[len(matches)-1][1])
} }
// ExtractSizeFromRsyncLog extracts the size from rsync logs
func ExtractSizeFromRsyncLog(logFile string) string {
// (?m) flag enables multi-line mode
re := regexp.MustCompile(`(?m)^Total file size: ([0-9\.]+[KMGTP]?) bytes`)
return ExtractSizeFromLog(logFile, re)
}

查看文件

@@ -1,6 +1,9 @@
package internal package internal
import ( import (
"io/ioutil"
"os"
"path/filepath"
"testing" "testing"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
@@ -26,7 +29,14 @@ sent 7.55M bytes received 823.25M bytes 5.11M bytes/sec
total size is 1.33T speedup is 1,604.11 total size is 1.33T speedup is 1,604.11
` `
Convey("Log parser should work", t, func() { Convey("Log parser should work", t, func() {
res := ExtractSizeFromRsyncLog([]byte(realLogContent)) tmpDir, err := ioutil.TempDir("", "tunasync")
So(err, ShouldBeNil)
defer os.RemoveAll(tmpDir)
logFile := filepath.Join(tmpDir, "rs.log")
err = ioutil.WriteFile(logFile, []byte(realLogContent), 0755)
So(err, ShouldBeNil)
res := ExtractSizeFromRsyncLog(logFile)
So(res, ShouldEqual, "1.33T") So(res, ShouldEqual, "1.33T")
}) })
} }

查看文件

@@ -1,3 +1,4 @@
package internal package internal
const Version string = "0.4.1" // Version of the program
const Version string = "0.5.1"

查看文件

@@ -3,11 +3,11 @@ package worker
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"regexp" "regexp"
"time" "time"
"github.com/anmitsu/go-shlex" "github.com/anmitsu/go-shlex"
"github.com/tuna/tunasync/internal"
) )
type cmdConfig struct { type cmdConfig struct {
@@ -18,13 +18,16 @@ type cmdConfig struct {
retry int retry int
env map[string]string env map[string]string
failOnMatch string failOnMatch string
sizePattern string
} }
type cmdProvider struct { type cmdProvider struct {
baseProvider baseProvider
cmdConfig cmdConfig
command []string command []string
dataSize string
failOnMatch *regexp.Regexp failOnMatch *regexp.Regexp
sizePattern *regexp.Regexp
} }
func newCmdProvider(c cmdConfig) (*cmdProvider, error) { func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
@@ -59,6 +62,14 @@ func newCmdProvider(c cmdConfig) (*cmdProvider, error) {
} }
provider.failOnMatch = failOnMatch provider.failOnMatch = failOnMatch
} }
if len(c.sizePattern) > 0 {
var err error
sizePattern, err := regexp.Compile(c.sizePattern)
if err != nil {
return nil, errors.New("size-pattern regexp error: " + err.Error())
}
provider.sizePattern = sizePattern
}
return provider, nil return provider, nil
} }
@@ -71,7 +82,12 @@ func (p *cmdProvider) Upstream() string {
return p.upstreamURL return p.upstreamURL
} }
func (p *cmdProvider) DataSize() string {
return p.dataSize
}
func (p *cmdProvider) Run() error { func (p *cmdProvider) Run() error {
p.dataSize = ""
if err := p.Start(); err != nil { if err := p.Start(); err != nil {
return err return err
} }
@@ -79,16 +95,18 @@ func (p *cmdProvider) Run() error {
return err return err
} }
if p.failOnMatch != nil { if p.failOnMatch != nil {
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { matches, err := internal.FindAllSubmatchInFile(p.LogFile(), p.failOnMatch)
matches := p.failOnMatch.FindAllSubmatch(logContent, -1) fmt.Printf("FindAllSubmatchInFile: %q\n", matches)
if len(matches) != 0 { if err != nil {
logger.Debug("Fail-on-match: %r", matches)
return errors.New(
fmt.Sprintf("Fail-on-match regexp found %d matches", len(matches)))
}
} else {
return err return err
} }
if len(matches) != 0 {
logger.Debug("Fail-on-match: %r", matches)
return fmt.Errorf("Fail-on-match regexp found %d matches", len(matches))
}
}
if p.sizePattern != nil {
p.dataSize = internal.ExtractSizeFromLog(p.LogFile(), p.sizePattern)
} }
return nil return nil
} }

查看文件

@@ -131,6 +131,7 @@ type mirrorConfig struct {
Command string `toml:"command"` Command string `toml:"command"`
FailOnMatch string `toml:"fail_on_match"` FailOnMatch string `toml:"fail_on_match"`
SizePattern string `toml:"size_pattern"`
UseIPv6 bool `toml:"use_ipv6"` UseIPv6 bool `toml:"use_ipv6"`
UseIPv4 bool `toml:"use_ipv4"` UseIPv4 bool `toml:"use_ipv4"`
ExcludeFile string `toml:"exclude_file"` ExcludeFile string `toml:"exclude_file"`

127
worker/docker_test.go 普通文件
查看文件

@@ -0,0 +1,127 @@
package worker
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/codeskyblue/go-sh"
. "github.com/smartystreets/goconvey/convey"
)
func cmdRun(p string, args []string) {
cmd := exec.Command(p, args...)
out, err := cmd.CombinedOutput()
if err != nil {
logger.Debugf("cmdRun failed %s", err)
return
}
logger.Debugf("cmdRun: ", string(out))
}
func getDockerByName(name string) (string, error) {
// docker ps -f 'name=$name' --format '{{.Names}}'
out, err := sh.Command(
"docker", "ps", "-a",
"--filter", "name="+name,
"--format", "{{.Names}}",
).Output()
if err == nil {
logger.Debugf("docker ps: '%s'", string(out))
}
return string(out), err
}
func TestDocker(t *testing.T) {
Convey("Docker Should Work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
cmdScript := filepath.Join(tmpDir, "cmd.sh")
tmpFile := filepath.Join(tmpDir, "log_file")
expectedOutput := "HELLO_WORLD"
c := cmdConfig{
name: "tuna-docker",
upstreamURL: "http://mirrors.tuna.moe/",
command: "/bin/cmd.sh",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 600 * time.Second,
env: map[string]string{
"TEST_CONTENT": expectedOutput,
},
}
cmdScriptContent := `#!/bin/sh
echo ${TEST_CONTENT}
sleep 20
`
err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
d := &dockerHook{
emptyHook: emptyHook{
provider: provider,
},
image: "alpine:3.8",
volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
},
}
provider.AddHook(d)
So(provider.Docker(), ShouldNotBeNil)
err = d.preExec()
So(err, ShouldBeNil)
cmdRun("docker", []string{"images"})
exitedErr := make(chan error, 1)
go func() {
err = provider.Run()
logger.Debugf("provider.Run() exited")
if err != nil {
logger.Errorf("provider.Run() failed: %v", err)
}
exitedErr <- err
}()
cmdRun("ps", []string{"aux"})
// Wait for docker running
time.Sleep(8 * time.Second)
cmdRun("ps", []string{"aux"})
// assert container running
names, err := getDockerByName(d.Name())
So(err, ShouldBeNil)
// So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate()
// So(err, ShouldBeNil)
cmdRun("ps", []string{"aux"})
<-exitedErr
// container should be terminated and removed
names, err = getDockerByName(d.Name())
So(err, ShouldBeNil)
So(names, ShouldEqual, "")
// check log content
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput+"\n")
d.postExec()
})
}

查看文件

@@ -113,6 +113,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
command: mirror.Command, command: mirror.Command,
workingDir: mirrorDir, workingDir: mirrorDir,
failOnMatch: mirror.FailOnMatch, failOnMatch: mirror.FailOnMatch,
sizePattern: mirror.SizePattern,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
@@ -135,6 +136,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
overriddenOptions: mirror.RsyncOverride, overriddenOptions: mirror.RsyncOverride,
rsyncEnv: mirror.Env,
workingDir: mirrorDir, workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
@@ -159,6 +161,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
extraOptions: mirror.RsyncOptions, extraOptions: mirror.RsyncOptions,
rsyncEnv: mirror.Env,
workingDir: mirrorDir, workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),

查看文件

@@ -5,6 +5,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"testing" "testing"
"time" "time"
@@ -90,7 +91,7 @@ exit 0
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 -6 %s %s", "--timeout=120 -6 %s %s",
provider.upstreamURL, provider.WorkingDir(), provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -114,6 +115,7 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "myrsync") scriptFile := filepath.Join(tmpDir, "myrsync")
tmpFile := filepath.Join(tmpDir, "log_file") tmpFile := filepath.Join(tmpDir, "log_file")
proxyAddr := "127.0.0.1:1233"
c := rsyncConfig{ c := rsyncConfig{
name: "tuna", name: "tuna",
@@ -123,6 +125,7 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
password: "tunasyncpassword", password: "tunasyncpassword",
workingDir: tmpDir, workingDir: tmpDir,
extraOptions: []string{"--delete-excluded"}, extraOptions: []string{"--delete-excluded"},
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
useIPv4: true, useIPv4: true,
@@ -141,7 +144,7 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
Convey("Let's try a run", func() { Convey("Let's try a run", func() {
scriptContent := `#!/bin/bash scriptContent := `#!/bin/bash
echo "syncing to $(pwd)" echo "syncing to $(pwd)"
echo $USER $RSYNC_PASSWORD $@ echo $USER $RSYNC_PASSWORD $RSYNC_PROXY $@
sleep 1 sleep 1
echo "Done" echo "Done"
exit 0 exit 0
@@ -156,10 +159,11 @@ exit 0
"Done\n", "Done\n",
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 -4 --delete-excluded %s %s", "--timeout=120 -4 --delete-excluded %s %s",
provider.username, provider.password, provider.upstreamURL, provider.WorkingDir(), provider.username, provider.password, proxyAddr,
provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -362,7 +366,7 @@ sleep 5
}) })
}) })
Convey("Command Provider with fail-on-match regexp should work", t, func(ctx C) { Convey("Command Provider with RegExprs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync") tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@@ -373,28 +377,73 @@ sleep 5
upstreamURL: "http://mirrors.tuna.moe/", upstreamURL: "http://mirrors.tuna.moe/",
command: "uptime", command: "uptime",
failOnMatch: "", failOnMatch: "",
sizePattern: "",
workingDir: tmpDir, workingDir: tmpDir,
logDir: tmpDir, logDir: tmpDir,
logFile: tmpFile, logFile: tmpFile,
interval: 600 * time.Second, interval: 600 * time.Second,
} }
Convey("when regexp matches", func() { Convey("when fail-on-match regexp matches", func() {
c.failOnMatch = `[a-z]+` c.failOnMatch = `[a-z]+`
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run()
So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty)
})
Convey("when fail-on-match regexp does not match", func() {
c.failOnMatch = `load average_`
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run()
So(err, ShouldBeNil)
})
Convey("when fail-on-match regexp meets /dev/null", func() {
c.failOnMatch = `load average_`
c.logFile = "/dev/null"
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run() err = provider.Run()
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("when regexp does not match", func() { Convey("when size-pattern regexp matches", func() {
c.failOnMatch = `load average_` c.sizePattern = `load average: ([\d\.]+)`
provider, err := newCmdProvider(c) provider, err := newCmdProvider(c)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = provider.Run() err = provider.Run()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(provider.DataSize(), ShouldNotBeEmpty)
_, err = strconv.ParseFloat(provider.DataSize(), 32)
So(err, ShouldBeNil)
})
Convey("when size-pattern regexp does not match", func() {
c.sizePattern = `load ave: ([\d\.]+)`
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run()
So(err, ShouldBeNil)
So(provider.DataSize(), ShouldBeEmpty)
})
Convey("when size-pattern regexp meets /dev/null", func() {
c.sizePattern = `load ave: ([\d\.]+)`
c.logFile = "/dev/null"
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
err = provider.Run()
So(err, ShouldNotBeNil)
So(provider.DataSize(), ShouldBeEmpty)
}) })
}) })
} }
@@ -457,15 +506,15 @@ exit 0
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--timeout=120 --exclude dists/ -6 "+
"--exclude-from %s --delete-excluded --cache %s %s", "--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
targetDir, targetDir,
fmt.Sprintf( fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+ "--delete --delete-after --delay-updates --safe-links "+
"--timeout=120 --contimeout=120 -6 --exclude-from %s --delete-excluded --cache %s %s", "--timeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
), ),
) )
@@ -496,8 +545,8 @@ exit 0
expectedOutput := fmt.Sprintf( expectedOutput := fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+ "-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+ "--timeout=120 --exclude dists/ -6 "+
"--exclude-from %s --delete-excluded --cache %s %s\n", "--exclude-from %s %s %s\n",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(), provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
) )

查看文件

@@ -2,7 +2,6 @@ package worker
import ( import (
"errors" "errors"
"io/ioutil"
"strings" "strings"
"time" "time"
@@ -15,6 +14,7 @@ type rsyncConfig struct {
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
overriddenOptions []string overriddenOptions []string
rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6, useIPv4 bool useIPv6, useIPv4 bool
interval time.Duration interval time.Duration
@@ -50,12 +50,21 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if c.rsyncCmd == "" { if c.rsyncCmd == "" {
provider.rsyncCmd = "rsync" provider.rsyncCmd = "rsync"
} }
if c.rsyncEnv == nil {
provider.rsyncEnv = map[string]string{}
}
if c.username != "" {
provider.rsyncEnv["USER"] = c.username
}
if c.password != "" {
provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
}
options := []string{ options := []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links", "--timeout=120",
} }
if c.overriddenOptions != nil { if c.overriddenOptions != nil {
options = c.overriddenOptions options = c.overriddenOptions
@@ -102,9 +111,7 @@ func (p *rsyncProvider) Run() error {
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
return err return err
} }
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil return nil
} }
@@ -116,18 +123,11 @@ func (p *rsyncProvider) Start() error {
return errors.New("provider is currently running") return errors.New("provider is currently running")
} }
env := map[string]string{}
if p.username != "" {
env["USER"] = p.username
}
if p.password != "" {
env["RSYNC_PASSWORD"] = p.password
}
command := []string{p.rsyncCmd} command := []string{p.rsyncCmd}
command = append(command, p.options...) command = append(command, p.options...)
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
if err := p.prepareLogFile(false); err != nil { if err := p.prepareLogFile(false); err != nil {
return err return err
} }

查看文件

@@ -104,7 +104,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
} }
func (c *cmdJob) Start() error { func (c *cmdJob) Start() error {
// logger.Debugf("Command start: %v", c.cmd.Args) logger.Debugf("Command start: %v", c.cmd.Args)
c.finished = make(chan empty, 1) c.finished = make(chan empty, 1)
return c.cmd.Start() return c.cmd.Start()
} }

查看文件

@@ -3,7 +3,6 @@ package worker
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"time" "time"
@@ -16,6 +15,7 @@ type twoStageRsyncConfig struct {
stage1Profile string stage1Profile string
upstreamURL, username, password, excludeFile string upstreamURL, username, password, excludeFile string
extraOptions []string extraOptions []string
rsyncEnv map[string]string
workingDir, logDir, logFile string workingDir, logDir, logFile string
useIPv6 bool useIPv6 bool
interval time.Duration interval time.Duration
@@ -59,16 +59,25 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
stage1Options: []string{ stage1Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links", "--timeout=120",
}, },
stage2Options: []string{ stage2Options: []string{
"-aHvh", "--no-o", "--no-g", "--stats", "-aHvh", "--no-o", "--no-g", "--stats",
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--delete", "--delete-after", "--delay-updates", "--delete", "--delete-after", "--delay-updates",
"--safe-links", "--timeout=120", "--contimeout=120", "--safe-links", "--timeout=120",
}, },
} }
if c.rsyncEnv == nil {
provider.rsyncEnv = map[string]string{}
}
if c.username != "" {
provider.rsyncEnv["USER"] = c.username
}
if c.password != "" {
provider.rsyncEnv["RSYNC_PASSWORD"] = c.password
}
if c.rsyncCmd == "" { if c.rsyncCmd == "" {
provider.rsyncCmd = "rsync" provider.rsyncCmd = "rsync"
} }
@@ -106,6 +115,9 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
} else if stage == 2 { } else if stage == 2 {
options = append(options, p.stage2Options...) options = append(options, p.stage2Options...)
if p.extraOptions != nil {
options = append(options, p.extraOptions...)
}
} else { } else {
return []string{}, fmt.Errorf("Invalid stage: %d", stage) return []string{}, fmt.Errorf("Invalid stage: %d", stage)
} }
@@ -117,9 +129,6 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
if p.excludeFile != "" { if p.excludeFile != "" {
options = append(options, "--exclude-from", p.excludeFile) options = append(options, "--exclude-from", p.excludeFile)
} }
if p.extraOptions != nil {
options = append(options, p.extraOptions...)
}
return options, nil return options, nil
} }
@@ -132,14 +141,6 @@ func (p *twoStageRsyncProvider) Run() error {
return errors.New("provider is currently running") return errors.New("provider is currently running")
} }
env := map[string]string{}
if p.username != "" {
env["USER"] = p.username
}
if p.password != "" {
env["RSYNC_PASSWORD"] = p.password
}
p.dataSize = "" p.dataSize = ""
stages := []int{1, 2} stages := []int{1, 2}
for _, stage := range stages { for _, stage := range stages {
@@ -151,7 +152,7 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, options...) command = append(command, options...)
command = append(command, p.upstreamURL, p.WorkingDir()) command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env) p.cmd = newCmdJob(p, command, p.WorkingDir(), p.rsyncEnv)
if err := p.prepareLogFile(stage > 1); err != nil { if err := p.prepareLogFile(stage > 1); err != nil {
return err return err
} }
@@ -169,8 +170,6 @@ func (p *twoStageRsyncProvider) Run() error {
return err return err
} }
} }
if logContent, err := ioutil.ReadFile(p.LogFile()); err == nil { p.dataSize = internal.ExtractSizeFromRsyncLog(p.LogFile())
p.dataSize = internal.ExtractSizeFromRsyncLog(logContent)
}
return nil return nil
} }