镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-08 07:26:47 +00:00
比较提交
32 次代码提交
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
23bf4890cc | ||
|
|
2f6a61aee5 | ||
|
|
b6043142e1 | ||
|
|
6241576b12 | ||
|
|
ef78563b8c | ||
|
|
ca106f1360 | ||
|
|
628266ac5a | ||
|
|
7e601d9fff | ||
|
|
c750aa1871 | ||
|
|
6cbe91b4f1 | ||
|
|
89a792986d | ||
|
|
0fdb07d061 | ||
|
|
c5bb172f99 | ||
|
|
79e6167028 | ||
|
|
285ffb2f2f | ||
|
|
95bb4bbd5e | ||
|
|
6bca9d2cd5 | ||
|
|
4fe7d03e54 | ||
|
|
1fe9499728 | ||
|
|
a475b044c6 | ||
|
|
a50a360a91 | ||
|
|
d536aca2ac | ||
|
|
28545d61e7 | ||
|
|
a87fb0f8b4 | ||
|
|
095e7c6320 | ||
|
|
7b441312f4 | ||
|
|
93194cde2e | ||
|
|
aa4c31a32b | ||
|
|
4c6a407c17 | ||
|
|
939abaef9b | ||
|
|
d5a438462f | ||
|
|
d4e07a7b29 |
@@ -2,7 +2,7 @@ sudo: required
|
|||||||
|
|
||||||
language: go
|
language: go
|
||||||
go:
|
go:
|
||||||
- 1.6
|
- 1.8
|
||||||
|
|
||||||
before_install:
|
before_install:
|
||||||
- sudo apt-get install cgroup-bin
|
- sudo apt-get install cgroup-bin
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ Pre-built binary for Linux x86_64 is available at [Github releases](https://gith
|
|||||||
```
|
```
|
||||||
# Architecture
|
# Architecture
|
||||||
|
|
||||||
- Manager: Centural instance on status and job management
|
- Manager: Central instance for status and job management
|
||||||
- Worker: Runs mirror jobs
|
- Worker: Runs mirror jobs
|
||||||
|
|
||||||
+------------+ +---+ +---+
|
+------------+ +---+ +---+
|
||||||
@@ -47,12 +47,12 @@ PreSyncing Syncing Success
|
|||||||
| |
|
| |
|
||||||
| +-----------------+ | Failed
|
| +-----------------+ | Failed
|
||||||
+------+ post-fail |<---------+
|
+------+ post-fail |<---------+
|
||||||
+-----------------+
|
+-----------------+
|
||||||
```
|
```
|
||||||
|
|
||||||
## Generate Self-Signed Certificate
|
## Generate Self-Signed Certificate
|
||||||
|
|
||||||
Fisrt, create root CA
|
First, create root CA
|
||||||
|
|
||||||
```
|
```
|
||||||
openssl genrsa -out rootCA.key 2048
|
openssl genrsa -out rootCA.key 2048
|
||||||
|
|||||||
@@ -140,9 +140,9 @@ func listWorkers(c *cli.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func listJobs(c *cli.Context) error {
|
func listJobs(c *cli.Context) error {
|
||||||
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl
|
var genericJobs interface{}
|
||||||
var jobs []tunasync.MirrorStatus
|
|
||||||
if c.Bool("all") {
|
if c.Bool("all") {
|
||||||
|
var jobs []tunasync.WebMirrorStatus
|
||||||
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
|
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.NewExitError(
|
return cli.NewExitError(
|
||||||
@@ -150,8 +150,10 @@ func listJobs(c *cli.Context) error {
|
|||||||
"of all jobs from manager server: %s", err.Error()),
|
"of all jobs from manager server: %s", err.Error()),
|
||||||
1)
|
1)
|
||||||
}
|
}
|
||||||
|
genericJobs = jobs
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
var jobs []tunasync.MirrorStatus
|
||||||
args := c.Args()
|
args := c.Args()
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
return cli.NewExitError(
|
return cli.NewExitError(
|
||||||
@@ -174,9 +176,10 @@ func listJobs(c *cli.Context) error {
|
|||||||
for range args {
|
for range args {
|
||||||
jobs = append(jobs, <-ans...)
|
jobs = append(jobs, <-ans...)
|
||||||
}
|
}
|
||||||
|
genericJobs = jobs
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(jobs, "", " ")
|
b, err := json.MarshalIndent(genericJobs, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.NewExitError(
|
return cli.NewExitError(
|
||||||
fmt.Sprintf("Error printing out informations: %s", err.Error()),
|
fmt.Sprintf("Error printing out informations: %s", err.Error()),
|
||||||
@@ -186,6 +189,103 @@ func listJobs(c *cli.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateMirrorSize(c *cli.Context) error {
|
||||||
|
args := c.Args()
|
||||||
|
if len(args) != 2 {
|
||||||
|
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
|
||||||
|
}
|
||||||
|
workerID := c.String("worker")
|
||||||
|
mirrorID := args.Get(0)
|
||||||
|
mirrorSize := args.Get(1)
|
||||||
|
|
||||||
|
msg := struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size string `json:"size"`
|
||||||
|
}{
|
||||||
|
Name: mirrorID,
|
||||||
|
Size: mirrorSize,
|
||||||
|
}
|
||||||
|
|
||||||
|
url := fmt.Sprintf(
|
||||||
|
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
|
||||||
|
)
|
||||||
|
|
||||||
|
resp, err := tunasync.PostJSON(url, msg, client)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Failed to send request to manager: %s",
|
||||||
|
err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(resp.Body)
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var status tunasync.MirrorStatus
|
||||||
|
json.Unmarshal(body, &status)
|
||||||
|
if status.Size != mirrorSize {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf(
|
||||||
|
"Mirror size error, expecting %s, manager returned %s",
|
||||||
|
mirrorSize, status.Size,
|
||||||
|
), 1,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeWorker(c *cli.Context) error {
|
||||||
|
args := c.Args()
|
||||||
|
if len(args) != 0 {
|
||||||
|
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
|
||||||
|
}
|
||||||
|
workerID := c.String("worker")
|
||||||
|
if len(workerID) == 0 {
|
||||||
|
return cli.NewExitError("Please specify the <worker-id>", 1)
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("DELETE", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("Invalid HTTP Request: %s", err.Error())
|
||||||
|
}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return cli.NewExitError(
|
||||||
|
fmt.Sprintf("Failed to parse response: %s", err.Error()),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
|
||||||
|
" command: HTTP status code is not 200: %s", body),
|
||||||
|
1)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := map[string]string{}
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||||
|
if res["message"] == "deleted" {
|
||||||
|
logger.Info("Successfully removed the worker")
|
||||||
|
} else {
|
||||||
|
logger.Info("Failed to remove the worker")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func flushDisabledJobs(c *cli.Context) error {
|
func flushDisabledJobs(c *cli.Context) error {
|
||||||
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
|
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -235,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
|
|||||||
"argument WORKER", 1)
|
"argument WORKER", 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
options := map[string]bool{}
|
||||||
|
if c.Bool("force") {
|
||||||
|
options["force"] = true
|
||||||
|
}
|
||||||
cmd := tunasync.ClientCmd{
|
cmd := tunasync.ClientCmd{
|
||||||
Cmd: cmd,
|
Cmd: cmd,
|
||||||
MirrorID: mirrorID,
|
MirrorID: mirrorID,
|
||||||
WorkerID: c.String("worker"),
|
WorkerID: c.String("worker"),
|
||||||
Args: argsList,
|
Args: argsList,
|
||||||
|
Options: options,
|
||||||
}
|
}
|
||||||
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
|
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -360,6 +465,11 @@ func main() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
forceStartFlag := cli.BoolFlag{
|
||||||
|
Name: "force, f",
|
||||||
|
Usage: "Override the concurrent limit",
|
||||||
|
}
|
||||||
|
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
{
|
{
|
||||||
Name: "list",
|
Name: "list",
|
||||||
@@ -385,10 +495,34 @@ func main() {
|
|||||||
Flags: commonFlags,
|
Flags: commonFlags,
|
||||||
Action: initializeWrapper(listWorkers),
|
Action: initializeWrapper(listWorkers),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "rm-worker",
|
||||||
|
Usage: "Remove a worker",
|
||||||
|
Flags: append(
|
||||||
|
commonFlags,
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "worker, w",
|
||||||
|
Usage: "worker-id of the worker to be removed",
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Action: initializeWrapper(removeWorker),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "set-size",
|
||||||
|
Usage: "Set mirror size",
|
||||||
|
Flags: append(
|
||||||
|
commonFlags,
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "worker, w",
|
||||||
|
Usage: "specify worker-id of the mirror job",
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Action: initializeWrapper(updateMirrorSize),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "start",
|
Name: "start",
|
||||||
Usage: "Start a job",
|
Usage: "Start a job",
|
||||||
Flags: append(commonFlags, cmdFlags...),
|
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
|
||||||
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
|
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A StatusUpdateMsg represents a msg when
|
// A MirrorStatus represents a msg when
|
||||||
// a worker has done syncing
|
// a worker has done syncing
|
||||||
type MirrorStatus struct {
|
type MirrorStatus struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -13,6 +13,7 @@ type MirrorStatus struct {
|
|||||||
IsMaster bool `json:"is_master"`
|
IsMaster bool `json:"is_master"`
|
||||||
Status SyncStatus `json:"status"`
|
Status SyncStatus `json:"status"`
|
||||||
LastUpdate time.Time `json:"last_update"`
|
LastUpdate time.Time `json:"last_update"`
|
||||||
|
LastEnded time.Time `json:"last_ended"`
|
||||||
Upstream string `json:"upstream"`
|
Upstream string `json:"upstream"`
|
||||||
Size string `json:"size"`
|
Size string `json:"size"`
|
||||||
ErrorMsg string `json:"error_msg"`
|
ErrorMsg string `json:"error_msg"`
|
||||||
@@ -67,9 +68,10 @@ func (c CmdVerb) String() string {
|
|||||||
// A WorkerCmd is the command message send from the
|
// A WorkerCmd is the command message send from the
|
||||||
// manager to a worker
|
// manager to a worker
|
||||||
type WorkerCmd struct {
|
type WorkerCmd struct {
|
||||||
Cmd CmdVerb `json:"cmd"`
|
Cmd CmdVerb `json:"cmd"`
|
||||||
MirrorID string `json:"mirror_id"`
|
MirrorID string `json:"mirror_id"`
|
||||||
Args []string `json:"args"`
|
Args []string `json:"args"`
|
||||||
|
Options map[string]bool `json:"options"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c WorkerCmd) String() string {
|
func (c WorkerCmd) String() string {
|
||||||
@@ -82,8 +84,9 @@ func (c WorkerCmd) String() string {
|
|||||||
// A ClientCmd is the command message send from client
|
// A ClientCmd is the command message send from client
|
||||||
// to the manager
|
// to the manager
|
||||||
type ClientCmd struct {
|
type ClientCmd struct {
|
||||||
Cmd CmdVerb `json:"cmd"`
|
Cmd CmdVerb `json:"cmd"`
|
||||||
MirrorID string `json:"mirror_id"`
|
MirrorID string `json:"mirror_id"`
|
||||||
WorkerID string `json:"worker_id"`
|
WorkerID string `json:"worker_id"`
|
||||||
Args []string `json:"args"`
|
Args []string `json:"args"`
|
||||||
|
Options map[string]bool `json:"options"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
package manager
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/tuna/tunasync/internal"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type textTime struct {
|
type textTime struct {
|
||||||
@@ -38,24 +36,28 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// webMirrorStatus is the mirror status to be shown in the web page
|
// WebMirrorStatus is the mirror status to be shown in the web page
|
||||||
type webMirrorStatus struct {
|
type WebMirrorStatus struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
IsMaster bool `json:"is_master"`
|
IsMaster bool `json:"is_master"`
|
||||||
Status SyncStatus `json:"status"`
|
Status SyncStatus `json:"status"`
|
||||||
LastUpdate textTime `json:"last_update"`
|
LastUpdate textTime `json:"last_update"`
|
||||||
LastUpdateTs stampTime `json:"last_update_ts"`
|
LastUpdateTs stampTime `json:"last_update_ts"`
|
||||||
|
LastEnded textTime `json:"last_ended"`
|
||||||
|
LastEndedTs stampTime `json:"last_ended_ts"`
|
||||||
Upstream string `json:"upstream"`
|
Upstream string `json:"upstream"`
|
||||||
Size string `json:"size"` // approximate size
|
Size string `json:"size"` // approximate size
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertMirrorStatus(m MirrorStatus) webMirrorStatus {
|
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
|
||||||
return webMirrorStatus{
|
return WebMirrorStatus{
|
||||||
Name: m.Name,
|
Name: m.Name,
|
||||||
IsMaster: m.IsMaster,
|
IsMaster: m.IsMaster,
|
||||||
Status: m.Status,
|
Status: m.Status,
|
||||||
LastUpdate: textTime{m.LastUpdate},
|
LastUpdate: textTime{m.LastUpdate},
|
||||||
LastUpdateTs: stampTime{m.LastUpdate},
|
LastUpdateTs: stampTime{m.LastUpdate},
|
||||||
|
LastEnded: textTime{m.LastEnded},
|
||||||
|
LastEndedTs: stampTime{m.LastEnded},
|
||||||
Upstream: m.Upstream,
|
Upstream: m.Upstream,
|
||||||
Size: m.Size,
|
Size: m.Size,
|
||||||
}
|
}
|
||||||
76
internal/status_web_test.go
普通文件
76
internal/status_web_test.go
普通文件
@@ -0,0 +1,76 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatus(t *testing.T) {
|
||||||
|
Convey("status json ser-de should work", t, func() {
|
||||||
|
tz := "Asia/Tokyo"
|
||||||
|
loc, err := time.LoadLocation(tz)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
||||||
|
m := WebMirrorStatus{
|
||||||
|
Name: "tunalinux",
|
||||||
|
Status: Success,
|
||||||
|
LastUpdate: textTime{t},
|
||||||
|
LastUpdateTs: stampTime{t},
|
||||||
|
LastEnded: textTime{t},
|
||||||
|
LastEndedTs: stampTime{t},
|
||||||
|
Size: "5GB",
|
||||||
|
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := json.Marshal(m)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
//fmt.Println(string(b))
|
||||||
|
var m2 WebMirrorStatus
|
||||||
|
err = json.Unmarshal(b, &m2)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
// fmt.Printf("%#v", m2)
|
||||||
|
So(m2.Name, ShouldEqual, m.Name)
|
||||||
|
So(m2.Status, ShouldEqual, m.Status)
|
||||||
|
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||||
|
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||||
|
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||||
|
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||||
|
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||||
|
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||||
|
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||||
|
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||||
|
So(m2.Size, ShouldEqual, m.Size)
|
||||||
|
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||||
|
})
|
||||||
|
Convey("BuildWebMirrorStatus should work", t, func() {
|
||||||
|
m := MirrorStatus{
|
||||||
|
Name: "arch-sync3",
|
||||||
|
Worker: "testWorker",
|
||||||
|
IsMaster: true,
|
||||||
|
Status: Failed,
|
||||||
|
LastUpdate: time.Now().Add(-time.Minute * 30),
|
||||||
|
LastEnded: time.Now(),
|
||||||
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
|
Size: "4GB",
|
||||||
|
}
|
||||||
|
|
||||||
|
var m2 WebMirrorStatus
|
||||||
|
m2 = BuildWebMirrorStatus(m)
|
||||||
|
// fmt.Printf("%#v", m2)
|
||||||
|
So(m2.Name, ShouldEqual, m.Name)
|
||||||
|
So(m2.Status, ShouldEqual, m.Status)
|
||||||
|
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||||
|
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||||
|
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||||
|
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||||
|
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||||
|
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||||
|
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||||
|
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||||
|
So(m2.Size, ShouldEqual, m.Size)
|
||||||
|
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,3 +1,3 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
const Version string = "0.2-dev"
|
const Version string = "0.3.2"
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ type dbAdapter interface {
|
|||||||
Init() error
|
Init() error
|
||||||
ListWorkers() ([]WorkerStatus, error)
|
ListWorkers() ([]WorkerStatus, error)
|
||||||
GetWorker(workerID string) (WorkerStatus, error)
|
GetWorker(workerID string) (WorkerStatus, error)
|
||||||
|
DeleteWorker(workerID string) error
|
||||||
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
||||||
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
||||||
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
||||||
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
|
||||||
|
err = b.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||||
|
v := bucket.Get([]byte(workerID))
|
||||||
|
if v == nil {
|
||||||
|
return fmt.Errorf("invalid workerID %s", workerID)
|
||||||
|
}
|
||||||
|
err := bucket.Delete([]byte(workerID))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||||
bucket := tx.Bucket([]byte(_workerBucketKey))
|
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||||
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
|
|||||||
bucket := tx.Bucket([]byte(_statusBucketKey))
|
bucket := tx.Bucket([]byte(_statusBucketKey))
|
||||||
v := bucket.Get([]byte(id))
|
v := bucket.Get([]byte(id))
|
||||||
if v == nil {
|
if v == nil {
|
||||||
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
|
return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
|
||||||
}
|
}
|
||||||
err := json.Unmarshal(v, &m)
|
err := json.Unmarshal(v, &m)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
Convey("get exists worker", func() {
|
Convey("get existent worker", func() {
|
||||||
_, err := boltDB.GetWorker(testWorkerIDs[0])
|
_, err := boltDB.GetWorker(testWorkerIDs[0])
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("list exist worker", func() {
|
Convey("list existent workers", func() {
|
||||||
ws, err := boltDB.ListWorkers()
|
ws, err := boltDB.ListWorkers()
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(len(ws), ShouldEqual, 2)
|
So(len(ws), ShouldEqual, 2)
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("get inexist worker", func() {
|
Convey("get non-existent worker", func() {
|
||||||
_, err := boltDB.GetWorker("invalid workerID")
|
_, err := boltDB.GetWorker("invalid workerID")
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("delete existent worker", func() {
|
||||||
|
err := boltDB.DeleteWorker(testWorkerIDs[0])
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
_, err = boltDB.GetWorker(testWorkerIDs[0])
|
||||||
|
So(err, ShouldNotBeNil)
|
||||||
|
ws, err := boltDB.ListWorkers()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(len(ws), ShouldEqual, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("delete non-existent worker", func() {
|
||||||
|
err := boltDB.DeleteWorker("invalid workerID")
|
||||||
|
So(err, ShouldNotBeNil)
|
||||||
|
ws, err := boltDB.ListWorkers()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(len(ws), ShouldEqual, 2)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("update mirror status", func() {
|
Convey("update mirror status", func() {
|
||||||
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
IsMaster: true,
|
IsMaster: true,
|
||||||
Status: Success,
|
Status: Success,
|
||||||
LastUpdate: time.Now(),
|
LastUpdate: time.Now(),
|
||||||
|
LastEnded: time.Now(),
|
||||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
Size: "3GB",
|
Size: "3GB",
|
||||||
},
|
},
|
||||||
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
Worker: testWorkerIDs[1],
|
Worker: testWorkerIDs[1],
|
||||||
IsMaster: true,
|
IsMaster: true,
|
||||||
Status: Disabled,
|
Status: Disabled,
|
||||||
LastUpdate: time.Now(),
|
LastUpdate: time.Now().Add(-time.Hour),
|
||||||
|
LastEnded: time.Now(),
|
||||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
Size: "4GB",
|
Size: "4GB",
|
||||||
},
|
},
|
||||||
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
|
|||||||
Worker: testWorkerIDs[1],
|
Worker: testWorkerIDs[1],
|
||||||
IsMaster: true,
|
IsMaster: true,
|
||||||
Status: Success,
|
Status: Success,
|
||||||
LastUpdate: time.Now(),
|
LastUpdate: time.Now().Add(-time.Second),
|
||||||
|
LastEnded: time.Now(),
|
||||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
Size: "4GB",
|
Size: "4GB",
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package manager
|
package manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
@@ -83,10 +84,13 @@ func GetTUNASyncManager(cfg *Config) *Manager {
|
|||||||
// workerID should be valid in this route group
|
// workerID should be valid in this route group
|
||||||
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
|
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
|
||||||
{
|
{
|
||||||
|
// delete specified worker
|
||||||
|
workerValidateGroup.DELETE(":id", s.deleteWorker)
|
||||||
// get job list
|
// get job list
|
||||||
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
|
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
|
||||||
// post job status
|
// post job status
|
||||||
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
|
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
|
||||||
|
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// for tunasynctl to post commands
|
// for tunasynctl to post commands
|
||||||
@@ -133,11 +137,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
|||||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
webMirStatusList := []webMirrorStatus{}
|
webMirStatusList := []WebMirrorStatus{}
|
||||||
for _, m := range mirrorStatusList {
|
for _, m := range mirrorStatusList {
|
||||||
webMirStatusList = append(
|
webMirStatusList = append(
|
||||||
webMirStatusList,
|
webMirStatusList,
|
||||||
convertMirrorStatus(m),
|
BuildWebMirrorStatus(m),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, webMirStatusList)
|
c.JSON(http.StatusOK, webMirStatusList)
|
||||||
@@ -157,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
|
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteWorker deletes one worker by id
|
||||||
|
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||||
|
workerID := c.Param("id")
|
||||||
|
err := s.adapter.DeleteWorker(workerID)
|
||||||
|
if err != nil {
|
||||||
|
err := fmt.Errorf("failed to delete worker: %s",
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
|
c.Error(err)
|
||||||
|
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Noticef("Worker <%s> deleted", workerID)
|
||||||
|
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||||
|
}
|
||||||
|
|
||||||
// listWrokers respond with informations of all the workers
|
// listWrokers respond with informations of all the workers
|
||||||
func (s *Manager) listWorkers(c *gin.Context) {
|
func (s *Manager) listWorkers(c *gin.Context) {
|
||||||
var workerInfos []WorkerStatus
|
var workerInfos []WorkerStatus
|
||||||
@@ -225,6 +245,12 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
var status MirrorStatus
|
var status MirrorStatus
|
||||||
c.BindJSON(&status)
|
c.BindJSON(&status)
|
||||||
mirrorName := status.Name
|
mirrorName := status.Name
|
||||||
|
if len(mirrorName) == 0 {
|
||||||
|
s.returnErrJSON(
|
||||||
|
c, http.StatusBadRequest,
|
||||||
|
errors.New("Mirror Name should not be empty"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
|
||||||
@@ -234,21 +260,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
} else {
|
} else {
|
||||||
status.LastUpdate = curStatus.LastUpdate
|
status.LastUpdate = curStatus.LastUpdate
|
||||||
}
|
}
|
||||||
|
if status.Status == Success || status.Status == Failed {
|
||||||
|
status.LastEnded = time.Now()
|
||||||
|
} else {
|
||||||
|
status.LastEnded = curStatus.LastEnded
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only message with meaningful size updates the mirror size
|
||||||
|
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
|
||||||
|
if len(status.Size) == 0 || status.Size == "unknown" {
|
||||||
|
status.Size = curStatus.Size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// for logging
|
// for logging
|
||||||
switch status.Status {
|
switch status.Status {
|
||||||
case Success:
|
|
||||||
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
|
|
||||||
case Failed:
|
|
||||||
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
|
|
||||||
case Syncing:
|
case Syncing:
|
||||||
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
||||||
case Disabled:
|
|
||||||
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
|
|
||||||
case Paused:
|
|
||||||
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
|
|
||||||
default:
|
default:
|
||||||
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status)
|
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
@@ -263,6 +293,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, newStatus)
|
c.JSON(http.StatusOK, newStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Manager) updateMirrorSize(c *gin.Context) {
|
||||||
|
workerID := c.Param("id")
|
||||||
|
type SizeMsg struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size string `json:"size"`
|
||||||
|
}
|
||||||
|
var msg SizeMsg
|
||||||
|
c.BindJSON(&msg)
|
||||||
|
|
||||||
|
mirrorName := msg.Name
|
||||||
|
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf(
|
||||||
|
"Failed to get status of mirror %s @<%s>: %s",
|
||||||
|
mirrorName, workerID, err.Error(),
|
||||||
|
)
|
||||||
|
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only message with meaningful size updates the mirror size
|
||||||
|
if len(msg.Size) > 0 || msg.Size != "unknown" {
|
||||||
|
status.Size = msg.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||||
|
|
||||||
|
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||||
|
if err != nil {
|
||||||
|
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||||
|
mirrorName, workerID, err.Error(),
|
||||||
|
)
|
||||||
|
c.Error(err)
|
||||||
|
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, newStatus)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Manager) handleClientCmd(c *gin.Context) {
|
func (s *Manager) handleClientCmd(c *gin.Context) {
|
||||||
var clientCmd ClientCmd
|
var clientCmd ClientCmd
|
||||||
c.BindJSON(&clientCmd)
|
c.BindJSON(&clientCmd)
|
||||||
@@ -286,6 +355,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
|||||||
Cmd: clientCmd.Cmd,
|
Cmd: clientCmd.Cmd,
|
||||||
MirrorID: clientCmd.MirrorID,
|
MirrorID: clientCmd.MirrorID,
|
||||||
Args: clientCmd.Args,
|
Args: clientCmd.Args,
|
||||||
|
Options: clientCmd.Options,
|
||||||
}
|
}
|
||||||
|
|
||||||
// update job status, even if the job did not disable successfully,
|
// update job status, even if the job did not disable successfully,
|
||||||
|
|||||||
@@ -21,9 +21,16 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestHTTPServer(t *testing.T) {
|
func TestHTTPServer(t *testing.T) {
|
||||||
|
var listenPort = 5000
|
||||||
Convey("HTTP server should work", t, func(ctx C) {
|
Convey("HTTP server should work", t, func(ctx C) {
|
||||||
|
listenPort++
|
||||||
|
port := listenPort
|
||||||
|
addr := "127.0.0.1"
|
||||||
|
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
|
||||||
InitLogger(true, true, false)
|
InitLogger(true, true, false)
|
||||||
s := GetTUNASyncManager(&Config{Debug: false})
|
s := GetTUNASyncManager(&Config{Debug: true})
|
||||||
|
s.cfg.Server.Addr = addr
|
||||||
|
s.cfg.Server.Port = port
|
||||||
So(s, ShouldNotBeNil)
|
So(s, ShouldNotBeNil)
|
||||||
s.setDBAdapter(&mockDBAdapter{
|
s.setDBAdapter(&mockDBAdapter{
|
||||||
workerStore: map[string]WorkerStatus{
|
workerStore: map[string]WorkerStatus{
|
||||||
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
}},
|
}},
|
||||||
statusStore: make(map[string]MirrorStatus),
|
statusStore: make(map[string]MirrorStatus),
|
||||||
})
|
})
|
||||||
port := rand.Intn(10000) + 20000
|
go s.Run()
|
||||||
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
|
time.Sleep(50 * time.Millisecond)
|
||||||
go func() {
|
|
||||||
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
|
|
||||||
}()
|
|
||||||
time.Sleep(50 * time.Microsecond)
|
|
||||||
resp, err := http.Get(baseURL + "/ping")
|
resp, err := http.Get(baseURL + "/ping")
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
@@ -79,6 +82,33 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(len(actualResponseObj), ShouldEqual, 2)
|
So(len(actualResponseObj), ShouldEqual, 2)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("delete an existent worker", func(ctx C) {
|
||||||
|
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
clt := &http.Client{}
|
||||||
|
resp, err := clt.Do(req)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
res := map[string]string{}
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(res[_infoKey], ShouldEqual, "deleted")
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("delete non-existent worker", func(ctx C) {
|
||||||
|
invalidWorker := "test_worker233"
|
||||||
|
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
clt := &http.Client{}
|
||||||
|
resp, err := clt.Do(req)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
res := map[string]string{}
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
|
||||||
|
})
|
||||||
|
|
||||||
Convey("flush disabled jobs", func(ctx C) {
|
Convey("flush disabled jobs", func(ctx C) {
|
||||||
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
|
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
@@ -99,7 +129,7 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
IsMaster: true,
|
IsMaster: true,
|
||||||
Status: Success,
|
Status: Success,
|
||||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
Size: "3GB",
|
Size: "unknown",
|
||||||
}
|
}
|
||||||
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(m.Size, ShouldEqual, status.Size)
|
So(m.Size, ShouldEqual, status.Size)
|
||||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||||
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||||
|
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("list all job status of all workers", func(ctx C) {
|
Convey("list all job status of all workers", func(ctx C) {
|
||||||
var ms []webMirrorStatus
|
var ms []WebMirrorStatus
|
||||||
resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
|
resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
@@ -137,8 +168,77 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
So(m.Size, ShouldEqual, status.Size)
|
So(m.Size, ShouldEqual, status.Size)
|
||||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||||
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
|
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
|
||||||
|
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("Update size of a valid mirror", func(ctx C) {
|
||||||
|
msg := struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size string `json:"size"`
|
||||||
|
}{status.Name, "5GB"}
|
||||||
|
|
||||||
|
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||||
|
resp, err := PostJSON(url, msg, nil)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
|
||||||
|
Convey("Get new size of a mirror", func(ctx C) {
|
||||||
|
var ms []MirrorStatus
|
||||||
|
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||||
|
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||||
|
m := ms[0]
|
||||||
|
So(m.Name, ShouldEqual, status.Name)
|
||||||
|
So(m.Worker, ShouldEqual, status.Worker)
|
||||||
|
So(m.Status, ShouldEqual, status.Status)
|
||||||
|
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||||
|
So(m.Size, ShouldEqual, "5GB")
|
||||||
|
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||||
|
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||||
|
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("Update size of an invalid mirror", func(ctx C) {
|
||||||
|
msg := struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size string `json:"size"`
|
||||||
|
}{"Invalid mirror", "5GB"}
|
||||||
|
|
||||||
|
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||||
|
resp, err := PostJSON(url, msg, nil)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
||||||
|
})
|
||||||
|
|
||||||
|
// what if status changed to failed
|
||||||
|
status.Status = Failed
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
|
||||||
|
Convey("What if syncing job failed", func(ctx C) {
|
||||||
|
var ms []MirrorStatus
|
||||||
|
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||||
|
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||||
|
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||||
|
m := ms[0]
|
||||||
|
So(m.Name, ShouldEqual, status.Name)
|
||||||
|
So(m.Worker, ShouldEqual, status.Worker)
|
||||||
|
So(m.Status, ShouldEqual, status.Status)
|
||||||
|
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||||
|
So(m.Size, ShouldEqual, status.Size)
|
||||||
|
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||||
|
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
|
||||||
|
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("update mirror status of an inexisted worker", func(ctx C) {
|
Convey("update mirror status of an inexisted worker", func(ctx C) {
|
||||||
@@ -149,6 +249,7 @@ func TestHTTPServer(t *testing.T) {
|
|||||||
IsMaster: true,
|
IsMaster: true,
|
||||||
Status: Success,
|
Status: Success,
|
||||||
LastUpdate: time.Now(),
|
LastUpdate: time.Now(),
|
||||||
|
LastEnded: time.Now(),
|
||||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||||
Size: "4GB",
|
Size: "4GB",
|
||||||
}
|
}
|
||||||
@@ -252,6 +353,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
|
||||||
|
delete(b.workerStore, workerID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||||
// _, ok := b.workerStore[w.ID]
|
// _, ok := b.workerStore[w.ID]
|
||||||
// if ok {
|
// if ok {
|
||||||
|
|||||||
@@ -1,44 +0,0 @@
|
|||||||
package manager
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
tunasync "github.com/tuna/tunasync/internal"
|
|
||||||
|
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStatus(t *testing.T) {
|
|
||||||
Convey("status json ser-de should work", t, func() {
|
|
||||||
tz := "Asia/Tokyo"
|
|
||||||
loc, err := time.LoadLocation(tz)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
|
||||||
m := webMirrorStatus{
|
|
||||||
Name: "tunalinux",
|
|
||||||
Status: tunasync.Success,
|
|
||||||
LastUpdate: textTime{t},
|
|
||||||
LastUpdateTs: stampTime{t},
|
|
||||||
Size: "5GB",
|
|
||||||
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := json.Marshal(m)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
//fmt.Println(string(b))
|
|
||||||
var m2 webMirrorStatus
|
|
||||||
err = json.Unmarshal(b, &m2)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
// fmt.Printf("%#v", m2)
|
|
||||||
So(m2.Name, ShouldEqual, m.Name)
|
|
||||||
So(m2.Status, ShouldEqual, m.Status)
|
|
||||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
|
||||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
|
||||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
|
||||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
|
||||||
So(m2.Size, ShouldEqual, m.Size)
|
|
||||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -20,8 +20,6 @@ type baseProvider struct {
|
|||||||
cmd *cmdJob
|
cmd *cmdJob
|
||||||
isRunning atomic.Value
|
isRunning atomic.Value
|
||||||
|
|
||||||
logFile *os.File
|
|
||||||
|
|
||||||
cgroup *cgroupHook
|
cgroup *cgroupHook
|
||||||
zfs *zfsHook
|
zfs *zfsHook
|
||||||
docker *dockerHook
|
docker *dockerHook
|
||||||
@@ -111,20 +109,21 @@ func (p *baseProvider) Docker() *dockerHook {
|
|||||||
return p.docker
|
return p.docker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *baseProvider) prepareLogFile() error {
|
func (p *baseProvider) prepareLogFile(append bool) error {
|
||||||
if p.LogFile() == "/dev/null" {
|
if p.LogFile() == "/dev/null" {
|
||||||
p.cmd.SetLogFile(nil)
|
p.cmd.SetLogFile(nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if p.logFile == nil {
|
appendMode := 0
|
||||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
|
if append {
|
||||||
if err != nil {
|
appendMode = os.O_APPEND
|
||||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
p.logFile = logFile
|
|
||||||
}
|
}
|
||||||
p.cmd.SetLogFile(p.logFile)
|
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.cmd.SetLogFile(logFile)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,32 +142,22 @@ func (p *baseProvider) IsRunning() bool {
|
|||||||
|
|
||||||
func (p *baseProvider) Wait() error {
|
func (p *baseProvider) Wait() error {
|
||||||
defer func() {
|
defer func() {
|
||||||
p.Lock()
|
logger.Debugf("set isRunning to false: %s", p.Name())
|
||||||
p.isRunning.Store(false)
|
p.isRunning.Store(false)
|
||||||
if p.logFile != nil {
|
|
||||||
p.logFile.Close()
|
|
||||||
p.logFile = nil
|
|
||||||
}
|
|
||||||
p.Unlock()
|
|
||||||
}()
|
}()
|
||||||
|
logger.Debugf("calling Wait: %s", p.Name())
|
||||||
return p.cmd.Wait()
|
return p.cmd.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *baseProvider) Terminate() error {
|
func (p *baseProvider) Terminate() error {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
logger.Debugf("terminating provider: %s", p.Name())
|
logger.Debugf("terminating provider: %s", p.Name())
|
||||||
if !p.IsRunning() {
|
if !p.IsRunning() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Lock()
|
|
||||||
if p.logFile != nil {
|
|
||||||
p.logFile.Close()
|
|
||||||
p.logFile = nil
|
|
||||||
}
|
|
||||||
p.Unlock()
|
|
||||||
|
|
||||||
err := p.cmd.Terminate()
|
err := p.cmd.Terminate()
|
||||||
p.isRunning.Store(false)
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package worker
|
package worker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anmitsu/go-shlex"
|
"github.com/anmitsu/go-shlex"
|
||||||
@@ -60,17 +61,25 @@ func (p *cmdProvider) Run() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *cmdProvider) Start() error {
|
func (p *cmdProvider) Start() error {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
if p.IsRunning() {
|
||||||
|
return errors.New("provider is currently running")
|
||||||
|
}
|
||||||
|
|
||||||
env := map[string]string{
|
env := map[string]string{
|
||||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||||
"TUNASYNC_UPSTREAM_URL": p.upstreamURL,
|
"TUNASYNC_UPSTREAM_URL": p.upstreamURL,
|
||||||
|
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||||
}
|
}
|
||||||
for k, v := range p.env {
|
for k, v := range p.env {
|
||||||
env[k] = v
|
env[k] = v
|
||||||
}
|
}
|
||||||
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
|
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
|
||||||
if err := p.prepareLogFile(); err != nil {
|
if err := p.prepareLogFile(false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -123,6 +123,7 @@ type mirrorConfig struct {
|
|||||||
|
|
||||||
Command string `toml:"command"`
|
Command string `toml:"command"`
|
||||||
UseIPv6 bool `toml:"use_ipv6"`
|
UseIPv6 bool `toml:"use_ipv6"`
|
||||||
|
UseIPv4 bool `toml:"use_ipv4"`
|
||||||
ExcludeFile string `toml:"exclude_file"`
|
ExcludeFile string `toml:"exclude_file"`
|
||||||
Username string `toml:"username"`
|
Username string `toml:"username"`
|
||||||
Password string `toml:"password"`
|
Password string `toml:"password"`
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
|
|||||||
|
|
||||||
func (d *dockerHook) preExec() error {
|
func (d *dockerHook) preExec() error {
|
||||||
p := d.provider
|
p := d.provider
|
||||||
|
logDir := p.LogDir()
|
||||||
logFile := p.LogFile()
|
logFile := p.LogFile()
|
||||||
workingDir := p.WorkingDir()
|
workingDir := p.WorkingDir()
|
||||||
|
|
||||||
@@ -42,17 +43,13 @@ func (d *dockerHook) preExec() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logFileNew := "/log_latest"
|
|
||||||
workingDirNew := "/data"
|
|
||||||
|
|
||||||
// Override workingDir
|
// Override workingDir
|
||||||
ctx := p.EnterContext()
|
ctx := p.EnterContext()
|
||||||
ctx.Set(_WorkingDirKey, workingDirNew)
|
|
||||||
ctx.Set(_LogFileKey+":docker", logFileNew)
|
|
||||||
ctx.Set(
|
ctx.Set(
|
||||||
"volumes", []string{
|
"volumes", []string{
|
||||||
fmt.Sprintf("%s:%s", logFile, logFileNew),
|
fmt.Sprintf("%s:%s", logDir, logDir),
|
||||||
fmt.Sprintf("%s:%s", workingDir, workingDirNew),
|
fmt.Sprintf("%s:%s", logFile, logFile),
|
||||||
|
fmt.Sprintf("%s:%s", workingDir, workingDir),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ func (h *execPostHook) Do() error {
|
|||||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||||
"TUNASYNC_UPSTREAM_URL": p.Upstream(),
|
"TUNASYNC_UPSTREAM_URL": p.Upstream(),
|
||||||
|
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||||
"TUNASYNC_JOB_EXIT_STATUS": exitStatus,
|
"TUNASYNC_JOB_EXIT_STATUS": exitStatus,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
tunasync "github.com/tuna/tunasync/internal"
|
tunasync "github.com/tuna/tunasync/internal"
|
||||||
)
|
)
|
||||||
@@ -14,12 +15,13 @@ import (
|
|||||||
type ctrlAction uint8
|
type ctrlAction uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
jobStart ctrlAction = iota
|
jobStart ctrlAction = iota
|
||||||
jobStop // stop syncing keep the job
|
jobStop // stop syncing keep the job
|
||||||
jobDisable // disable the job (stops goroutine)
|
jobDisable // disable the job (stops goroutine)
|
||||||
jobRestart // restart syncing
|
jobRestart // restart syncing
|
||||||
jobPing // ensure the goroutine is alive
|
jobPing // ensure the goroutine is alive
|
||||||
jobHalt // worker halts
|
jobHalt // worker halts
|
||||||
|
jobForceStart // ignore concurrent limit
|
||||||
)
|
)
|
||||||
|
|
||||||
type jobMessage struct {
|
type jobMessage struct {
|
||||||
@@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
syncDone := make(chan error, 1)
|
syncDone := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
err := provider.Run()
|
err := provider.Run()
|
||||||
if !stopASAP {
|
syncDone <- err
|
||||||
syncDone <- err
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
runJob := func(kill <-chan empty, jobDone chan<- empty) {
|
runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
|
||||||
select {
|
select {
|
||||||
case semaphore <- empty{}:
|
case semaphore <- empty{}:
|
||||||
defer func() { <-semaphore }()
|
defer func() { <-semaphore }()
|
||||||
runJobWrapper(kill, jobDone)
|
runJobWrapper(kill, jobDone)
|
||||||
|
case <-bypassSemaphore:
|
||||||
|
logger.Noticef("Concurrent limit ignored by %s", m.Name())
|
||||||
|
runJobWrapper(kill, jobDone)
|
||||||
case <-kill:
|
case <-kill:
|
||||||
jobDone <- empty{}
|
jobDone <- empty{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bypassSemaphore := make(chan empty, 1)
|
||||||
for {
|
for {
|
||||||
if m.State() == stateReady {
|
if m.State() == stateReady {
|
||||||
kill := make(chan empty)
|
kill := make(chan empty)
|
||||||
jobDone := make(chan empty)
|
jobDone := make(chan empty)
|
||||||
go runJob(kill, jobDone)
|
go runJob(kill, jobDone, bypassSemaphore)
|
||||||
|
|
||||||
_wait_for_job:
|
_wait_for_job:
|
||||||
select {
|
select {
|
||||||
@@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
m.SetState(stateReady)
|
m.SetState(stateReady)
|
||||||
close(kill)
|
close(kill)
|
||||||
<-jobDone
|
<-jobDone
|
||||||
|
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
|
||||||
continue
|
continue
|
||||||
|
case jobForceStart:
|
||||||
|
select { //non-blocking
|
||||||
|
default:
|
||||||
|
case bypassSemaphore <- empty{}:
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
case jobStart:
|
case jobStart:
|
||||||
m.SetState(stateReady)
|
m.SetState(stateReady)
|
||||||
goto _wait_for_job
|
goto _wait_for_job
|
||||||
@@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
|||||||
case jobDisable:
|
case jobDisable:
|
||||||
m.SetState(stateDisabled)
|
m.SetState(stateDisabled)
|
||||||
return nil
|
return nil
|
||||||
|
case jobForceStart:
|
||||||
|
select { //non-blocking
|
||||||
|
default:
|
||||||
|
case bypassSemaphore <- empty{}:
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
case jobRestart:
|
case jobRestart:
|
||||||
m.SetState(stateReady)
|
fallthrough
|
||||||
case jobStart:
|
case jobStart:
|
||||||
m.SetState(stateReady)
|
m.SetState(stateReady)
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
msg = <-managerChan
|
msg = <-managerChan
|
||||||
So(msg.status, ShouldEqual, Syncing)
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStart // should be ignored
|
||||||
|
|
||||||
job.ctrlChan <- jobStop
|
job.ctrlChan <- jobStop
|
||||||
|
|
||||||
msg = <-managerChan
|
msg = <-managerChan
|
||||||
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
|
|||||||
job.ctrlChan <- jobDisable
|
job.ctrlChan <- jobDisable
|
||||||
<-job.disabled
|
<-job.disabled
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("If we restart it", func(ctx C) {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
|
||||||
|
msg := <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobRestart
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Failed)
|
||||||
|
So(msg.msg, ShouldEqual, "killed by manager")
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Success)
|
||||||
|
|
||||||
|
expectedOutput := fmt.Sprintf(
|
||||||
|
"%s\n%s\n",
|
||||||
|
provider.WorkingDir(), provider.WorkingDir(),
|
||||||
|
)
|
||||||
|
|
||||||
|
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("If we disable it", func(ctx C) {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
|
||||||
|
msg := <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Failed)
|
||||||
|
So(msg.msg, ShouldEqual, "killed by manager")
|
||||||
|
|
||||||
|
<-job.disabled
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("If we stop it twice, than start it", func(ctx C) {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
|
||||||
|
msg := <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStop
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Failed)
|
||||||
|
So(msg.msg, ShouldEqual, "killed by manager")
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStop // should be ignored
|
||||||
|
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, PreSyncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Syncing)
|
||||||
|
msg = <-managerChan
|
||||||
|
So(msg.status, ShouldEqual, Success)
|
||||||
|
|
||||||
|
expectedOutput := fmt.Sprintf(
|
||||||
|
"%s\n%s\n",
|
||||||
|
provider.WorkingDir(), provider.WorkingDir(),
|
||||||
|
)
|
||||||
|
|
||||||
|
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||||
|
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentMirrorJobs(t *testing.T) {
|
||||||
|
|
||||||
|
InitLogger(true, true, false)
|
||||||
|
|
||||||
|
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
const CONCURRENT = 5
|
||||||
|
|
||||||
|
var providers [CONCURRENT]*cmdProvider
|
||||||
|
var jobs [CONCURRENT]*mirrorJob
|
||||||
|
for i := 0; i < CONCURRENT; i++ {
|
||||||
|
c := cmdConfig{
|
||||||
|
name: fmt.Sprintf("job-%d", i),
|
||||||
|
upstreamURL: "http://mirrors.tuna.moe/",
|
||||||
|
command: "sleep 2",
|
||||||
|
workingDir: tmpDir,
|
||||||
|
logDir: tmpDir,
|
||||||
|
logFile: "/dev/null",
|
||||||
|
interval: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
providers[i], err = newCmdProvider(c)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
jobs[i] = newMirrorJob(providers[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
managerChan := make(chan jobMessage, 10)
|
||||||
|
semaphore := make(chan empty, CONCURRENT-2)
|
||||||
|
|
||||||
|
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
|
||||||
|
counterEnded := 0
|
||||||
|
counterRunning := 0
|
||||||
|
peakConcurrent = 0
|
||||||
|
counterFailed = 0
|
||||||
|
for counterEnded < totalJobs {
|
||||||
|
msg := <-managerChan
|
||||||
|
switch msg.status {
|
||||||
|
case PreSyncing:
|
||||||
|
counterRunning++
|
||||||
|
case Syncing:
|
||||||
|
case Failed:
|
||||||
|
counterFailed++
|
||||||
|
fallthrough
|
||||||
|
case Success:
|
||||||
|
counterEnded++
|
||||||
|
counterRunning--
|
||||||
|
default:
|
||||||
|
So(0, ShouldEqual, 1)
|
||||||
|
}
|
||||||
|
// Test if semaphore works
|
||||||
|
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
|
||||||
|
if counterRunning > peakConcurrent {
|
||||||
|
peakConcurrent = counterRunning
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// select {
|
||||||
|
// case msg := <-managerChan:
|
||||||
|
// logger.Errorf("extra message received: %v", msg)
|
||||||
|
// So(0, ShouldEqual, 1)
|
||||||
|
// case <-time.After(2 * time.Second):
|
||||||
|
// }
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
Convey("When we run them all", func(ctx C) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
}
|
||||||
|
|
||||||
|
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||||
|
|
||||||
|
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||||
|
So(counterFailed, ShouldEqual, 0)
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
}
|
||||||
|
})
|
||||||
|
Convey("If we cancel one job", func(ctx C) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobRestart
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the one waiting for semaphore
|
||||||
|
jobs[len(jobs)-1].ctrlChan <- jobStop
|
||||||
|
|
||||||
|
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
|
||||||
|
|
||||||
|
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||||
|
So(counterFailed, ShouldEqual, 0)
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
}
|
||||||
|
})
|
||||||
|
Convey("If we override the concurrent limit", func(ctx C) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
go job.Run(managerChan, semaphore)
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs[len(jobs)-1].ctrlChan <- jobForceStart
|
||||||
|
jobs[len(jobs)-2].ctrlChan <- jobForceStart
|
||||||
|
|
||||||
|
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
|
||||||
|
|
||||||
|
So(peakConcurrent, ShouldEqual, CONCURRENT)
|
||||||
|
So(counterFailed, ShouldEqual, 0)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// fmt.Println("Restart them")
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
}
|
||||||
|
|
||||||
|
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||||
|
|
||||||
|
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||||
|
So(counterFailed, ShouldEqual, 0)
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
job.ctrlChan <- jobDisable
|
||||||
|
<-job.disabled
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -130,6 +130,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
|||||||
logDir: logDir,
|
logDir: logDir,
|
||||||
logFile: filepath.Join(logDir, "latest.log"),
|
logFile: filepath.Join(logDir, "latest.log"),
|
||||||
useIPv6: mirror.UseIPv6,
|
useIPv6: mirror.UseIPv6,
|
||||||
|
useIPv4: mirror.UseIPv4,
|
||||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||||
}
|
}
|
||||||
p, err := newRsyncProvider(rc)
|
p, err := newRsyncProvider(rc)
|
||||||
|
|||||||
@@ -79,11 +79,12 @@ exit 0
|
|||||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||||
expectedOutput := fmt.Sprintf(
|
expectedOutput := fmt.Sprintf(
|
||||||
"syncing to %s\n"+
|
"syncing to %s\n"+
|
||||||
"%s\n"+
|
"%s\n"+
|
||||||
"Done\n",
|
"Done\n",
|
||||||
provider.WorkingDir(),
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
@@ -144,11 +145,12 @@ exit 0
|
|||||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||||
expectedOutput := fmt.Sprintf(
|
expectedOutput := fmt.Sprintf(
|
||||||
"syncing to %s\n"+
|
"syncing to %s\n"+
|
||||||
"%s\n"+
|
"%s\n"+
|
||||||
"Done\n",
|
"Done\n",
|
||||||
provider.WorkingDir(),
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
@@ -260,6 +262,40 @@ sleep 5
|
|||||||
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
Convey("Command Provider without log file should work", t, func(ctx C) {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
c := cmdConfig{
|
||||||
|
name: "run-ls",
|
||||||
|
upstreamURL: "http://mirrors.tuna.moe/",
|
||||||
|
command: "ls",
|
||||||
|
workingDir: tmpDir,
|
||||||
|
logDir: tmpDir,
|
||||||
|
logFile: "/dev/null",
|
||||||
|
interval: 600 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
provider, err := newCmdProvider(c)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
So(provider.IsMaster(), ShouldEqual, false)
|
||||||
|
So(provider.ZFS(), ShouldBeNil)
|
||||||
|
So(provider.Type(), ShouldEqual, provCommand)
|
||||||
|
So(provider.Name(), ShouldEqual, c.name)
|
||||||
|
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
|
||||||
|
So(provider.LogDir(), ShouldEqual, c.logDir)
|
||||||
|
So(provider.LogFile(), ShouldEqual, c.logFile)
|
||||||
|
So(provider.Interval(), ShouldEqual, c.interval)
|
||||||
|
|
||||||
|
Convey("Run the command", func() {
|
||||||
|
|
||||||
|
err = provider.Run()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTwoStageRsyncProvider(t *testing.T) {
|
func TestTwoStageRsyncProvider(t *testing.T) {
|
||||||
@@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
|
|||||||
logFile: tmpFile,
|
logFile: tmpFile,
|
||||||
useIPv6: true,
|
useIPv6: true,
|
||||||
excludeFile: tmpFile,
|
excludeFile: tmpFile,
|
||||||
|
username: "hello",
|
||||||
|
password: "world",
|
||||||
}
|
}
|
||||||
|
|
||||||
provider, err := newTwoStageRsyncProvider(c)
|
provider, err := newTwoStageRsyncProvider(c)
|
||||||
@@ -306,6 +344,7 @@ exit 0
|
|||||||
err = provider.Run()
|
err = provider.Run()
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||||
expectedOutput := fmt.Sprintf(
|
expectedOutput := fmt.Sprintf(
|
||||||
"syncing to %s\n"+
|
"syncing to %s\n"+
|
||||||
"%s\n"+
|
"%s\n"+
|
||||||
@@ -313,14 +352,14 @@ exit 0
|
|||||||
"syncing to %s\n"+
|
"syncing to %s\n"+
|
||||||
"%s\n"+
|
"%s\n"+
|
||||||
"Done\n",
|
"Done\n",
|
||||||
provider.WorkingDir(),
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
||||||
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
|
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
|
||||||
"--exclude-from %s %s %s",
|
"--exclude-from %s %s %s",
|
||||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||||
),
|
),
|
||||||
provider.WorkingDir(),
|
targetDir,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||||
"--delete --delete-after --delay-updates --safe-links "+
|
"--delete --delete-after --delay-updates --safe-links "+
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type rsyncConfig struct {
|
|||||||
rsyncCmd string
|
rsyncCmd string
|
||||||
upstreamURL, username, password, excludeFile string
|
upstreamURL, username, password, excludeFile string
|
||||||
workingDir, logDir, logFile string
|
workingDir, logDir, logFile string
|
||||||
useIPv6 bool
|
useIPv6, useIPv4 bool
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,6 +49,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
|||||||
|
|
||||||
if c.useIPv6 {
|
if c.useIPv6 {
|
||||||
options = append(options, "-6")
|
options = append(options, "-6")
|
||||||
|
} else if c.useIPv4 {
|
||||||
|
options = append(options, "-4")
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.excludeFile != "" {
|
if c.excludeFile != "" {
|
||||||
@@ -79,6 +81,12 @@ func (p *rsyncProvider) Run() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *rsyncProvider) Start() error {
|
func (p *rsyncProvider) Start() error {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
if p.IsRunning() {
|
||||||
|
return errors.New("provider is currently running")
|
||||||
|
}
|
||||||
|
|
||||||
env := map[string]string{}
|
env := map[string]string{}
|
||||||
if p.username != "" {
|
if p.username != "" {
|
||||||
@@ -92,7 +100,7 @@ func (p *rsyncProvider) Start() error {
|
|||||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||||
|
|
||||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||||
if err := p.prepareLogFile(); err != nil {
|
if err := p.prepareLogFile(false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -41,13 +41,17 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
|
|||||||
"--name", d.Name(),
|
"--name", d.Name(),
|
||||||
"-w", workingDir,
|
"-w", workingDir,
|
||||||
}
|
}
|
||||||
|
// specify user
|
||||||
|
args = append(
|
||||||
|
args, "-u",
|
||||||
|
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
|
||||||
|
)
|
||||||
// add volumes
|
// add volumes
|
||||||
for _, vol := range d.Volumes() {
|
for _, vol := range d.Volumes() {
|
||||||
logger.Debugf("volume: %s", vol)
|
logger.Debugf("volume: %s", vol)
|
||||||
args = append(args, "-v", vol)
|
args = append(args, "-v", vol)
|
||||||
}
|
}
|
||||||
// set env
|
// set env
|
||||||
env["TUNASYNC_LOG_FILE"] = d.LogFile()
|
|
||||||
for k, v := range env {
|
for k, v := range env {
|
||||||
kv := fmt.Sprintf("%s=%s", k, v)
|
kv := fmt.Sprintf("%s=%s", k, v)
|
||||||
args = append(args, "-e", kv)
|
args = append(args, "-e", kv)
|
||||||
@@ -114,6 +118,9 @@ func (c *cmdJob) Wait() error {
|
|||||||
return c.retErr
|
return c.retErr
|
||||||
default:
|
default:
|
||||||
err := c.cmd.Wait()
|
err := c.cmd.Wait()
|
||||||
|
if c.cmd.Stdout != nil {
|
||||||
|
c.cmd.Stdout.(*os.File).Close()
|
||||||
|
}
|
||||||
c.retErr = err
|
c.retErr = err
|
||||||
close(c.finished)
|
close(c.finished)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *twoStageRsyncProvider) Run() error {
|
func (p *twoStageRsyncProvider) Run() error {
|
||||||
defer p.Wait()
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
if p.IsRunning() {
|
||||||
|
return errors.New("provider is currently running")
|
||||||
|
}
|
||||||
|
|
||||||
env := map[string]string{}
|
env := map[string]string{}
|
||||||
if p.username != "" {
|
if p.username != "" {
|
||||||
@@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error {
|
|||||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||||
|
|
||||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||||
if err := p.prepareLogFile(); err != nil {
|
if err := p.prepareLogFile(stage > 1); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.isRunning.Store(true)
|
p.isRunning.Store(true)
|
||||||
|
logger.Debugf("set isRunning to true: %s", p.Name())
|
||||||
|
|
||||||
err = p.cmd.Wait()
|
p.Unlock()
|
||||||
p.isRunning.Store(false)
|
err = p.Wait()
|
||||||
|
p.Lock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
|
|||||||
}
|
}
|
||||||
switch cmd.Cmd {
|
switch cmd.Cmd {
|
||||||
case CmdStart:
|
case CmdStart:
|
||||||
job.ctrlChan <- jobStart
|
if cmd.Options["force"] {
|
||||||
|
job.ctrlChan <- jobForceStart
|
||||||
|
} else {
|
||||||
|
job.ctrlChan <- jobStart
|
||||||
|
}
|
||||||
case CmdRestart:
|
case CmdRestart:
|
||||||
job.ctrlChan <- jobRestart
|
job.ctrlChan <- jobRestart
|
||||||
case CmdStop:
|
case CmdStop:
|
||||||
|
|||||||
在新工单中引用
屏蔽一个用户