From 6cbe91b4f1c29611ed2a257e6f7ef72fa8c1cfa7 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Wed, 30 May 2018 16:07:07 +0800 Subject: [PATCH] new command: jobForceStart --- worker/job.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/worker/job.go b/worker/job.go index e07af45..a4c23dc 100644 --- a/worker/job.go +++ b/worker/job.go @@ -15,12 +15,13 @@ import ( type ctrlAction uint8 const ( - jobStart ctrlAction = iota - jobStop // stop syncing keep the job - jobDisable // disable the job (stops goroutine) - jobRestart // restart syncing - jobPing // ensure the goroutine is alive - jobHalt // worker halts + jobStart ctrlAction = iota + jobStop // stop syncing keep the job + jobDisable // disable the job (stops goroutine) + jobRestart // restart syncing + jobPing // ensure the goroutine is alive + jobHalt // worker halts + jobForceStart // ignore concurrent limit ) type jobMessage struct { @@ -211,22 +212,25 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err return nil } - runJob := func(kill <-chan empty, jobDone chan<- empty) { + runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) { select { case semaphore <- empty{}: defer func() { <-semaphore }() runJobWrapper(kill, jobDone) + case <-bypassSemaphore: + runJobWrapper(kill, jobDone) case <-kill: jobDone <- empty{} return } } + bypassSemaphore := make(chan empty, 1) for { if m.State() == stateReady { kill := make(chan empty) jobDone := make(chan empty) - go runJob(kill, jobDone) + go runJob(kill, jobDone, bypassSemaphore) _wait_for_job: select { @@ -249,6 +253,12 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err <-jobDone time.Sleep(time.Second) // Restart may fail if the process was not exited yet continue + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobStart: m.SetState(stateReady) goto _wait_for_job @@ -272,8 +282,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobDisable: m.SetState(stateDisabled) return nil + case jobForceStart: + select { //non-blocking + default: + case bypassSemaphore <- empty{}: + } + fallthrough case jobRestart: - m.SetState(stateReady) + fallthrough case jobStart: m.SetState(stateReady) default: