1
0
镜像自地址 https://github.com/tuna/tunasync.git 已同步 2025-12-06 06:26:46 +00:00

Merge branch 'cgroupv2', and bump version to v0.8.0

Signed-off-by: Miao Wang <shankerwangmiao@gmail.com>
这个提交包含在:
Miao Wang
2021-09-02 22:21:52 +08:00
当前提交 c07aaffe65
共有 18 个文件被更改,包括 1033 次插入104 次删除

查看文件

@@ -1,64 +1,296 @@
package worker
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"syscall"
"time"
"golang.org/x/sys/unix"
"github.com/codeskyblue/go-sh"
"github.com/moby/moby/pkg/reexec"
cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2"
contspecs "github.com/opencontainers/runtime-spec/specs-go"
)
type cgroupHook struct {
emptyHook
basePath string
baseGroup string
created bool
subsystem string
memLimit string
cgCfg cgroupConfig
memLimit MemBytes
cgMgrV1 cgv1.Cgroup
cgMgrV2 *cgv2.Manager
}
func newCgroupHook(p mirrorProvider, basePath, baseGroup, subsystem, memLimit string) *cgroupHook {
if basePath == "" {
basePath = "/sys/fs/cgroup"
type execCmd string
const (
cmdCont execCmd = "cont"
cmdAbrt execCmd = "abrt"
)
func init () {
reexec.Register("tunasync-exec", waitExec)
}
func waitExec () {
binary, err := exec.LookPath(os.Args[1])
if err != nil {
panic(err)
}
if baseGroup == "" {
baseGroup = "tunasync"
pipe := os.NewFile(3, "pipe")
if pipe != nil {
if _, err := pipe.Stat(); err == nil {
cmdBytes, err := ioutil.ReadAll(pipe)
if err != nil {
panic(err)
}
if err := pipe.Close(); err != nil {
}
cmd := execCmd(string(cmdBytes))
switch cmd {
case cmdAbrt:
fallthrough
default:
panic("Exited on request")
case cmdCont:
}
}
}
if subsystem == "" {
subsystem = "cpu"
args := os.Args[1:]
env := os.Environ()
if err := syscall.Exec(binary, args, env); err != nil {
panic(err)
}
panic("Exec failed.")
}
func initCgroup(cfg *cgroupConfig) (error) {
logger.Debugf("Initializing cgroup")
baseGroup := cfg.Group
//subsystem := cfg.Subsystem
// If baseGroup is empty, it implies using the cgroup of the current process
// otherwise, it refers to a absolute group path
if baseGroup != "" {
baseGroup = filepath.Join("/", baseGroup)
}
cfg.isUnified = cgv1.Mode() == cgv1.Unified
if cfg.isUnified {
logger.Debugf("Cgroup V2 detected")
g := baseGroup
if g == "" {
logger.Debugf("Detecting my cgroup path")
var err error
if g, err = cgv2.NestedGroupPath(""); err != nil {
return err
}
}
logger.Infof("Using cgroup path: %s", g)
var err error
if cfg.cgMgrV2, err = cgv2.LoadManager("/sys/fs/cgroup", g); err != nil {
return err
}
if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV2.NewChild("__worker", nil);
if err != nil {
return err
}
for {
logger.Debugf("Reading pids")
procs, err := cfg.cgMgrV2.Procs(false)
if err != nil {
logger.Errorf("Cannot read pids in that group")
return err
}
if len(procs) == 0 {
break
}
for _, p := range(procs) {
if err := wkrMgr.AddProc(p); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
return err
}
}
}
}
} else {
logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV2.NewChild("__test", nil);
if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup")
return err
}
if err := testMgr.Delete(); err != nil {
return err
}
procs, err := cfg.cgMgrV2.Procs(false)
if err != nil {
logger.Errorf("Cannot read pids in that group")
return err
}
if len(procs) != 0 {
return fmt.Errorf("There are remaining processes in cgroup %s", baseGroup)
}
}
} else {
logger.Debugf("Cgroup V1 detected")
var pather cgv1.Path
if baseGroup != "" {
pather = cgv1.StaticPath(baseGroup)
} else {
pather = (func(p cgv1.Path) (cgv1.Path){
return func(subsys cgv1.Name) (string, error){
path, err := p(subsys);
if err != nil {
return "", err
}
if path == "/" {
return "", cgv1.ErrControllerNotActive
}
return path, err
}
})(cgv1.NestedPath(""))
}
logger.Infof("Loading cgroup")
var err error
if cfg.cgMgrV1, err = cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{
cfg.InitCheck = cgv1.AllowAny
return nil
}); err != nil {
return err
}
logger.Debugf("Available subsystems:")
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Debugf("%s: %s", subsys.Name(), p)
}
if baseGroup == "" {
logger.Debugf("Creating a sub group and move all processes into it")
wkrMgr, err := cfg.cgMgrV1.New("__worker", &contspecs.LinuxResources{});
if err != nil {
return err
}
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
logger.Debugf("Reading pids for subsystem %s", subsys.Name())
for {
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
if err != nil {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
return err
}
if len(procs) == 0 {
break
}
for _, proc := range(procs) {
if err := wkrMgr.Add(proc); err != nil {
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
return err
}
}
}
}
}
} else {
logger.Debugf("Trying to create a sub group in that group")
testMgr, err := cfg.cgMgrV1.New("__test", &contspecs.LinuxResources{});
if err != nil {
logger.Errorf("Cannot create a sub group in the cgroup")
return err
}
if err := testMgr.Delete(); err != nil {
return err
}
for _, subsys := range(cfg.cgMgrV1.Subsystems()) {
logger.Debugf("Reading pids for subsystem %s", subsys.Name())
procs, err := cfg.cgMgrV1.Processes(subsys.Name(), false)
if err != nil {
p, err := pather(subsys.Name())
if err != nil {
return err
}
logger.Errorf("Cannot read pids in group %s of subsystem %s", p, subsys.Name())
return err
}
if len(procs) != 0 {
p, err := pather(subsys.Name())
if err != nil {
return err
}
return fmt.Errorf("There are remaining processes in cgroup %s of subsystem %s", p, subsys.Name())
}
}
}
}
return nil
}
func newCgroupHook(p mirrorProvider, cfg cgroupConfig, memLimit MemBytes) *cgroupHook {
return &cgroupHook{
emptyHook: emptyHook{
provider: p,
},
basePath: basePath,
baseGroup: baseGroup,
subsystem: subsystem,
cgCfg: cfg,
memLimit: memLimit,
}
}
func (c *cgroupHook) preExec() error {
c.created = true
if err := sh.Command("cgcreate", "-g", c.Cgroup()).Run(); err != nil {
return err
}
if c.subsystem != "memory" {
return nil
}
if c.memLimit != "" {
gname := fmt.Sprintf("%s/%s", c.baseGroup, c.provider.Name())
return sh.Command(
"cgset", "-r",
fmt.Sprintf("memory.limit_in_bytes=%s", c.memLimit),
gname,
).Run()
if c.cgCfg.isUnified {
logger.Debugf("Creating v2 cgroup for task %s", c.provider.Name())
var resSet *cgv2.Resources
if c.memLimit != 0 {
resSet = &cgv2.Resources {
Memory: &cgv2.Memory{
Max: func(i int64) *int64 { return &i }(c.memLimit.Value()),
},
}
}
subMgr, err := c.cgCfg.cgMgrV2.NewChild(c.provider.Name(), resSet)
if err != nil {
logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
return err
}
c.cgMgrV2 = subMgr
} else {
logger.Debugf("Creating v1 cgroup for task %s", c.provider.Name())
var resSet contspecs.LinuxResources
if c.memLimit != 0 {
resSet = contspecs.LinuxResources {
Memory: &contspecs.LinuxMemory{
Limit: func(i int64) *int64 { return &i }(c.memLimit.Value()),
},
}
}
subMgr, err := c.cgCfg.cgMgrV1.New(c.provider.Name(), &resSet)
if err != nil {
logger.Errorf("Failed to create cgroup for task %s: %s", c.provider.Name(), err.Error())
return err
}
c.cgMgrV1 = subMgr
}
return nil
}
@@ -69,36 +301,59 @@ func (c *cgroupHook) postExec() error {
logger.Errorf("Error killing tasks: %s", err.Error())
}
c.created = false
return sh.Command("cgdelete", c.Cgroup()).Run()
}
func (c *cgroupHook) Cgroup() string {
name := c.provider.Name()
return fmt.Sprintf("%s:%s/%s", c.subsystem, c.baseGroup, name)
if c.cgCfg.isUnified {
logger.Debugf("Deleting v2 cgroup for task %s", c.provider.Name())
if err := c.cgMgrV2.Delete(); err != nil {
logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
return err
}
c.cgMgrV2 = nil
} else {
logger.Debugf("Deleting v1 cgroup for task %s", c.provider.Name())
if err := c.cgMgrV1.Delete(); err != nil {
logger.Errorf("Failed to delete cgroup for task %s: %s", c.provider.Name(), err.Error())
return err
}
c.cgMgrV1 = nil
}
return nil
}
func (c *cgroupHook) killAll() error {
if !c.created {
return nil
if c.cgCfg.isUnified {
if c.cgMgrV2 == nil {
return nil
}
} else {
if c.cgMgrV1 == nil {
return nil
}
}
name := c.provider.Name()
readTaskList := func() ([]int, error) {
taskList := []int{}
taskFile, err := os.Open(filepath.Join(c.basePath, c.subsystem, c.baseGroup, name, "tasks"))
if err != nil {
return taskList, err
}
defer taskFile.Close()
scanner := bufio.NewScanner(taskFile)
for scanner.Scan() {
pid, err := strconv.Atoi(scanner.Text())
if err != nil {
return taskList, err
if c.cgCfg.isUnified {
procs, err := c.cgMgrV2.Procs(false)
if (err != nil) {
return []int{}, err
}
for _, proc := range procs {
taskList = append(taskList, int(proc))
}
} else {
taskSet := make(map[int]struct{})
for _, subsys := range(c.cgMgrV1.Subsystems()) {
procs, err := c.cgMgrV1.Processes(subsys.Name(), false)
if err != nil {
return []int{}, err
}
for _, proc := range(procs) {
taskSet[proc.Pid] = struct{}{}
}
}
for proc := range(taskSet) {
taskList = append(taskList, proc)
}
taskList = append(taskList, pid)
}
return taskList, nil
}

查看文件

@@ -3,17 +3,101 @@ package worker
import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"errors"
"syscall"
cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2"
units "github.com/docker/go-units"
"github.com/moby/moby/pkg/reexec"
. "github.com/smartystreets/goconvey/convey"
)
func init() {
_, testReexec := os.LookupEnv("TESTREEXEC")
if ! testReexec {
reexec.Init()
}
}
func TestReexec(t *testing.T) {
testCase, testReexec := os.LookupEnv("TESTREEXEC")
if ! testReexec {
return
}
for len(os.Args) > 1 {
thisArg := os.Args[1]
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
if thisArg == "--" {
break
}
}
switch testCase {
case "1":
Convey("Reexec should panic when command not found", t, func(ctx C){
So(func(){
reexec.Init()
}, ShouldPanicWith, exec.ErrNotFound)
})
case "2":
Convey("Reexec should run when fd 3 is not open", t, func(ctx C){
So((func() error{
pipe := os.NewFile(3, "pipe")
if pipe == nil {
return errors.New("pipe is nil")
} else {
_, err := pipe.Stat()
return err
}
})(), ShouldNotBeNil)
So(func(){
reexec.Init()
}, ShouldPanicWith, syscall.ENOEXEC)
})
case "3":
Convey("Reexec should fail when fd 3 is sent with abrt cmd", t, func(ctx C){
So(func(){
reexec.Init()
}, ShouldPanicWith, "Exited on request")
})
case "4":
Convey("Reexec should run when fd 3 is sent with cont cmd", t, func(ctx C){
So(func(){
reexec.Init()
}, ShouldPanicWith, syscall.ENOEXEC)
})
case "5":
Convey("Reexec should not be triggered when argv[0] is not reexec", t, func(ctx C){
So(func(){
reexec.Init()
}, ShouldNotPanic)
})
}
}
func TestCgroup(t *testing.T) {
Convey("Cgroup Should Work", t, func(ctx C) {
var cgcf *cgroupConfig
Convey("init cgroup", t, func(ctx C){
_, useCurrentCgroup := os.LookupEnv("USECURCGROUP")
cgcf = &cgroupConfig{BasePath: "/sys/fs/cgroup", Group: "tunasync", Subsystem: "cpu"}
if useCurrentCgroup {
cgcf.Group = ""
}
err := initCgroup(cgcf)
So(err, ShouldBeNil)
if cgcf.isUnified {
So(cgcf.cgMgrV2, ShouldNotBeNil)
} else {
So(cgcf.cgMgrV1, ShouldNotBeNil)
}
Convey("Cgroup Should Work", func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
@@ -45,13 +129,13 @@ redirect-std() {
close-fds() {
eval exec {3..255}\>\&-
}
# full daemonization of external command with setsid
daemonize() {
(
redirect-std
cd /
close-fds
redirect-std
cd /
close-fds
exec setsid "$@"
) &
}
@@ -72,14 +156,10 @@ sleep 30
provider, err := newCmdProvider(c)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "")
cg := newCgroupHook(provider, *cgcf, 0)
provider.AddHook(cg)
err = cg.preExec()
if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
So(err, ShouldBeNil)
go func() {
@@ -111,7 +191,7 @@ sleep 30
})
Convey("Rsync Memory Should Be Limited", t, func() {
Convey("Rsync Memory Should Be Limited", func() {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
@@ -132,19 +212,112 @@ sleep 30
provider, err := newRsyncProvider(c)
So(err, ShouldBeNil)
cg := newCgroupHook(provider, "/sys/fs/cgroup", "tunasync", "cpu", "512M")
cg := newCgroupHook(provider, *cgcf, 512 * units.MiB)
provider.AddHook(cg)
err = cg.preExec()
if err != nil {
logger.Errorf("Failed to create cgroup")
return
}
if cg.subsystem == "memory" {
memoLimit, err := ioutil.ReadFile(filepath.Join(cg.basePath, "memory", cg.baseGroup, provider.Name(), "memory.limit_in_bytes"))
So(err, ShouldBeNil)
if cgcf.isUnified {
cgpath := filepath.Join(cgcf.BasePath, cgcf.Group, provider.Name())
if useCurrentCgroup {
group, err := cgv2.NestedGroupPath(filepath.Join("..", provider.Name()))
So(err, ShouldBeNil)
cgpath = filepath.Join(cgcf.BasePath, group)
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgpath, "memory.max"))
So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
} else {
for _, subsys := range(cg.cgMgrV1.Subsystems()) {
if subsys.Name() == cgv1.Memory {
cgpath := filepath.Join(cgcf.Group, provider.Name())
if useCurrentCgroup {
p, err := cgv1.NestedPath(filepath.Join("..", provider.Name()))(cgv1.Memory)
So(err, ShouldBeNil)
cgpath = p
}
memoLimit, err := ioutil.ReadFile(filepath.Join(cgcf.BasePath, "memory", cgpath, "memory.limit_in_bytes"))
So(err, ShouldBeNil)
So(strings.Trim(string(memoLimit), "\n"), ShouldEqual, strconv.Itoa(512*1024*1024))
}
}
}
cg.postExec()
So(cg.cgMgrV1, ShouldBeNil)
})
Reset(func() {
if cgcf.isUnified {
if cgcf.Group == "" {
wkrg, err := cgv2.NestedGroupPath("");
So(err, ShouldBeNil)
wkrMgr, err := cgv2.LoadManager("/sys/fs/cgroup", wkrg);
allCtrls, err := wkrMgr.Controllers()
So(err, ShouldBeNil)
err = wkrMgr.ToggleControllers(allCtrls, cgv2.Disable)
So(err, ShouldBeNil)
origMgr := cgcf.cgMgrV2
for {
logger.Debugf("Restoring pids")
procs, err := wkrMgr.Procs(false)
So(err, ShouldBeNil)
if len(procs) == 0 {
break
}
for _, p := range(procs) {
if err := origMgr.AddProc(p); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
So(err, ShouldBeNil)
}
}
}
}
err = wkrMgr.Delete()
So(err, ShouldBeNil)
}
} else {
if cgcf.Group == "" {
pather := (func(p cgv1.Path) (cgv1.Path){
return func(subsys cgv1.Name) (string, error){
path, err := p(subsys);
if err != nil {
return "", err
}
if path == "/" {
return "", cgv1.ErrControllerNotActive
}
return path, err
}
})(cgv1.NestedPath(""))
wkrMgr, err := cgv1.Load(cgv1.V1, pather, func(cfg *cgv1.InitConfig) error{
cfg.InitCheck = cgv1.AllowAny
return nil
})
So(err, ShouldBeNil)
origMgr := cgcf.cgMgrV1
for _, subsys := range(wkrMgr.Subsystems()){
for {
procs, err := wkrMgr.Processes(subsys.Name(), false)
So(err, ShouldBeNil)
if len(procs) == 0 {
break
}
for _, proc := range(procs) {
if err := origMgr.Add(proc); err != nil {
if errors.Is(err, syscall.ESRCH) {
logger.Debugf("Write pid %d to sub group failed: process vanished, ignoring")
} else {
So(err, ShouldBeNil)
}
}
}
}
}
err = wkrMgr.Delete()
So(err, ShouldBeNil)
}
}
})
})
}

查看文件

@@ -7,6 +7,9 @@ import (
"github.com/BurntSushi/toml"
"github.com/imdario/mergo"
units "github.com/docker/go-units"
cgv1 "github.com/containerd/cgroups"
cgv2 "github.com/containerd/cgroups/v2"
)
type providerEnum uint8
@@ -87,6 +90,9 @@ type cgroupConfig struct {
BasePath string `toml:"base_path"`
Group string `toml:"group"`
Subsystem string `toml:"subsystem"`
isUnified bool
cgMgrV1 cgv1.Cgroup
cgMgrV2 *cgv2.Manager
}
type dockerConfig struct {
@@ -113,6 +119,32 @@ type includedMirrorConfig struct {
Mirrors []mirrorConfig `toml:"mirrors"`
}
type MemBytes int64
// Set sets the value of the MemBytes by passing a string
func (m *MemBytes) Set(value string) error {
val, err := units.RAMInBytes(value)
*m = MemBytes(val)
return err
}
// Type returns the type
func (m *MemBytes) Type() string {
return "bytes"
}
// Value returns the value in int64
func (m *MemBytes) Value() int64 {
return int64(*m)
}
// UnmarshalJSON is the customized unmarshaler for MemBytes
func (m *MemBytes) UnmarshalText(s []byte) error {
val, err := units.RAMInBytes(string(s))
*m = MemBytes(val)
return err
}
type mirrorConfig struct {
Name string `toml:"name"`
Provider providerEnum `toml:"provider"`
@@ -148,7 +180,7 @@ type mirrorConfig struct {
RsyncOverride []string `toml:"rsync_override"`
Stage1Profile string `toml:"stage1_profile"`
MemoryLimit string `toml:"memory_limit"`
MemoryLimit MemBytes `toml:"memory_limit"`
DockerImage string `toml:"docker_image"`
DockerVolumes []string `toml:"docker_volumes"`

查看文件

@@ -7,6 +7,7 @@ import (
"path/filepath"
"testing"
"time"
units "github.com/docker/go-units"
. "github.com/smartystreets/goconvey/convey"
)
@@ -53,12 +54,15 @@ provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "rsync://ftp.debian.org/debian/"
use_ipv6 = true
memory_limit = "256MiB"
[[mirrors]]
name = "fedora"
provider = "rsync"
upstream = "rsync://ftp.fedoraproject.org/fedora/"
use_ipv6 = true
memory_limit = "128M"
exclude_file = "/etc/tunasync.d/fedora-exclude.txt"
exec_on_failure = [
"bash -c 'echo ${TUNASYNC_JOB_EXIT_STATUS} > ${TUNASYNC_WORKING_DIR}/exit_status'"
@@ -141,17 +145,20 @@ use_ipv6 = true
So(m.Name, ShouldEqual, "debian")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provTwoStageRsync)
So(m.MemoryLimit.Value(), ShouldEqual, 256 * units.MiB)
m = cfg.Mirrors[2]
So(m.Name, ShouldEqual, "fedora")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provRsync)
So(m.ExcludeFile, ShouldEqual, "/etc/tunasync.d/fedora-exclude.txt")
So(m.MemoryLimit.Value(), ShouldEqual, 128 * units.MiB)
m = cfg.Mirrors[3]
So(m.Name, ShouldEqual, "debian-cd")
So(m.MirrorDir, ShouldEqual, "")
So(m.Provider, ShouldEqual, provTwoStageRsync)
So(m.MemoryLimit.Value(), ShouldEqual, 0)
m = cfg.Mirrors[4]
So(m.Name, ShouldEqual, "debian-security")

查看文件

@@ -13,6 +13,7 @@ type dockerHook struct {
image string
volumes []string
options []string
memoryLimit MemBytes
}
func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dockerHook {
@@ -35,6 +36,7 @@ func newDockerHook(p mirrorProvider, gCfg dockerConfig, mCfg mirrorConfig) *dock
image: mCfg.DockerImage,
volumes: volumes,
options: options,
memoryLimit: mCfg.MemoryLimit,
}
}

查看文件

@@ -8,6 +8,7 @@ import (
"path/filepath"
"testing"
"time"
units "github.com/docker/go-units"
"github.com/codeskyblue/go-sh"
. "github.com/smartystreets/goconvey/convey"
@@ -77,6 +78,7 @@ sleep 20
volumes: []string{
fmt.Sprintf("%s:%s", cmdScript, "/bin/cmd.sh"),
},
memoryLimit: 512 * units.MiB,
}
provider.AddHook(d)
So(provider.Docker(), ShouldNotBeNil)

查看文件

@@ -212,8 +212,7 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
// Add Cgroup Hook
provider.AddHook(
newCgroupHook(
provider, cfg.Cgroup.BasePath, cfg.Cgroup.Group,
cfg.Cgroup.Subsystem, mirror.MemoryLimit,
provider, cfg.Cgroup, mirror.MemoryLimit,
),
)
}

查看文件

@@ -12,6 +12,8 @@ import (
"github.com/codeskyblue/go-sh"
"golang.org/x/sys/unix"
"github.com/moby/moby/pkg/reexec"
cgv1 "github.com/containerd/cgroups"
)
// runner is to run os commands giving command line, env and log file
@@ -56,6 +58,10 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
kv := fmt.Sprintf("%s=%s", k, v)
args = append(args, "-e", kv)
}
// set memlimit
if d.memoryLimit != 0 {
args = append(args, "-m", fmt.Sprint(d.memoryLimit.Value()))
}
// apply options
args = append(args, d.options...)
// apply image and command
@@ -66,10 +72,7 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
cmd = exec.Command(c, args...)
} else if provider.Cgroup() != nil {
c := "cgexec"
args := []string{"-g", provider.Cgroup().Cgroup()}
args = append(args, cmdAndArgs...)
cmd = exec.Command(c, args...)
cmd = reexec.Command(append([]string{"tunasync-exec"}, cmdAndArgs...)...)
} else {
if len(cmdAndArgs) == 1 {
@@ -104,9 +107,59 @@ func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string,
}
func (c *cmdJob) Start() error {
cg := c.provider.Cgroup()
var (
pipeR *os.File
pipeW *os.File
)
if cg != nil {
logger.Debugf("Preparing cgroup sync pipes for job %s", c.provider.Name())
var err error
pipeR, pipeW, err = os.Pipe();
if err != nil {
return err
}
c.cmd.ExtraFiles = []*os.File{pipeR}
defer pipeR.Close()
defer pipeW.Close()
}
logger.Debugf("Command start: %v", c.cmd.Args)
c.finished = make(chan empty, 1)
return c.cmd.Start()
if err := c.cmd.Start(); err != nil {
return err
}
if cg != nil {
if err := pipeR.Close(); err != nil {
return err
}
if c.cmd == nil || c.cmd.Process == nil {
return errProcessNotStarted
}
pid := c.cmd.Process.Pid
if cg.cgCfg.isUnified {
if err := cg.cgMgrV2.AddProc(uint64(pid)); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
} else {
return err
}
}
} else {
if err := cg.cgMgrV1.Add(cgv1.Process{Pid: pid}); err != nil{
if errors.Is(err, syscall.ESRCH) {
logger.Infof("Write pid %d to cgroup failed: process vanished, ignoring")
} else {
return err
}
}
}
if _, err := pipeW.WriteString(string(cmdCont)); err != nil {
return err
}
}
return nil
}
func (c *cmdJob) Wait() error {

查看文件

@@ -54,6 +54,12 @@ func NewTUNASyncWorker(cfg *Config) *Worker {
w.httpClient = httpClient
}
if cfg.Cgroup.Enable {
if err := initCgroup(&cfg.Cgroup); err != nil {
logger.Errorf("Error initializing Cgroup: %s", err.Error())
return nil
}
}
w.initJobs()
w.makeHTTPServer()
return w