1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-07 15:06:47 +00:00

19 次代码提交

作者 SHA1 备注 提交日期
Miao Wang
a4d94cae07 bump to version 0.6.7 2020-09-11 18:21:15 +08:00
Miao Wang
8ebace4d9a Add support for multiarch builds 2020-09-11 17:59:33 +08:00
Yuxiang Zhang
b578237df8 Merge pull request #134 from tuna/worker-last-online-register
Worker last online and last register
2020-09-10 23:08:57 +08:00
Jiajie Chen
9f7f18c2c4 Fix missing method in mock test 2020-09-10 21:58:31 +08:00
Jiajie Chen
fd274cc976 Refresh worker LastOnline when worker updates 2020-09-10 21:51:33 +08:00
Jiajie Chen
b4b81ef7e9 Fix typo: registor -> register 2020-09-10 21:32:22 +08:00
Jiajie Chen
c8600d094e Add LastRegister to WorkerStatus 2020-09-10 21:31:31 +08:00
z4yx
2ba3a27fa3 ignore the SIGTERM failure 2020-09-06 19:23:26 +08:00
Yuxiang Zhang
b34238c097 Merge pull request #130 from tuna/add-bolt-open-timeout
Add 5 seconds timeout for bolt
2020-08-05 12:41:29 +08:00
Jiajie Chen
16e458f354 Add 5 seconds timeout for bolt 2020-08-03 14:46:45 +08:00
Yuxiang Zhang
16b4df1ec2 Merge pull request #127 from hxsf/patch-1
fix examlpe with docker_image
2020-06-30 09:47:28 +08:00
呼啸随风
e3c8cded6c fix examlpe with docker_image
If `docker.enable` not be `true`, the worker will ignore docker provider's config, and just exec the command.
so we need to doc it.
2020-06-30 09:45:57 +08:00
Yuxiang Zhang
3809df6cfb Merge pull request #126 from lrh3321/master
Add `--format` and `--status` for tunasynctl
2020-06-22 12:58:15 +08:00
zack.liu
600874ae54 Add --format and --status for tunasynctl 2020-06-22 11:25:18 +08:00
z4yx
2afe1f2e06 bump version to 0.6.6 2020-06-17 22:12:11 +08:00
z4yx
1b099520b2 [manager] protect DB with RW lock 2020-06-17 22:10:39 +08:00
z4yx
85b2105a2b [worker] retry registration 2020-06-17 21:34:55 +08:00
zyx
45e5d900fb bump version to 0.6.5 2020-06-08 22:30:28 +08:00
zyx
7b0cd490b7 fix misuse of a variable 2020-06-08 22:23:12 +08:00
共有 16 个文件被更改,包括 218 次插入54 次删除

查看文件

@@ -21,16 +21,12 @@ jobs:
- name: Check out code into the Go module directory - name: Check out code into the Go module directory
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Get dependencies
run: |
go get -v -t -d ./cmd/tunasync
go get -v -t -d ./cmd/tunasynctl
- name: Build - name: Build
run: | run: |
make tunasync for i in linux-amd64 linux-arm64; do
make tunasynctl make ARCH=$i all
tar -jcf build/tunasync-linux-bin.tar.bz2 -C build tunasync tunasynctl tar -cz --numeric-owner --owner root -f tunasync-$i-bin.tar.gz -C build-$i tunasync tunasynctl
done
- name: Create Release - name: Create Release
id: create_release id: create_release
@@ -42,13 +38,9 @@ jobs:
release_name: Release ${{ github.ref }} release_name: Release ${{ github.ref }}
draft: false draft: false
prerelease: false prerelease: false
- name: Upload Release Asset - name: Upload Release Assets
id: upload-release-asset
uses: actions/upload-release-asset@v1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with: TAG_NAME: ${{ github.ref }}
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps run: |
asset_path: ./build/tunasync-linux-bin.tar.bz2 hub release edit $(find . -type f -name "tunasync-*.tar.gz" -printf "-a %p ") -m "" "${TAG_NAME##*/}"
asset_name: tunasync-linux-bin.tar.bz2
asset_content_type: application/x-bzip2

查看文件

@@ -32,7 +32,7 @@ jobs:
uses: actions/upload-artifact@v1 uses: actions/upload-artifact@v1
with: with:
name: tunasync-bin name: tunasync-bin
path: build/ path: build-linux-amd64/
test: test:
name: Test name: Test

1
.gitignore vendored
查看文件

@@ -1 +1,2 @@
/build /build
/build-*

查看文件

@@ -1,19 +1,22 @@
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`" LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
ARCH ?= linux-amd64
ARCH_LIST = $(subst -, ,$(ARCH))
GOOS = $(word 1, $(ARCH_LIST))
GOARCH = $(word 2, $(ARCH_LIST))
BUILDBIN = tunasync tunasynctl
all: get tunasync tunasynctl all: $(BUILDBIN)
get: build-$(ARCH):
go get ./cmd/tunasync mkdir -p $@
go get ./cmd/tunasynctl
build: $(BUILDBIN): % : build-$(ARCH) build-$(ARCH)/%
mkdir -p build
tunasync: build $(BUILDBIN:%=build-$(ARCH)/%) : build-$(ARCH)/% : cmd/%
go build -o build/tunasync -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasync GOOS=$(GOOS) GOARCH=$(GOARCH) go get ./$<
GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $@ -ldflags ${LDFLAGS} github.com/tuna/tunasync/$<
tunasynctl: build
go build -o build/tunasynctl -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasynctl
test: test:
go test -v -covermode=count -coverprofile=profile.cov ./... go test -v -covermode=count -coverprofile=profile.cov ./...
.PHONY: all test $(BUILDBIN)

查看文件

@@ -8,6 +8,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"text/template"
"time" "time"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
@@ -160,8 +161,31 @@ func listJobs(c *cli.Context) error {
"of all jobs from manager server: %s", err.Error()), "of all jobs from manager server: %s", err.Error()),
1) 1)
} }
genericJobs = jobs if statusStr := c.String("status"); statusStr != "" {
filteredJobs := make([]tunasync.WebMirrorStatus, 0, len(jobs))
var statuses []tunasync.SyncStatus
for _, s := range strings.Split(statusStr, ",") {
var status tunasync.SyncStatus
err = status.UnmarshalJSON([]byte("\"" + strings.TrimSpace(s) + "\""))
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error parsing status: %s", err.Error()),
1)
}
statuses = append(statuses, status)
}
for _, job := range jobs {
for _, s := range statuses {
if job.Status == s {
filteredJobs = append(filteredJobs, job)
break
}
}
}
genericJobs = filteredJobs
} else {
genericJobs = jobs
}
} else { } else {
var jobs []tunasync.MirrorStatus var jobs []tunasync.MirrorStatus
args := c.Args() args := c.Args()
@@ -196,13 +220,46 @@ func listJobs(c *cli.Context) error {
genericJobs = jobs genericJobs = jobs
} }
b, err := json.MarshalIndent(genericJobs, "", " ") if format := c.String("format"); format != "" {
if err != nil { tpl := template.New("")
return cli.NewExitError( _, err := tpl.Parse(format)
fmt.Sprintf("Error printing out information: %s", err.Error()), if err != nil {
1) return cli.NewExitError(
fmt.Sprintf("Error parsing format template: %s", err.Error()),
1)
}
switch jobs := genericJobs.(type) {
case []tunasync.WebMirrorStatus:
for _, job := range jobs {
err = tpl.Execute(os.Stdout, job)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println()
}
case []tunasync.MirrorStatus:
for _, job := range jobs {
err = tpl.Execute(os.Stdout, job)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println()
}
}
} else {
b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out information: %s", err.Error()),
1)
}
fmt.Println(string(b))
} }
fmt.Println(string(b))
return nil return nil
} }
@@ -506,6 +563,14 @@ func main() {
Name: "all, a", Name: "all, a",
Usage: "List all jobs of all workers", Usage: "List all jobs of all workers",
}, },
cli.StringFlag{
Name: "status, s",
Usage: "Filter output based on status provided",
},
cli.StringFlag{
Name: "format, f",
Usage: "Pretty-print containers using a Go template",
},
}...), }...),
Action: initializeWrapper(listJobs), Action: initializeWrapper(listJobs),
}, },

查看文件

@@ -7,6 +7,11 @@ mirror_dir = "/srv/tunasync"
concurrent = 10 concurrent = 10
interval = 1 interval = 1
# ensure the exec user be add into `docker` group
[docker]
# in `command provider` can use docker_image and docker_volumes
enable = true
[manager] [manager]
api_base = "http://localhost:12345" api_base = "http://localhost:12345"
token = "some_token" token = "some_token"

查看文件

@@ -24,10 +24,11 @@ type MirrorStatus struct {
// A WorkerStatus is the information struct that describe // A WorkerStatus is the information struct that describe
// a worker, and sent from the manager to clients. // a worker, and sent from the manager to clients.
type WorkerStatus struct { type WorkerStatus struct {
ID string `json:"id"` ID string `json:"id"`
URL string `json:"url"` // worker url URL string `json:"url"` // worker url
Token string `json:"token"` // session token Token string `json:"token"` // session token
LastOnline time.Time `json:"last_online"` // last seen LastOnline time.Time `json:"last_online"` // last seen
LastRegister time.Time `json:"last_register"` // last register time
} }
type MirrorSchedules struct { type MirrorSchedules struct {

查看文件

@@ -1,4 +1,4 @@
package internal package internal
// Version of the program // Version of the program
const Version string = "0.6.4" const Version string = "0.6.7"

查看文件

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
@@ -16,6 +17,7 @@ type dbAdapter interface {
GetWorker(workerID string) (WorkerStatus, error) GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error) CreateWorker(w WorkerStatus) (WorkerStatus, error)
RefreshWorker(workerID string) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
ListMirrorStatus(workerID string) ([]MirrorStatus, error) ListMirrorStatus(workerID string) ([]MirrorStatus, error)
@@ -26,7 +28,9 @@ type dbAdapter interface {
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) { func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
if dbType == "bolt" { if dbType == "bolt" {
innerDB, err := bolt.Open(dbFile, 0600, nil) innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
Timeout: 5 * time.Second,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -122,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, err return w, err
} }
func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) { func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID id := mirrorID + "/" + workerID
err := b.db.Update(func(tx *bolt.Tx) error { err := b.db.Update(func(tx *bolt.Tx) error {

查看文件

@@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
ID: id, ID: id,
Token: "token_" + id, Token: "token_" + id,
LastOnline: time.Now(), LastOnline: time.Now(),
LastRegister: time.Now(),
} }
w, err = boltDB.CreateWorker(w) w, err = boltDB.CreateWorker(w)
So(err, ShouldBeNil) So(err, ShouldBeNil)

查看文件

@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -23,6 +24,7 @@ type Manager struct {
cfg *Config cfg *Config
engine *gin.Engine engine *gin.Engine
adapter dbAdapter adapter dbAdapter
rwmu sync.RWMutex
httpClient *http.Client httpClient *http.Client
} }
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
} }
} }
// listAllJobs repond with all jobs of specified workers // listAllJobs respond with all jobs of specified workers
func (s *Manager) listAllJobs(c *gin.Context) { func (s *Manager) listAllJobs(c *gin.Context) {
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListAllMirrorStatus() mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list all mirror status: %s", err := fmt.Errorf("failed to list all mirror status: %s",
err.Error(), err.Error(),
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
// flushDisabledJobs deletes all jobs that marks as deleted // flushDisabledJobs deletes all jobs that marks as deleted
func (s *Manager) flushDisabledJobs(c *gin.Context) { func (s *Manager) flushDisabledJobs(c *gin.Context) {
s.rwmu.Lock()
err := s.adapter.FlushDisabledJobs() err := s.adapter.FlushDisabledJobs()
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to flush disabled jobs: %s", err := fmt.Errorf("failed to flush disabled jobs: %s",
err.Error(), err.Error(),
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
// deleteWorker deletes one worker by id // deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) { func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.Lock()
err := s.adapter.DeleteWorker(workerID) err := s.adapter.DeleteWorker(workerID)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to delete worker: %s", err := fmt.Errorf("failed to delete worker: %s",
err.Error(), err.Error(),
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"}) c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
} }
// listWrokers respond with informations of all the workers // listWorkers respond with information of all the workers
func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus var workerInfos []WorkerStatus
s.rwmu.RLock()
workers, err := s.adapter.ListWorkers() workers, err := s.adapter.ListWorkers()
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list workers: %s", err := fmt.Errorf("failed to list workers: %s",
err.Error(), err.Error(),
@@ -193,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) {
for _, w := range workers { for _, w := range workers {
workerInfos = append(workerInfos, workerInfos = append(workerInfos,
WorkerStatus{ WorkerStatus{
ID: w.ID, ID: w.ID,
LastOnline: w.LastOnline, LastOnline: w.LastOnline,
LastRegister: w.LastRegister,
}) })
} }
c.JSON(http.StatusOK, workerInfos) c.JSON(http.StatusOK, workerInfos)
@@ -205,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus var _worker WorkerStatus
c.BindJSON(&_worker) c.BindJSON(&_worker)
_worker.LastOnline = time.Now() _worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
newWorker, err := s.adapter.CreateWorker(_worker) newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil { if err != nil {
err := fmt.Errorf("failed to register worker: %s", err := fmt.Errorf("failed to register worker: %s",
@@ -223,7 +235,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
// listJobsOfWorker respond with all the jobs of the specified worker // listJobsOfWorker respond with all the jobs of the specified worker
func (s *Manager) listJobsOfWorker(c *gin.Context) { func (s *Manager) listJobsOfWorker(c *gin.Context) {
workerID := c.Param("id") workerID := c.Param("id")
s.rwmu.RLock()
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID) mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to list jobs of worker %s: %s", err := fmt.Errorf("failed to list jobs of worker %s: %s",
workerID, err.Error(), workerID, err.Error(),
@@ -255,7 +269,10 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
fmt.Errorf("failed to get job %s of worker %s: %s", fmt.Errorf("failed to get job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -269,7 +286,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
} }
curStatus.Scheduled = schedule.NextSchedule curStatus.Scheduled = schedule.NextSchedule
s.rwmu.Lock()
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus) _, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -295,7 +314,10 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
) )
} }
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName) curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
curTime := time.Now() curTime := time.Now()
@@ -331,7 +353,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status) logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
} }
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -353,7 +377,10 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
c.BindJSON(&msg) c.BindJSON(&msg)
mirrorName := msg.Name mirrorName := msg.Name
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName) status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil { if err != nil {
logger.Errorf( logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s", "Failed to get status of mirror %s @<%s>: %s",
@@ -370,7 +397,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size) logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
s.rwmu.Lock()
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status) newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
s.rwmu.Unlock()
if err != nil { if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s", err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(), mirrorName, workerID, err.Error(),
@@ -393,7 +422,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
return return
} }
s.rwmu.RLock()
w, err := s.adapter.GetWorker(workerID) w, err := s.adapter.GetWorker(workerID)
s.rwmu.RUnlock()
if err != nil { if err != nil {
err := fmt.Errorf("worker %s is not registered yet", workerID) err := fmt.Errorf("worker %s is not registered yet", workerID)
s.returnErrJSON(c, http.StatusBadRequest, err) s.returnErrJSON(c, http.StatusBadRequest, err)
@@ -410,7 +441,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
// update job status, even if the job did not disable successfully, // update job status, even if the job did not disable successfully,
// this status should be set as disabled // this status should be set as disabled
s.rwmu.RLock()
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID) curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
s.rwmu.RUnlock()
changed := false changed := false
switch clientCmd.Cmd { switch clientCmd.Cmd {
case CmdDisable: case CmdDisable:
@@ -421,7 +454,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
changed = true changed = true
} }
if changed { if changed {
s.rwmu.Lock()
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat) s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
s.rwmu.Unlock()
} }
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID) logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)

查看文件

@@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail")) So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
}) })
Convey("when register multiple workers", func(ctx C) {
N := 10
var cnt uint32
for i := 0; i < N; i++ {
go func(id int) {
w := WorkerStatus{
ID: fmt.Sprintf("worker%d", id),
}
resp, err := PostJSON(baseURL+"/workers", w, nil)
ctx.So(err, ShouldBeNil)
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
atomic.AddUint32(&cnt, 1)
}(i)
}
time.Sleep(2 * time.Second)
So(cnt, ShouldEqual, N)
Convey("list all workers", func(ctx C) {
resp, err := http.Get(baseURL + "/workers")
So(err, ShouldBeNil)
defer resp.Body.Close()
var actualResponseObj []WorkerStatus
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
So(err, ShouldBeNil)
So(len(actualResponseObj), ShouldEqual, N+1)
})
})
Convey("when register a worker", func(ctx C) { Convey("when register a worker", func(ctx C) {
w := WorkerStatus{ w := WorkerStatus{
ID: "test_worker1", ID: "test_worker1",
@@ -433,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, nil return w, nil
} }
func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) { func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
id := mirrorID + "/" + workerID id := mirrorID + "/" + workerID
status, ok := b.statusStore[id] status, ok := b.statusStore[id]

查看文件

@@ -189,7 +189,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncErr = errors.New("killed by manager") syncErr = errors.New("killed by manager")
} }
if termErr != nil { if termErr != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error()) logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
return termErr return termErr
} }

查看文件

@@ -149,10 +149,10 @@ func (c *cmdJob) Terminate() error {
select { select {
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL) unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
return errors.New("SIGTERM failed to kill the job") logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
case <-c.finished: case <-c.finished:
return nil
} }
return nil
} }
// Copied from go-sh // Copied from go-sh

查看文件

@@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
// Run runs worker forever // Run runs worker forever
func (w *Worker) Run() { func (w *Worker) Run() {
w.registorWorker() w.registerWorker()
go w.runHTTPServer() go w.runHTTPServer()
w.runSchedule() w.runSchedule()
} }
@@ -393,7 +393,7 @@ func (w *Worker) URL() string {
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port) return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
} }
func (w *Worker) registorWorker() { func (w *Worker) registerWorker() {
msg := WorkerStatus{ msg := WorkerStatus{
ID: w.Name(), ID: w.Name(),
URL: w.URL(), URL: w.URL(),
@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
for _, root := range w.cfg.Manager.APIBaseList() { for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf("%s/workers", root) url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url) logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil { for retry := 10; retry > 0; {
logger.Errorf("Failed to register worker") if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
retry--
if retry > 0 {
time.Sleep(1 * time.Second)
logger.Noticef("Retrying... (%d)", retry)
}
} else {
break
}
} }
} }
} }

查看文件

@@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
var _worker WorkerStatus var _worker WorkerStatus
c.BindJSON(&_worker) c.BindJSON(&_worker)
_worker.LastOnline = time.Now() _worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
recvData <- _worker recvData <- _worker
c.JSON(http.StatusOK, _worker) c.JSON(http.StatusOK, _worker)
}) })