1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-06 22:46:47 +00:00

52 次代码提交

作者 SHA1 备注 提交日期
z4yx
9ac374527a regenerate travis deploy key 2018-10-25 17:27:32 +08:00
z4yx
f03626d4e1 update Get Started document 2018-10-25 17:23:02 +08:00
z4yx
23bf4890cc bump version to v0.3.2 2018-10-25 17:07:04 +08:00
z4yx
2f6a61aee5 increse test coverage 2018-10-25 17:02:05 +08:00
z4yx
b6043142e1 test if it works with golang 1.8 2018-10-25 16:16:04 +08:00
zhang
6241576b12 bug fix: tunasynctl failed to parse datetime when you list jobs of specific worker 2018-06-13 10:28:48 +08:00
bigeagle
ef78563b8c Merge pull request #74 from houbaron/patch-1
Update README.md
2018-05-31 21:25:55 +08:00
bigeagle
ca106f1360 Merge pull request #82 from tuna/dev
New feature: remove a worker with tunasynctl
2018-05-31 21:22:46 +08:00
Miao Wang
628266ac5a Merge pull request #81 from tuna/wip-override-concurrent-limit
New feature: run "tunasynctl start" with "-f" to override the limit of concurrent jobs
2018-05-31 14:22:03 +08:00
Yuxiang Zhang
7e601d9fff New feature: remove a worker with tunasynctl
Fix #78
2018-05-31 12:32:22 +08:00
z4yx
c750aa1871 new feature: run "tunasynctl start" with "-f" to override concurrent job limit 2018-05-30 18:59:24 +08:00
Yuxiang Zhang
6cbe91b4f1 new command: jobForceStart 2018-05-30 16:07:07 +08:00
Yuxiang Zhang
89a792986d increase test coverage rate of job & provider 2018-05-30 14:00:10 +08:00
Yuxiang Zhang
0fdb07d061 bug fix: log over-written in twoStageRsyncProvider
solve more DATA RACE problem
2018-05-30 12:28:09 +08:00
Yuxiang Zhang
c5bb172f99 increase test coverage rate of job.go 2018-05-30 11:45:05 +08:00
Yuxiang Zhang
79e6167028 fix race condition on logFile of baseProvider 2018-05-30 01:46:16 +08:00
Miao Wang
285ffb2f2f Merge pull request #80 from tuna/dev
Fix the "list" command of tunasynctl
2018-05-29 21:42:57 +08:00
Yuxiang Zhang
95bb4bbd5e report the last ended time (updated whether successful or not) of jobs 2018-05-29 21:21:03 +08:00
Yuxiang Zhang
6bca9d2cd5 fix TestHTTPServer in manager package 2018-05-29 19:07:01 +08:00
Yuxiang Zhang
4fe7d03e54 Move the WebMirrorStatus to internal package. Fix the list command of tunasynctl 2018-05-29 18:48:33 +08:00
Baron Hou
1fe9499728 Update README.md 2017-09-29 18:14:11 +08:00
bigeagle
a475b044c6 feat(worker): add 'use_ipv4' option for rsync provider 2017-09-08 00:15:48 +08:00
bigeagle
a50a360a91 Revert "feat(worker): add '-4' option to rsync when 'use_ipv6' is false"
This reverts commit d536aca2ac.
2017-09-08 00:12:40 +08:00
bigeagle
d536aca2ac feat(worker): add '-4' option to rsync when 'use_ipv6' is false 2017-09-06 23:22:55 +08:00
bigeagle
28545d61e7 Merge pull request #68 from l2dy/master
Update README.md
2017-05-29 11:03:27 -05:00
Zero King
a87fb0f8b4 Update README.md 2017-05-29 15:42:10 +00:00
Jason Lau
095e7c6320 Merge pull request #65 from felixonmars/patch-1
Fix a typo: Fisrt -> First
2017-03-30 15:31:46 +08:00
Felix Yan
7b441312f4 Fix a typo: Fisrt -> First 2017-03-30 13:27:40 +08:00
bigeagle
93194cde2e Merge pull request #60 from tuna/dev
Dev
2016-12-19 01:10:38 +08:00
bigeagle
aa4c31a32b feat(tunasynctl): implemented 'set-size' command to update a mirror size 2016-12-18 23:30:41 +08:00
bigeagle
4c6a407c17 feat(manager): implemented restful API for updating mirror size 2016-12-18 23:06:08 +08:00
bigeagle
939abaef9b feat(worker): TUNASYNC_LOG_DIR environment variable 2016-12-18 20:41:26 +08:00
bigeagle
d5a438462f feat(worker): map current uid and gid to docker 2016-12-18 14:28:48 +08:00
bigeagle
d4e07a7b29 fix(worker): keep the same working dir inside and outside of docker 2016-12-18 14:28:32 +08:00
bigeagle
9ac3193d50 Merge pull request #58 from tuna/dev
Dev
2016-12-12 23:46:22 +08:00
bigeagle
9ffb101cc7 chore(tunasync): bump version to 0.2-dev 2016-12-12 23:23:06 +08:00
bigeagle
fd277388d5 fix(worker): fixed multi-manager configuration
the worker must be registerred on the manager

`extra_status_manager` option is replaced by `api_base_list`, which overrides the `api_base` option
2016-12-12 23:17:50 +08:00
bigeagle
c5cba66786 Merge pull request #57 from tuna/dev
fix(cmd): make tunasynctl work with both HTTP and HTTPS
2016-12-11 02:45:19 +08:00
bigeagle
97e9725774 fix(cmd): make tunasynctl work with both HTTP and HTTPS 2016-12-11 02:13:19 +08:00
bigeagle
54740388b3 Merge pull request #56 from tuna/dev
Dev
2016-12-10 04:18:46 +08:00
bigeagle
7601e5793f fix(worker): improved cgroup creation 2016-12-10 04:14:39 +08:00
bigeagle
9645fd44ec ci(travis): Enabled docker on travis 2016-12-10 03:48:03 +08:00
bigeagle
ebd462be36 feat(worker): Implemented docker executor, close #55
if docker is enabled in configure file and `docker_image` is set on mirror config, the command would

be executed via `docker run ...`
2016-12-10 02:44:45 +08:00
bigeagle
21c832c8fb fix(worker): disabled memory limit
rsync memory is nolonger limited
2016-12-09 23:07:05 +08:00
bigeagle
81a15e7dd1 Merge pull request #54 from tuna/dev
Dev
2016-12-07 00:11:34 +08:00
bigeagle
3f31e83c14 feat(manager): let illegal status records be flushed with disabled jobs 2016-12-07 00:08:16 +08:00
bigeagle
a0b8ef08ab feat(worker): implemented extra_status_manager option to enable a worker reporting status to multi 2016-12-06 23:59:15 +08:00
bigeagle
86153c59e3 feat(worker): ZFS support: isolate mirrors in zfs datasets 2016-12-05 00:44:55 +08:00
bigeagle
96f9db8bb8 fix(worker): extended rsync memory limit to 512MB 2016-12-04 22:56:48 +08:00
bigeagle
6dd06c954c Merge pull request #51 from tuna/dev
Dev
2016-11-20 01:11:22 +08:00
bigeagle
03d22b7683 refactor(removing unneeded script files): 2016-11-20 01:10:24 +08:00
bigeagle
e9a7fc2de2 docs(tunasync): prebuilt binaries on Github releases, close #41 2016-11-20 01:09:00 +08:00
共有 51 个文件被更改,包括 1278 次插入1023 次删除

查看文件

@@ -1,6 +1,8 @@
sudo: required
language: go
go:
- 1.6
- 1.8
before_install:
- sudo apt-get install cgroup-bin
@@ -11,8 +13,14 @@ before_install:
os:
- linux
services:
- docker
before_script:
- sudo cgcreate -t travis -a travis -g memory:tunasync
- lssubsys -am
- sudo cgcreate -a $USER -t $USER -g cpu:tunasync
- sudo cgcreate -a $USER -t $USER -g memory:tunasync
- docker pull alpine
script:
- ./.testandcover.bash
@@ -27,9 +35,10 @@ deploy:
file:
- "build/tunasync-linux-bin.tar.gz"
api_key:
secure: "F9kaVaR1mxEh2+EL9Nm8GZmbVY98pXCJA0LGDNrq1C2vU61AUNOeX6yI1mMklHNZPLBqoFDvGN1M5HnJ+xWCFH+KnJgLD2GVIAcAxFNpcNWQe8XKE5heklNsIQNQfuh/rJKM6YzeDB9G5RN4Y76iL4WIAXhNnMm48W6jLnWhf70="
secure: ZOYL/CALrVJsZzbZqUMSI89Gw4zsBJH1StD/2yTyG45GfKgvtK4hG0S5cQM/L0wcikjEkgxSMsmr4ycq+OwbN++gc0umfoAQ/VSjzetiobAlT1E854aRKRjT82WxYdnPW2fsFjuEJTcyZmcbgJGTMi86MDt7w8tEjLomhd1+rUo=
skip_cleanup: true
overwrite: true
on:
tags: true
all_branches: true
repo: tuna/tunasync

查看文件

@@ -10,13 +10,16 @@ tunasync
- [中文文档](https://github.com/tuna/tunasync/blob/master/docs/zh_CN/get_started.md)
## Download
Pre-built binary for Linux x86_64 is available at [Github releases](https://github.com/tuna/tunasync/releases/latest).
## Design
```
# Architecture
- Manager: Centural instance on status and job management
- Manager: Central instance for status and job management
- Worker: Runs mirror jobs
+------------+ +---+ +---+
@@ -44,12 +47,12 @@ PreSyncing Syncing Success
| |
| +-----------------+ | Failed
+------+ post-fail |<---------+
+-----------------+
+-----------------+
```
## Generate Self-Signed Certificate
Fisrt, create root CA
First, create root CA
```
openssl genrsa -out rootCA.key 2048

查看文件

@@ -134,7 +134,7 @@ func main() {
app.Name = "tunasync"
app.Usage = "tunasync mirror job management tool"
app.EnableBashCompletion = true
app.Version = "0.1"
app.Version = tunasync.Version
app.Commands = []cli.Command{
{
Name: "manager",

查看文件

@@ -99,8 +99,11 @@ func initialize(c *cli.Context) error {
}
// parse base url of the manager server
baseURL = fmt.Sprintf("https://%s:%d",
cfg.ManagerAddr, cfg.ManagerPort)
if cfg.CACert != "" {
baseURL = fmt.Sprintf("https://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
} else {
baseURL = fmt.Sprintf("http://%s:%d", cfg.ManagerAddr, cfg.ManagerPort)
}
logger.Infof("Use manager address: %s", baseURL)
@@ -137,9 +140,9 @@ func listWorkers(c *cli.Context) error {
}
func listJobs(c *cli.Context) error {
// FIXME: there should be an API on manager server side that return MirrorStatus list to tunasynctl
var jobs []tunasync.MirrorStatus
var genericJobs interface{}
if c.Bool("all") {
var jobs []tunasync.WebMirrorStatus
_, err := tunasync.GetJSON(baseURL+listJobsPath, &jobs, client)
if err != nil {
return cli.NewExitError(
@@ -147,8 +150,10 @@ func listJobs(c *cli.Context) error {
"of all jobs from manager server: %s", err.Error()),
1)
}
genericJobs = jobs
} else {
var jobs []tunasync.MirrorStatus
args := c.Args()
if len(args) == 0 {
return cli.NewExitError(
@@ -171,9 +176,10 @@ func listJobs(c *cli.Context) error {
for range args {
jobs = append(jobs, <-ans...)
}
genericJobs = jobs
}
b, err := json.MarshalIndent(jobs, "", " ")
b, err := json.MarshalIndent(genericJobs, "", " ")
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Error printing out informations: %s", err.Error()),
@@ -183,6 +189,103 @@ func listJobs(c *cli.Context) error {
return nil
}
func updateMirrorSize(c *cli.Context) error {
args := c.Args()
if len(args) != 2 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id> <mirror> <size>", 1)
}
workerID := c.String("worker")
mirrorID := args.Get(0)
mirrorSize := args.Get(1)
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{
Name: mirrorID,
Size: mirrorSize,
}
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s/size", baseURL, workerID, mirrorID,
)
resp, err := tunasync.PostJSON(url, msg, client)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s",
err.Error()),
1)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return cli.NewExitError(
fmt.Sprintf("Manager failed to update mirror size: %s", body), 1,
)
}
var status tunasync.MirrorStatus
json.Unmarshal(body, &status)
if status.Size != mirrorSize {
return cli.NewExitError(
fmt.Sprintf(
"Mirror size error, expecting %s, manager returned %s",
mirrorSize, status.Size,
), 1,
)
}
logger.Infof("Successfully updated mirror size to %s", mirrorSize)
return nil
}
func removeWorker(c *cli.Context) error {
args := c.Args()
if len(args) != 0 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
}
workerID := c.String("worker")
if len(workerID) == 0 {
return cli.NewExitError("Please specify the <worker-id>", 1)
}
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
logger.Panicf("Invalid HTTP Request: %s", err.Error())
}
resp, err := client.Do(req)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to parse response: %s", err.Error()),
1)
}
return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
" command: HTTP status code is not 200: %s", body),
1)
}
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
if res["message"] == "deleted" {
logger.Info("Successfully removed the worker")
} else {
logger.Info("Failed to remove the worker")
}
return nil
}
func flushDisabledJobs(c *cli.Context) error {
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
if err != nil {
@@ -232,11 +335,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
"argument WORKER", 1)
}
options := map[string]bool{}
if c.Bool("force") {
options["force"] = true
}
cmd := tunasync.ClientCmd{
Cmd: cmd,
MirrorID: mirrorID,
WorkerID: c.String("worker"),
Args: argsList,
Options: options,
}
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
if err != nil {
@@ -322,7 +430,7 @@ func main() {
app := cli.NewApp()
app.EnableBashCompletion = true
app.Version = "0.1"
app.Version = tunasync.Version
app.Name = "tunasynctl"
app.Usage = "control client for tunasync manager"
@@ -357,6 +465,11 @@ func main() {
},
}
forceStartFlag := cli.BoolFlag{
Name: "force, f",
Usage: "Override the concurrent limit",
}
app.Commands = []cli.Command{
{
Name: "list",
@@ -382,10 +495,34 @@ func main() {
Flags: commonFlags,
Action: initializeWrapper(listWorkers),
},
{
Name: "rm-worker",
Usage: "Remove a worker",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "worker-id of the worker to be removed",
},
),
Action: initializeWrapper(removeWorker),
},
{
Name: "set-size",
Usage: "Set mirror size",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "specify worker-id of the mirror job",
},
),
Action: initializeWrapper(updateMirrorSize),
},
{
Name: "start",
Usage: "Start a job",
Flags: append(commonFlags, cmdFlags...),
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
},
{

查看文件

@@ -15,7 +15,7 @@ date: 2016-10-31 00:50:00
### 二进制包
TODO
到 [Github Releases](https://github.com/tuna/tunasync/releases/latest) 下载 `tunasync-linux-bin.tar.gz` 即可。
### 自行编译
@@ -90,6 +90,14 @@ $ tunasync worker --config ~/tunasync_demo/worker.conf
本例中,镜像的数据在`/tmp/tunasync/`
### 控制
查看同步状态
```
$ tunasynctl list -p 12345 --all
```
## 更进一步
可以参看

查看文件

@@ -5,7 +5,7 @@ import (
"time"
)
// A StatusUpdateMsg represents a msg when
// A MirrorStatus represents a msg when
// a worker has done syncing
type MirrorStatus struct {
Name string `json:"name"`
@@ -13,6 +13,7 @@ type MirrorStatus struct {
IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"`
LastUpdate time.Time `json:"last_update"`
LastEnded time.Time `json:"last_ended"`
Upstream string `json:"upstream"`
Size string `json:"size"`
ErrorMsg string `json:"error_msg"`
@@ -67,9 +68,10 @@ func (c CmdVerb) String() string {
// A WorkerCmd is the command message send from the
// manager to a worker
type WorkerCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}
func (c WorkerCmd) String() string {
@@ -82,8 +84,9 @@ func (c WorkerCmd) String() string {
// A ClientCmd is the command message send from client
// to the manager
type ClientCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}

查看文件

@@ -1,11 +1,9 @@
package manager
package internal
import (
"encoding/json"
"strconv"
"time"
. "github.com/tuna/tunasync/internal"
)
type textTime struct {
@@ -38,24 +36,28 @@ func (t *stampTime) UnmarshalJSON(b []byte) error {
return err
}
// webMirrorStatus is the mirror status to be shown in the web page
type webMirrorStatus struct {
// WebMirrorStatus is the mirror status to be shown in the web page
type WebMirrorStatus struct {
Name string `json:"name"`
IsMaster bool `json:"is_master"`
Status SyncStatus `json:"status"`
LastUpdate textTime `json:"last_update"`
LastUpdateTs stampTime `json:"last_update_ts"`
LastEnded textTime `json:"last_ended"`
LastEndedTs stampTime `json:"last_ended_ts"`
Upstream string `json:"upstream"`
Size string `json:"size"` // approximate size
}
func convertMirrorStatus(m MirrorStatus) webMirrorStatus {
return webMirrorStatus{
func BuildWebMirrorStatus(m MirrorStatus) WebMirrorStatus {
return WebMirrorStatus{
Name: m.Name,
IsMaster: m.IsMaster,
Status: m.Status,
LastUpdate: textTime{m.LastUpdate},
LastUpdateTs: stampTime{m.LastUpdate},
LastEnded: textTime{m.LastEnded},
LastEndedTs: stampTime{m.LastEnded},
Upstream: m.Upstream,
Size: m.Size,
}

76
internal/status_web_test.go 普通文件
查看文件

@@ -0,0 +1,76 @@
package internal
import (
"encoding/json"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := WebMirrorStatus{
Name: "tunalinux",
Status: Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
LastEnded: textTime{t},
LastEndedTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 WebMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
Convey("BuildWebMirrorStatus should work", t, func() {
m := MirrorStatus{
Name: "arch-sync3",
Worker: "testWorker",
IsMaster: true,
Status: Failed,
LastUpdate: time.Now().Add(-time.Minute * 30),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
var m2 WebMirrorStatus
m2 = BuildWebMirrorStatus(m)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastEnded.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEndedTs.Unix(), ShouldEqual, m.LastEnded.Unix())
So(m2.LastEnded.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.LastEndedTs.UnixNano(), ShouldEqual, m.LastEnded.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

3
internal/version.go 普通文件
查看文件

@@ -0,0 +1,3 @@
package internal
const Version string = "0.3.3"

查看文件

@@ -14,6 +14,7 @@ type dbAdapter interface {
Init() error
ListWorkers() ([]WorkerStatus, error)
GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
@@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
return
}
func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := bucket.Delete([]byte(workerID))
return err
})
return
}
func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
@@ -125,7 +139,7 @@ func (b *boltAdapter) GetMirrorStatus(workerID, mirrorID string) (m MirrorStatus
bucket := tx.Bucket([]byte(_statusBucketKey))
v := bucket.Get([]byte(id))
if v == nil {
return fmt.Errorf("no mirror %s exists in worker %s", mirrorID, workerID)
return fmt.Errorf("no mirror '%s' exists in worker '%s'", mirrorID, workerID)
}
err := json.Unmarshal(v, &m)
return err
@@ -182,7 +196,7 @@ func (b *boltAdapter) FlushDisabledJobs() (err error) {
err = fmt.Errorf("%s; %s", err.Error(), jsonErr)
continue
}
if m.Status == Disabled {
if m.Status == Disabled || len(m.Name) == 0 {
err = c.Delete()
}
}

查看文件

@@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
So(err, ShouldBeNil)
}
Convey("get exists worker", func() {
Convey("get existent worker", func() {
_, err := boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
})
Convey("list exist worker", func() {
Convey("list existent workers", func() {
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})
Convey("get inexist worker", func() {
Convey("get non-existent worker", func() {
_, err := boltDB.GetWorker("invalid workerID")
So(err, ShouldNotBeNil)
})
Convey("delete existent worker", func() {
err := boltDB.DeleteWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
_, err = boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 1)
})
Convey("delete non-existent worker", func() {
err := boltDB.DeleteWorker("invalid workerID")
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})
})
Convey("update mirror status", func() {
@@ -65,6 +83,7 @@ func TestBoltAdapter(t *testing.T) {
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
},
@@ -73,7 +92,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1],
IsMaster: true,
Status: Disabled,
LastUpdate: time.Now(),
LastUpdate: time.Now().Add(-time.Hour),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
},
@@ -82,7 +102,8 @@ func TestBoltAdapter(t *testing.T) {
Worker: testWorkerIDs[1],
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
LastUpdate: time.Now().Add(-time.Second),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
},

查看文件

@@ -1,6 +1,7 @@
package manager
import (
"errors"
"fmt"
"net/http"
"time"
@@ -83,10 +84,13 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
{
// delete specified worker
workerValidateGroup.DELETE(":id", s.deleteWorker)
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)
workerValidateGroup.POST(":id/jobs/:job/size", s.updateMirrorSize)
}
// for tunasynctl to post commands
@@ -133,11 +137,11 @@ func (s *Manager) listAllJobs(c *gin.Context) {
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
webMirStatusList := []webMirrorStatus{}
webMirStatusList := []WebMirrorStatus{}
for _, m := range mirrorStatusList {
webMirStatusList = append(
webMirStatusList,
convertMirrorStatus(m),
BuildWebMirrorStatus(m),
)
}
c.JSON(http.StatusOK, webMirStatusList)
@@ -157,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
}
// deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id")
err := s.adapter.DeleteWorker(workerID)
if err != nil {
err := fmt.Errorf("failed to delete worker: %s",
err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
logger.Noticef("Worker <%s> deleted", workerID)
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
}
// listWrokers respond with informations of all the workers
func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus
@@ -225,6 +245,12 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
var status MirrorStatus
c.BindJSON(&status)
mirrorName := status.Name
if len(mirrorName) == 0 {
s.returnErrJSON(
c, http.StatusBadRequest,
errors.New("Mirror Name should not be empty"),
)
}
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
@@ -234,21 +260,25 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
} else {
status.LastUpdate = curStatus.LastUpdate
}
if status.Status == Success || status.Status == Failed {
status.LastEnded = time.Now()
} else {
status.LastEnded = curStatus.LastEnded
}
// Only message with meaningful size updates the mirror size
if len(curStatus.Size) > 0 && curStatus.Size != "unknown" {
if len(status.Size) == 0 || status.Size == "unknown" {
status.Size = curStatus.Size
}
}
// for logging
switch status.Status {
case Success:
logger.Noticef("Job [%s] @<%s> success", status.Name, status.Worker)
case Failed:
logger.Warningf("Job [%s] @<%s> failed", status.Name, status.Worker)
case Syncing:
logger.Noticef("Job [%s] @<%s> starts syncing", status.Name, status.Worker)
case Disabled:
logger.Noticef("Job [%s] @<%s> disabled", status.Name, status.Worker)
case Paused:
logger.Noticef("Job [%s] @<%s> paused", status.Name, status.Worker)
default:
logger.Infof("Job [%s] @<%s> status: %s", status.Name, status.Worker, status.Status)
logger.Noticef("Job [%s] @<%s> %s", status.Name, status.Worker, status.Status)
}
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
@@ -263,6 +293,45 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
c.JSON(http.StatusOK, newStatus)
}
func (s *Manager) updateMirrorSize(c *gin.Context) {
workerID := c.Param("id")
type SizeMsg struct {
Name string `json:"name"`
Size string `json:"size"`
}
var msg SizeMsg
c.BindJSON(&msg)
mirrorName := msg.Name
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
if err != nil {
logger.Errorf(
"Failed to get status of mirror %s @<%s>: %s",
mirrorName, workerID, err.Error(),
)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
// Only message with meaningful size updates the mirror size
if len(msg.Size) > 0 || msg.Size != "unknown" {
status.Size = msg.Size
}
logger.Noticef("Mirror size of [%s] @<%s>: %s", status.Name, status.Worker, status.Size)
newStatus, err := s.adapter.UpdateMirrorStatus(workerID, mirrorName, status)
if err != nil {
err := fmt.Errorf("failed to update job %s of worker %s: %s",
mirrorName, workerID, err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, newStatus)
}
func (s *Manager) handleClientCmd(c *gin.Context) {
var clientCmd ClientCmd
c.BindJSON(&clientCmd)
@@ -286,6 +355,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args,
Options: clientCmd.Options,
}
// update job status, even if the job did not disable successfully,

查看文件

@@ -21,9 +21,16 @@ const (
)
func TestHTTPServer(t *testing.T) {
var listenPort = 5000
Convey("HTTP server should work", t, func(ctx C) {
listenPort++
port := listenPort
addr := "127.0.0.1"
baseURL := fmt.Sprintf("http://%s:%d", addr, port)
InitLogger(true, true, false)
s := GetTUNASyncManager(&Config{Debug: false})
s := GetTUNASyncManager(&Config{Debug: true})
s.cfg.Server.Addr = addr
s.cfg.Server.Port = port
So(s, ShouldNotBeNil)
s.setDBAdapter(&mockDBAdapter{
workerStore: map[string]WorkerStatus{
@@ -32,12 +39,8 @@ func TestHTTPServer(t *testing.T) {
}},
statusStore: make(map[string]MirrorStatus),
})
port := rand.Intn(10000) + 20000
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
go func() {
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
}()
time.Sleep(50 * time.Microsecond)
go s.Run()
time.Sleep(50 * time.Millisecond)
resp, err := http.Get(baseURL + "/ping")
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -79,6 +82,33 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2)
})
Convey("delete an existent worker", func(ctx C) {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_infoKey], ShouldEqual, "deleted")
})
Convey("delete non-existent worker", func(ctx C) {
invalidWorker := "test_worker233"
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})
Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil)
@@ -99,7 +129,7 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true,
Status: Success,
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
Size: "unknown",
}
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
@@ -121,11 +151,12 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
Convey("list all job status of all workers", func(ctx C) {
var ms []webMirrorStatus
var ms []WebMirrorStatus
resp, err := GetJSON(baseURL+"/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
@@ -137,8 +168,77 @@ func TestHTTPServer(t *testing.T) {
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate.Time), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded.Time), ShouldBeLessThan, 1*time.Second)
})
Convey("Update size of a valid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{status.Name, "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("Get new size of a mirror", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, "5GB")
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeLessThan, 1*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
})
Convey("Update size of an invalid mirror", func(ctx C) {
msg := struct {
Name string `json:"name"`
Size string `json:"size"`
}{"Invalid mirror", "5GB"}
url := fmt.Sprintf("%s/workers/%s/jobs/%s/size", baseURL, status.Worker, status.Name)
resp, err := PostJSON(url, msg, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
})
// what if status changed to failed
status.Status = Failed
time.Sleep(3 * time.Second)
resp, err = PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Convey("What if syncing job failed", func(ctx C) {
var ms []MirrorStatus
resp, err := GetJSON(baseURL+"/workers/test_worker1/jobs", &ms, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// err = json.NewDecoder(resp.Body).Decode(&mirrorStatusList)
m := ms[0]
So(m.Name, ShouldEqual, status.Name)
So(m.Worker, ShouldEqual, status.Worker)
So(m.Status, ShouldEqual, status.Status)
So(m.Upstream, ShouldEqual, status.Upstream)
So(m.Size, ShouldEqual, status.Size)
So(m.IsMaster, ShouldEqual, status.IsMaster)
So(time.Now().Sub(m.LastUpdate), ShouldBeGreaterThan, 3*time.Second)
So(time.Now().Sub(m.LastEnded), ShouldBeLessThan, 1*time.Second)
})
})
Convey("update mirror status of an inexisted worker", func(ctx C) {
@@ -149,6 +249,7 @@ func TestHTTPServer(t *testing.T) {
IsMaster: true,
Status: Success,
LastUpdate: time.Now(),
LastEnded: time.Now(),
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
@@ -252,6 +353,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
return w, nil
}
func (b *mockDBAdapter) DeleteWorker(workerID string) error {
delete(b.workerStore, workerID)
return nil
}
func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
// _, ok := b.workerStore[w.ID]
// if ok {

查看文件

@@ -1,44 +0,0 @@
package manager
import (
"encoding/json"
"testing"
"time"
tunasync "github.com/tuna/tunasync/internal"
. "github.com/smartystreets/goconvey/convey"
)
func TestStatus(t *testing.T) {
Convey("status json ser-de should work", t, func() {
tz := "Asia/Tokyo"
loc, err := time.LoadLocation(tz)
So(err, ShouldBeNil)
t := time.Date(2016, time.April, 16, 23, 8, 10, 0, loc)
m := webMirrorStatus{
Name: "tunalinux",
Status: tunasync.Success,
LastUpdate: textTime{t},
LastUpdateTs: stampTime{t},
Size: "5GB",
Upstream: "rsync://mirrors.tuna.tsinghua.edu.cn/tunalinux/",
}
b, err := json.Marshal(m)
So(err, ShouldBeNil)
//fmt.Println(string(b))
var m2 webMirrorStatus
err = json.Unmarshal(b, &m2)
So(err, ShouldBeNil)
// fmt.Printf("%#v", m2)
So(m2.Name, ShouldEqual, m.Name)
So(m2.Status, ShouldEqual, m.Status)
So(m2.LastUpdate.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdateTs.Unix(), ShouldEqual, m.LastUpdate.Unix())
So(m2.LastUpdate.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.LastUpdateTs.UnixNano(), ShouldEqual, m.LastUpdate.UnixNano())
So(m2.Size, ShouldEqual, m.Size)
So(m2.Upstream, ShouldEqual, m.Upstream)
})
}

查看文件

@@ -1,96 +0,0 @@
#!/bin/bash
# requires: wget, lftp, jq
#
set -e
set -o pipefail
CONDA_REPO_BASE=${CONDA_REPO_BASE:-"http://repo.continuum.io"}
LOCAL_DIR_BASE="${TUNASYNC_WORKING_DIR}/pkgs"
TMP_DIR=$(mktemp -d)
CONDA_REPOS=("free" "r" "mro" "pro")
CONDA_ARCHES=("linux-64" "linux-32" "linux-armv6l" "linux-armv7l" "linux-ppc64le" "osx-64" "osx-32" "win-64" "win-32")
function check-and-download() {
remote_file=$1
local_file=$2
wget -q --spider ${remote_file}
if [ $? -eq 0 ]; then
echo "downloading ${remote_file}"
wget -q -N -O ${local_file} ${remote_file}
return
fi
return 1
}
function cleanup () {
echo "cleaning up"
[ -d ${TMP_DIR} ] && {
[ -f ${TMP_DIR}/repodata.json ] && rm ${TMP_DIR}/repodata.json
[ -f ${TMP_DIR}/repodata.json.bz2 ] && rm ${TMP_DIR}/repodata.json.bz2
rmdir ${TMP_DIR}
}
}
trap cleanup EXIT
echo ${TMP_DIR}
for repo in ${CONDA_REPOS[@]}; do
for arch in ${CONDA_ARCHES[@]}; do
PKG_REPO_BASE="${CONDA_REPO_BASE}/pkgs/$repo/$arch"
repodata_url="${PKG_REPO_BASE}/repodata.json"
bz2_repodata_url="${PKG_REPO_BASE}/repodata.json.bz2"
LOCAL_DIR="${LOCAL_DIR_BASE}/$repo/$arch"
[ ! -d ${LOCAL_DIR} ] && mkdir -p ${LOCAL_DIR}
tmp_repodata="${TMP_DIR}/repodata.json"
tmp_bz2_repodata="${TMP_DIR}/repodata.json.bz2"
check-and-download ${repodata_url} ${tmp_repodata}
check-and-download ${bz2_repodata_url} ${tmp_bz2_repodata}
jq_cmd='.packages | to_entries[] | [.key, .value.size, .value.md5] | map(tostring) | join(" ")'
bzip2 -c -d ${tmp_bz2_repodata} | jq -r "${jq_cmd}" | while read line;
do
read -a tokens <<< $line
pkgfile=${tokens[0]}
pkgsize=${tokens[1]}
pkgmd5=${tokens[2]}
pkg_url="${PKG_REPO_BASE}/${pkgfile}"
dest_file="${LOCAL_DIR}/${pkgfile}"
declare downloaded=false
if [ -f ${dest_file} ]; then
rsize=`stat -c "%s" ${dest_file}`
if [ ${rsize} -eq ${pkgsize} ]; then
downloaded=true
echo "Skipping ${pkgfile}, size ${pkgsize}"
fi
fi
while [ $downloaded != true ]; do
echo "downloading ${pkg_url}"
wget -q -O ${dest_file} ${pkg_url} && {
# two space for md5sum check format
echo "${pkgmd5} ${dest_file}" | md5sum -c - && downloaded=true
}
done
done
mv -f "${TMP_DIR}/repodata.json" "${LOCAL_DIR}/repodata.json"
mv -f "${TMP_DIR}/repodata.json.bz2" "${LOCAL_DIR}/repodata.json.bz2"
done
done
function sync_installer() {
repo_url="$1"
repo_dir="$2"
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
cd $repo_dir
lftp "${repo_url}/" -e "mirror --verbose -P 5; bye"
}
sync_installer "${CONDA_REPO_BASE}/archive/" "${TUNASYNC_WORKING_DIR}/archive/"
sync_installer "${CONDA_REPO_BASE}/miniconda/" "${TUNASYNC_WORKING_DIR}/miniconda/"

查看文件

@@ -1,21 +0,0 @@
#!/bin/bash
REPO=${REPO:-"/usr/local/bin/repo"}
function repo_init() {
mkdir -p $TUNASYNC_WORKING_DIR
cd $TUNASYNC_WORKING_DIR
$REPO init -u https://android.googlesource.com/mirror/manifest --mirror
}
function repo_sync() {
cd $TUNASYNC_WORKING_DIR
$REPO sync -f
}
if [ ! -d "$TUNASYNC_WORKING_DIR/git-repo.git" ]; then
echo "Initializing AOSP mirror"
repo_init
fi
repo_sync

查看文件

@@ -1,59 +0,0 @@
#!/bin/bash
# reqires: wget, yum-utils
set -e
set -o pipefail
_here=`dirname $(realpath $0)`
. ${_here}/helpers/apt-download
APT_VERSIONS=("debian-wheezy" "debian-jessie" "ubuntu-precise" "ubuntu-trusty" "ubuntu-xenial")
BASE_PATH="${TUNASYNC_WORKING_DIR}"
APT_PATH="${BASE_PATH}/apt/repo"
YUM_PATH="${BASE_PATH}/yum/repo"
mkdir -p ${APT_PATH} ${YUM_PATH}
wget -q -N -O ${BASE_PATH}/yum/gpg https://yum.dockerproject.org/gpg
wget -q -N -O ${BASE_PATH}/apt/gpg https://apt.dockerproject.org/gpg
# YUM mirror
cache_dir="/tmp/yum-docker-cache/"
cfg="/tmp/docker-yum.conf"
cat <<EOF > ${cfg}
[main]
keepcache=0
[centos6]
name=Docker Repository
baseurl=https://yum.dockerproject.org/repo/main/centos/6
enabled=1
gpgcheck=0
gpgkey=https://yum.dockerproject.org/gpg
sslverify=0
[centos7]
name=Docker Repository
baseurl=https://yum.dockerproject.org/repo/main/centos/7
enabled=1
gpgcheck=0
gpgkey=https://yum.dockerproject.org/gpg
sslverify=0
EOF
[ ! -d ${YUM_PATH}/centos6 ] && mkdir -p ${YUM_PATH}/centos6
[ ! -d ${YUM_PATH}/centos7 ] && mkdir -p ${YUM_PATH}/centos7
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/centos6 ${YUM_PATH}/centos7
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/centos6 ${YUM_PATH}/centos7
rm $cfg
# APT mirror
base_url="http://apt.dockerproject.org/repo"
for version in ${APT_VERSIONS[@]}; do
apt-download-binary ${base_url} "$version" "main" "amd64" "${APT_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${APT_PATH}" || true
done
# sync_docker "http://apt.dockerproject.org/" "${TUNASYNC_WORKING_DIR}/apt"
# sync_docker "http://yum.dockerproject.org/" "${TUNASYNC_WORKING_DIR}/yum"

查看文件

@@ -1,13 +0,0 @@
#!/usr/bin/env python
ARCH_EXCLUDE = ['armel', 'alpha', 'hurd-i386', 'ia64', 'kfreebsd-amd64', 'kfreebsd-i386', 'mips', 'powerpc', 'ppc64el', 's390', 's390x', 'sparc']
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
with open("debian-exclude.txt", 'wb') as f:
f.write(".~tmp~/\n")
f.write(".*\n")
for arch in ARCH_EXCLUDE:
for content in CONTENT_EXCLUDE:
f.write(content.format(arch=arch))
f.write('\n')

查看文件

@@ -1,13 +0,0 @@
#!/usr/bin/env python
ARCH_EXCLUDE = ['armel', 'armhf']
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
with open("kali-exclude.txt", 'wb') as f:
f.write(".~tmp~/\n")
f.write(".*\n")
for arch in ARCH_EXCLUDE:
for content in CONTENT_EXCLUDE:
f.write(content.format(arch=arch))
f.write('\n')

查看文件

@@ -1,13 +0,0 @@
#!/usr/bin/env python
ARCH_EXCLUDE = ['powerpc', 'ppc64el', 'ia64', 'sparc', 'armel']
CONTENT_EXCLUDE = ['binary-{arch}', 'installer-{arch}', 'Contents-{arch}.gz', 'Contents-udeb-{arch}.gz', 'Contents-{arch}.diff', 'arch-{arch}.files', 'arch-{arch}.list.gz', '*_{arch}.deb', '*_{arch}.udeb', '*_{arch}.changes']
with open("ubuntu-ports-exclude.txt", 'wb') as f:
f.write(".~tmp~/\n")
f.write(".*\n")
for arch in ARCH_EXCLUDE:
for content in CONTENT_EXCLUDE:
f.write(content.format(arch=arch))
f.write('\n')

查看文件

@@ -1,65 +0,0 @@
#!/bin/bash
set -e
_here=`dirname $(realpath $0)`
. ${_here}/helpers/apt-download
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
BASE_PATH="${TUNASYNC_WORKING_DIR}"
YUM_PATH="${BASE_PATH}/yum"
UBUNTU_VERSIONS=("trusty" "wily")
DEBIAN_VERSIONS=("wheezy" "jessie" "stretch")
UBUNTU_PATH="${BASE_PATH}/ubuntu/"
DEBIAN_PATH="${BASE_PATH}/debian/"
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
cache_dir="/tmp/yum-gitlab-ce-cache/"
cfg="/tmp/gitlab-ce-yum.conf"
cat <<EOF > ${cfg}
[main]
keepcache=0
[el6]
name=el6
baseurl=https://packages.gitlab.com/gitlab/gitlab-ce/el/6/x86_64
repo_gpgcheck=0
gpgcheck=0
enabled=1
gpgkey=https://packages.gitlab.com/gpg.key
sslverify=0
[el7]
name=el7
baseurl=https://packages.gitlab.com/gitlab/gitlab-ce/el/7/x86_64
repo_gpgcheck=0
gpgcheck=0
enabled=1
gpgkey=https://packages.gitlab.com/gpg.key
sslverify=0
EOF
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6 ${YUM_PATH}/el6
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7 ${YUM_PATH}/el7
rm $cfg
base_url="https://packages.gitlab.com/gitlab/gitlab-ce/ubuntu"
for version in ${UBUNTU_VERSIONS[@]}; do
apt-download-binary ${base_url} "$version" "main" "amd64" "${UBUNTU_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${UBUNTU_PATH}" || true
done
echo "Ubuntu finished"
base_url="https://packages.gitlab.com/gitlab/gitlab-ce/debian"
for version in ${DEBIAN_VERSIONS[@]}; do
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
done
echo "Debian finished"
# vim: ts=4 sts=4 sw=4

查看文件

@@ -1,69 +0,0 @@
#!/bin/bash
# reqires: wget, yum-utils
set -e
set -o pipefail
_here=`dirname $(realpath $0)`
. ${_here}/helpers/apt-download
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
BASE_PATH="${TUNASYNC_WORKING_DIR}"
YUM_PATH="${BASE_PATH}/yum"
UBUNTU_VERSIONS=("trusty" "xenial")
DEBIAN_VERSIONS=("wheezy" "jessie" "stretch")
UBUNTU_PATH="${BASE_PATH}/ubuntu/"
DEBIAN_PATH="${BASE_PATH}/debian/"
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
cache_dir="/tmp/yum-gitlab-runner-cache/"
cfg="/tmp/gitlab-runner-yum.conf"
cat <<EOF > ${cfg}
[main]
keepcache=0
[el6]
name=gitlab-ci-multi-runner-el6
baseurl=https://packages.gitlab.com/runner/gitlab-ci-multi-runner/el/6/x86_64
repo_gpgcheck=0
gpgcheck=0
enabled=1
gpgkey=https://packages.gitlab.com/gpg.key
sslverify=0
[el7]
name=gitlab-ci-multi-runner-el7
baseurl=https://packages.gitlab.com/runner/gitlab-ci-multi-runner/el/7/x86_64
repo_gpgcheck=0
gpgcheck=0
enabled=1
gpgkey=https://packages.gitlab.com/gpg.key
sslverify=0
EOF
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
[ ! -d ${YUM_PATH}/el6 ] && mkdir -p ${YUM_PATH}/el6
[ ! -d ${YUM_PATH}/el7 ] && mkdir -p ${YUM_PATH}/el7
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6 ${YUM_PATH}/el6
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7 ${YUM_PATH}/el7
rm $cfg
base_url="https://packages.gitlab.com/runner/gitlab-ci-multi-runner/ubuntu"
for version in ${UBUNTU_VERSIONS[@]}; do
apt-download-binary ${base_url} "$version" "main" "amd64" "${UBUNTU_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${UBUNTU_PATH}" || true
done
echo "Ubuntu finished"
base_url="https://packages.gitlab.com/runner/gitlab-ci-multi-runner/debian"
for version in ${DEBIAN_VERSIONS[@]}; do
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
done
echo "Debian finished"
# vim: ts=4 sts=4 sw=4

查看文件

@@ -1,92 +0,0 @@
#!/bin/bash
set -e
function remove_broken() {
interval=$1
interval_file="/tmp/hackage_lastcheck"
now=`date +%s`
if [[ -f ${interval_file} ]]; then
lastcheck=`cat ${interval_file}`
between=$(echo "${now}-${lastcheck}" | bc)
[[ $between -lt $interval ]] && echo "skip checking"; return 0
fi
echo "start checking"
mkdir -p "${TUNASYNC_WORKING_DIR}/package"
cd "${TUNASYNC_WORKING_DIR}/package"
ls | while read line; do
echo -n "$line\t\t"
tar -tzf $line >/dev/null || (echo "FAIL"; rm $line) && echo "OK"
done
echo `date +%s` > $interval_file
}
function must_download() {
src=$1
dst=$2
while true; do
echo "downloading: $name"
wget "$src" -O "$dst" &>/dev/null || true
tar -tzf package/$name >/dev/null || rm package/$name && break
done
}
function hackage_mirror() {
local_pklist="/tmp/hackage_local_pklist_$$.list"
remote_pklist="/tmp/hackage_remote_pklist_$$.list"
cd ${TUNASYNC_WORKING_DIR}
mkdir -p package
echo "Downloading index..."
rm index.tar.gz || true
axel http://hdiff.luite.com/packages/archive/index.tar.gz -o index.tar.gz > /dev/null
echo "building local package list"
ls package | sed "s/\.tar\.gz$//" > $local_pklist
echo "preferred-versions" >> $local_pklist # ignore preferred-versions
echo "building remote package list"
tar ztf index.tar.gz | (cut -d/ -f 1,2 2>/dev/null) | sed 's|/|-|' > $remote_pklist
echo "building download list"
# substract local list from remote list
comm <(sort $remote_pklist) <(sort $local_pklist) -23 | while read pk; do
# limit concurrent level
bgcount=`jobs | wc -l`
while [[ $bgcount -ge 5 ]]; do
sleep 0.5
bgcount=`jobs | wc -l`
done
name="$pk.tar.gz"
if [ ! -a package/$name ]; then
must_download "http://hackage.haskell.org/package/$pk/$name" "package/$name" &
else
echo "skip existed: $name"
fi
done
# delete redundanty files
comm <(sort $remote_pklist) <(sort $local_pklist) -13 | while read pk; do
name="$pk.tar.gz"
echo "deleting ${name}"
rm "package/$name"
done
cp index.tar.gz 00-index.tar.gz
}
function cleanup () {
echo "cleaning up"
[[ ! -z $local_pklist ]] && (rm $local_pklist $remote_pklist ; true)
}
trap cleanup EXIT
remove_broken 86400
hackage_mirror
# vim: ts=4 sts=4 sw=4

查看文件

@@ -1,132 +0,0 @@
#!/bin/bash
set -e
LOADED_APT_DOWNLOAD="yes"
function check-and-download() {
remote_file=$1
local_file=$2
wget -q --spider ${remote_file}
if [ $? -eq 0 ]; then
echo "downloading ${remote_file}"
wget -q -N -O ${local_file} ${remote_file}
return
fi
return 0
}
function apt-download-binary() {
base_url=$1
dist=$2
repo=$3
arch=$4
dest_base_dir=$5
if [ -z $dest_base_dir ]; then
echo "Destination directory is empty, cannot continue"
return 1
fi
dest_dir="${dest_base_dir}/dists/${dist}"
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
check-and-download "${base_url}/dists/${dist}/Contents-${arch}.gz" "${dest_dir}/Contents-${arch}.gz" || true
check-and-download "${base_url}/dists/${dist}/InRelease" "${dest_dir}/InRelease" || true
check-and-download "${base_url}/dists/${dist}/Release" "${dest_dir}/Release"
check-and-download "${base_url}/dists/${dist}/Release.gpg" "${dest_dir}/Release.gpg" || true
# Load Package Index URLs from Release file
release_file="${dest_dir}/Release"
dest_dir="${dest_base_dir}/dists/${dist}/${repo}/binary-${arch}"
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
declare pkgidx_content=""
declare cnt_start=false
declare -i checksum_len
if (grep -e '^SHA256:$' ${release_file} &>/dev/null); then
checksum_cmd="sha256sum"; checksum_regex="^SHA256:$"; checksum_len=64
elif (grep -e '^SHA1:$' ${release_file} &>/dev/null); then
checksum_cmd="sha1sum"; checksum_regex="^SHA1:$"; checksum_len=40
elif (grep -e '^MD5Sum:$' ${release_file} &>/dev/null); then
checksum_cmd="md5sum"; checksum_regex="^MD5sum:$"; checksum_len=32
fi
while read line; do
if [[ ${cnt_start} = true ]]; then
read -a tokens <<< $line
checksum=${tokens[0]}
if [[ ${#checksum} != ${checksum_len} ]]; then
break
fi
filesize=${tokens[1]}
filename=${tokens[2]}
if [[ "$filename" =~ ${repo}/binary-${arch} ]]; then
# Load package list from Packages file
pkgidx_file="${dest_base_dir}/dists/${dist}/${filename}"
dest_dir=`dirname ${pkgidx_file}`
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
pkglist_url="${base_url}/dists/${dist}/${filename}"
check-and-download "${pkglist_url}" ${pkgidx_file} || true
echo "${checksum} ${pkgidx_file}" | ${checksum_cmd} -c -
if [ -z "${pkgidx_content}" -a -f ${pkgidx_file} ]; then
echo "getting packages index content"
case $filename in
"*.bz2")
pkgidx_content=`bunzip2 -c ${pkgidx_file}`
;;
"*.gz")
pkgidx_content=`gunzip -c ${pkgidx_file}`
;;
*)
pkgidx_content=`cat ${pkgidx_file}`
;;
esac
fi
fi
else
if [[ "$line" =~ ${checksum_regex} ]]; then
cnt_start=true
fi
fi
done < ${release_file}
if [ -z "${pkgidx_content}" ]; then
echo "index is empty, failed"
return 1
fi
# Set checksum method
if (echo -e "${pkgidx_content}" | grep -e '^SHA256' &>/dev/null); then
checksum_cmd="sha256sum"; checksum_regex="^SHA256"
elif (echo -e "${pkgidx_content}" | grep -e '^SHA1' &>/dev/null); then
checksum_cmd="sha1sum"; checksum_regex="^SHA1"
elif (echo -e "${pkgidx_content}" | grep -e '^MD5sum' &>/dev/null); then
checksum_cmd="md5sum"; checksum_regex="^MD5sum"
fi
# Download packages
(echo -e "${pkgidx_content}" | grep -e '^Filename' -e '^Size' -e ${checksum_regex} | cut -d' ' -f 2) | \
while read pkg_filename; read pkg_size; read pkg_checksum; do
dest_filename="${dest_base_dir}/${pkg_filename}"
dest_dir=`dirname ${dest_filename}`
[ ! -d "$dest_dir" ] && mkdir -p "$dest_dir"
pkg_url="${base_url}/${pkg_filename}"
declare downloaded=false
if [ -f ${dest_filename} ]; then
rsize=`stat -c "%s" ${dest_filename}`
if [ ${rsize} -eq ${pkg_size} ]; then
downloaded=true
echo "Skipping ${pkg_filename}, size ${pkg_size}"
fi
fi
while [ $downloaded != true ]; do
echo "downloading ${pkg_url}"
wget -q -O ${dest_filename} ${pkg_url} && {
echo "${pkg_checksum} ${dest_filename}" | ${checksum_cmd} -c - && downloaded=true # two space for md5sum/sha1sum/sha256sum check format
}
done
done
echo "Mirroring ${base_url} ${dist}, ${repo}, ${arch} done!"
}
# vim: ts=4 sts=4 sw=4

查看文件

@@ -1,17 +0,0 @@
#!/bin/bash
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
echo "Directory not exists, fail"
exit 1
fi
function update_homebrew_git() {
repo_dir="$1"
cd $repo_dir
echo "==== SYNC $repo_dir START ===="
/usr/bin/timeout -s INT 3600 git remote -v update
echo "==== SYNC $repo_dir DONE ===="
}
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew.git"
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew-python.git"
update_homebrew_git "$TUNASYNC_WORKING_DIR/homebrew-science.git"

查看文件

@@ -1,12 +0,0 @@
#!/bin/bash
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
echo "Directory not exists, fail"
exit 1
fi
function update_linux_git() {
cd $TUNASYNC_WORKING_DIR
/usr/bin/timeout -s INT 3600 git remote -v update
}
update_linux_git

查看文件

@@ -1,16 +0,0 @@
#!/bin/bash
function sync_lxc_images() {
repo_url="$1"
repo_dir="$2"
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
cd $repo_dir
# lftp "${repo_url}/" -e "mirror --verbose --log=${TUNASYNC_LOG_FILE} --exclude-glob='*/SRPMS/*' -P 5 --delete --only-newer; bye"
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
}
sync_lxc_images "http://images.linuxcontainers.org/images" "${TUNASYNC_WORKING_DIR}/images"
sync_lxc_images "http://images.linuxcontainers.org/meta" "${TUNASYNC_WORKING_DIR}/meta"

查看文件

@@ -1,88 +0,0 @@
#!/bin/bash
set -e
_here=`dirname $(realpath $0)`
. ${_here}/helpers/apt-download
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
BASE_PATH="${TUNASYNC_WORKING_DIR}"
YUM_PATH="${BASE_PATH}/yum"
APT_PATH="${BASE_PATH}/apt"
UBUNTU_VERSIONS=("trusty" "precise")
DEBIAN_VERSIONS=("wheezy")
MONGO_VERSIONS=("3.2" "3.0")
STABLE_VERSION="3.2"
UBUNTU_PATH="${APT_PATH}/ubuntu"
DEBIAN_PATH="${APT_PATH}/debian"
mkdir -p $UBUNTU_PATH $DEBIAN_PATH $YUM_PATH
cache_dir="/tmp/yum-mongodb-cache/"
cfg="/tmp/mongodb-yum.conf"
cat <<EOF > ${cfg}
[main]
keepcache=0
EOF
for mgver in ${MONGO_VERSIONS[@]}; do
cat <<EOF >> ${cfg}
[el6-${mgver}]
name=el6-${mgver}
baseurl=https://repo.mongodb.org/yum/redhat/6/mongodb-org/${mgver}/x86_64/
repo_gpgcheck=0
gpgcheck=0
enabled=1
sslverify=0
[el7-${mgver}]
name=el7-${mgver}
baseurl=https://repo.mongodb.org/yum/redhat/7/mongodb-org/${mgver}/x86_64/
repo_gpgcheck=0
gpgcheck=0
enabled=1
sslverify=0
EOF
done
reposync -c $cfg -d -p ${YUM_PATH} -e $cache_dir
for mgver in ${MONGO_VERSIONS[@]}; do
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el6-$mgver/ ${YUM_PATH}/el6-$mgver/
createrepo --update -v -c $cache_dir -o ${YUM_PATH}/el7-$mgver/ ${YUM_PATH}/el7-$mgver/
done
[ -e ${YUM_PATH}/el6 ] || (cd ${YUM_PATH}; ln -s el6-${STABLE_VERSION} el6)
[ -e ${YUM_PATH}/el7 ] || (cd ${YUM_PATH}; ln -s el7-${STABLE_VERSION} el7)
rm $cfg
base_url="http://repo.mongodb.org/apt/ubuntu"
for ubver in ${UBUNTU_VERSIONS[@]}; do
for mgver in ${MONGO_VERSIONS[@]}; do
version="$ubver/mongodb-org/$mgver"
apt-download-binary ${base_url} "$version" "multiverse" "amd64" "${UBUNTU_PATH}" || true
apt-download-binary ${base_url} "$version" "multiverse" "i386" "${UBUNTU_PATH}" || true
done
mg_basepath="${UBUNTU_PATH}/dists/$ubver/mongodb-org"
[ -e ${mg_basepath}/stable ] || (cd ${mg_basepath}; ln -s ${STABLE_VERSION} stable)
done
echo "Ubuntu finished"
base_url="http://repo.mongodb.org/apt/debian"
for dbver in ${DEBIAN_VERSIONS[@]}; do
for mgver in ${MONGO_VERSIONS[@]}; do
version="$dbver/mongodb-org/$mgver"
apt-download-binary ${base_url} "$version" "main" "amd64" "${DEBIAN_PATH}" || true
apt-download-binary ${base_url} "$version" "main" "i386" "${DEBIAN_PATH}" || true
done
mg_basepath="${DEBIAN_PATH}/dists/$dbver/mongodb-org"
[ -e ${mg_basepath}/stable ] || (cd ${mg_basepath}; ln -s ${STABLE_VERSION} stable)
done
echo "Debian finished"
# vim: ts=4 sts=4 sw=4

查看文件

@@ -1,18 +0,0 @@
#!/bin/bash
function sync_nodesource() {
repo_url="$1"
repo_dir="$2"
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
cd $repo_dir
# lftp "${repo_url}/" -e "mirror --verbose --exclude-glob='*/SRPMS/*' -P 5 --delete --only-newer; bye"
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
}
sync_nodesource "https://deb.nodesource.com/node" "${TUNASYNC_WORKING_DIR}/deb"
sync_nodesource "https://deb.nodesource.com/node_0.12" "${TUNASYNC_WORKING_DIR}/deb_0.12"
sync_nodesource "https://deb.nodesource.com/node_4.x" "${TUNASYNC_WORKING_DIR}/deb_4.x"
sync_nodesource "https://rpm.nodesource.com/pub" "${TUNASYNC_WORKING_DIR}/rpm"
sync_nodesource "https://rpm.nodesource.com/pub_0.12" "${TUNASYNC_WORKING_DIR}/rpm_0.12"
sync_nodesource "https://rpm.nodesource.com/pub_4.x" "${TUNASYNC_WORKING_DIR}/rpm_4.x"

查看文件

@@ -1,13 +0,0 @@
#!/bin/bash
function sync_openwrt() {
repo_url="$1"
repo_dir="$2"
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
cd $repo_dir
lftp "${repo_url}/" -e "mirror --verbose -P 5 --delete --only-newer; bye"
}
sync_openwrt "http://downloads.openwrt.org/chaos_calmer/15.05/" "${TUNASYNC_WORKING_DIR}/chaos_calmer/15.05"
sync_openwrt "http://downloads.openwrt.org/snapshots/trunk/" "${TUNASYNC_WORKING_DIR}/snapshots/trunk"

查看文件

@@ -1,9 +0,0 @@
#!/bin/bash
if [ ! -d "$TUNASYNC_WORKING_DIR" ]; then
echo "Directory not exists, fail"
exit 1
fi
echo "Syncing to $TUNASYNC_WORKING_DIR"
/usr/bin/timeout -s INT 3600 /home/tuna/.virtualenvs/bandersnatch/bin/bandersnatch -c /etc/bandersnatch.conf mirror || exit 1

查看文件

@@ -1,17 +0,0 @@
#!/bin/bash
function sync_repo_ck() {
repo_url="$1"
repo_dir="$2"
[ ! -d "$repo_dir" ] && mkdir -p "$repo_dir"
cd $repo_dir
lftp "${repo_url}/" -e 'mirror -v -P 5 --delete --only-missing --only-newer --no-recursion; bye'
wget "${repo_url}/repo-ck.db" -O "repo-ck.db"
wget "${repo_url}/repo-ck.files" -O "repo-ck.files"
}
UPSTREAM="http://repo-ck.com"
sync_repo_ck "${UPSTREAM}/x86_64" "${TUNASYNC_WORKING_DIR}/x86_64"
sync_repo_ck "${UPSTREAM}/i686" "${TUNASYNC_WORKING_DIR}/i686"

查看文件

@@ -1,16 +0,0 @@
#!/bin/bash
set -e
_here=`dirname $(realpath $0)`
. ${_here}/helpers/apt-download
[ -z "${LOADED_APT_DOWNLOAD}" ] && (echo "failed to load apt-download"; exit 1)
BASE_PATH="${TUNASYNC_WORKING_DIR}"
base_url="http://apt.termux.com"
ARCHES=("aarch64" "all" "arm" "i686")
for arch in ${ARCHES[@]}; do
echo "start syncing: ${arch}"
apt-download-binary "${base_url}" "stable" "main" "${arch}" "${BASE_PATH}" || true
done
echo "finished"

查看文件

@@ -1,27 +0,0 @@
#!/bin/bash
SYNC_FILES="$TUNASYNC_WORKING_DIR"
# SYNC_FILES="/srv/mirror_disk/ubuntu/_working/"
#LOG_FILE="$TUNASYNC_LOG_FILE"
# [ -f $SYNC_LOCK ] && exit 1
# touch $SYNC_LOCK
echo ">> Starting sync on $(date --rfc-3339=seconds)"
arch="i386,amd64"
sections="main,main/debian-installer,multiverse,multiverse/debian-installer,restricted,restricted/debian-installer,universe,universe/debian-installer"
dists="precise,precise-backports,precise-proposed,precise-updates,precise-security,trusty,trusty-backports,trusty-proposed,trusty-updates,trusty-security"
server="$1"
inPath="/ubuntu"
proto="rsync"
outpath="$SYNC_FILES"
rsyncOpt='-6 -aIL --partial'
debmirror -h $server --no-check-gpg -a $arch -s $sections -d $dists -r $inPath -e $proto --rsync-options "$rsyncOpt" --verbose $outpath
date --rfc-3339=seconds > "$SYNC_FILES/lastsync"
echo ">> Finished sync on $(date --rfc-3339=seconds)"
# rm -f "$SYNC_LOCK"
exit 0

查看文件

@@ -20,10 +20,11 @@ type baseProvider struct {
cmd *cmdJob
isRunning atomic.Value
logFile *os.File
cgroup *cgroupHook
hooks []jobHook
zfs *zfsHook
docker *dockerHook
hooks []jobHook
}
func (p *baseProvider) Name() string {
@@ -77,12 +78,17 @@ func (p *baseProvider) LogFile() string {
return s
}
}
panic("log dir is impossible to be unavailable")
panic("log file is impossible to be unavailable")
}
func (p *baseProvider) AddHook(hook jobHook) {
if cg, ok := hook.(*cgroupHook); ok {
p.cgroup = cg
switch v := hook.(type) {
case *cgroupHook:
p.cgroup = v
case *zfsHook:
p.zfs = v
case *dockerHook:
p.docker = v
}
p.hooks = append(p.hooks, hook)
}
@@ -95,20 +101,29 @@ func (p *baseProvider) Cgroup() *cgroupHook {
return p.cgroup
}
func (p *baseProvider) prepareLogFile() error {
func (p *baseProvider) ZFS() *zfsHook {
return p.zfs
}
func (p *baseProvider) Docker() *dockerHook {
return p.docker
}
func (p *baseProvider) prepareLogFile(append bool) error {
if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil)
return nil
}
if p.logFile == nil {
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.logFile = logFile
appendMode := 0
if append {
appendMode = os.O_APPEND
}
p.cmd.SetLogFile(p.logFile)
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.cmd.SetLogFile(logFile)
return nil
}
@@ -127,32 +142,22 @@ func (p *baseProvider) IsRunning() bool {
func (p *baseProvider) Wait() error {
defer func() {
p.Lock()
logger.Debugf("set isRunning to false: %s", p.Name())
p.isRunning.Store(false)
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
}()
logger.Debugf("calling Wait: %s", p.Name())
return p.cmd.Wait()
}
func (p *baseProvider) Terminate() error {
p.Lock()
defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() {
return nil
}
p.Lock()
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
err := p.cmd.Terminate()
p.isRunning.Store(false)
return err
}

查看文件

@@ -15,35 +15,31 @@ import (
"github.com/codeskyblue/go-sh"
)
var cgSubsystem = "cpu"
type cgroupHook struct {
emptyHook
provider mirrorProvider
basePath string
baseGroup string
created bool
subsystem string
memLimit string
}
func initCgroup(basePath string) {
if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil {
cgSubsystem = "memory"
return
}
logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu")
}
func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook {
func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
if basePath == "" {
basePath = "/sys/fs/cgroup"
}
if baseGroup == "" {
baseGroup = "tunasync"
}
if subsystem == "" {
subsystem = "cpu"
}
return &cgroupHook{
provider: p,
basePath: basePath,
baseGroup: baseGroup,
subsystem: subsystem,
}
}
@@ -52,13 +48,15 @@ func (c *cgroupHook) preExec() error {
if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
return err
}
if cgSubsystem != "memory" {
if c.subsystem != "memory" {
return nil
}
if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync {
if c.memLimit != "" {
gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
return sh.Command(
"cgset", "-r", "memory.limit_in_bytes=128M", gname,
"cgset", "-r",
fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
gname,
).Run()
}
return nil
@@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error {
func (c *cgroupHook) Cgroup() string {
name := c.provider.Name()
return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name)
return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
}
func (c *cgroupHook) killAll() error {
@@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error {
readTaskList := func() ([]int, error) {
taskList := []int{}
taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks"))
taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
if err != nil {
return taskList, err
}

查看文件

@@ -72,11 +72,14 @@ sleep 30
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
initCgroup("/sys/fs/cgroup")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
provider.AddHook(cg)
err = cg.preExec()
if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
So(err, ShouldBeNil)
go func() {
@@ -129,15 +132,18 @@ sleep 30
provider, err := newRsyncProvider(c)
So(err, ShouldBeNil)
initCgroup("/sys/fs/cgroup")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync")
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
provider.AddHook(cg)
cg.preExec()
if cgSubsystem == "memory" {
err = cg.preExec()
if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
if cg.subsystem == "memory" {
memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(128*1024*1024))
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
}
cg.postExec()
})

查看文件

@@ -1,6 +1,7 @@
package worker
import (
"errors"
"time"
"github.com/anmitsu/go-shlex"
@@ -60,17 +61,25 @@ func (p *cmdProvider) Run() error {
}
func (p *cmdProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{
"TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.upstreamURL,
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(),
}
for k, v := range p.env {
env[k] = v
}
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(false); err != nil {
return err
}

查看文件

@@ -37,6 +37,8 @@ type Config struct {
Manager managerConfig `toml:"manager"`
Server serverConfig `toml:"server"`
Cgroup cgroupConfig `toml:"cgroup"`
ZFS zfsConfig `toml:"zfs"`
Docker dockerConfig `toml:"docker"`
Include includeConfig `toml:"include"`
Mirrors []mirrorConfig `toml:"mirrors"`
}
@@ -54,8 +56,17 @@ type globalConfig struct {
type managerConfig struct {
APIBase string `toml:"api_base"`
CACert string `toml:"ca_cert"`
Token string `toml:"token"`
// this option overrides the APIBase
APIList []string `toml:"api_base_list"`
CACert string `toml:"ca_cert"`
// Token string `toml:"token"`
}
func (mc managerConfig) APIBaseList() []string {
if len(mc.APIList) > 0 {
return mc.APIList
}
return []string{mc.APIBase}
}
type serverConfig struct {
@@ -67,9 +78,21 @@ type serverConfig struct {
}
type cgroupConfig struct {
Enable bool `toml:"enable"`
BasePath string `toml:"base_path"`
Group string `toml:"group"`
Enable bool `toml:"enable"`
BasePath string `toml:"base_path"`
Group string `toml:"group"`
Subsystem string `toml:"subsystem"`
}
type dockerConfig struct {
Enable bool `toml:"enable"`
Volumes []string `toml:"volumes"`
Options []string `toml:"options"`
}
type zfsConfig struct {
Enable bool `toml:"enable"`
Zpool string `toml:"zpool"`
}
type includeConfig struct {
@@ -100,10 +123,17 @@ type mirrorConfig struct {
Command string `toml:"command"`
UseIPv6 bool `toml:"use_ipv6"`
UseIPv4 bool `toml:"use_ipv4"`
ExcludeFile string `toml:"exclude_file"`
Username string `toml:"username"`
Password string `toml:"password"`
Stage1Profile string `toml:"stage1_profile"`
MemoryLimit string `toml:"memory_limit"`
DockerImage string `toml:"docker_image"`
DockerVolumes []string `toml:"docker_volumes"`
DockerOptions []string `toml:"docker_options"`
}
// LoadConfig loads configuration

95
worker/docker.go 普通文件
查看文件

@@ -0,0 +1,95 @@
package worker
import (
"fmt"
"os"
)
type dockerHook struct {
emptyHook
provider mirrorProvider
image string
volumes []string
options []string
}
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
volumes := []string{}
volumes = append(volumes, gCfg.Volumes...)
volumes = append(volumes, mCfg.DockerVolumes...)
options := []string{}
options = append(options, gCfg.Options...)
options = append(options, mCfg.DockerOptions...)
return &dockerHook{
provider: p,
image: mCfg.DockerImage,
volumes: volumes,
options: options,
}
}
func (d *dockerHook) preExec() error {
p := d.provider
logDir := p.LogDir()
logFile := p.LogFile()
workingDir := p.WorkingDir()
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debugf("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
return fmt.Errorf("Error making dir %s: %s", workingDir, err.Error())
}
}
// Override workingDir
ctx := p.EnterContext()
ctx.Set(
"volumes", []string{
fmt.Sprintf("%s:%s", logDir, logDir),
fmt.Sprintf("%s:%s", logFile, logFile),
fmt.Sprintf("%s:%s", workingDir, workingDir),
},
)
return nil
}
func (d *dockerHook) postExec() error {
// sh.Command(
// "docker", "rm", "-f", d.Name(),
// ).Run()
d.provider.ExitContext()
return nil
}
// Volumes returns the configured volumes and
// runtime-needed volumes, including mirror dirs
// and log files
func (d *dockerHook) Volumes() []string {
vols := make([]string, len(d.volumes))
copy(vols, d.volumes)
p := d.provider
ctx := p.Context()
if ivs, ok := ctx.Get("volumes"); ok {
vs := ivs.([]string)
vols = append(vols, vs...)
}
return vols
}
func (d *dockerHook) LogFile() string {
p := d.provider
ctx := p.Context()
if iv, ok := ctx.Get(_LogFileKey + ":docker"); ok {
v := iv.(string)
return v
}
return p.LogFile()
}
func (d *dockerHook) Name() string {
p := d.provider
return "tunasync-job-" + p.Name()
}

97
worker/docker_test.go 普通文件
查看文件

@@ -0,0 +1,97 @@
package worker
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/codeskyblue/go-sh"
. "github.com/smartystreets/goconvey/convey"
)
func getDockerByName(name string) (string, error) {
// docker ps -f 'name=$name' --format '{{.Names}}'
out, err := sh.Command(
"docker", "ps",
"--filter", "name="+name,
"--format", "{{.Names}}",
).Output()
return string(out), err
}
func TestDocker(t *testing.T) {
Convey("Docker Should Work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
cmdScript := filepath.Join(tmpDir, "cmd.sh")
tmpFile := filepath.Join(tmpDir, "log_file")
expectedOutput := "HELLO_WORLD"
c := cmdConfig{
name: "tuna-docker",
upstreamURL: "http://mirrors.tuna.moe/",
command: "/bin/cmd.sh",
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 600 * time.Second,
env: map[string]string{
"TEST_CONTENT": expectedOutput,
},
}
cmdScriptContent := `#!/bin/sh
echo ${TEST_CONTENT}
sleep 10
`
err = ioutil.WriteFile(cmdScript, []byte(cmdScriptContent), 0755)
So(err, ShouldBeNil)
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
d := &dockerHook{
provider: provider,
image: "alpine",
volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
},
}
provider.AddHook(d)
So(provider.Docker(), ShouldNotBeNil)
err = d.preExec()
So(err, ShouldBeNil)
go func() {
err = provider.Run()
ctx.So(err, ShouldNotBeNil)
}()
time.Sleep(1 * time.Second)
// assert container running
names, err := getDockerByName(d.Name())
So(err, ShouldBeNil)
So(names, ShouldEqual, d.Name()+"\n")
err = provider.Terminate()
So(err, ShouldBeNil)
// container should be terminated and removed
names, err = getDockerByName(d.Name())
So(err, ShouldBeNil)
So(names, ShouldEqual, "")
// check log content
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput+"\n")
d.postExec()
})
}

查看文件

@@ -71,6 +71,7 @@ func (h *execPostHook) Do() error {
"TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
"TUNASYNC_UPSTREAM_URL": p.Upstream(),
"TUNASYNC_LOG_DIR": p.LogDir(),
"TUNASYNC_LOG_FILE": p.LogFile(),
"TUNASYNC_JOB_EXIT_STATUS": exitStatus,
}

查看文件

@@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
tunasync "github.com/tuna/tunasync/internal"
)
@@ -14,12 +15,13 @@ import (
type ctrlAction uint8
const (
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
jobHalt // worker halts
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
jobHalt // worker halts
jobForceStart // ignore concurrent limit
)
type jobMessage struct {
@@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncDone := make(chan error, 1)
go func() {
err := provider.Run()
if !stopASAP {
syncDone <- err
}
syncDone <- err
}()
select {
@@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return nil
}
runJob := func(kill <-chan empty, jobDone chan<- empty) {
runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
select {
case semaphore <- empty{}:
defer func() { <-semaphore }()
runJobWrapper(kill, jobDone)
case <-bypassSemaphore:
logger.Noticef("Concurrent limit ignored by %s", m.Name())
runJobWrapper(kill, jobDone)
case <-kill:
jobDone <- empty{}
return
}
}
bypassSemaphore := make(chan empty, 1)
for {
if m.State() == stateReady {
kill := make(chan empty)
jobDone := make(chan empty)
go runJob(kill, jobDone)
go runJob(kill, jobDone, bypassSemaphore)
_wait_for_job:
select {
@@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
m.SetState(stateReady)
close(kill)
<-jobDone
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
continue
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobStart:
m.SetState(stateReady)
goto _wait_for_job
@@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
case jobDisable:
m.SetState(stateDisabled)
return nil
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobRestart:
m.SetState(stateReady)
fallthrough
case jobStart:
m.SetState(stateReady)
default:

查看文件

@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStart // should be ignored
job.ctrlChan <- jobStop
msg = <-managerChan
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("If we restart it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobRestart
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
Convey("If we disable it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobDisable
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
<-job.disabled
})
Convey("If we stop it twice, than start it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
job.ctrlChan <- jobStop
msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")
job.ctrlChan <- jobStop // should be ignored
job.ctrlChan <- jobStart
msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)
expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
})
})
}
func TestConcurrentMirrorJobs(t *testing.T) {
InitLogger(true, true, false)
Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
const CONCURRENT = 5
var providers [CONCURRENT]*cmdProvider
var jobs [CONCURRENT]*mirrorJob
for i := 0; i < CONCURRENT; i++ {
c := cmdConfig{
name: fmt.Sprintf("job-%d", i),
upstreamURL: "http://mirrors.tuna.moe/",
command: "sleep 2",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 10 * time.Second,
}
var err error
providers[i], err = newCmdProvider(c)
So(err, ShouldBeNil)
jobs[i] = newMirrorJob(providers[i])
}
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, CONCURRENT-2)
countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
counterEnded := 0
counterRunning := 0
peakConcurrent = 0
counterFailed = 0
for counterEnded < totalJobs {
msg := <-managerChan
switch msg.status {
case PreSyncing:
counterRunning++
case Syncing:
case Failed:
counterFailed++
fallthrough
case Success:
counterEnded++
counterRunning--
default:
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
if counterRunning > peakConcurrent {
peakConcurrent = counterRunning
}
}
// select {
// case msg := <-managerChan:
// logger.Errorf("extra message received: %v", msg)
// So(0, ShouldEqual, 1)
// case <-time.After(2 * time.Second):
// }
return
}
Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we cancel one job", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobRestart
time.Sleep(200 * time.Millisecond)
}
// Cancel the one waiting for semaphore
jobs[len(jobs)-1].ctrlChan <- jobStop
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we override the concurrent limit", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(200 * time.Millisecond)
}
jobs[len(jobs)-1].ctrlChan <- jobForceStart
jobs[len(jobs)-2].ctrlChan <- jobForceStart
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)
So(peakConcurrent, ShouldEqual, CONCURRENT)
So(counterFailed, ShouldEqual, 0)
time.Sleep(1 * time.Second)
// fmt.Println("Restart them")
for _, job := range jobs {
job.ctrlChan <- jobStart
}
peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)
for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
})
}

查看文件

@@ -36,6 +36,10 @@ type mirrorProvider interface {
IsRunning() bool
// Cgroup
Cgroup() *cgroupHook
// ZFS
ZFS() *zfsHook
// Docker
Docker() *dockerHook
AddHook(hook jobHook)
Hooks() []jobHook
@@ -126,6 +130,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6,
useIPv4: mirror.UseIPv4,
interval: time.Duration(mirror.Interval) * time.Minute,
}
p, err := newRsyncProvider(rc)
@@ -162,10 +167,22 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
// Add Logging Hook
provider.AddHook(newLogLimiter(provider))
// Add Cgroup Hook
if cfg.Cgroup.Enable {
// Add ZFS Hook
if cfg.ZFS.Enable {
provider.AddHook(newZfsHook(provider, cfg.ZFS.Zpool))
}
// Add Docker Hook
if cfg.Docker.Enable && len(mirror.DockerImage) > 0 {
provider.AddHook(newDockerHook(provider, cfg.Docker, mirror))
} else if cfg.Cgroup.Enable {
// Add Cgroup Hook
provider.AddHook(
newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group),
newCgroupHook(
provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
cfg.Cgroup.Subsystem, mirror.MemoryLimit,
),
)
}

查看文件

@@ -79,11 +79,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+
@@ -144,11 +145,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+
@@ -260,6 +262,40 @@ sleep 5
})
})
Convey("Command Provider without log file should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
c := cmdConfig{
name: "run-ls",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 600 * time.Second,
}
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
So(provider.IsMaster(), ShouldEqual, false)
So(provider.ZFS(), ShouldBeNil)
So(provider.Type(), ShouldEqual, provCommand)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
Convey("Run the command", func() {
err = provider.Run()
So(err, ShouldBeNil)
})
})
}
func TestTwoStageRsyncProvider(t *testing.T) {
@@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
logFile: tmpFile,
useIPv6: true,
excludeFile: tmpFile,
username: "hello",
password: "world",
}
provider, err := newTwoStageRsyncProvider(c)
@@ -306,6 +344,7 @@ exit 0
err = provider.Run()
So(err, ShouldBeNil)
targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
@@ -313,14 +352,14 @@ exit 0
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
"--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
),
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+

查看文件

@@ -11,7 +11,7 @@ type rsyncConfig struct {
rsyncCmd string
upstreamURL, username, password, excludeFile string
workingDir, logDir, logFile string
useIPv6 bool
useIPv6, useIPv4 bool
interval time.Duration
}
@@ -49,6 +49,8 @@ func newRsyncProvider(c rsyncConfig) (*rsyncProvider, error) {
if c.useIPv6 {
options = append(options, "-6")
} else if c.useIPv4 {
options = append(options, "-4")
}
if c.excludeFile != "" {
@@ -79,6 +81,12 @@ func (p *rsyncProvider) Run() error {
}
func (p *rsyncProvider) Start() error {
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{}
if p.username != "" {
@@ -92,7 +100,7 @@ func (p *rsyncProvider) Start() error {
command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(false); err != nil {
return err
}

查看文件

@@ -2,6 +2,7 @@ package worker
import (
"errors"
"fmt"
"os"
"os/exec"
"strings"
@@ -9,6 +10,7 @@ import (
"syscall"
"time"
"github.com/codeskyblue/go-sh"
"golang.org/x/sys/unix"
)
@@ -31,11 +33,44 @@ type cmdJob struct {
func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob {
var cmd *exec.Cmd
if provider.Cgroup() != nil {
if d := provider.Docker(); d != nil {
c := "docker"
args := []string{
"run", "--rm",
"-a", "STDOUT", "-a", "STDERR",
"--name", d.Name(),
"-w", workingDir,
}
// specify user
args = append(
args, "-u",
fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid()),
)
// add volumes
for _, vol := range d.Volumes() {
logger.Debugf("volume: %s", vol)
args = append(args, "-v", vol)
}
// set env
for k, v := range env {
kv := fmt.Sprintf("%s=%s", k, v)
args = append(args, "-e", kv)
}
// apply options
args = append(args, d.options...)
// apply image and command
args = append(args, d.image)
// apply command
args = append(args, cmdAndArgs...)
cmd = exec.Command(c, args...)
} else if provider.Cgroup() != nil {
c := "cgexec"
args := []string{"-g", provider.Cgroup().Cgroup()}
args = append(args, cmdAndArgs...)
cmd = exec.Command(c, args...)
} else {
if len(cmdAndArgs) == 1 {
cmd = exec.Command(cmdAndArgs[0])
@@ -48,25 +83,28 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
}
}
logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debugf("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
logger.Errorf("Error making dir %s", workingDir)
if provider.Docker() == nil {
logger.Debugf("Executing command %s at %s", cmdAndArgs[0], workingDir)
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debugf("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
logger.Errorf("Error making dir %s: %s", workingDir, err.Error())
}
}
cmd.Dir = workingDir
cmd.Env = newEnviron(env, true)
}
cmd.Dir = workingDir
cmd.Env = newEnviron(env, true)
return &cmdJob{
cmd: cmd,
workingDir: workingDir,
env: env,
provider: provider,
}
}
func (c *cmdJob) Start() error {
// logger.Debugf("Command start: %v", c.cmd.Args)
c.finished = make(chan empty, 1)
return c.cmd.Start()
}
@@ -80,6 +118,9 @@ func (c *cmdJob) Wait() error {
return c.retErr
default:
err := c.cmd.Wait()
if c.cmd.Stdout != nil {
c.cmd.Stdout.(*os.File).Close()
}
c.retErr = err
close(c.finished)
return err
@@ -95,6 +136,14 @@ func (c *cmdJob) Terminate() error {
if c.cmd == nil || c.cmd.Process == nil {
return errProcessNotStarted
}
if d := c.provider.Docker(); d != nil {
sh.Command(
"docker", "stop", "-t", "2", d.Name(),
).Run()
return nil
}
err := unix.Kill(c.cmd.Process.Pid, syscall.SIGTERM)
if err != nil {
return err

查看文件

@@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
}
func (p *twoStageRsyncProvider) Run() error {
defer p.Wait()
p.Lock()
defer p.Unlock()
if p.IsRunning() {
return errors.New("provider is currently running")
}
env := map[string]string{}
if p.username != "" {
@@ -129,7 +134,7 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, p.upstreamURL, p.WorkingDir())
p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(stage > 1); err != nil {
return err
}
@@ -137,9 +142,11 @@ func (p *twoStageRsyncProvider) Run() error {
return err
}
p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())
err = p.cmd.Wait()
p.isRunning.Store(false)
p.Unlock()
err = p.Wait()
p.Lock()
if err != nil {
return err
}

查看文件

@@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
w.httpClient = httpClient
}
if cfg.Cgroup.Enable {
initCgroup(cfg.Cgroup.BasePath)
}
w.initJobs()
w.makeHTTPServer()
tunasyncWorker = w
@@ -222,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
}
switch cmd.Cmd {
case CmdStart:
job.ctrlChan <- jobStart
if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart:
job.ctrlChan <- jobRestart
case CmdStop:
@@ -389,28 +390,21 @@ func (w *Worker) URL() string {
}
func (w *Worker) registorWorker() {
url := fmt.Sprintf(
"%s/workers",
w.cfg.Manager.APIBase,
)
msg := WorkerStatus{
ID: w.Name(),
URL: w.URL(),
}
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf("%s/workers", root)
logger.Debugf("register on manager url: %s", url)
if _, err := PostJSON(url, msg, w.httpClient); err != nil {
logger.Errorf("Failed to register worker")
}
}
}
func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s",
w.cfg.Manager.APIBase,
w.Name(),
jobMsg.name,
)
p := job.provider
smsg := MirrorStatus{
Name: jobMsg.name,
@@ -422,19 +416,22 @@ func (w *Worker) updateStatus(job *mirrorJob, jobMsg jobMessage) {
ErrorMsg: jobMsg.msg,
}
if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
for _, root := range w.cfg.Manager.APIBaseList() {
url := fmt.Sprintf(
"%s/workers/%s/jobs/%s", root, w.Name(), jobMsg.name,
)
logger.Debugf("reporting on manager url: %s", url)
if _, err := PostJSON(url, smsg, w.httpClient); err != nil {
logger.Errorf("Failed to update mirror(%s) status: %s", jobMsg.name, err.Error())
}
}
}
func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus
apiBase := w.cfg.Manager.APIBaseList()[0]
url := fmt.Sprintf(
"%s/workers/%s/jobs",
w.cfg.Manager.APIBase,
w.Name(),
)
url := fmt.Sprintf("%s/workers/%s/jobs", apiBase, w.Name())
if _, err := GetJSON(url, &mirrorList, w.httpClient); err != nil {
logger.Errorf("Failed to fetch job status: %s", err.Error())

45
worker/zfs_hook.go 普通文件
查看文件

@@ -0,0 +1,45 @@
package worker
import (
"fmt"
"os"
"strings"
"github.com/codeskyblue/go-sh"
)
type zfsHook struct {
emptyHook
provider mirrorProvider
zpool string
}
func newZfsHook(provider mirrorProvider, zpool string) *zfsHook {
return &zfsHook{
provider: provider,
zpool: zpool,
}
}
// create zfs dataset for a new mirror
func (z *zfsHook) preJob() error {
workingDir := z.provider.WorkingDir()
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
// sudo zfs create $zfsDataset
// sudo zfs set mountpoint=${absPath} ${zfsDataset}
zfsDataset := fmt.Sprintf("%s/%s", z.zpool, z.provider.Name())
// Unknown issue of ZFS:
// dataset name should not contain upper case letters
zfsDataset = strings.ToLower(zfsDataset)
logger.Infof("Creating ZFS dataset %s", zfsDataset)
if err := sh.Command("sudo", "zfs", "create", zfsDataset).Run(); err != nil {
return err
}
logger.Infof("Mount ZFS dataset %s to %s", zfsDataset, workingDir)
if err := sh.Command("sudo", "zfs", "set", "mountpoint="+workingDir, zfsDataset).Run(); err != nil {
return err
}
}
return nil
}