镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-07 15:06:47 +00:00
比较提交
19 次代码提交
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
a4d94cae07 | ||
|
|
8ebace4d9a | ||
|
|
b578237df8 | ||
|
|
9f7f18c2c4 | ||
|
|
fd274cc976 | ||
|
|
b4b81ef7e9 | ||
|
|
c8600d094e | ||
|
|
2ba3a27fa3 | ||
|
|
b34238c097 | ||
|
|
16e458f354 | ||
|
|
16b4df1ec2 | ||
|
|
e3c8cded6c | ||
|
|
3809df6cfb | ||
|
|
600874ae54 | ||
|
|
2afe1f2e06 | ||
|
|
1b099520b2 | ||
|
|
85b2105a2b | ||
|
|
45e5d900fb | ||
|
|
7b0cd490b7 |
26
.github/workflows/release.yml
vendored
26
.github/workflows/release.yml
vendored
@@ -21,16 +21,12 @@ jobs:
|
|||||||
- name: Check out code into the Go module directory
|
- name: Check out code into the Go module directory
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
- name: Get dependencies
|
|
||||||
run: |
|
|
||||||
go get -v -t -d ./cmd/tunasync
|
|
||||||
go get -v -t -d ./cmd/tunasynctl
|
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: |
|
run: |
|
||||||
make tunasync
|
for i in linux-amd64 linux-arm64; do
|
||||||
make tunasynctl
|
make ARCH=$i all
|
||||||
tar -jcf build/tunasync-linux-bin.tar.bz2 -C build tunasync tunasynctl
|
tar -cz --numeric-owner --owner root -f tunasync-$i-bin.tar.gz -C build-$i tunasync tunasynctl
|
||||||
|
done
|
||||||
|
|
||||||
- name: Create Release
|
- name: Create Release
|
||||||
id: create_release
|
id: create_release
|
||||||
@@ -42,13 +38,9 @@ jobs:
|
|||||||
release_name: Release ${{ github.ref }}
|
release_name: Release ${{ github.ref }}
|
||||||
draft: false
|
draft: false
|
||||||
prerelease: false
|
prerelease: false
|
||||||
- name: Upload Release Asset
|
- name: Upload Release Assets
|
||||||
id: upload-release-asset
|
|
||||||
uses: actions/upload-release-asset@v1
|
|
||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
with:
|
TAG_NAME: ${{ github.ref }}
|
||||||
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
|
run: |
|
||||||
asset_path: ./build/tunasync-linux-bin.tar.bz2
|
hub release edit $(find . -type f -name "tunasync-*.tar.gz" -printf "-a %p ") -m "" "${TAG_NAME##*/}"
|
||||||
asset_name: tunasync-linux-bin.tar.bz2
|
|
||||||
asset_content_type: application/x-bzip2
|
|
||||||
|
|||||||
2
.github/workflows/tunasync.yml
vendored
2
.github/workflows/tunasync.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
|||||||
uses: actions/upload-artifact@v1
|
uses: actions/upload-artifact@v1
|
||||||
with:
|
with:
|
||||||
name: tunasync-bin
|
name: tunasync-bin
|
||||||
path: build/
|
path: build-linux-amd64/
|
||||||
|
|
||||||
test:
|
test:
|
||||||
name: Test
|
name: Test
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1 +1,2 @@
|
|||||||
/build
|
/build
|
||||||
|
/build-*
|
||||||
|
|||||||
25
Makefile
25
Makefile
@@ -1,19 +1,22 @@
|
|||||||
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
|
LDFLAGS="-X main.buildstamp=`date -u '+%s'` -X main.githash=`git rev-parse HEAD`"
|
||||||
|
ARCH ?= linux-amd64
|
||||||
|
ARCH_LIST = $(subst -, ,$(ARCH))
|
||||||
|
GOOS = $(word 1, $(ARCH_LIST))
|
||||||
|
GOARCH = $(word 2, $(ARCH_LIST))
|
||||||
|
BUILDBIN = tunasync tunasynctl
|
||||||
|
|
||||||
all: get tunasync tunasynctl
|
all: $(BUILDBIN)
|
||||||
|
|
||||||
get:
|
build-$(ARCH):
|
||||||
go get ./cmd/tunasync
|
mkdir -p $@
|
||||||
go get ./cmd/tunasynctl
|
|
||||||
|
|
||||||
build:
|
$(BUILDBIN): % : build-$(ARCH) build-$(ARCH)/%
|
||||||
mkdir -p build
|
|
||||||
|
|
||||||
tunasync: build
|
$(BUILDBIN:%=build-$(ARCH)/%) : build-$(ARCH)/% : cmd/%
|
||||||
go build -o build/tunasync -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasync
|
GOOS=$(GOOS) GOARCH=$(GOARCH) go get ./$<
|
||||||
|
GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $@ -ldflags ${LDFLAGS} github.com/tuna/tunasync/$<
|
||||||
tunasynctl: build
|
|
||||||
go build -o build/tunasynctl -ldflags ${LDFLAGS} github.com/tuna/tunasync/cmd/tunasynctl
|
|
||||||
|
|
||||||
test:
|
test:
|
||||||
go test -v -covermode=count -coverprofile=profile.cov ./...
|
go test -v -covermode=count -coverprofile=profile.cov ./...
|
||||||
|
|
||||||
|
.PHONY: all test $(BUILDBIN)
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
"github.com/BurntSushi/toml"
|
||||||
@@ -160,8 +161,31 @@ func listJobs(c *cli.Context) error {
|
|||||||
"of all jobs from manager server: %s", err.Error()),
|
"of all jobs from manager server: %s", err.Error()),
|
||||||
1)
|
1)
|
||||||
}
|
}
|
||||||
genericJobs = jobs
|
if statusStr := c.String("status"); statusStr != "" {
|
||||||
|
filteredJobs := make([]tunasync.WebMirrorStatus, 0, len(jobs))
|
||||||
|
var statuses []tunasync.SyncStatus
|
||||||
|
for _, s := range strings.Split(statusStr, ",") {
|
||||||
|
var status tunasync.SyncStatus
|
||||||
|
err = status.UnmarshalJSON([]byte("\"" + strings.TrimSpace(s) + "\""))
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error parsing status: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
statuses = append(statuses, status)
|
||||||
|
}
|
||||||
|
for _, job := range jobs {
|
||||||
|
for _, s := range statuses {
|
||||||
|
if job.Status == s {
|
||||||
|
filteredJobs = append(filteredJobs, job)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
genericJobs = filteredJobs
|
||||||
|
} else {
|
||||||
|
genericJobs = jobs
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
var jobs []tunasync.MirrorStatus
|
var jobs []tunasync.MirrorStatus
|
||||||
args := c.Args()
|
args := c.Args()
|
||||||
@@ -196,13 +220,46 @@ func listJobs(c *cli.Context) error {
|
|||||||
genericJobs = jobs
|
genericJobs = jobs
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(genericJobs, "", " ")
|
if format := c.String("format"); format != "" {
|
||||||
if err != nil {
|
tpl := template.New("")
|
||||||
return cli.NewExitError(
|
_, err := tpl.Parse(format)
|
||||||
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
if err != nil {
|
||||||
1)
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error parsing format template: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
switch jobs := genericJobs.(type) {
|
||||||
|
case []tunasync.WebMirrorStatus:
|
||||||
|
for _, job := range jobs {
|
||||||
|
err = tpl.Execute(os.Stdout, job)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
case []tunasync.MirrorStatus:
|
||||||
|
for _, job := range jobs {
|
||||||
|
err = tpl.Execute(os.Stdout, job)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
b, err := json.MarshalIndent(genericJobs, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Error printing out information: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
fmt.Println(string(b))
|
||||||
}
|
}
|
||||||
fmt.Println(string(b))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -506,6 +563,14 @@ func main() {
|
|||||||
Name: "all, a",
|
Name: "all, a",
|
||||||
Usage: "List all jobs of all workers",
|
Usage: "List all jobs of all workers",
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "status, s",
|
||||||
|
Usage: "Filter output based on status provided",
|
||||||
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "format, f",
|
||||||
|
Usage: "Pretty-print containers using a Go template",
|
||||||
|
},
|
||||||
}...),
|
}...),
|
||||||
Action: initializeWrapper(listJobs),
|
Action: initializeWrapper(listJobs),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -7,6 +7,11 @@ mirror_dir = "/srv/tunasync"
|
|||||||
concurrent = 10
|
concurrent = 10
|
||||||
interval = 1
|
interval = 1
|
||||||
|
|
||||||
|
# ensure the exec user be add into `docker` group
|
||||||
|
[docker]
|
||||||
|
# in `command provider` can use docker_image and docker_volumes
|
||||||
|
enable = true
|
||||||
|
|
||||||
[manager]
|
[manager]
|
||||||
api_base = "http://localhost:12345"
|
api_base = "http://localhost:12345"
|
||||||
token = "some_token"
|
token = "some_token"
|
||||||
|
|||||||
@@ -24,10 +24,11 @@ type MirrorStatus struct {
|
|||||||
// A WorkerStatus is the information struct that describe
|
// A WorkerStatus is the information struct that describe
|
||||||
// a worker, and sent from the manager to clients.
|
// a worker, and sent from the manager to clients.
|
||||||
type WorkerStatus struct {
|
type WorkerStatus struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
URL string `json:"url"` // worker url
|
URL string `json:"url"` // worker url
|
||||||
Token string `json:"token"` // session token
|
Token string `json:"token"` // session token
|
||||||
LastOnline time.Time `json:"last_online"` // last seen
|
LastOnline time.Time `json:"last_online"` // last seen
|
||||||
|
LastRegister time.Time `json:"last_register"` // last register time
|
||||||
}
|
}
|
||||||
|
|
||||||
type MirrorSchedules struct {
|
type MirrorSchedules struct {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
// Version of the program
|
// Version of the program
|
||||||
const Version string = "0.6.4"
|
const Version string = "0.6.7"
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
|
|
||||||
@@ -16,6 +17,7 @@ type dbAdapter interface {
|
|||||||
GetWorker(workerID string) (WorkerStatus, error)
|
GetWorker(workerID string) (WorkerStatus, error)
|
||||||
DeleteWorker(workerID string) error
|
DeleteWorker(workerID string) error
|
||||||
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
||||||
|
RefreshWorker(workerID string) (WorkerStatus, error)
|
||||||
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
||||||
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
||||||
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
|
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
|
||||||
@@ -26,7 +28,9 @@ type dbAdapter interface {
|
|||||||
|
|
||||||
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
|
func makeDBAdapter(dbType string, dbFile string) (dbAdapter, error) {
|
||||||
if dbType == "bolt" {
|
if dbType == "bolt" {
|
||||||
innerDB, err := bolt.Open(dbFile, 0600, nil)
|
innerDB, err := bolt.Open(dbFile, 0600, &bolt.Options{
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -122,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
|||||||
return w, err
|
return w, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
|
||||||
|
w, err = b.GetWorker(workerID)
|
||||||
|
if err == nil {
|
||||||
|
w.LastOnline = time.Now()
|
||||||
|
w, err = b.CreateWorker(w)
|
||||||
|
}
|
||||||
|
return w, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
|
func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
|
||||||
id := mirrorID + "/" + workerID
|
id := mirrorID + "/" + workerID
|
||||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
ID: id,
|
ID: id,
|
||||||
Token: "token_" + id,
|
Token: "token_" + id,
|
||||||
LastOnline: time.Now(),
|
LastOnline: time.Now(),
|
||||||
|
LastRegister: time.Now(),
|
||||||
}
|
}
|
||||||
w, err = boltDB.CreateWorker(w)
|
w, err = boltDB.CreateWorker(w)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -23,6 +24,7 @@ type Manager struct {
|
|||||||
cfg *Config
|
cfg *Config
|
||||||
engine *gin.Engine
|
engine *gin.Engine
|
||||||
adapter dbAdapter
|
adapter dbAdapter
|
||||||
|
rwmu sync.RWMutex
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,9 +129,11 @@ func (s *Manager) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// listAllJobs repond with all jobs of specified workers
|
// listAllJobs respond with all jobs of specified workers
|
||||||
func (s *Manager) listAllJobs(c *gin.Context) {
|
func (s *Manager) listAllJobs(c *gin.Context) {
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list all mirror status: %s",
|
err := fmt.Errorf("failed to list all mirror status: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -150,7 +154,9 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
|||||||
|
|
||||||
// flushDisabledJobs deletes all jobs that marks as deleted
|
// flushDisabledJobs deletes all jobs that marks as deleted
|
||||||
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.FlushDisabledJobs()
|
err := s.adapter.FlushDisabledJobs()
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
err := fmt.Errorf("failed to flush disabled jobs: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -165,7 +171,9 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
|||||||
// deleteWorker deletes one worker by id
|
// deleteWorker deletes one worker by id
|
||||||
func (s *Manager) deleteWorker(c *gin.Context) {
|
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.Lock()
|
||||||
err := s.adapter.DeleteWorker(workerID)
|
err := s.adapter.DeleteWorker(workerID)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to delete worker: %s",
|
err := fmt.Errorf("failed to delete worker: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -178,10 +186,12 @@ func (s *Manager) deleteWorker(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||||
}
|
}
|
||||||
|
|
||||||
// listWrokers respond with informations of all the workers
|
// listWorkers respond with information of all the workers
|
||||||
func (s *Manager) listWorkers(c *gin.Context) {
|
func (s *Manager) listWorkers(c *gin.Context) {
|
||||||
var workerInfos []WorkerStatus
|
var workerInfos []WorkerStatus
|
||||||
|
s.rwmu.RLock()
|
||||||
workers, err := s.adapter.ListWorkers()
|
workers, err := s.adapter.ListWorkers()
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list workers: %s",
|
err := fmt.Errorf("failed to list workers: %s",
|
||||||
err.Error(),
|
err.Error(),
|
||||||
@@ -193,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) {
|
|||||||
for _, w := range workers {
|
for _, w := range workers {
|
||||||
workerInfos = append(workerInfos,
|
workerInfos = append(workerInfos,
|
||||||
WorkerStatus{
|
WorkerStatus{
|
||||||
ID: w.ID,
|
ID: w.ID,
|
||||||
LastOnline: w.LastOnline,
|
LastOnline: w.LastOnline,
|
||||||
|
LastRegister: w.LastRegister,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, workerInfos)
|
c.JSON(http.StatusOK, workerInfos)
|
||||||
@@ -205,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
|
|||||||
var _worker WorkerStatus
|
var _worker WorkerStatus
|
||||||
c.BindJSON(&_worker)
|
c.BindJSON(&_worker)
|
||||||
_worker.LastOnline = time.Now()
|
_worker.LastOnline = time.Now()
|
||||||
|
_worker.LastRegister = time.Now()
|
||||||
newWorker, err := s.adapter.CreateWorker(_worker)
|
newWorker, err := s.adapter.CreateWorker(_worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to register worker: %s",
|
err := fmt.Errorf("failed to register worker: %s",
|
||||||
@@ -223,7 +235,9 @@ func (s *Manager) registerWorker(c *gin.Context) {
|
|||||||
// listJobsOfWorker respond with all the jobs of the specified worker
|
// listJobsOfWorker respond with all the jobs of the specified worker
|
||||||
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
func (s *Manager) listJobsOfWorker(c *gin.Context) {
|
||||||
workerID := c.Param("id")
|
workerID := c.Param("id")
|
||||||
|
s.rwmu.RLock()
|
||||||
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
err := fmt.Errorf("failed to list jobs of worker %s: %s",
|
||||||
workerID, err.Error(),
|
workerID, err.Error(),
|
||||||
@@ -255,7 +269,10 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Errorf("failed to get job %s of worker %s: %s",
|
fmt.Errorf("failed to get job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -269,7 +286,9 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
curStatus.Scheduled = schedule.NextSchedule
|
curStatus.Scheduled = schedule.NextSchedule
|
||||||
|
s.rwmu.Lock()
|
||||||
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
_, err = s.adapter.UpdateMirrorStatus(workerID, mirrorName, curStatus)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -295,7 +314,10 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
|
|
||||||
curTime := time.Now()
|
curTime := time.Now()
|
||||||
|
|
||||||
@@ -331,7 +353,9 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -353,7 +377,10 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
c.BindJSON(&msg)
|
c.BindJSON(&msg)
|
||||||
|
|
||||||
mirrorName := msg.Name
|
mirrorName := msg.Name
|
||||||
|
s.rwmu.RLock()
|
||||||
|
s.adapter.RefreshWorker(workerID)
|
||||||
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf(
|
logger.Errorf(
|
||||||
"Failed to get status of mirror %s @<%s>: %s",
|
"Failed to get status of mirror %s @<%s>: %s",
|
||||||
@@ -370,7 +397,9 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {
|
|||||||
|
|
||||||
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||||
|
|
||||||
|
s.rwmu.Lock()
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
s.rwmu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
mirrorName, workerID, err.Error(),
|
mirrorName, workerID, err.Error(),
|
||||||
@@ -393,7 +422,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.rwmu.RLock()
|
||||||
w, err := s.adapter.GetWorker(workerID)
|
w, err := s.adapter.GetWorker(workerID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
err := fmt.Errorf("worker %s is not registered yet", workerID)
|
||||||
s.returnErrJSON(c, http.StatusBadRequest, err)
|
s.returnErrJSON(c, http.StatusBadRequest, err)
|
||||||
@@ -410,7 +441,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
|
|
||||||
// update job status, even if the job did not disable successfully,
|
// update job status, even if the job did not disable successfully,
|
||||||
// this status should be set as disabled
|
// this status should be set as disabled
|
||||||
|
s.rwmu.RLock()
|
||||||
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
curStat, _ := s.adapter.GetMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID)
|
||||||
|
s.rwmu.RUnlock()
|
||||||
changed := false
|
changed := false
|
||||||
switch clientCmd.Cmd {
|
switch clientCmd.Cmd {
|
||||||
case CmdDisable:
|
case CmdDisable:
|
||||||
@@ -421,7 +454,9 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
changed = true
|
changed = true
|
||||||
}
|
}
|
||||||
if changed {
|
if changed {
|
||||||
|
s.rwmu.Lock()
|
||||||
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
s.adapter.UpdateMirrorStatus(clientCmd.WorkerID, clientCmd.MirrorID, curStat)
|
||||||
|
s.rwmu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
logger.Noticef("Posting command '%s %s' to <%s>", clientCmd.Cmd, clientCmd.MirrorID, clientCmd.WorkerID)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -64,6 +65,34 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
So(msg[_errorKey], ShouldEqual, fmt.Sprintf("failed to list jobs of worker %s: %s", _magicBadWorkerID, "database fail"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("when register multiple workers", func(ctx C) {
|
||||||
|
N := 10
|
||||||
|
var cnt uint32
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
w := WorkerStatus{
|
||||||
|
ID: fmt.Sprintf("worker%d", id),
|
||||||
|
}
|
||||||
|
resp, err := PostJSON(baseURL+"/workers", w, nil)
|
||||||
|
ctx.So(err, ShouldBeNil)
|
||||||
|
ctx.So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
atomic.AddUint32(&cnt, 1)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
So(cnt, ShouldEqual, N)
|
||||||
|
|
||||||
|
Convey("list all workers", func(ctx C) {
|
||||||
|
resp, err := http.Get(baseURL + "/workers")
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
var actualResponseObj []WorkerStatus
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&actualResponseObj)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(len(actualResponseObj), ShouldEqual, N+1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Convey("when register a worker", func(ctx C) {
|
Convey("when register a worker", func(ctx C) {
|
||||||
w := WorkerStatus{
|
w := WorkerStatus{
|
||||||
ID: "test_worker1",
|
ID: "test_worker1",
|
||||||
@@ -433,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
|
||||||
|
w, err = b.GetWorker(workerID)
|
||||||
|
if err == nil {
|
||||||
|
w.LastOnline = time.Now()
|
||||||
|
w, err = b.CreateWorker(w)
|
||||||
|
}
|
||||||
|
return w, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
|
func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
|
||||||
id := mirrorID + "/" + workerID
|
id := mirrorID + "/" + workerID
|
||||||
status, ok := b.statusStore[id]
|
status, ok := b.statusStore[id]
|
||||||
|
|||||||
@@ -189,7 +189,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
syncErr = errors.New("killed by manager")
|
syncErr = errors.New("killed by manager")
|
||||||
}
|
}
|
||||||
if termErr != nil {
|
if termErr != nil {
|
||||||
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
|
logger.Errorf("failed to terminate provider %s: %s", m.Name(), termErr.Error())
|
||||||
return termErr
|
return termErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -149,10 +149,10 @@ func (c *cmdJob) Terminate() error {
|
|||||||
select {
|
select {
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(2 * time.Second):
|
||||||
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
|
unix.Kill(c.cmd.Process.Pid, syscall.SIGKILL)
|
||||||
return errors.New("SIGTERM failed to kill the job")
|
logger.Warningf("SIGTERM failed to kill the job in 2s. SIGKILL sent")
|
||||||
case <-c.finished:
|
case <-c.finished:
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copied from go-sh
|
// Copied from go-sh
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
|
|||||||
|
|
||||||
// Run runs worker forever
|
// Run runs worker forever
|
||||||
func (w *Worker) Run() {
|
func (w *Worker) Run() {
|
||||||
w.registorWorker()
|
w.registerWorker()
|
||||||
go w.runHTTPServer()
|
go w.runHTTPServer()
|
||||||
w.runSchedule()
|
w.runSchedule()
|
||||||
}
|
}
|
||||||
@@ -393,7 +393,7 @@ func (w *Worker) URL() string {
|
|||||||
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
|
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) registorWorker() {
|
func (w *Worker) registerWorker() {
|
||||||
msg := WorkerStatus{
|
msg := WorkerStatus{
|
||||||
ID: w.Name(),
|
ID: w.Name(),
|
||||||
URL: w.URL(),
|
URL: w.URL(),
|
||||||
@@ -402,8 +402,17 @@ func (w *Worker) registorWorker() {
|
|||||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||||
url := fmt.Sprintf("%s/workers", root)
|
url := fmt.Sprintf("%s/workers", root)
|
||||||
logger.Debugf("register on manager url: %s", url)
|
logger.Debugf("register on manager url: %s", url)
|
||||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
for retry := 10; retry > 0; {
|
||||||
logger.Errorf("Failed to register worker")
|
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||||
|
logger.Errorf("Failed to register worker")
|
||||||
|
retry--
|
||||||
|
if retry > 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
logger.Noticef("Retrying... (%d)", retry)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
|
|||||||
var _worker WorkerStatus
|
var _worker WorkerStatus
|
||||||
c.BindJSON(&_worker)
|
c.BindJSON(&_worker)
|
||||||
_worker.LastOnline = time.Now()
|
_worker.LastOnline = time.Now()
|
||||||
|
_worker.LastRegister = time.Now()
|
||||||
recvData <- _worker
|
recvData <- _worker
|
||||||
c.JSON(http.StatusOK, _worker)
|
c.JSON(http.StatusOK, _worker)
|
||||||
})
|
})
|
||||||
|
|||||||
在新工单中引用
屏蔽一个用户