diff --git a/worker/cgroup.go b/worker/cgroup.go index 097aaa7..26ae7e9 100644 --- a/worker/cgroup.go +++ b/worker/cgroup.go @@ -2,18 +2,20 @@ package worker import ( "bufio" + "errors" "fmt" "os" "path/filepath" "strconv" "syscall" + "time" "golang.org/x/sys/unix" "github.com/codeskyblue/go-sh" ) -var cgSubsystem string = "cpu" +var cgSubsystem = "cpu" type cgroupHook struct { emptyHook @@ -82,23 +84,43 @@ func (c *cgroupHook) killAll() error { return nil } name := c.provider.Name() - taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) - if err != nil { - return err + + readTaskList := func() ([]int, error) { + taskList := []int{} + taskFile, err := os.Open(filepath.Join(c.basePath, cgSubsystem, c.baseGroup, name, "tasks")) + if err != nil { + return taskList, err + } + defer taskFile.Close() + + scanner := bufio.NewScanner(taskFile) + for scanner.Scan() { + pid, err := strconv.Atoi(scanner.Text()) + if err != nil { + return taskList, err + } + taskList = append(taskList, pid) + } + return taskList, nil } - defer taskFile.Close() - taskList := []int{} - scanner := bufio.NewScanner(taskFile) - for scanner.Scan() { - pid, err := strconv.Atoi(scanner.Text()) + + for i := 0; i < 4; i++ { + if i == 3 { + return errors.New("Unable to kill all child tasks") + } + taskList, err := readTaskList() if err != nil { return err } - taskList = append(taskList, pid) - } - for _, pid := range taskList { - logger.Debugf("Killing process: %d", pid) - unix.Kill(pid, syscall.SIGKILL) + if len(taskList) == 0 { + return nil + } + for _, pid := range taskList { + logger.Debugf("Killing process: %d", pid) + unix.Kill(pid, syscall.SIGKILL) + } + // sleep 10ms for the first round, and 1.01s, 2.01s, 3.01s for the rest + time.Sleep(time.Duration(i)*time.Second + 10*time.Millisecond) } return nil diff --git a/worker/loglimit_hook.go b/worker/loglimit_hook.go index ad89a84..b3ddffb 100644 --- a/worker/loglimit_hook.go +++ b/worker/loglimit_hook.go @@ -79,7 +79,7 @@ func (l *logLimiter) preExec() error { logLink := filepath.Join(logDir, "latest") - if _, err = os.Stat(logLink); err == nil { + if _, err = os.Lstat(logLink); err == nil { os.Remove(logLink) } os.Symlink(logFileName, logLink) diff --git a/worker/runner.go b/worker/runner.go index 04fc1fb..ff0399a 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "strings" + "sync" "syscall" "time" @@ -17,12 +18,14 @@ import ( var errProcessNotStarted = errors.New("Process Not Started") type cmdJob struct { + sync.Mutex cmd *exec.Cmd workingDir string env map[string]string logFile *os.File finished chan empty provider mirrorProvider + retErr error } func newCmdJob(provider mirrorProvider, cmdAndArgs []string, workingDir string, env map[string]string) *cmdJob { @@ -69,9 +72,18 @@ func (c *cmdJob) Start() error { } func (c *cmdJob) Wait() error { - err := c.cmd.Wait() - close(c.finished) - return err + c.Lock() + defer c.Unlock() + + select { + case <-c.finished: + return c.retErr + default: + err := c.cmd.Wait() + c.retErr = err + close(c.finished) + return err + } } func (c *cmdJob) SetLogFile(logFile *os.File) { diff --git a/worker/two_stage_rsync_provider.go b/worker/two_stage_rsync_provider.go index 21324ca..48e5125 100644 --- a/worker/two_stage_rsync_provider.go +++ b/worker/two_stage_rsync_provider.go @@ -108,6 +108,7 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) { } func (p *twoStageRsyncProvider) Run() error { + defer p.Wait() env := map[string]string{} if p.username != "" {