镜像自地址
https://github.com/tuna/tunasync.git
已同步 2025-12-06 22:46:47 +00:00
比较提交
50 次代码提交
travis-upl
...
v0.3.2
| 作者 | SHA1 | 提交日期 | |
|---|---|---|---|
|
|
23bf4890cc | ||
|
|
2f6a61aee5 | ||
|
|
b6043142e1 | ||
|
|
6241576b12 | ||
|
|
ef78563b8c | ||
|
|
ca106f1360 | ||
|
|
628266ac5a | ||
|
|
7e601d9fff | ||
|
|
c750aa1871 | ||
|
|
6cbe91b4f1 | ||
|
|
89a792986d | ||
|
|
0fdb07d061 | ||
|
|
c5bb172f99 | ||
|
|
79e6167028 | ||
|
|
285ffb2f2f | ||
|
|
95bb4bbd5e | ||
|
|
6bca9d2cd5 | ||
|
|
4fe7d03e54 | ||
|
|
1fe9499728 | ||
|
|
a475b044c6 | ||
|
|
a50a360a91 | ||
|
|
d536aca2ac | ||
|
|
28545d61e7 | ||
|
|
a87fb0f8b4 | ||
|
|
095e7c6320 | ||
|
|
7b441312f4 | ||
|
|
93194cde2e | ||
|
|
aa4c31a32b | ||
|
|
4c6a407c17 | ||
|
|
939abaef9b | ||
|
|
d5a438462f | ||
|
|
d4e07a7b29 | ||
|
|
9ac3193d50 | ||
|
|
9ffb101cc7 | ||
|
|
fd277388d5 | ||
|
|
c5cba66786 | ||
|
|
97e9725774 | ||
|
|
54740388b3 | ||
|
|
7601e5793f | ||
|
|
9645fd44ec | ||
|
|
ebd462be36 | ||
|
|
21c832c8fb | ||
|
|
81a15e7dd1 | ||
|
|
3f31e83c14 | ||
|
|
a0b8ef08ab | ||
|
|
86153c59e3 | ||
|
|
96f9db8bb8 | ||
|
|
6dd06c954c | ||
|
|
03d22b7683 | ||
|
|
e9a7fc2de2 |
12
.travis.yml
12
.travis.yml
@@ -1,6 +1,8 @@
|
||||
sudo: required
|
||||
|
||||
language: go
|
||||
go:
|
||||
- 1.6
|
||||
- 1.8
|
||||
|
||||
before_install:
|
||||
- sudo apt-get install cgroup-bin
|
||||
@@ -11,8 +13,14 @@ before_install:
|
||||
os:
|
||||
- linux
|
||||
|
||||
services:
|
||||
- docker
|
||||
|
||||
before_script:
|
||||
- sudo cgcreate -t travis -a travis -g memory:tunasync
|
||||
- lssubsys -am
|
||||
- sudo cgcreate -a $USER -t $USER -g cpu:tunasync
|
||||
- sudo cgcreate -a $USER -t $USER -g memory:tunasync
|
||||
- docker pull alpine
|
||||
|
||||
script:
|
||||
- ./.testandcover.bash
|
||||
|
||||
@@ -10,13 +10,16 @@ tunasync
|
||||
|
||||
- [中文文档](https://github.com/tuna/tunasync/blob/master/docs/zh_CN/get_started.md)
|
||||
|
||||
## Download
|
||||
|
||||
Pre-built binary for Linux x86_64 is available at [Github releases](https://github.com/tuna/tunasync/releases/latest).
|
||||
|
||||
## Design
|
||||
|
||||
```
|
||||
# Architecture
|
||||
|
||||
- Manager: Centural instance on status and job management
|
||||
- Manager: Central instance for status and job management
|
||||
- Worker: Runs mirror jobs
|
||||
|
||||
+------------+ +---+ +---+
|
||||
@@ -44,12 +47,12 @@ PreSyncing Syncing Success
|
||||
| |
|
||||
| +-----------------+ | Failed
|
||||
+------+ post-fail |<---------+
|
||||
+-----------------+
|
||||
+-----------------+
|
||||
```
|
||||
|
||||
## Generate Self-Signed Certificate
|
||||
|
||||
Fisrt, create root CA
|
||||
First, create root CA
|
||||
|
||||
```
|
||||
openssl genrsa -out rootCA.key 2048
|
||||
|
||||
@@ -134,7 +134,7 @@ func main() {
|
||||
app.Name = "tunasync"
|
||||
app.Usage = "tunasync mirror job management tool"
|
||||
app.EnableBashCompletion = true
|
||||
app.Version = "0.1"
|
||||
app.Version = tunasync.Version
|
||||
app.Commands = []cli.Command{
|
||||
{
|
||||
Name: "manager",
|
||||
|
||||
@@ -99,8 +99,11 @@ func initialize(c *cli.Context) error {
|
||||
}
|
||||
|
||||
// parse base url of the manager server
|
||||
baseURL = fmt.Sprintf("https://%s:%d",
|
||||
cfg.ManagerAddr, cfg.ManagerPort)
|
||||
if cfg.CACert != "" {
|
||||
baseURL = fmt.Sprintf("https://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
|
||||
} else {
|
||||
baseURL = fmt.Sprintf("http://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
|
||||
}
|
||||
|
||||
logger.Infof("Use manager address: %s", baseURL)
|
||||
|
||||
@@ -137,9 +140,9 @@ func listWorkers(c *cli.Context) error {
|
||||
}
|
||||
|
||||
func listJobs(c *cli.Context) error {
|
||||
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl
|
||||
var jobs []tunasync.MirrorStatus
|
||||
var genericJobs interface{}
|
||||
if c.Bool("all") {
|
||||
var jobs []tunasync.WebMirrorStatus
|
||||
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
@@ -147,8 +150,10 @@ func listJobs(c *cli.Context) error {
|
||||
"of all jobs from manager server: %s", err.Error()),
|
||||
1)
|
||||
}
|
||||
genericJobs = jobs
|
||||
|
||||
} else {
|
||||
var jobs []tunasync.MirrorStatus
|
||||
args := c.Args()
|
||||
if len(args) == 0 {
|
||||
return cli.NewExitError(
|
||||
@@ -171,9 +176,10 @@ func listJobs(c *cli.Context) error {
|
||||
for range args {
|
||||
jobs = append(jobs, <-ans...)
|
||||
}
|
||||
genericJobs = jobs
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(jobs, "", " ")
|
||||
b, err := json.MarshalIndent(genericJobs, "", " ")
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Error printing out informations: %s", err.Error()),
|
||||
@@ -183,6 +189,103 @@ func listJobs(c *cli.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateMirrorSize(c *cli.Context) error {
|
||||
args := c.Args()
|
||||
if len(args) != 2 {
|
||||
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
|
||||
}
|
||||
workerID := c.String("worker")
|
||||
mirrorID := args.Get(0)
|
||||
mirrorSize := args.Get(1)
|
||||
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{
|
||||
Name: mirrorID,
|
||||
Size: mirrorSize,
|
||||
}
|
||||
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
|
||||
)
|
||||
|
||||
resp, err := tunasync.PostJSON(url, msg, client)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to send request to manager: %s",
|
||||
err.Error()),
|
||||
1)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
|
||||
)
|
||||
}
|
||||
|
||||
var status tunasync.MirrorStatus
|
||||
json.Unmarshal(body, &status)
|
||||
if status.Size != mirrorSize {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf(
|
||||
"Mirror size error, expecting %s, manager returned %s",
|
||||
mirrorSize, status.Size,
|
||||
), 1,
|
||||
)
|
||||
}
|
||||
|
||||
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeWorker(c *cli.Context) error {
|
||||
args := c.Args()
|
||||
if len(args) != 0 {
|
||||
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
|
||||
}
|
||||
workerID := c.String("worker")
|
||||
if len(workerID) == 0 {
|
||||
return cli.NewExitError("Please specify the <worker-id>", 1)
|
||||
}
|
||||
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
|
||||
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if err != nil {
|
||||
logger.Panicf("Invalid HTTP Request: %s", err.Error())
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return cli.NewExitError(
|
||||
fmt.Sprintf("Failed to parse response: %s", err.Error()),
|
||||
1)
|
||||
}
|
||||
|
||||
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
|
||||
" command: HTTP status code is not 200: %s", body),
|
||||
1)
|
||||
}
|
||||
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
if res["message"] == "deleted" {
|
||||
logger.Info("Successfully removed the worker")
|
||||
} else {
|
||||
logger.Info("Failed to remove the worker")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushDisabledJobs(c *cli.Context) error {
|
||||
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
|
||||
if err != nil {
|
||||
@@ -232,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
|
||||
"argument WORKER", 1)
|
||||
}
|
||||
|
||||
options := map[string]bool{}
|
||||
if c.Bool("force") {
|
||||
options["force"] = true
|
||||
}
|
||||
cmd := tunasync.ClientCmd{
|
||||
Cmd: cmd,
|
||||
MirrorID: mirrorID,
|
||||
WorkerID: c.String("worker"),
|
||||
Args: argsList,
|
||||
Options: options,
|
||||
}
|
||||
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
|
||||
if err != nil {
|
||||
@@ -322,7 +430,7 @@ func main() {
|
||||
|
||||
app := cli.NewApp()
|
||||
app.EnableBashCompletion = true
|
||||
app.Version = "0.1"
|
||||
app.Version = tunasync.Version
|
||||
app.Name = "tunasynctl"
|
||||
app.Usage = "control client for tunasync manager"
|
||||
|
||||
@@ -357,6 +465,11 @@ func main() {
|
||||
},
|
||||
}
|
||||
|
||||
forceStartFlag := cli.BoolFlag{
|
||||
Name: "force, f",
|
||||
Usage: "Override the concurrent limit",
|
||||
}
|
||||
|
||||
app.Commands = []cli.Command{
|
||||
{
|
||||
Name: "list",
|
||||
@@ -382,10 +495,34 @@ func main() {
|
||||
Flags: commonFlags,
|
||||
Action: initializeWrapper(listWorkers),
|
||||
},
|
||||
{
|
||||
Name: "rm-worker",
|
||||
Usage: "Remove a worker",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
cli.StringFlag{
|
||||
Name: "worker, w",
|
||||
Usage: "worker-id of the worker to be removed",
|
||||
},
|
||||
),
|
||||
Action: initializeWrapper(removeWorker),
|
||||
},
|
||||
{
|
||||
Name: "set-size",
|
||||
Usage: "Set mirror size",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
cli.StringFlag{
|
||||
Name: "worker, w",
|
||||
Usage: "specify worker-id of the mirror job",
|
||||
},
|
||||
),
|
||||
Action: initializeWrapper(updateMirrorSize),
|
||||
},
|
||||
{
|
||||
Name: "start",
|
||||
Usage: "Start a job",
|
||||
Flags: append(commonFlags, cmdFlags...),
|
||||
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
|
||||
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
|
||||
},
|
||||
{
|
||||
|
||||
@@ -15,7 +15,7 @@ date: 2016-10-31 00:50:00
|
||||
|
||||
### 二进制包
|
||||
|
||||
TODO
|
||||
到 [Github Releases](https://github.com/tuna/tunasync/releases/latest) 下载 `tunasync-linux-bin.tar.gz` 即可。
|
||||
|
||||
### 自行编译
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// A StatusUpdateMsg represents a msg when
|
||||
// A MirrorStatus represents a msg when
|
||||
// a worker has done syncing
|
||||
type MirrorStatus struct {
|
||||
Name string `json:"name"`
|
||||
@@ -13,6 +13,7 @@ type MirrorStatus struct {
|
||||
IsMaster bool `json:"is_master"`
|
||||
Status SyncStatus `json:"status"`
|
||||
LastUpdate time.Time `json:"last_update"`
|
||||
LastEnded time.Time `json:"last_ended"`
|
||||
Upstream string `json:"upstream"`
|
||||
Size string `json:"size"`
|
||||
ErrorMsg string `json:"error_msg"`
|
||||
@@ -67,9 +68,10 @@ func (c CmdVerb) String() string {
|
||||
// A WorkerCmd is the command message send from the
|
||||
// manager to a worker
|
||||
type WorkerCmd struct {
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
Args []string `json:"args"`
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
Args []string `json:"args"`
|
||||
Options map[string]bool `json:"options"`
|
||||
}
|
||||
|
||||
func (c WorkerCmd) String() string {
|
||||
@@ -82,8 +84,9 @@ func (c WorkerCmd) String() string {
|
||||
// A ClientCmd is the command message send from client
|
||||
// to the manager
|
||||
type ClientCmd struct {
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
WorkerID string `json:"worker_id"`
|
||||
Args []string `json:"args"`
|
||||
Cmd CmdVerb `json:"cmd"`
|
||||
MirrorID string `json:"mirror_id"`
|
||||
WorkerID string `json:"worker_id"`
|
||||
Args []string `json:"args"`
|
||||
Options map[string]bool `json:"options"`
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package manager
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
. "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
|
||||
type textTime struct {
|
||||
@@ -38,24 +36,28 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// webMirrorStatus is the mirror status to be shown in the web page
|
||||
type webMirrorStatus struct {
|
||||
// WebMirrorStatus is the mirror status to be shown in the web page
|
||||
type WebMirrorStatus struct {
|
||||
Name string `json:"name"`
|
||||
IsMaster bool `json:"is_master"`
|
||||
Status SyncStatus `json:"status"`
|
||||
LastUpdate textTime `json:"last_update"`
|
||||
LastUpdateTs stampTime `json:"last_update_ts"`
|
||||
LastEnded textTime `json:"last_ended"`
|
||||
LastEndedTs stampTime `json:"last_ended_ts"`
|
||||
Upstream string `json:"upstream"`
|
||||
Size string `json:"size"` // approximate size
|
||||
}
|
||||
|
||||
func convertMirrorStatus(m MirrorStatus) webMirrorStatus {
|
||||
return webMirrorStatus{
|
||||
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
|
||||
return WebMirrorStatus{
|
||||
Name: m.Name,
|
||||
IsMaster: m.IsMaster,
|
||||
Status: m.Status,
|
||||
LastUpdate: textTime{m.LastUpdate},
|
||||
LastUpdateTs: stampTime{m.LastUpdate},
|
||||
LastEnded: textTime{m.LastEnded},
|
||||
LastEndedTs: stampTime{m.LastEnded},
|
||||
Upstream: m.Upstream,
|
||||
Size: m.Size,
|
||||
}
|
||||
76
internal/status_web_test.go
普通文件
76
internal/status_web_test.go
普通文件
@@ -0,0 +1,76 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestStatus(t *testing.T) {
|
||||
Convey("status json ser-de should work", t, func() {
|
||||
tz := "Asia/Tokyo"
|
||||
loc, err := time.LoadLocation(tz)
|
||||
So(err, ShouldBeNil)
|
||||
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
||||
m := WebMirrorStatus{
|
||||
Name: "tunalinux",
|
||||
Status: Success,
|
||||
LastUpdate: textTime{t},
|
||||
LastUpdateTs: stampTime{t},
|
||||
LastEnded: textTime{t},
|
||||
LastEndedTs: stampTime{t},
|
||||
Size: "5GB",
|
||||
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
So(err, ShouldBeNil)
|
||||
//fmt.Println(string(b))
|
||||
var m2 WebMirrorStatus
|
||||
err = json.Unmarshal(b, &m2)
|
||||
So(err, ShouldBeNil)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
Convey("BuildWebMirrorStatus should work", t, func() {
|
||||
m := MirrorStatus{
|
||||
Name: "arch-sync3",
|
||||
Worker: "testWorker",
|
||||
IsMaster: true,
|
||||
Status: Failed,
|
||||
LastUpdate: time.Now().Add(-time.Minute * 30),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
}
|
||||
|
||||
var m2 WebMirrorStatus
|
||||
m2 = BuildWebMirrorStatus(m)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
|
||||
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
}
|
||||
3
internal/version.go
普通文件
3
internal/version.go
普通文件
@@ -0,0 +1,3 @@
|
||||
package internal
|
||||
|
||||
const Version string = "0.3.2"
|
||||
@@ -14,6 +14,7 @@ type dbAdapter interface {
|
||||
Init() error
|
||||
ListWorkers() ([]WorkerStatus, error)
|
||||
GetWorker(workerID string) (WorkerStatus, error)
|
||||
DeleteWorker(workerID string) error
|
||||
CreateWorker(w WorkerStatus) (WorkerStatus, error)
|
||||
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
|
||||
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
|
||||
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
|
||||
err = b.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||
v := bucket.Get([]byte(workerID))
|
||||
if v == nil {
|
||||
return fmt.Errorf("invalid workerID %s", workerID)
|
||||
}
|
||||
err := bucket.Delete([]byte(workerID))
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(_workerBucketKey))
|
||||
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
|
||||
bucket := tx.Bucket([]byte(_statusBucketKey))
|
||||
v := bucket.Get([]byte(id))
|
||||
if v == nil {
|
||||
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
|
||||
return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
|
||||
}
|
||||
err := json.Unmarshal(v, &m)
|
||||
return err
|
||||
@@ -182,7 +196,7 @@ func (b *boltAdapter) FlushDisabledJobs() (err error) {
|
||||
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
|
||||
continue
|
||||
}
|
||||
if m.Status == Disabled {
|
||||
if m.Status == Disabled || len(m.Name) == 0 {
|
||||
err = c.Delete()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
Convey("get exists worker", func() {
|
||||
Convey("get existent worker", func() {
|
||||
_, err := boltDB.GetWorker(testWorkerIDs[0])
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("list exist worker", func() {
|
||||
Convey("list existent workers", func() {
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("get inexist worker", func() {
|
||||
Convey("get non-existent worker", func() {
|
||||
_, err := boltDB.GetWorker("invalid workerID")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("delete existent worker", func() {
|
||||
err := boltDB.DeleteWorker(testWorkerIDs[0])
|
||||
So(err, ShouldBeNil)
|
||||
_, err = boltDB.GetWorker(testWorkerIDs[0])
|
||||
So(err, ShouldNotBeNil)
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 1)
|
||||
})
|
||||
|
||||
Convey("delete non-existent worker", func() {
|
||||
err := boltDB.DeleteWorker("invalid workerID")
|
||||
So(err, ShouldNotBeNil)
|
||||
ws, err := boltDB.ListWorkers()
|
||||
So(err, ShouldBeNil)
|
||||
So(len(ws), ShouldEqual, 2)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("update mirror status", func() {
|
||||
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "3GB",
|
||||
},
|
||||
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
|
||||
Worker: testWorkerIDs[1],
|
||||
IsMaster: true,
|
||||
Status: Disabled,
|
||||
LastUpdate: time.Now(),
|
||||
LastUpdate: time.Now().Add(-time.Hour),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
},
|
||||
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
|
||||
Worker: testWorkerIDs[1],
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastUpdate: time.Now().Add(-time.Second),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
},
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
@@ -83,10 +84,13 @@ func GetTUNASyncManager(cfg *Config) *Manager {
|
||||
// workerID should be valid in this route group
|
||||
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
|
||||
{
|
||||
// delete specified worker
|
||||
workerValidateGroup.DELETE(":id", s.deleteWorker)
|
||||
// get job list
|
||||
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
|
||||
// post job status
|
||||
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
|
||||
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
|
||||
}
|
||||
|
||||
// for tunasynctl to post commands
|
||||
@@ -133,11 +137,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
webMirStatusList := []webMirrorStatus{}
|
||||
webMirStatusList := []WebMirrorStatus{}
|
||||
for _, m := range mirrorStatusList {
|
||||
webMirStatusList = append(
|
||||
webMirStatusList,
|
||||
convertMirrorStatus(m),
|
||||
BuildWebMirrorStatus(m),
|
||||
)
|
||||
}
|
||||
c.JSON(http.StatusOK, webMirStatusList)
|
||||
@@ -157,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
|
||||
}
|
||||
|
||||
// deleteWorker deletes one worker by id
|
||||
func (s *Manager) deleteWorker(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
err := s.adapter.DeleteWorker(workerID)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to delete worker: %s",
|
||||
err.Error(),
|
||||
)
|
||||
c.Error(err)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
logger.Noticef("Worker <%s> deleted", workerID)
|
||||
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
|
||||
}
|
||||
|
||||
// listWrokers respond with informations of all the workers
|
||||
func (s *Manager) listWorkers(c *gin.Context) {
|
||||
var workerInfos []WorkerStatus
|
||||
@@ -225,6 +245,12 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
var status MirrorStatus
|
||||
c.BindJSON(&status)
|
||||
mirrorName := status.Name
|
||||
if len(mirrorName) == 0 {
|
||||
s.returnErrJSON(
|
||||
c, http.StatusBadRequest,
|
||||
errors.New("Mirror Name should not be empty"),
|
||||
)
|
||||
}
|
||||
|
||||
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||
|
||||
@@ -234,21 +260,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
} else {
|
||||
status.LastUpdate = curStatus.LastUpdate
|
||||
}
|
||||
if status.Status == Success || status.Status == Failed {
|
||||
status.LastEnded = time.Now()
|
||||
} else {
|
||||
status.LastEnded = curStatus.LastEnded
|
||||
}
|
||||
|
||||
// Only message with meaningful size updates the mirror size
|
||||
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
|
||||
if len(status.Size) == 0 || status.Size == "unknown" {
|
||||
status.Size = curStatus.Size
|
||||
}
|
||||
}
|
||||
|
||||
// for logging
|
||||
switch status.Status {
|
||||
case Success:
|
||||
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
|
||||
case Failed:
|
||||
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
|
||||
case Syncing:
|
||||
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
|
||||
case Disabled:
|
||||
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
|
||||
case Paused:
|
||||
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
|
||||
default:
|
||||
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status)
|
||||
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
|
||||
}
|
||||
|
||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||
@@ -263,6 +293,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, newStatus)
|
||||
}
|
||||
|
||||
func (s *Manager) updateMirrorSize(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
type SizeMsg struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}
|
||||
var msg SizeMsg
|
||||
c.BindJSON(&msg)
|
||||
|
||||
mirrorName := msg.Name
|
||||
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
|
||||
if err != nil {
|
||||
logger.Errorf(
|
||||
"Failed to get status of mirror %s @<%s>: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Only message with meaningful size updates the mirror size
|
||||
if len(msg.Size) > 0 || msg.Size != "unknown" {
|
||||
status.Size = msg.Size
|
||||
}
|
||||
|
||||
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
|
||||
|
||||
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to update job %s of worker %s: %s",
|
||||
mirrorName, workerID, err.Error(),
|
||||
)
|
||||
c.Error(err)
|
||||
s.returnErrJSON(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, newStatus)
|
||||
}
|
||||
|
||||
func (s *Manager) handleClientCmd(c *gin.Context) {
|
||||
var clientCmd ClientCmd
|
||||
c.BindJSON(&clientCmd)
|
||||
@@ -286,6 +355,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
|
||||
Cmd: clientCmd.Cmd,
|
||||
MirrorID: clientCmd.MirrorID,
|
||||
Args: clientCmd.Args,
|
||||
Options: clientCmd.Options,
|
||||
}
|
||||
|
||||
// update job status, even if the job did not disable successfully,
|
||||
|
||||
@@ -21,9 +21,16 @@ const (
|
||||
)
|
||||
|
||||
func TestHTTPServer(t *testing.T) {
|
||||
var listenPort = 5000
|
||||
Convey("HTTP server should work", t, func(ctx C) {
|
||||
listenPort++
|
||||
port := listenPort
|
||||
addr := "127.0.0.1"
|
||||
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
|
||||
InitLogger(true, true, false)
|
||||
s := GetTUNASyncManager(&Config{Debug: false})
|
||||
s := GetTUNASyncManager(&Config{Debug: true})
|
||||
s.cfg.Server.Addr = addr
|
||||
s.cfg.Server.Port = port
|
||||
So(s, ShouldNotBeNil)
|
||||
s.setDBAdapter(&mockDBAdapter{
|
||||
workerStore: map[string]WorkerStatus{
|
||||
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
|
||||
}},
|
||||
statusStore: make(map[string]MirrorStatus),
|
||||
})
|
||||
port := rand.Intn(10000) + 20000
|
||||
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
|
||||
go func() {
|
||||
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
|
||||
}()
|
||||
time.Sleep(50 * time.Microsecond)
|
||||
go s.Run()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
resp, err := http.Get(baseURL + "/ping")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
@@ -79,6 +82,33 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(len(actualResponseObj), ShouldEqual, 2)
|
||||
})
|
||||
|
||||
Convey("delete an existent worker", func(ctx C) {
|
||||
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
|
||||
So(err, ShouldBeNil)
|
||||
clt := &http.Client{}
|
||||
resp, err := clt.Do(req)
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
So(err, ShouldBeNil)
|
||||
So(res[_infoKey], ShouldEqual, "deleted")
|
||||
})
|
||||
|
||||
Convey("delete non-existent worker", func(ctx C) {
|
||||
invalidWorker := "test_worker233"
|
||||
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
|
||||
So(err, ShouldBeNil)
|
||||
clt := &http.Client{}
|
||||
resp, err := clt.Do(req)
|
||||
So(err, ShouldBeNil)
|
||||
defer resp.Body.Close()
|
||||
res := map[string]string{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||
So(err, ShouldBeNil)
|
||||
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
|
||||
})
|
||||
|
||||
Convey("flush disabled jobs", func(ctx C) {
|
||||
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
|
||||
So(err, ShouldBeNil)
|
||||
@@ -99,7 +129,7 @@ func TestHTTPServer(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "3GB",
|
||||
Size: "unknown",
|
||||
}
|
||||
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||
defer resp.Body.Close()
|
||||
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
|
||||
})
|
||||
|
||||
Convey("list all job status of all workers", func(ctx C) {
|
||||
var ms []webMirrorStatus
|
||||
var ms []WebMirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
@@ -137,8 +168,77 @@ func TestHTTPServer(t *testing.T) {
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
|
||||
|
||||
})
|
||||
|
||||
Convey("Update size of a valid mirror", func(ctx C) {
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{status.Name, "5GB"}
|
||||
|
||||
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||
resp, err := PostJSON(url, msg, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("Get new size of a mirror", func(ctx C) {
|
||||
var ms []MirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||
m := ms[0]
|
||||
So(m.Name, ShouldEqual, status.Name)
|
||||
So(m.Worker, ShouldEqual, status.Worker)
|
||||
So(m.Status, ShouldEqual, status.Status)
|
||||
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||
So(m.Size, ShouldEqual, "5GB")
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("Update size of an invalid mirror", func(ctx C) {
|
||||
msg := struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
}{"Invalid mirror", "5GB"}
|
||||
|
||||
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
|
||||
resp, err := PostJSON(url, msg, nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
|
||||
})
|
||||
|
||||
// what if status changed to failed
|
||||
status.Status = Failed
|
||||
time.Sleep(3 * time.Second)
|
||||
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
|
||||
defer resp.Body.Close()
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
|
||||
Convey("What if syncing job failed", func(ctx C) {
|
||||
var ms []MirrorStatus
|
||||
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode, ShouldEqual, http.StatusOK)
|
||||
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
|
||||
m := ms[0]
|
||||
So(m.Name, ShouldEqual, status.Name)
|
||||
So(m.Worker, ShouldEqual, status.Worker)
|
||||
So(m.Status, ShouldEqual, status.Status)
|
||||
So(m.Upstream, ShouldEqual, status.Upstream)
|
||||
So(m.Size, ShouldEqual, status.Size)
|
||||
So(m.IsMaster, ShouldEqual, status.IsMaster)
|
||||
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
|
||||
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("update mirror status of an inexisted worker", func(ctx C) {
|
||||
@@ -149,6 +249,7 @@ func TestHTTPServer(t *testing.T) {
|
||||
IsMaster: true,
|
||||
Status: Success,
|
||||
LastUpdate: time.Now(),
|
||||
LastEnded: time.Now(),
|
||||
Upstream: "mirrors.tuna.tsinghua.edu.cn",
|
||||
Size: "4GB",
|
||||
}
|
||||
@@ -252,6 +353,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
|
||||
delete(b.workerStore, workerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
|
||||
// _, ok := b.workerStore[w.ID]
|
||||
// if ok {
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
tunasync "github.com/tuna/tunasync/internal"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestStatus(t *testing.T) {
|
||||
Convey("status json ser-de should work", t, func() {
|
||||
tz := "Asia/Tokyo"
|
||||
loc, err := time.LoadLocation(tz)
|
||||
So(err, ShouldBeNil)
|
||||
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
|
||||
m := webMirrorStatus{
|
||||
Name: "tunalinux",
|
||||
Status: tunasync.Success,
|
||||
LastUpdate: textTime{t},
|
||||
LastUpdateTs: stampTime{t},
|
||||
Size: "5GB",
|
||||
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
So(err, ShouldBeNil)
|
||||
//fmt.Println(string(b))
|
||||
var m2 webMirrorStatus
|
||||
err = json.Unmarshal(b, &m2)
|
||||
So(err, ShouldBeNil)
|
||||
// fmt.Printf("%#v", m2)
|
||||
So(m2.Name, ShouldEqual, m.Name)
|
||||
So(m2.Status, ShouldEqual, m.Status)
|
||||
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
|
||||
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
|
||||
So(m2.Size, ShouldEqual, m.Size)
|
||||
So(m2.Upstream, ShouldEqual, m.Upstream)
|
||||
})
|
||||
}
|
||||
@@ -1,96 +0,0 @@
|
||||
#!/bin/bash
|
||||
# requires: wget, lftp, jq
|
||||
#
|
||||
|
||||
set -e
|
||||
set -o pipefail
|
||||
|
||||
CONDA_REPO_BASE=${CONDA_REPO_BASE:-"http://repo.continuum.io"}
|
||||
LOCAL_DIR_BASE="${TUNASYNC_WORKING_DIR}/pkgs"
|
||||
TMP_DIR=$(mktemp -d)
|
||||
|
||||
CONDA_REPOS=("free" "r" "mro" "pro")
|
||||
CONDA_ARCHES=("linux-64" "linux-32" "linux-armv6l" "linux-armv7l" "linux-ppc64le" "osx-64" "osx-32" "win-64" "win-32")
|
||||
|
||||
function check-and-download() {
|
||||
remote_file=$1
|
||||
local_file=$2
|
||||
wget -q --spider ${remote_file}
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "downloading ${remote_file}"
|
||||
wget -q -N -O ${local_file} ${remote_file}
|
||||
return
|
||||
fi
|
||||
return 1
|
||||
}
|
||||
|
||||
function cleanup () {
|
||||
echo "cleaning up"
|
||||
[ -d ${TMP_DIR} ] && {
|
||||
[ -f ${TMP_DIR}/repodata.json ] && rm ${TMP_DIR}/repodata.json
|
||||
[ -f ${TMP_DIR}/repodata.json.bz2 ] && rm ${TMP_DIR}/repodata.json.bz2
|
||||
rmdir ${TMP_DIR}
|
||||
}
|
||||
}
|
||||
|
||||
trap cleanup EXIT
|
||||
|
||||
echo ${TMP_DIR}
|
||||
|
||||
for repo in ${CONDA_REPOS[@]}; do
|
||||
for arch in ${CONDA_ARCHES[@]}; do
|
||||
PKG_REPO_BASE="${CONDA_REPO_BASE}/pkgs/$repo/$arch"
|
||||
repodata_url="${PKG_REPO_BASE}/repodata.json"
|
||||
bz2_repodata_url="${PKG_REPO_BASE}/repodata.json.bz2"
|
||||
LOCAL_DIR="${LOCAL_DIR_BASE}/$repo/$arch"
|
||||
[ ! -d ${LOCAL_DIR} ] && mkdir -p ${LOCAL_DIR}
|
||||
tmp_repodata="${TMP_DIR}/repodata.json"
|
||||
tmp_bz2_repodata="${TMP_DIR}/repodata.json.bz2"
|
||||
|
||||
check-and-download ${repodata_url} ${tmp_repodata}
|
||||
check-and-download ${bz2_repodata_url} ${tmp_bz2_repodata}
|
||||
|
||||
jq_cmd='.packages | to_entries[] | [.key, .value.size, .value.md5] | map(tostring) | join(" ")'
|
||||
bzip2 -c -d ${tmp_bz2_repodata} | jq -r "${jq_cmd}" | while read line;
|
||||
do
|
||||
read -a tokens <<< $line
|
||||
pkgfile=${tokens[0]}
|
||||
pkgsize=${tokens[1]}
|
||||
pkgmd5=${tokens[2]}
|
||||
|
||||
pkg_url="${PKG_REPO_BASE}/${pkgfile}"
|
||||
dest_file="${LOCAL_DIR}/${pkgfile}"
|
||||
|
||||
declare downloaded=false
|
||||
if [ -f ${dest_file} ]; then
|
||||
rsize=`stat -c "%s" ${dest_file}`
|
||||
if [ ${rsize} -eq ${pkgsize} ]; then
|
||||
downloaded=true
|
||||
echo "Skipping ${pkgfile}, size ${pkgsize}"
|
||||
fi
|
||||
fi
|
||||
while [ $downloaded != true ]; do
|
||||
echo "downloading ${pkg_url}"
|
||||
wget -q -O ${dest_file} ${pkg_url} && {
|
||||
# two space for md5sum check format
|
||||
echo "${pkgmd5} ${dest_file}" | md5sum -c - && downloaded=true
|
||||
}
|
||||
done
|
||||
done
|
||||
|
||||
mv -f "${TMP_DIR}/repodata.json" "${LOCAL_DIR}/repodata.json"
|
||||
mv -f "${TMP_DIR}/repodata.json.bz2" "${LOCAL_DIR}/repodata.json.bz2"
|
||||
done
|
||||
done
|
||||
|
||||
function sync_installer() {
|
||||
repo_url="$1"
|
||||
repo_dir="$2"
|
||||
|
||||
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
|
||||
cd $repo_dir
|
||||
lftp "${repo_url}/" -e "mirror --verbose -P 5; bye"
|
||||
}
|
||||
|
||||
sync_installer "${CONDA_REPO_BASE}/archive/" "${TUNASYNC_WORKING_DIR}/archive/"
|
||||
sync_installer "${CONDA_REPO_BASE}/miniconda/" "${TUNASYNC_WORKING_DIR}/miniconda/"
|
||||
@@ -1,21 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
REPO=${REPO:-"/usr/local/bin/repo"}
|
||||
|
||||
function repo_init() {
|
||||
mkdir -p $TUNASYNC_WORKING_DIR
|
||||
cd $TUNASYNC_WORKING_DIR
|
||||
$REPO init -u https://android.googlesource.com/mirror/manifest --mirror
|
||||
}
|
||||
|
||||
function repo_sync() {
|
||||
cd $TUNASYNC_WORKING_DIR
|
||||
$REPO sync -f
|
||||
}
|
||||
|
||||
if [ ! -d "$TUNASYNC_WORKING_DIR/git-repo.git" ]; then
|
||||
echo "Initializing AOSP mirror"
|
||||
repo_init
|
||||
fi
|
||||
|
||||
repo_sync
|
||||
@@ -1,59 +0,0 @@
|
||||
#!/bin/bash
|
||||
# reqires: wget, yum-utils
|
||||
|
||||
set -e
|
||||
set -o pipefail
|
||||
|
||||
_here=`dirname $(realpath $0)`
|
||||
. ${_here}/helpers/apt-download
|
||||
APT_VERSIONS=("debian-wheezy" "debian-jessie" "ubuntu-precise" "ubuntu-trusty" "ubuntu-xenial")
|
||||
|
||||
BASE_PATH="${TUNASYNC_WORKING_DIR}"
|
||||
APT_PATH="${BASE_PATH}/apt/repo"
|
||||
YUM_PATH="${BASE_PATH}/yum/repo"
|
||||
|
||||
mkdir -p ${APT_PATH} ${YUM_PATH}
|
||||
|
||||
wget -q -N -O ${BASE_PATH}/yum/gpg https://yum.dockerproject.org/gpg
|
||||
wget -q -N -O ${BASE_PATH}/apt/gpg https://apt.dockerproject.org/gpg
|
||||
|
||||
# YUM mirror
|
||||
cache_dir="/tmp/yum-docker-cache/"
|
||||
cfg="/tmp/docker-yum.conf"
|
||||
cat <<EOF > ${cfg}
|
||||
[main]
|
||||
keepcache=0
|
||||
|
||||
[centos6]
|
||||
name=Docker Repository
|
||||
baseurl=https://yum.dockerproject.org/repo/main/centos/6
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
gpgkey=https://yum.dockerproject.org/gpg
|
||||
sslverify=0
|
||||
|
||||
[centos7]
|
||||
name=Docker Repository
|
||||
baseurl=https://yum.dockerproject.org/repo/main/centos/7
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
gpgkey=https://yum.dockerproject.org/gpg
|
||||
sslverify=0
|
||||
EOF
|
||||
|
||||
[ ! -d ${YUM_PATH}/centos6 ] && mkdir -p ${YUM_PATH}/centos6
|
||||
[ ! -d ${YUM_PATH}/centos7 ] && mkdir -p ${YUM_PATH}/centos7
|
||||
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/centos6 ${YUM_PATH}/centos7
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/centos6 ${YUM_PATH}/centos7
|
||||
rm $cfg
|
||||
|
||||
# APT mirror
|
||||
base_url="http://apt.dockerproject.org/repo"
|
||||
for version in ${APT_VERSIONS[@]}; do
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${APT_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${APT_PATH}" || true
|
||||
done
|
||||
|
||||
# sync_docker "http://apt.dockerproject.org/" "${TUNASYNC_WORKING_DIR}/apt"
|
||||
# sync_docker "http://yum.dockerproject.org/" "${TUNASYNC_WORKING_DIR}/yum"
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
ARCH_EXCLUDE = ['armel', 'alpha', 'hurd-i386', 'ia64', 'kfreebsd-amd64', 'kfreebsd-i386', 'mips', 'powerpc', 'ppc64el', 's390', 's390x', 'sparc']
|
||||
|
||||
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
|
||||
|
||||
with open("debian-exclude.txt", 'wb') as f:
|
||||
f.write(".~tmp~/\n")
|
||||
f.write(".*\n")
|
||||
for arch in ARCH_EXCLUDE:
|
||||
for content in CONTENT_EXCLUDE:
|
||||
f.write(content.format(arch=arch))
|
||||
f.write('\n')
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
ARCH_EXCLUDE = ['armel', 'armhf']
|
||||
|
||||
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
|
||||
|
||||
with open("kali-exclude.txt", 'wb') as f:
|
||||
f.write(".~tmp~/\n")
|
||||
f.write(".*\n")
|
||||
for arch in ARCH_EXCLUDE:
|
||||
for content in CONTENT_EXCLUDE:
|
||||
f.write(content.format(arch=arch))
|
||||
f.write('\n')
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
ARCH_EXCLUDE = ['powerpc', 'ppc64el', 'ia64', 'sparc', 'armel']
|
||||
|
||||
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
|
||||
|
||||
with open("ubuntu-ports-exclude.txt", 'wb') as f:
|
||||
f.write(".~tmp~/\n")
|
||||
f.write(".*\n")
|
||||
for arch in ARCH_EXCLUDE:
|
||||
for content in CONTENT_EXCLUDE:
|
||||
f.write(content.format(arch=arch))
|
||||
f.write('\n')
|
||||
@@ -1,65 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
_here=`dirname $(realpath $0)`
|
||||
. ${_here}/helpers/apt-download
|
||||
|
||||
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
|
||||
|
||||
BASE_PATH="${TUNASYNC_WORKING_DIR}"
|
||||
|
||||
YUM_PATH="${BASE_PATH}/yum"
|
||||
|
||||
UBUNTU_VERSIONS=("trusty" "wily")
|
||||
DEBIAN_VERSIONS=("wheezy" "jessie" "stretch")
|
||||
UBUNTU_PATH="${BASE_PATH}/ubuntu/"
|
||||
DEBIAN_PATH="${BASE_PATH}/debian/"
|
||||
|
||||
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
|
||||
|
||||
cache_dir="/tmp/yum-gitlab-ce-cache/"
|
||||
cfg="/tmp/gitlab-ce-yum.conf"
|
||||
cat <<EOF > ${cfg}
|
||||
[main]
|
||||
keepcache=0
|
||||
|
||||
[el6]
|
||||
name=el6
|
||||
baseurl=https://packages.gitlab.com/gitlab/gitlab-ce/el/6/x86_64
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
gpgkey=https://packages.gitlab.com/gpg.key
|
||||
sslverify=0
|
||||
|
||||
[el7]
|
||||
name=el7
|
||||
baseurl=https://packages.gitlab.com/gitlab/gitlab-ce/el/7/x86_64
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
gpgkey=https://packages.gitlab.com/gpg.key
|
||||
sslverify=0
|
||||
EOF
|
||||
|
||||
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6 ${YUM_PATH}/el6
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7 ${YUM_PATH}/el7
|
||||
rm $cfg
|
||||
|
||||
base_url="https://packages.gitlab.com/gitlab/gitlab-ce/ubuntu"
|
||||
for version in ${UBUNTU_VERSIONS[@]}; do
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${UBUNTU_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${UBUNTU_PATH}" || true
|
||||
done
|
||||
echo "Ubuntu finished"
|
||||
|
||||
base_url="https://packages.gitlab.com/gitlab/gitlab-ce/debian"
|
||||
for version in ${DEBIAN_VERSIONS[@]}; do
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
|
||||
done
|
||||
echo "Debian finished"
|
||||
|
||||
|
||||
# vim: ts=4 sts=4 sw=4
|
||||
@@ -1,69 +0,0 @@
|
||||
#!/bin/bash
|
||||
# reqires: wget, yum-utils
|
||||
set -e
|
||||
set -o pipefail
|
||||
|
||||
_here=`dirname $(realpath $0)`
|
||||
. ${_here}/helpers/apt-download
|
||||
|
||||
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
|
||||
|
||||
BASE_PATH="${TUNASYNC_WORKING_DIR}"
|
||||
|
||||
YUM_PATH="${BASE_PATH}/yum"
|
||||
|
||||
UBUNTU_VERSIONS=("trusty" "xenial")
|
||||
DEBIAN_VERSIONS=("wheezy" "jessie" "stretch")
|
||||
UBUNTU_PATH="${BASE_PATH}/ubuntu/"
|
||||
DEBIAN_PATH="${BASE_PATH}/debian/"
|
||||
|
||||
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
|
||||
|
||||
cache_dir="/tmp/yum-gitlab-runner-cache/"
|
||||
cfg="/tmp/gitlab-runner-yum.conf"
|
||||
cat <<EOF > ${cfg}
|
||||
[main]
|
||||
keepcache=0
|
||||
|
||||
[el6]
|
||||
name=gitlab-ci-multi-runner-el6
|
||||
baseurl=https://packages.gitlab.com/runner/gitlab-ci-multi-runner/el/6/x86_64
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
gpgkey=https://packages.gitlab.com/gpg.key
|
||||
sslverify=0
|
||||
|
||||
[el7]
|
||||
name=gitlab-ci-multi-runner-el7
|
||||
baseurl=https://packages.gitlab.com/runner/gitlab-ci-multi-runner/el/7/x86_64
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
gpgkey=https://packages.gitlab.com/gpg.key
|
||||
sslverify=0
|
||||
EOF
|
||||
|
||||
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
|
||||
[ ! -d ${YUM_PATH}/el6 ] && mkdir -p ${YUM_PATH}/el6
|
||||
[ ! -d ${YUM_PATH}/el7 ] && mkdir -p ${YUM_PATH}/el7
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6 ${YUM_PATH}/el6
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7 ${YUM_PATH}/el7
|
||||
rm $cfg
|
||||
|
||||
base_url="https://packages.gitlab.com/runner/gitlab-ci-multi-runner/ubuntu"
|
||||
for version in ${UBUNTU_VERSIONS[@]}; do
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${UBUNTU_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${UBUNTU_PATH}" || true
|
||||
done
|
||||
echo "Ubuntu finished"
|
||||
|
||||
base_url="https://packages.gitlab.com/runner/gitlab-ci-multi-runner/debian"
|
||||
for version in ${DEBIAN_VERSIONS[@]}; do
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
|
||||
done
|
||||
echo "Debian finished"
|
||||
|
||||
|
||||
# vim: ts=4 sts=4 sw=4
|
||||
@@ -1,92 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
function remove_broken() {
|
||||
interval=$1
|
||||
interval_file="/tmp/hackage_lastcheck"
|
||||
now=`date +%s`
|
||||
|
||||
if [[ -f ${interval_file} ]]; then
|
||||
lastcheck=`cat ${interval_file}`
|
||||
between=$(echo "${now}-${lastcheck}" | bc)
|
||||
[[ $between -lt $interval ]] && echo "skip checking"; return 0
|
||||
fi
|
||||
echo "start checking"
|
||||
|
||||
mkdir -p "${TUNASYNC_WORKING_DIR}/package"
|
||||
cd "${TUNASYNC_WORKING_DIR}/package"
|
||||
|
||||
ls | while read line; do
|
||||
echo -n "$line\t\t"
|
||||
tar -tzf $line >/dev/null || (echo "FAIL"; rm $line) && echo "OK"
|
||||
done
|
||||
|
||||
echo `date +%s` > $interval_file
|
||||
}
|
||||
|
||||
function must_download() {
|
||||
src=$1
|
||||
dst=$2
|
||||
while true; do
|
||||
echo "downloading: $name"
|
||||
wget "$src" -O "$dst" &>/dev/null || true
|
||||
tar -tzf package/$name >/dev/null || rm package/$name && break
|
||||
done
|
||||
}
|
||||
|
||||
function hackage_mirror() {
|
||||
local_pklist="/tmp/hackage_local_pklist_$$.list"
|
||||
remote_pklist="/tmp/hackage_remote_pklist_$$.list"
|
||||
|
||||
cd ${TUNASYNC_WORKING_DIR}
|
||||
mkdir -p package
|
||||
|
||||
echo "Downloading index..."
|
||||
rm index.tar.gz || true
|
||||
axel http://hdiff.luite.com/packages/archive/index.tar.gz -o index.tar.gz > /dev/null
|
||||
|
||||
echo "building local package list"
|
||||
ls package | sed "s/\.tar\.gz$//" > $local_pklist
|
||||
echo "preferred-versions" >> $local_pklist # ignore preferred-versions
|
||||
|
||||
echo "building remote package list"
|
||||
tar ztf index.tar.gz | (cut -d/ -f 1,2 2>/dev/null) | sed 's|/|-|' > $remote_pklist
|
||||
|
||||
echo "building download list"
|
||||
# substract local list from remote list
|
||||
comm <(sort $remote_pklist) <(sort $local_pklist) -23 | while read pk; do
|
||||
# limit concurrent level
|
||||
bgcount=`jobs | wc -l`
|
||||
while [[ $bgcount -ge 5 ]]; do
|
||||
sleep 0.5
|
||||
bgcount=`jobs | wc -l`
|
||||
done
|
||||
|
||||
name="$pk.tar.gz"
|
||||
if [ ! -a package/$name ]; then
|
||||
must_download "http://hackage.haskell.org/package/$pk/$name" "package/$name" &
|
||||
else
|
||||
echo "skip existed: $name"
|
||||
fi
|
||||
done
|
||||
|
||||
# delete redundanty files
|
||||
comm <(sort $remote_pklist) <(sort $local_pklist) -13 | while read pk; do
|
||||
name="$pk.tar.gz"
|
||||
echo "deleting ${name}"
|
||||
rm "package/$name"
|
||||
done
|
||||
|
||||
cp index.tar.gz 00-index.tar.gz
|
||||
}
|
||||
|
||||
function cleanup () {
|
||||
echo "cleaning up"
|
||||
[[ ! -z $local_pklist ]] && (rm $local_pklist $remote_pklist ; true)
|
||||
}
|
||||
|
||||
trap cleanup EXIT
|
||||
remove_broken 86400
|
||||
hackage_mirror
|
||||
|
||||
# vim: ts=4 sts=4 sw=4
|
||||
@@ -1,132 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
LOADED_APT_DOWNLOAD="yes"
|
||||
|
||||
function check-and-download() {
|
||||
remote_file=$1
|
||||
local_file=$2
|
||||
wget -q --spider ${remote_file}
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "downloading ${remote_file}"
|
||||
wget -q -N -O ${local_file} ${remote_file}
|
||||
return
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
|
||||
function apt-download-binary() {
|
||||
base_url=$1
|
||||
dist=$2
|
||||
repo=$3
|
||||
arch=$4
|
||||
dest_base_dir=$5
|
||||
if [ -z $dest_base_dir ]; then
|
||||
echo "Destination directory is empty, cannot continue"
|
||||
return 1
|
||||
fi
|
||||
|
||||
dest_dir="${dest_base_dir}/dists/${dist}"
|
||||
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
|
||||
check-and-download "${base_url}/dists/${dist}/Contents-${arch}.gz" "${dest_dir}/Contents-${arch}.gz" || true
|
||||
check-and-download "${base_url}/dists/${dist}/InRelease" "${dest_dir}/InRelease" || true
|
||||
check-and-download "${base_url}/dists/${dist}/Release" "${dest_dir}/Release"
|
||||
check-and-download "${base_url}/dists/${dist}/Release.gpg" "${dest_dir}/Release.gpg" || true
|
||||
|
||||
# Load Package Index URLs from Release file
|
||||
release_file="${dest_dir}/Release"
|
||||
dest_dir="${dest_base_dir}/dists/${dist}/${repo}/binary-${arch}"
|
||||
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
|
||||
|
||||
declare pkgidx_content=""
|
||||
declare cnt_start=false
|
||||
declare -i checksum_len
|
||||
if (grep -e '^SHA256:$' ${release_file} &>/dev/null); then
|
||||
checksum_cmd="sha256sum"; checksum_regex="^SHA256:$"; checksum_len=64
|
||||
elif (grep -e '^SHA1:$' ${release_file} &>/dev/null); then
|
||||
checksum_cmd="sha1sum"; checksum_regex="^SHA1:$"; checksum_len=40
|
||||
elif (grep -e '^MD5Sum:$' ${release_file} &>/dev/null); then
|
||||
checksum_cmd="md5sum"; checksum_regex="^MD5sum:$"; checksum_len=32
|
||||
fi
|
||||
|
||||
while read line; do
|
||||
if [[ ${cnt_start} = true ]]; then
|
||||
read -a tokens <<< $line
|
||||
checksum=${tokens[0]}
|
||||
if [[ ${#checksum} != ${checksum_len} ]]; then
|
||||
break
|
||||
fi
|
||||
filesize=${tokens[1]}
|
||||
filename=${tokens[2]}
|
||||
if [[ "$filename" =~ ${repo}/binary-${arch} ]]; then
|
||||
# Load package list from Packages file
|
||||
pkgidx_file="${dest_base_dir}/dists/${dist}/${filename}"
|
||||
dest_dir=`dirname ${pkgidx_file}`
|
||||
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
|
||||
pkglist_url="${base_url}/dists/${dist}/${filename}"
|
||||
check-and-download "${pkglist_url}" ${pkgidx_file} || true
|
||||
echo "${checksum} ${pkgidx_file}" | ${checksum_cmd} -c -
|
||||
if [ -z "${pkgidx_content}" -a -f ${pkgidx_file} ]; then
|
||||
echo "getting packages index content"
|
||||
case $filename in
|
||||
"*.bz2")
|
||||
pkgidx_content=`bunzip2 -c ${pkgidx_file}`
|
||||
;;
|
||||
"*.gz")
|
||||
pkgidx_content=`gunzip -c ${pkgidx_file}`
|
||||
;;
|
||||
*)
|
||||
pkgidx_content=`cat ${pkgidx_file}`
|
||||
;;
|
||||
esac
|
||||
fi
|
||||
fi
|
||||
else
|
||||
if [[ "$line" =~ ${checksum_regex} ]]; then
|
||||
cnt_start=true
|
||||
fi
|
||||
fi
|
||||
done < ${release_file}
|
||||
|
||||
if [ -z "${pkgidx_content}" ]; then
|
||||
echo "index is empty, failed"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# Set checksum method
|
||||
if (echo -e "${pkgidx_content}" | grep -e '^SHA256' &>/dev/null); then
|
||||
checksum_cmd="sha256sum"; checksum_regex="^SHA256"
|
||||
elif (echo -e "${pkgidx_content}" | grep -e '^SHA1' &>/dev/null); then
|
||||
checksum_cmd="sha1sum"; checksum_regex="^SHA1"
|
||||
elif (echo -e "${pkgidx_content}" | grep -e '^MD5sum' &>/dev/null); then
|
||||
checksum_cmd="md5sum"; checksum_regex="^MD5sum"
|
||||
fi
|
||||
|
||||
# Download packages
|
||||
(echo -e "${pkgidx_content}" | grep -e '^Filename' -e '^Size' -e ${checksum_regex} | cut -d' ' -f 2) | \
|
||||
while read pkg_filename; read pkg_size; read pkg_checksum; do
|
||||
dest_filename="${dest_base_dir}/${pkg_filename}"
|
||||
dest_dir=`dirname ${dest_filename}`
|
||||
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
|
||||
pkg_url="${base_url}/${pkg_filename}"
|
||||
declare downloaded=false
|
||||
if [ -f ${dest_filename} ]; then
|
||||
rsize=`stat -c "%s" ${dest_filename}`
|
||||
if [ ${rsize} -eq ${pkg_size} ]; then
|
||||
downloaded=true
|
||||
echo "Skipping ${pkg_filename}, size ${pkg_size}"
|
||||
fi
|
||||
fi
|
||||
while [ $downloaded != true ]; do
|
||||
echo "downloading ${pkg_url}"
|
||||
wget -q -O ${dest_filename} ${pkg_url} && {
|
||||
echo "${pkg_checksum} ${dest_filename}" | ${checksum_cmd} -c - && downloaded=true # two space for md5sum/sha1sum/sha256sum check format
|
||||
}
|
||||
done
|
||||
done
|
||||
|
||||
echo "Mirroring ${base_url} ${dist}, ${repo}, ${arch} done!"
|
||||
|
||||
}
|
||||
|
||||
# vim: ts=4 sts=4 sw=4
|
||||
@@ -1,17 +0,0 @@
|
||||
#!/bin/bash
|
||||
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
|
||||
echo "Directory not exists, fail"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
function update_homebrew_git() {
|
||||
repo_dir="$1"
|
||||
cd $repo_dir
|
||||
echo "==== SYNC $repo_dir START ===="
|
||||
/usr/bin/timeout -s INT 3600 git remote -v update
|
||||
echo "==== SYNC $repo_dir DONE ===="
|
||||
}
|
||||
|
||||
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew.git"
|
||||
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew-python.git"
|
||||
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew-science.git"
|
||||
@@ -1,12 +0,0 @@
|
||||
#!/bin/bash
|
||||
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
|
||||
echo "Directory not exists, fail"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
function update_linux_git() {
|
||||
cd $TUNASYNC_WORKING_DIR
|
||||
/usr/bin/timeout -s INT 3600 git remote -v update
|
||||
}
|
||||
|
||||
update_linux_git
|
||||
@@ -1,16 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
function sync_lxc_images() {
|
||||
repo_url="$1"
|
||||
repo_dir="$2"
|
||||
|
||||
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
|
||||
cd $repo_dir
|
||||
|
||||
# lftp "${repo_url}/" -e "mirror --verbose --log=${TUNASYNC_LOG_FILE} --exclude-glob='*/SRPMS/*' -P 5 --delete --only-newer; bye"
|
||||
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
|
||||
}
|
||||
|
||||
|
||||
sync_lxc_images "http://images.linuxcontainers.org/images" "${TUNASYNC_WORKING_DIR}/images"
|
||||
sync_lxc_images "http://images.linuxcontainers.org/meta" "${TUNASYNC_WORKING_DIR}/meta"
|
||||
@@ -1,88 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
_here=`dirname $(realpath $0)`
|
||||
. ${_here}/helpers/apt-download
|
||||
|
||||
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
|
||||
|
||||
BASE_PATH="${TUNASYNC_WORKING_DIR}"
|
||||
|
||||
YUM_PATH="${BASE_PATH}/yum"
|
||||
APT_PATH="${BASE_PATH}/apt"
|
||||
|
||||
UBUNTU_VERSIONS=("trusty" "precise")
|
||||
DEBIAN_VERSIONS=("wheezy")
|
||||
MONGO_VERSIONS=("3.2" "3.0")
|
||||
STABLE_VERSION="3.2"
|
||||
|
||||
UBUNTU_PATH="${APT_PATH}/ubuntu"
|
||||
DEBIAN_PATH="${APT_PATH}/debian"
|
||||
|
||||
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
|
||||
|
||||
cache_dir="/tmp/yum-mongodb-cache/"
|
||||
cfg="/tmp/mongodb-yum.conf"
|
||||
cat <<EOF > ${cfg}
|
||||
[main]
|
||||
keepcache=0
|
||||
|
||||
EOF
|
||||
|
||||
for mgver in ${MONGO_VERSIONS[@]}; do
|
||||
cat <<EOF >> ${cfg}
|
||||
[el6-${mgver}]
|
||||
name=el6-${mgver}
|
||||
baseurl=https://repo.mongodb.org/yum/redhat/6/mongodb-org/${mgver}/x86_64/
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
sslverify=0
|
||||
|
||||
[el7-${mgver}]
|
||||
name=el7-${mgver}
|
||||
baseurl=https://repo.mongodb.org/yum/redhat/7/mongodb-org/${mgver}/x86_64/
|
||||
repo_gpgcheck=0
|
||||
gpgcheck=0
|
||||
enabled=1
|
||||
sslverify=0
|
||||
EOF
|
||||
done
|
||||
|
||||
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
|
||||
for mgver in ${MONGO_VERSIONS[@]}; do
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6-$mgver/ ${YUM_PATH}/el6-$mgver/
|
||||
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7-$mgver/ ${YUM_PATH}/el7-$mgver/
|
||||
done
|
||||
|
||||
[ -e ${YUM_PATH}/el6 ] || (cd ${YUM_PATH}; ln -s el6-${STABLE_VERSION} el6)
|
||||
[ -e ${YUM_PATH}/el7 ] || (cd ${YUM_PATH}; ln -s el7-${STABLE_VERSION} el7)
|
||||
|
||||
rm $cfg
|
||||
|
||||
base_url="http://repo.mongodb.org/apt/ubuntu"
|
||||
for ubver in ${UBUNTU_VERSIONS[@]}; do
|
||||
for mgver in ${MONGO_VERSIONS[@]}; do
|
||||
version="$ubver/mongodb-org/$mgver"
|
||||
apt-download-binary ${base_url} "$version" "multiverse" "amd64" "${UBUNTU_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "multiverse" "i386" "${UBUNTU_PATH}" || true
|
||||
done
|
||||
mg_basepath="${UBUNTU_PATH}/dists/$ubver/mongodb-org"
|
||||
[ -e ${mg_basepath}/stable ] || (cd ${mg_basepath}; ln -s ${STABLE_VERSION} stable)
|
||||
done
|
||||
echo "Ubuntu finished"
|
||||
|
||||
base_url="http://repo.mongodb.org/apt/debian"
|
||||
for dbver in ${DEBIAN_VERSIONS[@]}; do
|
||||
for mgver in ${MONGO_VERSIONS[@]}; do
|
||||
version="$dbver/mongodb-org/$mgver"
|
||||
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
|
||||
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
|
||||
done
|
||||
mg_basepath="${DEBIAN_PATH}/dists/$dbver/mongodb-org"
|
||||
[ -e ${mg_basepath}/stable ] || (cd ${mg_basepath}; ln -s ${STABLE_VERSION} stable)
|
||||
done
|
||||
echo "Debian finished"
|
||||
|
||||
|
||||
# vim: ts=4 sts=4 sw=4
|
||||
@@ -1,18 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
function sync_nodesource() {
|
||||
repo_url="$1"
|
||||
repo_dir="$2"
|
||||
|
||||
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
|
||||
cd $repo_dir
|
||||
# lftp "${repo_url}/" -e "mirror --verbose --exclude-glob='*/SRPMS/*' -P 5 --delete --only-newer; bye"
|
||||
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
|
||||
}
|
||||
|
||||
sync_nodesource "https://deb.nodesource.com/node" "${TUNASYNC_WORKING_DIR}/deb"
|
||||
sync_nodesource "https://deb.nodesource.com/node_0.12" "${TUNASYNC_WORKING_DIR}/deb_0.12"
|
||||
sync_nodesource "https://deb.nodesource.com/node_4.x" "${TUNASYNC_WORKING_DIR}/deb_4.x"
|
||||
sync_nodesource "https://rpm.nodesource.com/pub" "${TUNASYNC_WORKING_DIR}/rpm"
|
||||
sync_nodesource "https://rpm.nodesource.com/pub_0.12" "${TUNASYNC_WORKING_DIR}/rpm_0.12"
|
||||
sync_nodesource "https://rpm.nodesource.com/pub_4.x" "${TUNASYNC_WORKING_DIR}/rpm_4.x"
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
function sync_openwrt() {
|
||||
repo_url="$1"
|
||||
repo_dir="$2"
|
||||
|
||||
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
|
||||
cd $repo_dir
|
||||
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
|
||||
}
|
||||
|
||||
sync_openwrt "http://downloads.openwrt.org/chaos_calmer/15.05/" "${TUNASYNC_WORKING_DIR}/chaos_calmer/15.05"
|
||||
sync_openwrt "http://downloads.openwrt.org/snapshots/trunk/" "${TUNASYNC_WORKING_DIR}/snapshots/trunk"
|
||||
@@ -1,9 +0,0 @@
|
||||
#!/bin/bash
|
||||
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
|
||||
echo "Directory not exists, fail"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Syncing to $TUNASYNC_WORKING_DIR"
|
||||
|
||||
/usr/bin/timeout -s INT 3600 /home/tuna/.virtualenvs/bandersnatch/bin/bandersnatch -c /etc/bandersnatch.conf mirror || exit 1
|
||||
@@ -1,17 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
function sync_repo_ck() {
|
||||
repo_url="$1"
|
||||
repo_dir="$2"
|
||||
|
||||
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
|
||||
cd $repo_dir
|
||||
lftp "${repo_url}/" -e 'mirror -v -P 5 --delete --only-missing --only-newer --no-recursion; bye'
|
||||
wget "${repo_url}/repo-ck.db" -O "repo-ck.db"
|
||||
wget "${repo_url}/repo-ck.files" -O "repo-ck.files"
|
||||
}
|
||||
|
||||
UPSTREAM="http://repo-ck.com"
|
||||
|
||||
sync_repo_ck "${UPSTREAM}/x86_64" "${TUNASYNC_WORKING_DIR}/x86_64"
|
||||
sync_repo_ck "${UPSTREAM}/i686" "${TUNASYNC_WORKING_DIR}/i686"
|
||||
@@ -1,16 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
_here=`dirname $(realpath $0)`
|
||||
. ${_here}/helpers/apt-download
|
||||
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
|
||||
|
||||
BASE_PATH="${TUNASYNC_WORKING_DIR}"
|
||||
|
||||
base_url="http://apt.termux.com"
|
||||
ARCHES=("aarch64" "all" "arm" "i686")
|
||||
for arch in ${ARCHES[@]}; do
|
||||
echo "start syncing: ${arch}"
|
||||
apt-download-binary "${base_url}" "stable" "main" "${arch}" "${BASE_PATH}" || true
|
||||
done
|
||||
echo "finished"
|
||||
@@ -1,27 +0,0 @@
|
||||
#!/bin/bash
|
||||
SYNC_FILES="$TUNASYNC_WORKING_DIR"
|
||||
# SYNC_FILES="/srv/mirror_disk/ubuntu/_working/"
|
||||
#LOG_FILE="$TUNASYNC_LOG_FILE"
|
||||
|
||||
# [ -f $SYNC_LOCK ] && exit 1
|
||||
# touch $SYNC_LOCK
|
||||
|
||||
|
||||
echo ">> Starting sync on $(date --rfc-3339=seconds)"
|
||||
|
||||
arch="i386,amd64"
|
||||
sections="main,main/debian-installer,multiverse,multiverse/debian-installer,restricted,restricted/debian-installer,universe,universe/debian-installer"
|
||||
dists="precise,precise-backports,precise-proposed,precise-updates,precise-security,trusty,trusty-backports,trusty-proposed,trusty-updates,trusty-security"
|
||||
server="$1"
|
||||
inPath="/ubuntu"
|
||||
proto="rsync"
|
||||
outpath="$SYNC_FILES"
|
||||
rsyncOpt='-6 -aIL --partial'
|
||||
|
||||
debmirror -h $server --no-check-gpg -a $arch -s $sections -d $dists -r $inPath -e $proto --rsync-options "$rsyncOpt" --verbose $outpath
|
||||
|
||||
date --rfc-3339=seconds > "$SYNC_FILES/lastsync"
|
||||
echo ">> Finished sync on $(date --rfc-3339=seconds)"
|
||||
|
||||
# rm -f "$SYNC_LOCK"
|
||||
exit 0
|
||||
@@ -20,10 +20,11 @@ type baseProvider struct {
|
||||
cmd *cmdJob
|
||||
isRunning atomic.Value
|
||||
|
||||
logFile *os.File
|
||||
|
||||
cgroup *cgroupHook
|
||||
hooks []jobHook
|
||||
zfs *zfsHook
|
||||
docker *dockerHook
|
||||
|
||||
hooks []jobHook
|
||||
}
|
||||
|
||||
func (p *baseProvider) Name() string {
|
||||
@@ -77,12 +78,17 @@ func (p *baseProvider) LogFile() string {
|
||||
return s
|
||||
}
|
||||
}
|
||||
panic("log dir is impossible to be unavailable")
|
||||
panic("log file is impossible to be unavailable")
|
||||
}
|
||||
|
||||
func (p *baseProvider) AddHook(hook jobHook) {
|
||||
if cg, ok := hook.(*cgroupHook); ok {
|
||||
p.cgroup = cg
|
||||
switch v := hook.(type) {
|
||||
case *cgroupHook:
|
||||
p.cgroup = v
|
||||
case *zfsHook:
|
||||
p.zfs = v
|
||||
case *dockerHook:
|
||||
p.docker = v
|
||||
}
|
||||
p.hooks = append(p.hooks, hook)
|
||||
}
|
||||
@@ -95,20 +101,29 @@ func (p *baseProvider) Cgroup() *cgroupHook {
|
||||
return p.cgroup
|
||||
}
|
||||
|
||||
func (p *baseProvider) prepareLogFile() error {
|
||||
func (p *baseProvider) ZFS() *zfsHook {
|
||||
return p.zfs
|
||||
}
|
||||
|
||||
func (p *baseProvider) Docker() *dockerHook {
|
||||
return p.docker
|
||||
}
|
||||
|
||||
func (p *baseProvider) prepareLogFile(append bool) error {
|
||||
if p.LogFile() == "/dev/null" {
|
||||
p.cmd.SetLogFile(nil)
|
||||
return nil
|
||||
}
|
||||
if p.logFile == nil {
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
}
|
||||
p.logFile = logFile
|
||||
appendMode := 0
|
||||
if append {
|
||||
appendMode = os.O_APPEND
|
||||
}
|
||||
p.cmd.SetLogFile(p.logFile)
|
||||
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
|
||||
return err
|
||||
}
|
||||
p.cmd.SetLogFile(logFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -127,32 +142,22 @@ func (p *baseProvider) IsRunning() bool {
|
||||
|
||||
func (p *baseProvider) Wait() error {
|
||||
defer func() {
|
||||
p.Lock()
|
||||
logger.Debugf("set isRunning to false: %s", p.Name())
|
||||
p.isRunning.Store(false)
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
logger.Debugf("calling Wait: %s", p.Name())
|
||||
return p.cmd.Wait()
|
||||
}
|
||||
|
||||
func (p *baseProvider) Terminate() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
logger.Debugf("terminating provider: %s", p.Name())
|
||||
if !p.IsRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.Lock()
|
||||
if p.logFile != nil {
|
||||
p.logFile.Close()
|
||||
p.logFile = nil
|
||||
}
|
||||
p.Unlock()
|
||||
|
||||
err := p.cmd.Terminate()
|
||||
p.isRunning.Store(false)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -15,35 +15,31 @@ import (
|
||||
"github.com/codeskyblue/go-sh"
|
||||
)
|
||||
|
||||
var cgSubsystem = "cpu"
|
||||
|
||||
type cgroupHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
basePath string
|
||||
baseGroup string
|
||||
created bool
|
||||
subsystem string
|
||||
memLimit string
|
||||
}
|
||||
|
||||
func initCgroup(basePath string) {
|
||||
if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
|
||||
cgSubsystem = "memory"
|
||||
return
|
||||
}
|
||||
logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
|
||||
}
|
||||
|
||||
func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
|
||||
func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
|
||||
if basePath == "" {
|
||||
basePath = "/sys/fs/cgroup"
|
||||
}
|
||||
if baseGroup == "" {
|
||||
baseGroup = "tunasync"
|
||||
}
|
||||
if subsystem == "" {
|
||||
subsystem = "cpu"
|
||||
}
|
||||
return &cgroupHook{
|
||||
provider: p,
|
||||
basePath: basePath,
|
||||
baseGroup: baseGroup,
|
||||
subsystem: subsystem,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,13 +48,15 @@ func (c *cgroupHook) preExec() error {
|
||||
if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
if cgSubsystem != "memory" {
|
||||
if c.subsystem != "memory" {
|
||||
return nil
|
||||
}
|
||||
if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
|
||||
if c.memLimit != "" {
|
||||
gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
|
||||
return sh.Command(
|
||||
"cgset", "-r", "memory.limit_in_bytes=128M", gname,
|
||||
"cgset", "-r",
|
||||
fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
|
||||
gname,
|
||||
).Run()
|
||||
}
|
||||
return nil
|
||||
@@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error {
|
||||
|
||||
func (c *cgroupHook) Cgroup() string {
|
||||
name := c.provider.Name()
|
||||
return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
|
||||
return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
|
||||
}
|
||||
|
||||
func (c *cgroupHook) killAll() error {
|
||||
@@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error {
|
||||
|
||||
readTaskList := func() ([]int, error) {
|
||||
taskList := []int{}
|
||||
taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
|
||||
taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
|
||||
if err != nil {
|
||||
return taskList, err
|
||||
}
|
||||
|
||||
@@ -72,11 +72,14 @@ sleep 30
|
||||
provider, err := newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
initCgroup("/sys/fs/cgroup")
|
||||
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
|
||||
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
|
||||
provider.AddHook(cg)
|
||||
|
||||
err = cg.preExec()
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create cgroup")
|
||||
return
|
||||
}
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
go func() {
|
||||
@@ -129,15 +132,18 @@ sleep 30
|
||||
provider, err := newRsyncProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
initCgroup("/sys/fs/cgroup")
|
||||
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
|
||||
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
|
||||
provider.AddHook(cg)
|
||||
|
||||
cg.preExec()
|
||||
if cgSubsystem == "memory" {
|
||||
err = cg.preExec()
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create cgroup")
|
||||
return
|
||||
}
|
||||
if cg.subsystem == "memory" {
|
||||
memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
|
||||
So(err, ShouldBeNil)
|
||||
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024))
|
||||
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
|
||||
}
|
||||
cg.postExec()
|
||||
})
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/anmitsu/go-shlex"
|
||||
@@ -60,17 +61,25 @@ func (p *cmdProvider) Run() error {
|
||||
}
|
||||
|
||||
func (p *cmdProvider) Start() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{
|
||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||
"TUNASYNC_UPSTREAM_URL": p.upstreamURL,
|
||||
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||
}
|
||||
for k, v := range p.env {
|
||||
env[k] = v
|
||||
}
|
||||
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,8 @@ type Config struct {
|
||||
Manager managerConfig `toml:"manager"`
|
||||
Server serverConfig `toml:"server"`
|
||||
Cgroup cgroupConfig `toml:"cgroup"`
|
||||
ZFS zfsConfig `toml:"zfs"`
|
||||
Docker dockerConfig `toml:"docker"`
|
||||
Include includeConfig `toml:"include"`
|
||||
Mirrors []mirrorConfig `toml:"mirrors"`
|
||||
}
|
||||
@@ -54,8 +56,17 @@ type globalConfig struct {
|
||||
|
||||
type managerConfig struct {
|
||||
APIBase string `toml:"api_base"`
|
||||
CACert string `toml:"ca_cert"`
|
||||
Token string `toml:"token"`
|
||||
// this option overrides the APIBase
|
||||
APIList []string `toml:"api_base_list"`
|
||||
CACert string `toml:"ca_cert"`
|
||||
// Token string `toml:"token"`
|
||||
}
|
||||
|
||||
func (mc managerConfig) APIBaseList() []string {
|
||||
if len(mc.APIList) > 0 {
|
||||
return mc.APIList
|
||||
}
|
||||
return []string{mc.APIBase}
|
||||
}
|
||||
|
||||
type serverConfig struct {
|
||||
@@ -67,9 +78,21 @@ type serverConfig struct {
|
||||
}
|
||||
|
||||
type cgroupConfig struct {
|
||||
Enable bool `toml:"enable"`
|
||||
BasePath string `toml:"base_path"`
|
||||
Group string `toml:"group"`
|
||||
Enable bool `toml:"enable"`
|
||||
BasePath string `toml:"base_path"`
|
||||
Group string `toml:"group"`
|
||||
Subsystem string `toml:"subsystem"`
|
||||
}
|
||||
|
||||
type dockerConfig struct {
|
||||
Enable bool `toml:"enable"`
|
||||
Volumes []string `toml:"volumes"`
|
||||
Options []string `toml:"options"`
|
||||
}
|
||||
|
||||
type zfsConfig struct {
|
||||
Enable bool `toml:"enable"`
|
||||
Zpool string `toml:"zpool"`
|
||||
}
|
||||
|
||||
type includeConfig struct {
|
||||
@@ -100,10 +123,17 @@ type mirrorConfig struct {
|
||||
|
||||
Command string `toml:"command"`
|
||||
UseIPv6 bool `toml:"use_ipv6"`
|
||||
UseIPv4 bool `toml:"use_ipv4"`
|
||||
ExcludeFile string `toml:"exclude_file"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
Stage1Profile string `toml:"stage1_profile"`
|
||||
|
||||
MemoryLimit string `toml:"memory_limit"`
|
||||
|
||||
DockerImage string `toml:"docker_image"`
|
||||
DockerVolumes []string `toml:"docker_volumes"`
|
||||
DockerOptions []string `toml:"docker_options"`
|
||||
}
|
||||
|
||||
// LoadConfig loads configuration
|
||||
|
||||
95
worker/docker.go
普通文件
95
worker/docker.go
普通文件
@@ -0,0 +1,95 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
type dockerHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
image string
|
||||
volumes []string
|
||||
options []string
|
||||
}
|
||||
|
||||
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
|
||||
volumes := []string{}
|
||||
volumes = append(volumes, gCfg.Volumes...)
|
||||
volumes = append(volumes, mCfg.DockerVolumes...)
|
||||
|
||||
options := []string{}
|
||||
options = append(options, gCfg.Options...)
|
||||
options = append(options, mCfg.DockerOptions...)
|
||||
|
||||
return &dockerHook{
|
||||
provider: p,
|
||||
image: mCfg.DockerImage,
|
||||
volumes: volumes,
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dockerHook) preExec() error {
|
||||
p := d.provider
|
||||
logDir := p.LogDir()
|
||||
logFile := p.LogFile()
|
||||
workingDir := p.WorkingDir()
|
||||
|
||||
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
|
||||
logger.Debugf("Making dir %s", workingDir)
|
||||
if err = os.MkdirAll(workingDir, 0755); err != nil {
|
||||
return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Override workingDir
|
||||
ctx := p.EnterContext()
|
||||
ctx.Set(
|
||||
"volumes", []string{
|
||||
fmt.Sprintf("%s:%s", logDir, logDir),
|
||||
fmt.Sprintf("%s:%s", logFile, logFile),
|
||||
fmt.Sprintf("%s:%s", workingDir, workingDir),
|
||||
},
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dockerHook) postExec() error {
|
||||
// sh.Command(
|
||||
// "docker", "rm", "-f", d.Name(),
|
||||
// ).Run()
|
||||
d.provider.ExitContext()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Volumes returns the configured volumes and
|
||||
// runtime-needed volumes, including mirror dirs
|
||||
// and log files
|
||||
func (d *dockerHook) Volumes() []string {
|
||||
vols := make([]string, len(d.volumes))
|
||||
copy(vols, d.volumes)
|
||||
|
||||
p := d.provider
|
||||
ctx := p.Context()
|
||||
if ivs, ok := ctx.Get("volumes"); ok {
|
||||
vs := ivs.([]string)
|
||||
vols = append(vols, vs...)
|
||||
}
|
||||
return vols
|
||||
}
|
||||
|
||||
func (d *dockerHook) LogFile() string {
|
||||
p := d.provider
|
||||
ctx := p.Context()
|
||||
if iv, ok := ctx.Get(_LogFileKey + ":docker"); ok {
|
||||
v := iv.(string)
|
||||
return v
|
||||
}
|
||||
return p.LogFile()
|
||||
}
|
||||
|
||||
func (d *dockerHook) Name() string {
|
||||
p := d.provider
|
||||
return "tunasync-job-" + p.Name()
|
||||
}
|
||||
97
worker/docker_test.go
普通文件
97
worker/docker_test.go
普通文件
@@ -0,0 +1,97 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/codeskyblue/go-sh"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func getDockerByName(name string) (string, error) {
|
||||
// docker ps -f 'name=$name' --format '{{.Names}}'
|
||||
out, err := sh.Command(
|
||||
"docker", "ps",
|
||||
"--filter", "name="+name,
|
||||
"--format", "{{.Names}}",
|
||||
).Output()
|
||||
return string(out), err
|
||||
}
|
||||
|
||||
func TestDocker(t *testing.T) {
|
||||
Convey("Docker Should Work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
So(err, ShouldBeNil)
|
||||
cmdScript := filepath.Join(tmpDir, "cmd.sh")
|
||||
tmpFile := filepath.Join(tmpDir, "log_file")
|
||||
expectedOutput := "HELLO_WORLD"
|
||||
|
||||
c := cmdConfig{
|
||||
name: "tuna-docker",
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "/bin/cmd.sh",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: tmpFile,
|
||||
interval: 600 * time.Second,
|
||||
env: map[string]string{
|
||||
"TEST_CONTENT": expectedOutput,
|
||||
},
|
||||
}
|
||||
|
||||
cmdScriptContent := `#!/bin/sh
|
||||
echo ${TEST_CONTENT}
|
||||
sleep 10
|
||||
`
|
||||
err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
provider, err := newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
d := &dockerHook{
|
||||
provider: provider,
|
||||
image: "alpine",
|
||||
volumes: []string{
|
||||
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
|
||||
},
|
||||
}
|
||||
provider.AddHook(d)
|
||||
So(provider.Docker(), ShouldNotBeNil)
|
||||
|
||||
err = d.preExec()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
go func() {
|
||||
err = provider.Run()
|
||||
ctx.So(err, ShouldNotBeNil)
|
||||
}()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// assert container running
|
||||
names, err := getDockerByName(d.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(names, ShouldEqual, d.Name()+"\n")
|
||||
|
||||
err = provider.Terminate()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// container should be terminated and removed
|
||||
names, err = getDockerByName(d.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(names, ShouldEqual, "")
|
||||
|
||||
// check log content
|
||||
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput+"\n")
|
||||
|
||||
d.postExec()
|
||||
})
|
||||
}
|
||||
@@ -71,6 +71,7 @@ func (h *execPostHook) Do() error {
|
||||
"TUNASYNC_MIRROR_NAME": p.Name(),
|
||||
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
|
||||
"TUNASYNC_UPSTREAM_URL": p.Upstream(),
|
||||
"TUNASYNC_LOG_DIR": p.LogDir(),
|
||||
"TUNASYNC_LOG_FILE": p.LogFile(),
|
||||
"TUNASYNC_JOB_EXIT_STATUS": exitStatus,
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
tunasync "github.com/tuna/tunasync/internal"
|
||||
)
|
||||
@@ -14,12 +15,13 @@ import (
|
||||
type ctrlAction uint8
|
||||
|
||||
const (
|
||||
jobStart ctrlAction = iota
|
||||
jobStop // stop syncing keep the job
|
||||
jobDisable // disable the job (stops goroutine)
|
||||
jobRestart // restart syncing
|
||||
jobPing // ensure the goroutine is alive
|
||||
jobHalt // worker halts
|
||||
jobStart ctrlAction = iota
|
||||
jobStop // stop syncing keep the job
|
||||
jobDisable // disable the job (stops goroutine)
|
||||
jobRestart // restart syncing
|
||||
jobPing // ensure the goroutine is alive
|
||||
jobHalt // worker halts
|
||||
jobForceStart // ignore concurrent limit
|
||||
)
|
||||
|
||||
type jobMessage struct {
|
||||
@@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
syncDone := make(chan error, 1)
|
||||
go func() {
|
||||
err := provider.Run()
|
||||
if !stopASAP {
|
||||
syncDone <- err
|
||||
}
|
||||
syncDone <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
return nil
|
||||
}
|
||||
|
||||
runJob := func(kill <-chan empty, jobDone chan<- empty) {
|
||||
runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
|
||||
select {
|
||||
case semaphore <- empty{}:
|
||||
defer func() { <-semaphore }()
|
||||
runJobWrapper(kill, jobDone)
|
||||
case <-bypassSemaphore:
|
||||
logger.Noticef("Concurrent limit ignored by %s", m.Name())
|
||||
runJobWrapper(kill, jobDone)
|
||||
case <-kill:
|
||||
jobDone <- empty{}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
bypassSemaphore := make(chan empty, 1)
|
||||
for {
|
||||
if m.State() == stateReady {
|
||||
kill := make(chan empty)
|
||||
jobDone := make(chan empty)
|
||||
go runJob(kill, jobDone)
|
||||
go runJob(kill, jobDone, bypassSemaphore)
|
||||
|
||||
_wait_for_job:
|
||||
select {
|
||||
@@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
m.SetState(stateReady)
|
||||
close(kill)
|
||||
<-jobDone
|
||||
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
|
||||
continue
|
||||
case jobForceStart:
|
||||
select { //non-blocking
|
||||
default:
|
||||
case bypassSemaphore <- empty{}:
|
||||
}
|
||||
fallthrough
|
||||
case jobStart:
|
||||
m.SetState(stateReady)
|
||||
goto _wait_for_job
|
||||
@@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
|
||||
case jobDisable:
|
||||
m.SetState(stateDisabled)
|
||||
return nil
|
||||
case jobForceStart:
|
||||
select { //non-blocking
|
||||
default:
|
||||
case bypassSemaphore <- empty{}:
|
||||
}
|
||||
fallthrough
|
||||
case jobRestart:
|
||||
m.SetState(stateReady)
|
||||
fallthrough
|
||||
case jobStart:
|
||||
m.SetState(stateReady)
|
||||
default:
|
||||
|
||||
@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobStart // should be ignored
|
||||
|
||||
job.ctrlChan <- jobStop
|
||||
|
||||
msg = <-managerChan
|
||||
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we restart it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobRestart
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Success)
|
||||
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"%s\n%s\n",
|
||||
provider.WorkingDir(), provider.WorkingDir(),
|
||||
)
|
||||
|
||||
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we disable it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobDisable
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
<-job.disabled
|
||||
})
|
||||
|
||||
Convey("If we stop it twice, than start it", func(ctx C) {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg := <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
|
||||
job.ctrlChan <- jobStop
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Failed)
|
||||
So(msg.msg, ShouldEqual, "killed by manager")
|
||||
|
||||
job.ctrlChan <- jobStop // should be ignored
|
||||
|
||||
job.ctrlChan <- jobStart
|
||||
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, PreSyncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Syncing)
|
||||
msg = <-managerChan
|
||||
So(msg.status, ShouldEqual, Success)
|
||||
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"%s\n%s\n",
|
||||
provider.WorkingDir(), provider.WorkingDir(),
|
||||
)
|
||||
|
||||
loggedContent, err := ioutil.ReadFile(provider.LogFile())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(loggedContent), ShouldEqual, expectedOutput)
|
||||
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestConcurrentMirrorJobs(t *testing.T) {
|
||||
|
||||
InitLogger(true, true, false)
|
||||
|
||||
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
const CONCURRENT = 5
|
||||
|
||||
var providers [CONCURRENT]*cmdProvider
|
||||
var jobs [CONCURRENT]*mirrorJob
|
||||
for i := 0; i < CONCURRENT; i++ {
|
||||
c := cmdConfig{
|
||||
name: fmt.Sprintf("job-%d", i),
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "sleep 2",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: "/dev/null",
|
||||
interval: 10 * time.Second,
|
||||
}
|
||||
|
||||
var err error
|
||||
providers[i], err = newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
jobs[i] = newMirrorJob(providers[i])
|
||||
}
|
||||
|
||||
managerChan := make(chan jobMessage, 10)
|
||||
semaphore := make(chan empty, CONCURRENT-2)
|
||||
|
||||
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
|
||||
counterEnded := 0
|
||||
counterRunning := 0
|
||||
peakConcurrent = 0
|
||||
counterFailed = 0
|
||||
for counterEnded < totalJobs {
|
||||
msg := <-managerChan
|
||||
switch msg.status {
|
||||
case PreSyncing:
|
||||
counterRunning++
|
||||
case Syncing:
|
||||
case Failed:
|
||||
counterFailed++
|
||||
fallthrough
|
||||
case Success:
|
||||
counterEnded++
|
||||
counterRunning--
|
||||
default:
|
||||
So(0, ShouldEqual, 1)
|
||||
}
|
||||
// Test if semaphore works
|
||||
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
|
||||
if counterRunning > peakConcurrent {
|
||||
peakConcurrent = counterRunning
|
||||
}
|
||||
}
|
||||
// select {
|
||||
// case msg := <-managerChan:
|
||||
// logger.Errorf("extra message received: %v", msg)
|
||||
// So(0, ShouldEqual, 1)
|
||||
// case <-time.After(2 * time.Second):
|
||||
// }
|
||||
return
|
||||
}
|
||||
|
||||
Convey("When we run them all", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
Convey("If we cancel one job", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobRestart
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Cancel the one waiting for semaphore
|
||||
jobs[len(jobs)-1].ctrlChan <- jobStop
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
Convey("If we override the concurrent limit", func(ctx C) {
|
||||
for _, job := range jobs {
|
||||
go job.Run(managerChan, semaphore)
|
||||
job.ctrlChan <- jobStart
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
jobs[len(jobs)-1].ctrlChan <- jobForceStart
|
||||
jobs[len(jobs)-2].ctrlChan <- jobForceStart
|
||||
|
||||
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// fmt.Println("Restart them")
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
|
||||
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
|
||||
|
||||
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
|
||||
So(counterFailed, ShouldEqual, 0)
|
||||
|
||||
for _, job := range jobs {
|
||||
job.ctrlChan <- jobDisable
|
||||
<-job.disabled
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -36,6 +36,10 @@ type mirrorProvider interface {
|
||||
IsRunning() bool
|
||||
// Cgroup
|
||||
Cgroup() *cgroupHook
|
||||
// ZFS
|
||||
ZFS() *zfsHook
|
||||
// Docker
|
||||
Docker() *dockerHook
|
||||
|
||||
AddHook(hook jobHook)
|
||||
Hooks() []jobHook
|
||||
@@ -126,6 +130,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
logDir: logDir,
|
||||
logFile: filepath.Join(logDir, "latest.log"),
|
||||
useIPv6: mirror.UseIPv6,
|
||||
useIPv4: mirror.UseIPv4,
|
||||
interval: time.Duration(mirror.Interval) * time.Minute,
|
||||
}
|
||||
p, err := newRsyncProvider(rc)
|
||||
@@ -162,10 +167,22 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
|
||||
// Add Logging Hook
|
||||
provider.AddHook(newLogLimiter(provider))
|
||||
|
||||
// Add Cgroup Hook
|
||||
if cfg.Cgroup.Enable {
|
||||
// Add ZFS Hook
|
||||
if cfg.ZFS.Enable {
|
||||
provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
|
||||
}
|
||||
|
||||
// Add Docker Hook
|
||||
if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
|
||||
provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
|
||||
|
||||
} else if cfg.Cgroup.Enable {
|
||||
// Add Cgroup Hook
|
||||
provider.AddHook(
|
||||
newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
|
||||
newCgroupHook(
|
||||
provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
|
||||
cfg.Cgroup.Subsystem, mirror.MemoryLimit,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -79,11 +79,12 @@ exit 0
|
||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
@@ -144,11 +145,12 @@ exit 0
|
||||
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
@@ -260,6 +262,40 @@ sleep 5
|
||||
|
||||
})
|
||||
})
|
||||
Convey("Command Provider without log file should work", t, func(ctx C) {
|
||||
tmpDir, err := ioutil.TempDir("", "tunasync")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
c := cmdConfig{
|
||||
name: "run-ls",
|
||||
upstreamURL: "http://mirrors.tuna.moe/",
|
||||
command: "ls",
|
||||
workingDir: tmpDir,
|
||||
logDir: tmpDir,
|
||||
logFile: "/dev/null",
|
||||
interval: 600 * time.Second,
|
||||
}
|
||||
|
||||
provider, err := newCmdProvider(c)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(provider.IsMaster(), ShouldEqual, false)
|
||||
So(provider.ZFS(), ShouldBeNil)
|
||||
So(provider.Type(), ShouldEqual, provCommand)
|
||||
So(provider.Name(), ShouldEqual, c.name)
|
||||
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
|
||||
So(provider.LogDir(), ShouldEqual, c.logDir)
|
||||
So(provider.LogFile(), ShouldEqual, c.logFile)
|
||||
So(provider.Interval(), ShouldEqual, c.interval)
|
||||
|
||||
Convey("Run the command", func() {
|
||||
|
||||
err = provider.Run()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestTwoStageRsyncProvider(t *testing.T) {
|
||||
@@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
|
||||
logFile: tmpFile,
|
||||
useIPv6: true,
|
||||
excludeFile: tmpFile,
|
||||
username: "hello",
|
||||
password: "world",
|
||||
}
|
||||
|
||||
provider, err := newTwoStageRsyncProvider(c)
|
||||
@@ -306,6 +344,7 @@ exit 0
|
||||
err = provider.Run()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
|
||||
expectedOutput := fmt.Sprintf(
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
@@ -313,14 +352,14 @@ exit 0
|
||||
"syncing to %s\n"+
|
||||
"%s\n"+
|
||||
"Done\n",
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
|
||||
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
|
||||
"--exclude-from %s %s %s",
|
||||
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
|
||||
),
|
||||
provider.WorkingDir(),
|
||||
targetDir,
|
||||
fmt.Sprintf(
|
||||
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
|
||||
"--delete --delete-after --delay-updates --safe-links "+
|
||||
|
||||
@@ -11,7 +11,7 @@ type rsyncConfig struct {
|
||||
rsyncCmd string
|
||||
upstreamURL, username, password, excludeFile string
|
||||
workingDir, logDir, logFile string
|
||||
useIPv6 bool
|
||||
useIPv6, useIPv4 bool
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
@@ -49,6 +49,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
|
||||
|
||||
if c.useIPv6 {
|
||||
options = append(options, "-6")
|
||||
} else if c.useIPv4 {
|
||||
options = append(options, "-4")
|
||||
}
|
||||
|
||||
if c.excludeFile != "" {
|
||||
@@ -79,6 +81,12 @@ func (p *rsyncProvider) Run() error {
|
||||
}
|
||||
|
||||
func (p *rsyncProvider) Start() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{}
|
||||
if p.username != "" {
|
||||
@@ -92,7 +100,7 @@ func (p *rsyncProvider) Start() error {
|
||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||
|
||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package worker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
@@ -9,6 +10,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/codeskyblue/go-sh"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
@@ -31,11 +33,44 @@ type cmdJob struct {
|
||||
func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
|
||||
var cmd *exec.Cmd
|
||||
|
||||
if provider.Cgroup() != nil {
|
||||
if d := provider.Docker(); d != nil {
|
||||
c := "docker"
|
||||
args := []string{
|
||||
"run", "--rm",
|
||||
"-a", "STDOUT", "-a", "STDERR",
|
||||
"--name", d.Name(),
|
||||
"-w", workingDir,
|
||||
}
|
||||
// specify user
|
||||
args = append(
|
||||
args, "-u",
|
||||
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
|
||||
)
|
||||
// add volumes
|
||||
for _, vol := range d.Volumes() {
|
||||
logger.Debugf("volume: %s", vol)
|
||||
args = append(args, "-v", vol)
|
||||
}
|
||||
// set env
|
||||
for k, v := range env {
|
||||
kv := fmt.Sprintf("%s=%s", k, v)
|
||||
args = append(args, "-e", kv)
|
||||
}
|
||||
// apply options
|
||||
args = append(args, d.options...)
|
||||
// apply image and command
|
||||
args = append(args, d.image)
|
||||
// apply command
|
||||
args = append(args, cmdAndArgs...)
|
||||
|
||||
cmd = exec.Command(c, args...)
|
||||
|
||||
} else if provider.Cgroup() != nil {
|
||||
c := "cgexec"
|
||||
args := []string{"-g", provider.Cgroup().Cgroup()}
|
||||
args = append(args, cmdAndArgs...)
|
||||
cmd = exec.Command(c, args...)
|
||||
|
||||
} else {
|
||||
if len(cmdAndArgs) == 1 {
|
||||
cmd = exec.Command(cmdAndArgs[0])
|
||||
@@ -48,25 +83,28 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
|
||||
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
|
||||
logger.Debugf("Making dir %s", workingDir)
|
||||
if err = os.MkdirAll(workingDir, 0755); err != nil {
|
||||
logger.Errorf("Error making dir %s", workingDir)
|
||||
if provider.Docker() == nil {
|
||||
logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
|
||||
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
|
||||
logger.Debugf("Making dir %s", workingDir)
|
||||
if err = os.MkdirAll(workingDir, 0755); err != nil {
|
||||
logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
|
||||
}
|
||||
}
|
||||
cmd.Dir = workingDir
|
||||
cmd.Env = newEnviron(env, true)
|
||||
}
|
||||
|
||||
cmd.Dir = workingDir
|
||||
cmd.Env = newEnviron(env, true)
|
||||
|
||||
return &cmdJob{
|
||||
cmd: cmd,
|
||||
workingDir: workingDir,
|
||||
env: env,
|
||||
provider: provider,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cmdJob) Start() error {
|
||||
// logger.Debugf("Command start: %v", c.cmd.Args)
|
||||
c.finished = make(chan empty, 1)
|
||||
return c.cmd.Start()
|
||||
}
|
||||
@@ -80,6 +118,9 @@ func (c *cmdJob) Wait() error {
|
||||
return c.retErr
|
||||
default:
|
||||
err := c.cmd.Wait()
|
||||
if c.cmd.Stdout != nil {
|
||||
c.cmd.Stdout.(*os.File).Close()
|
||||
}
|
||||
c.retErr = err
|
||||
close(c.finished)
|
||||
return err
|
||||
@@ -95,6 +136,14 @@ func (c *cmdJob) Terminate() error {
|
||||
if c.cmd == nil || c.cmd.Process == nil {
|
||||
return errProcessNotStarted
|
||||
}
|
||||
|
||||
if d := c.provider.Docker(); d != nil {
|
||||
sh.Command(
|
||||
"docker", "stop", "-t", "2", d.Name(),
|
||||
).Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
|
||||
}
|
||||
|
||||
func (p *twoStageRsyncProvider) Run() error {
|
||||
defer p.Wait()
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if p.IsRunning() {
|
||||
return errors.New("provider is currently running")
|
||||
}
|
||||
|
||||
env := map[string]string{}
|
||||
if p.username != "" {
|
||||
@@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error {
|
||||
command = append(command, p.upstreamURL, p.WorkingDir())
|
||||
|
||||
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
|
||||
if err := p.prepareLogFile(); err != nil {
|
||||
if err := p.prepareLogFile(stage > 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error {
|
||||
return err
|
||||
}
|
||||
p.isRunning.Store(true)
|
||||
logger.Debugf("set isRunning to true: %s", p.Name())
|
||||
|
||||
err = p.cmd.Wait()
|
||||
p.isRunning.Store(false)
|
||||
p.Unlock()
|
||||
err = p.Wait()
|
||||
p.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
|
||||
w.httpClient = httpClient
|
||||
}
|
||||
|
||||
if cfg.Cgroup.Enable {
|
||||
initCgroup(cfg.Cgroup.BasePath)
|
||||
}
|
||||
w.initJobs()
|
||||
w.makeHTTPServer()
|
||||
tunasyncWorker = w
|
||||
@@ -222,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
|
||||
}
|
||||
switch cmd.Cmd {
|
||||
case CmdStart:
|
||||
job.ctrlChan <- jobStart
|
||||
if cmd.Options["force"] {
|
||||
job.ctrlChan <- jobForceStart
|
||||
} else {
|
||||
job.ctrlChan <- jobStart
|
||||
}
|
||||
case CmdRestart:
|
||||
job.ctrlChan <- jobRestart
|
||||
case CmdStop:
|
||||
@@ -389,28 +390,21 @@ func (w *Worker) URL() string {
|
||||
}
|
||||
|
||||
func (w *Worker) registorWorker() {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers",
|
||||
w.cfg.Manager.APIBase,
|
||||
)
|
||||
|
||||
msg := WorkerStatus{
|
||||
ID: w.Name(),
|
||||
URL: w.URL(),
|
||||
}
|
||||
|
||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to register worker")
|
||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||
url := fmt.Sprintf("%s/workers", root)
|
||||
logger.Debugf("register on manager url: %s", url)
|
||||
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to register worker")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s",
|
||||
w.cfg.Manager.APIBase,
|
||||
w.Name(),
|
||||
jobMsg.name,
|
||||
)
|
||||
p := job.provider
|
||||
smsg := MirrorStatus{
|
||||
Name: jobMsg.name,
|
||||
@@ -422,19 +416,22 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
|
||||
ErrorMsg: jobMsg.msg,
|
||||
}
|
||||
|
||||
if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
|
||||
for _, root := range w.cfg.Manager.APIBaseList() {
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
|
||||
)
|
||||
logger.Debugf("reporting on manager url: %s", url)
|
||||
if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) fetchJobStatus() []MirrorStatus {
|
||||
var mirrorList []MirrorStatus
|
||||
apiBase := w.cfg.Manager.APIBaseList()[0]
|
||||
|
||||
url := fmt.Sprintf(
|
||||
"%s/workers/%s/jobs",
|
||||
w.cfg.Manager.APIBase,
|
||||
w.Name(),
|
||||
)
|
||||
url := fmt.Sprintf("%s/workers/%s/jobs", apiBase, w.Name())
|
||||
|
||||
if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil {
|
||||
logger.Errorf("Failed to fetch job status: %s", err.Error())
|
||||
|
||||
45
worker/zfs_hook.go
普通文件
45
worker/zfs_hook.go
普通文件
@@ -0,0 +1,45 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/codeskyblue/go-sh"
|
||||
)
|
||||
|
||||
type zfsHook struct {
|
||||
emptyHook
|
||||
provider mirrorProvider
|
||||
zpool string
|
||||
}
|
||||
|
||||
func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
|
||||
return &zfsHook{
|
||||
provider: provider,
|
||||
zpool: zpool,
|
||||
}
|
||||
}
|
||||
|
||||
// create zfs dataset for a new mirror
|
||||
func (z *zfsHook) preJob() error {
|
||||
workingDir := z.provider.WorkingDir()
|
||||
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
|
||||
// sudo zfs create $zfsDataset
|
||||
// sudo zfs set mountpoint=${absPath} ${zfsDataset}
|
||||
|
||||
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
|
||||
// Unknown issue of ZFS:
|
||||
// dataset name should not contain upper case letters
|
||||
zfsDataset = strings.ToLower(zfsDataset)
|
||||
logger.Infof("Creating ZFS dataset %s", zfsDataset)
|
||||
if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
|
||||
if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
在新工单中引用
屏蔽一个用户