镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-08 15:36:47 +00:00
比较提交
13 次代码提交
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
2afe1f2e06 | ||
|
|
1b099520b2 | ||
|
|
85b2105a2b | ||
|
|
45e5d900fb | ||
|
|
7b0cd490b7 | ||
|
|
9178966aed | ||
|
|
b5d2a0ad89 | ||
|
|
d8963c9946 | ||
|
|
198afa72cd | ||
|
|
85ce9c1270 | ||
|
|
a8a35fc259 | ||
|
|
c00eb12a75 | ||
|
|
95ae9c16a9 |
@@ -486,7 +486,7 @@ name = "pypi"
|
|||||||
provider = "command"
|
provider = "command"
|
||||||
upstream = "https://pypi.python.org/"
|
upstream = "https://pypi.python.org/"
|
||||||
command = "/home/scripts/pypi.sh"
|
command = "/home/scripts/pypi.sh"
|
||||||
docker_image = "tunathu/tunasync-scripts:latest"
|
docker_image = "tunathu/bandersnatch:latest"
|
||||||
interval = 5
|
interval = 5
|
||||||
|
|
||||||
[[mirrors]]
|
[[mirrors]]
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
// Version of the program
|
// Version of the program
|
||||||
const Version string = "0.6.3"
|
const Version string = "0.6.6"
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -23,6 +24,7 @@ type Manager struct {
|
|||||||
cfg *Config
|
cfg *Config
|
||||||
engine *gin.Engine
|
engine *gin.Engine
|
||||||
adapter dbAdapter
|
adapter dbAdapter
|
||||||
|
rwmu sync.RWMutex
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// listAllJobs repond with all jobs of specified workers
|
// listAllJobs respond with all jobs of specified workers
|
||||||
func (s *Manager) listAllJobs(c *gin.Context) {
|
func (s *Manager) listAllJobs(c *gin.Context) {
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list all mirror status: %s",
|
err := fmt.Errorf("failed to list all mirror status: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
|||||||
|
|
||||||
// flushDisabledJobs deletes all jobs that marks as deleted
|
// flushDisabledJobs deletes all jobs that marks as deleted
|
||||||
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.FlushDisabledJobs()
|
err := s.adapter.FlushDisabledJobs()
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
|||||||
// deleteWorker deletes one worker by id
|
// deleteWorker deletes one worker by id
|
||||||
func (s *Manager) deleteWorker(c *gin.Context) {
|
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.DeleteWorker(workerID)
|
err := s.adapter.DeleteWorker(workerID)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to delete worker: %s",
|
err := fmt.Errorf("failed to delete worker: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||||
}
|
}
|
||||||
|
|
||||||
// listWrokers respond with informations of all the workers
|
// listWorkers respond with information of all the workers
|
||||||
func (s *Manager) listWorkers(c *gin.Context) {
|
func (s *Manager) listWorkers(c *gin.Context) {
|
||||||
var workerInfos []WorkerStatus
|
var workerInfos []WorkerStatus
|
||||||
|
s.rwmu.RLock()
|
||||||
workers, err := s.adapter.ListWorkers()
|
workers, err := s.adapter.ListWorkers()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list workers: %s",
|
err := fmt.Errorf("failed to list workers: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -223,7 +233,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
|
|||||||
// listJobsOfWorker respond with all the jobs of the specified worker
|
// listJobsOfWorker respond with all the jobs of the specified worker
|
||||||
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
||||||
workerID, err.Error(),
|
workerID, err.Error(),
|
||||||
@@ -255,7 +267,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Errorf("failed to get job %s of worker %s: %s",
|
fmt.Errorf("failed to get job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -269,7 +283,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
curStatus.Scheduled = schedule.NextSchedule
|
curStatus.Scheduled = schedule.NextSchedule
|
||||||
|
s.rwmu.Lock()
|
||||||
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -295,7 +311,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
|
|
||||||
curTime := time.Now()
|
curTime := time.Now()
|
||||||
|
|
||||||
@@ -331,7 +349,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -353,7 +373,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
c.BindJSON(&msg)
|
c.BindJSON(&msg)
|
||||||
|
|
||||||
mirrorName := msg.Name
|
mirrorName := msg.Name
|
||||||
|
s.rwmu.RLock()
|
||||||
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf(
|
logger.Errorf(
|
||||||
"Failed to get status of mirror %s @<%s>: %s",
|
"Failed to get status of mirror %s @<%s>: %s",
|
||||||
@@ -370,7 +392,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
|
|
||||||
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -393,7 +417,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
w, err := s.adapter.GetWorker(workerID)
|
w, err := s.adapter.GetWorker(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
||||||
s.returnErrJSON(c, http.StatusBadRequest, err)
|
s.returnErrJSON(c, http.StatusBadRequest, err)
|
||||||
@@ -410,7 +436,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
|
|
||||||
// update job status, even if the job did not disable successfully,
|
// update job status, even if the job did not disable successfully,
|
||||||
// this status should be set as disabled
|
// this status should be set as disabled
|
||||||
|
s.rwmu.RLock()
|
||||||
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
changed := false
|
changed := false
|
||||||
switch clientCmd.Cmd {
|
switch clientCmd.Cmd {
|
||||||
case CmdDisable:
|
case CmdDisable:
|
||||||
@@ -421,7 +449,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
changed = true
|
changed = true
|
||||||
}
|
}
|
||||||
if changed {
|
if changed {
|
||||||
|
s.rwmu.Lock()
|
||||||
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
||||||
|
s.rwmu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("when register multiple workers", func(ctx C) {
|
||||||
|
N := 10
|
||||||
|
var cnt uint32
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
w := WorkerStatus{
|
||||||
|
ID: fmt.Sprintf("worker%d", id),
|
||||||
|
}
|
||||||
|
resp, err := PostJSON(baseURL+"/workers", w, nil)
|
||||||
|
ctx.So(err, ShouldBeNil)
|
||||||
|
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
atomic.AddUint32(&cnt, 1)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
So(cnt, ShouldEqual, N)
|
||||||
|
|
||||||
|
Convey("list all workers", func(ctx C) {
|
||||||
|
resp, err := http.Get(baseURL + "/workers")
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
var actualResponseObj []WorkerStatus
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(len(actualResponseObj), ShouldEqual, N+1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Convey("when register a worker", func(ctx C) {
|
Convey("when register a worker", func(ctx C) {
|
||||||
w := WorkerStatus{
|
w := WorkerStatus{
|
||||||
ID: "test_worker1",
|
ID: "test_worker1",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package worker
|
package worker
|
||||||
|
|
||||||
// put global viables and types here
|
// put global variables and types here
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gopkg.in/op/go-logging.v1"
|
"gopkg.in/op/go-logging.v1"
|
||||||
|
|||||||
@@ -142,6 +142,8 @@ type mirrorConfig struct {
|
|||||||
ExcludeFile string `toml:"exclude_file"`
|
ExcludeFile string `toml:"exclude_file"`
|
||||||
Username string `toml:"username"`
|
Username string `toml:"username"`
|
||||||
Password string `toml:"password"`
|
Password string `toml:"password"`
|
||||||
|
RsyncNoTimeo bool `toml:"rsync_no_timeout"`
|
||||||
|
RsyncTimeout int `toml:"rsync_timeout"`
|
||||||
RsyncOptions []string `toml:"rsync_options"`
|
RsyncOptions []string `toml:"rsync_options"`
|
||||||
RsyncOverride []string `toml:"rsync_override"`
|
RsyncOverride []string `toml:"rsync_override"`
|
||||||
Stage1Profile string `toml:"stage1_profile"`
|
Stage1Profile string `toml:"stage1_profile"`
|
||||||
|
|||||||
@@ -3,6 +3,9 @@ package worker
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/codeskyblue/go-sh"
|
||||||
)
|
)
|
||||||
|
|
||||||
type dockerHook struct {
|
type dockerHook struct {
|
||||||
@@ -16,6 +19,10 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
|
|||||||
volumes := []string{}
|
volumes := []string{}
|
||||||
volumes = append(volumes, gCfg.Volumes...)
|
volumes = append(volumes, gCfg.Volumes...)
|
||||||
volumes = append(volumes, mCfg.DockerVolumes...)
|
volumes = append(volumes, mCfg.DockerVolumes...)
|
||||||
|
if len(mCfg.ExcludeFile) > 0 {
|
||||||
|
arg := fmt.Sprintf("%s:%s:ro", mCfg.ExcludeFile, mCfg.ExcludeFile)
|
||||||
|
volumes = append(volumes, arg)
|
||||||
|
}
|
||||||
|
|
||||||
options := []string{}
|
options := []string{}
|
||||||
options = append(options, gCfg.Options...)
|
options = append(options, gCfg.Options...)
|
||||||
@@ -60,6 +67,27 @@ func (d *dockerHook) postExec() error {
|
|||||||
// sh.Command(
|
// sh.Command(
|
||||||
// "docker", "rm", "-f", d.Name(),
|
// "docker", "rm", "-f", d.Name(),
|
||||||
// ).Run()
|
// ).Run()
|
||||||
|
name := d.Name()
|
||||||
|
retry := 10
|
||||||
|
for ; retry > 0; retry-- {
|
||||||
|
out, err := sh.Command(
|
||||||
|
"docker", "ps", "-a",
|
||||||
|
"--filter", "name=^"+name+"$",
|
||||||
|
"--format", "{{.Status}}",
|
||||||
|
).Output()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("docker ps failed: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
logger.Debugf("container %s still exists: '%s'", name, string(out))
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
if retry == 0 {
|
||||||
|
logger.Warningf("container %s not removed automatically, next sync may fail", name)
|
||||||
|
}
|
||||||
d.provider.ExitContext()
|
d.provider.ExitContext()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -94,22 +94,27 @@ sleep 20
|
|||||||
}
|
}
|
||||||
exitedErr <- err
|
exitedErr <- err
|
||||||
}()
|
}()
|
||||||
cmdRun("ps", []string{"aux"})
|
|
||||||
|
|
||||||
// Wait for docker running
|
// Wait for docker running
|
||||||
time.Sleep(8 * time.Second)
|
for wait := 0; wait < 8; wait++ {
|
||||||
|
names, err := getDockerByName(d.Name())
|
||||||
cmdRun("ps", []string{"aux"})
|
So(err, ShouldBeNil)
|
||||||
|
if names != "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
// cmdRun("ps", []string{"aux"})
|
||||||
|
|
||||||
// assert container running
|
// assert container running
|
||||||
names, err := getDockerByName(d.Name())
|
names, err := getDockerByName(d.Name())
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
// So(names, ShouldEqual, d.Name()+"\n")
|
So(names, ShouldEqual, d.Name()+"\n")
|
||||||
|
|
||||||
err = provider.Terminate()
|
err = provider.Terminate()
|
||||||
// So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
cmdRun("ps", []string{"aux"})
|
// cmdRun("ps", []string{"aux"})
|
||||||
<-exitedErr
|
<-exitedErr
|
||||||
|
|
||||||
// container should be terminated and removed
|
// container should be terminated and removed
|
||||||
|
|||||||
@@ -180,7 +180,6 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
logger.Debug("syncing done")
|
logger.Debug("syncing done")
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
logger.Notice("provider timeout")
|
logger.Notice("provider timeout")
|
||||||
stopASAP = true
|
|
||||||
termErr = provider.Terminate()
|
termErr = provider.Terminate()
|
||||||
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
|
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
|
||||||
case <-kill:
|
case <-kill:
|
||||||
@@ -190,7 +189,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
syncErr = errors.New("killed by manager")
|
syncErr = errors.New("killed by manager")
|
||||||
}
|
}
|
||||||
if termErr != nil {
|
if termErr != nil {
|
||||||
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
|
logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
|
||||||
return termErr
|
return termErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -335,7 +335,6 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
Convey("When a job timed out", func(ctx C) {
|
Convey("When a job timed out", func(ctx C) {
|
||||||
scriptContent := `#!/bin/bash
|
scriptContent := `#!/bin/bash
|
||||||
echo $TUNASYNC_WORKING_DIR
|
echo $TUNASYNC_WORKING_DIR
|
||||||
@@ -371,6 +370,30 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
job.ctrlChan <- jobDisable
|
job.ctrlChan <- jobDisable
|
||||||
<-job.disabled
|
<-job.disabled
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("It should be retried", func(ctx C) {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
msg := <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
|
||||||
|
for i := 0; i < defaultMaxRetry; i++ {
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStart // should be ignored
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Failed)
|
||||||
|
So(msg.msg, ShouldContainSubstring, "timeout after")
|
||||||
|
// re-schedule after last try
|
||||||
|
So(msg.schedule, ShouldEqual, i == defaultMaxRetry-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -140,6 +140,8 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
|||||||
password: mirror.Password,
|
password: mirror.Password,
|
||||||
excludeFile: mirror.ExcludeFile,
|
excludeFile: mirror.ExcludeFile,
|
||||||
extraOptions: mirror.RsyncOptions,
|
extraOptions: mirror.RsyncOptions,
|
||||||
|
rsyncNeverTimeout: mirror.RsyncNoTimeo,
|
||||||
|
rsyncTimeoutValue: mirror.RsyncTimeout,
|
||||||
overriddenOptions: mirror.RsyncOverride,
|
overriddenOptions: mirror.RsyncOverride,
|
||||||
rsyncEnv: mirror.Env,
|
rsyncEnv: mirror.Env,
|
||||||
workingDir: mirrorDir,
|
workingDir: mirrorDir,
|
||||||
@@ -167,6 +169,8 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
|||||||
password: mirror.Password,
|
password: mirror.Password,
|
||||||
excludeFile: mirror.ExcludeFile,
|
excludeFile: mirror.ExcludeFile,
|
||||||
extraOptions: mirror.RsyncOptions,
|
extraOptions: mirror.RsyncOptions,
|
||||||
|
rsyncNeverTimeout: mirror.RsyncNoTimeo,
|
||||||
|
rsyncTimeoutValue: mirror.RsyncTimeout,
|
||||||
rsyncEnv: mirror.Env,
|
rsyncEnv: mirror.Env,
|
||||||
workingDir: mirrorDir,
|
workingDir: mirrorDir,
|
||||||
logDir: logDir,
|
logDir: logDir,
|
||||||
|
|||||||
@@ -155,6 +155,7 @@ func TestRsyncProviderWithAuthentication(t *testing.T) {
|
|||||||
password: "tunasyncpassword",
|
password: "tunasyncpassword",
|
||||||
workingDir: tmpDir,
|
workingDir: tmpDir,
|
||||||
extraOptions: []string{"--delete-excluded"},
|
extraOptions: []string{"--delete-excluded"},
|
||||||
|
rsyncTimeoutValue: 30,
|
||||||
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
|
rsyncEnv: map[string]string{"RSYNC_PROXY": proxyAddr},
|
||||||
logDir: tmpDir,
|
logDir: tmpDir,
|
||||||
logFile: tmpFile,
|
logFile: tmpFile,
|
||||||
@@ -191,7 +192,7 @@ exit 0
|
|||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"%s %s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
"--timeout=120 -4 --delete-excluded %s %s",
|
"--timeout=30 -4 --delete-excluded %s %s",
|
||||||
provider.username, provider.password, proxyAddr,
|
provider.username, provider.password, proxyAddr,
|
||||||
provider.upstreamURL, provider.WorkingDir(),
|
provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
@@ -221,6 +222,7 @@ func TestRsyncProviderWithOverriddenOptions(t *testing.T) {
|
|||||||
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
upstreamURL: "rsync://rsync.tuna.moe/tuna/",
|
||||||
rsyncCmd: scriptFile,
|
rsyncCmd: scriptFile,
|
||||||
workingDir: tmpDir,
|
workingDir: tmpDir,
|
||||||
|
rsyncNeverTimeout: true,
|
||||||
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
|
overriddenOptions: []string{"-aHvh", "--no-o", "--no-g", "--stats"},
|
||||||
extraOptions: []string{"--delete-excluded"},
|
extraOptions: []string{"--delete-excluded"},
|
||||||
logDir: tmpDir,
|
logDir: tmpDir,
|
||||||
@@ -270,6 +272,78 @@ exit 0
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRsyncProviderWithDocker(t *testing.T) {
|
||||||
|
Convey("Rsync in Docker should work", t, func() {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
scriptFile := filepath.Join(tmpDir, "myrsync")
|
||||||
|
excludeFile := filepath.Join(tmpDir, "exclude.txt")
|
||||||
|
|
||||||
|
g := &Config{
|
||||||
|
Global: globalConfig{
|
||||||
|
Retry: 2,
|
||||||
|
},
|
||||||
|
Docker: dockerConfig{
|
||||||
|
Enable: true,
|
||||||
|
Volumes: []string{
|
||||||
|
scriptFile + ":/bin/myrsync",
|
||||||
|
"/etc/gai.conf:/etc/gai.conf:ro",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
c := mirrorConfig{
|
||||||
|
Name: "tuna",
|
||||||
|
Provider: provRsync,
|
||||||
|
Upstream: "rsync://rsync.tuna.moe/tuna/",
|
||||||
|
Command: "/bin/myrsync",
|
||||||
|
ExcludeFile: excludeFile,
|
||||||
|
DockerImage: "alpine:3.8",
|
||||||
|
LogDir: tmpDir,
|
||||||
|
MirrorDir: tmpDir,
|
||||||
|
UseIPv6: true,
|
||||||
|
Timeout: 100,
|
||||||
|
Interval: 600,
|
||||||
|
}
|
||||||
|
|
||||||
|
provider := newMirrorProvider(c, g)
|
||||||
|
|
||||||
|
So(provider.Type(), ShouldEqual, provRsync)
|
||||||
|
So(provider.Name(), ShouldEqual, c.Name)
|
||||||
|
So(provider.WorkingDir(), ShouldEqual, c.MirrorDir)
|
||||||
|
So(provider.LogDir(), ShouldEqual, c.LogDir)
|
||||||
|
|
||||||
|
cmdScriptContent := `#!/bin/sh
|
||||||
|
#echo "$@"
|
||||||
|
while [[ $# -gt 0 ]]; do
|
||||||
|
if [[ "$1" = "--exclude-from" ]]; then
|
||||||
|
cat "$2"
|
||||||
|
shift
|
||||||
|
fi
|
||||||
|
shift
|
||||||
|
done
|
||||||
|
`
|
||||||
|
err = ioutil.WriteFile(scriptFile, []byte(cmdScriptContent), 0755)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
err = ioutil.WriteFile(excludeFile, []byte("__some_pattern"), 0755)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
for _, hook := range provider.Hooks() {
|
||||||
|
err = hook.preExec()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
}
|
||||||
|
err = provider.Run(make(chan empty, 1))
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
for _, hook := range provider.Hooks() {
|
||||||
|
err = hook.postExec()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
}
|
||||||
|
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(string(loggedContent), ShouldEqual, "__some_pattern")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestCmdProvider(t *testing.T) {
|
func TestCmdProvider(t *testing.T) {
|
||||||
Convey("Command Provider should work", t, func(ctx C) {
|
Convey("Command Provider should work", t, func(ctx C) {
|
||||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
@@ -499,6 +573,7 @@ func TestTwoStageRsyncProvider(t *testing.T) {
|
|||||||
logFile: tmpFile,
|
logFile: tmpFile,
|
||||||
useIPv6: true,
|
useIPv6: true,
|
||||||
excludeFile: tmpFile,
|
excludeFile: tmpFile,
|
||||||
|
rsyncTimeoutValue: 30,
|
||||||
extraOptions: []string{"--delete-excluded", "--cache"},
|
extraOptions: []string{"--delete-excluded", "--cache"},
|
||||||
username: "hello",
|
username: "hello",
|
||||||
password: "world",
|
password: "world",
|
||||||
@@ -539,7 +614,7 @@ exit 0
|
|||||||
targetDir,
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
||||||
"--timeout=120 --exclude dists/ -6 "+
|
"--exclude dists/ --timeout=30 -6 "+
|
||||||
"--exclude-from %s %s %s",
|
"--exclude-from %s %s %s",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
@@ -547,7 +622,7 @@ exit 0
|
|||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
"--timeout=120 --delete-excluded --cache -6 --exclude-from %s %s %s",
|
"--delete-excluded --cache --timeout=30 -6 --exclude-from %s %s %s",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -581,7 +656,7 @@ exit 0
|
|||||||
|
|
||||||
expectedOutput := fmt.Sprintf(
|
expectedOutput := fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
||||||
"--timeout=120 --exclude dists/ -6 "+
|
"--exclude dists/ --timeout=30 -6 "+
|
||||||
"--exclude-from %s %s %s\n",
|
"--exclude-from %s %s %s\n",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package worker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -14,6 +15,8 @@ type rsyncConfig struct {
|
|||||||
upstreamURL, username, password, excludeFile string
|
upstreamURL, username, password, excludeFile string
|
||||||
extraOptions []string
|
extraOptions []string
|
||||||
overriddenOptions []string
|
overriddenOptions []string
|
||||||
|
rsyncNeverTimeout bool
|
||||||
|
rsyncTimeoutValue int
|
||||||
rsyncEnv map[string]string
|
rsyncEnv map[string]string
|
||||||
workingDir, logDir, logFile string
|
workingDir, logDir, logFile string
|
||||||
useIPv6, useIPv4 bool
|
useIPv6, useIPv4 bool
|
||||||
@@ -66,12 +69,20 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
|||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--exclude", ".~tmp~/",
|
||||||
"--delete", "--delete-after", "--delay-updates",
|
"--delete", "--delete-after", "--delay-updates",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
}
|
}
|
||||||
if c.overriddenOptions != nil {
|
if c.overriddenOptions != nil {
|
||||||
options = c.overriddenOptions
|
options = c.overriddenOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !c.rsyncNeverTimeout {
|
||||||
|
timeo := 120
|
||||||
|
if c.rsyncTimeoutValue > 0 {
|
||||||
|
timeo = c.rsyncTimeoutValue
|
||||||
|
}
|
||||||
|
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
|
||||||
|
}
|
||||||
|
|
||||||
if c.useIPv6 {
|
if c.useIPv6 {
|
||||||
options = append(options, "-6")
|
options = append(options, "-6")
|
||||||
} else if c.useIPv4 {
|
} else if c.useIPv4 {
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ type twoStageRsyncConfig struct {
|
|||||||
stage1Profile string
|
stage1Profile string
|
||||||
upstreamURL, username, password, excludeFile string
|
upstreamURL, username, password, excludeFile string
|
||||||
extraOptions []string
|
extraOptions []string
|
||||||
|
rsyncNeverTimeout bool
|
||||||
|
rsyncTimeoutValue int
|
||||||
rsyncEnv map[string]string
|
rsyncEnv map[string]string
|
||||||
workingDir, logDir, logFile string
|
workingDir, logDir, logFile string
|
||||||
useIPv6 bool
|
useIPv6 bool
|
||||||
@@ -61,13 +63,13 @@ func newTwoStageRsyncProvider(c twoStageRsyncConfig) (*twoStageRsyncProvider, er
|
|||||||
stage1Options: []string{
|
stage1Options: []string{
|
||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--exclude", ".~tmp~/",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
},
|
},
|
||||||
stage2Options: []string{
|
stage2Options: []string{
|
||||||
"-aHvh", "--no-o", "--no-g", "--stats",
|
"-aHvh", "--no-o", "--no-g", "--stats",
|
||||||
"--exclude", ".~tmp~/",
|
"--exclude", ".~tmp~/",
|
||||||
"--delete", "--delete-after", "--delay-updates",
|
"--delete", "--delete-after", "--delay-updates",
|
||||||
"--safe-links", "--timeout=120",
|
"--safe-links",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,6 +126,14 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
|||||||
return []string{}, fmt.Errorf("Invalid stage: %d", stage)
|
return []string{}, fmt.Errorf("Invalid stage: %d", stage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !p.rsyncNeverTimeout {
|
||||||
|
timeo := 120
|
||||||
|
if p.rsyncTimeoutValue > 0 {
|
||||||
|
timeo = p.rsyncTimeoutValue
|
||||||
|
}
|
||||||
|
options = append(options, fmt.Sprintf("--timeout=%d", timeo))
|
||||||
|
}
|
||||||
|
|
||||||
if p.useIPv6 {
|
if p.useIPv6 {
|
||||||
options = append(options, "-6")
|
options = append(options, "-6")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
|
|||||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||||
url := fmt.Sprintf("%s/workers", root)
|
url := fmt.Sprintf("%s/workers", root)
|
||||||
logger.Debugf("register on manager url: %s", url)
|
logger.Debugf("register on manager url: %s", url)
|
||||||
|
for retry := 10; retry > 0; {
|
||||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||||
logger.Errorf("Failed to register worker")
|
logger.Errorf("Failed to register worker")
|
||||||
|
retry--
|
||||||
|
if retry > 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
logger.Noticef("Retrying... (%d)", retry)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
在新工单中引用
屏蔽一个用户