From 7601e5793fec37a1a9479cde735ac1ce11b837a4 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sat, 10 Dec 2016 04:02:44 +0800 Subject: [PATCH] fix(worker): improved cgroup creation --- .travis.yml | 4 ++-- worker/cgroup.go | 42 ++++++++++++++++++++---------------------- worker/cgroup_test.go | 27 +++++++++++++++++---------- worker/config.go | 9 ++++++--- worker/provider.go | 5 ++++- worker/worker.go | 3 --- 6 files changed, 49 insertions(+), 41 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4c93f44..487276d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,8 +17,8 @@ services: - docker before_script: - - sudo mount -t memory -o memory memory /sys/fs/cgroup/memory - - mount + - lssubsys -am + - sudo cgcreate -a $USER -t $USER -g cpu:tunasync - sudo cgcreate -a $USER -t $USER -g memory:tunasync - docker pull alpine diff --git a/worker/cgroup.go b/worker/cgroup.go index c0a1478..f25cdd3 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -15,35 +15,31 @@ import ( "github.com/codeskyblue/go-sh" ) -var cgSubsystem = "cpuset" - type cgroupHook struct { emptyHook provider mirrorProvider basePath string baseGroup string created bool + subsystem string + memLimit string } -func initCgroup(basePath string) { - if _, err := os.Stat(filepath.Join(basePath, "memory")); err == nil { - cgSubsystem = "memory" - return - } - logger.Warning("Memory subsystem of cgroup not enabled, fallback to cpu") -} - -func newCgroupHook(p mirrorProvider, basePath, baseGroup string) *cgroupHook { +func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook { if basePath == "" { basePath = "/sys/fs/cgroup" } if baseGroup == "" { baseGroup = "tunasync" } + if subsystem == "" { + subsystem = "cpu" + } return &cgroupHook{ provider: p, basePath: basePath, baseGroup: baseGroup, + subsystem: subsystem, } } @@ -52,15 +48,17 @@ func (c *cgroupHook) preExec() error { if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil { return err } - // if cgSubsystem != "memory" { - // return nil - // } - // if c.provider.Type() == provRsync || c.provider.Type() == provTwoStageRsync { - // gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name()) - // return sh.Command( - // "cgset", "-r", "memory.limit_in_bytes=512M", gname, - // ).Run() - // } + if c.subsystem != "memory" { + return nil + } + if c.memLimit != "" { + gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name()) + return sh.Command( + "cgset", "-r", + fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit), + gname, + ).Run() + } return nil } @@ -76,7 +74,7 @@ func (c *cgroupHook) postExec() error { func (c *cgroupHook) Cgroup() string { name := c.provider.Name() - return fmt.Sprintf("%s:%s/%s", cgSubsystem, c.baseGroup, name) + return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name) } func (c *cgroupHook) killAll() error { @@ -87,7 +85,7 @@ func (c *cgroupHook) killAll() error { readTaskList := func() ([]int, error) { taskList := []int{} - taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) + taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks")) if err != nil { return taskList, err } diff --git a/worker/cgroup_test.go b/worker/cgroup_test.go index 6a53441..4b0366a 100644 --- a/worker/cgroup_test.go +++ b/worker/cgroup_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "strings" "testing" "time" @@ -71,11 +72,14 @@ sleep 30 provider, err := newCmdProvider(c) So(err, ShouldBeNil) - initCgroup("/sys/fs/cgroup") - cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") + cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "") provider.AddHook(cg) err = cg.preExec() + if err != nil { + logger.Errorf("Failed to create cgroup") + return + } So(err, ShouldBeNil) go func() { @@ -128,16 +132,19 @@ sleep 30 provider, err := newRsyncProvider(c) So(err, ShouldBeNil) - initCgroup("/sys/fs/cgroup") - cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync") + cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M") provider.AddHook(cg) - cg.preExec() - //if cgSubsystem == "memory" { - // memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes")) - // So(err, ShouldBeNil) - // So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) - //} + err = cg.preExec() + if err != nil { + logger.Errorf("Failed to create cgroup") + return + } + if cg.subsystem == "memory" { + memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes")) + So(err, ShouldBeNil) + So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024)) + } cg.postExec() }) } diff --git a/worker/config.go b/worker/config.go index 93ee27c..138c9b0 100644 --- a/worker/config.go +++ b/worker/config.go @@ -70,9 +70,10 @@ type serverConfig struct { } type cgroupConfig struct { - Enable bool `toml:"enable"` - BasePath string `toml:"base_path"` - Group string `toml:"group"` + Enable bool `toml:"enable"` + BasePath string `toml:"base_path"` + Group string `toml:"group"` + Subsystem string `toml:"subsystem"` } type dockerConfig struct { @@ -119,6 +120,8 @@ type mirrorConfig struct { Password string `toml:"password"` Stage1Profile string `toml:"stage1_profile"` + MemoryLimit string `toml:"memory_limit"` + DockerImage string `toml:"docker_image"` DockerVolumes []string `toml:"docker_volumes"` DockerOptions []string `toml:"docker_options"` diff --git a/worker/provider.go b/worker/provider.go index 94d4e20..9662dcd 100644 --- a/worker/provider.go +++ b/worker/provider.go @@ -178,7 +178,10 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider { } else if cfg.Cgroup.Enable { // Add Cgroup Hook provider.AddHook( - newCgroupHook(provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group), + newCgroupHook( + provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group, + cfg.Cgroup.Subsystem, mirror.MemoryLimit, + ), ) } diff --git a/worker/worker.go b/worker/worker.go index 7c3db9f..5bfb6f0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -55,9 +55,6 @@ func GetTUNASyncWorker(cfg *Config) *Worker { w.httpClient = httpClient } - if cfg.Cgroup.Enable { - initCgroup(cfg.Cgroup.BasePath) - } w.initJobs() w.makeHTTPServer() tunasyncWorker = w