diff --git a/cmd/tunasync/tunasync.go b/cmd/tunasync/tunasync.go index aadf868..afcc5c3 100644 --- a/cmd/tunasync/tunasync.go +++ b/cmd/tunasync/tunasync.go @@ -61,8 +61,9 @@ func startWorker(c *cli.Context) { time.Sleep(1 * time.Second) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGHUP) - for { - s := <-sigChan + signal.Notify(sigChan, syscall.SIGINT) + signal.Notify(sigChan, syscall.SIGTERM) + for s := range sigChan { switch s { case syscall.SIGHUP: logger.Info("Received reload signal") @@ -71,6 +72,8 @@ func startWorker(c *cli.Context) { logger.Errorf("Error loading config: %s", err.Error()) } w.ReloadMirrorConfig(newCfg.Mirrors) + case syscall.SIGINT, syscall.SIGTERM: + w.Halt() } } }() diff --git a/worker/job.go b/worker/job.go index d8462fd..a5feca1 100644 --- a/worker/job.go +++ b/worker/job.go @@ -3,6 +3,7 @@ package worker import ( "errors" "fmt" + "sync" "sync/atomic" tunasync "github.com/tuna/tunasync/internal" @@ -18,6 +19,7 @@ const ( jobDisable // disable the job (stops goroutine) jobRestart // restart syncing jobPing // ensure the goroutine is alive + jobHalt // worker halts ) type jobMessage struct { @@ -36,8 +38,14 @@ const ( statePaused // disabled by jobDisable stateDisabled + // worker is halting + stateHalting ) +// use to ensure all jobs are finished before +// worker exit +var jobsDone sync.WaitGroup + type mirrorJob struct { provider mirrorProvider ctrlChan chan ctrlAction @@ -82,11 +90,11 @@ func (m *mirrorJob) SetProvider(provider mirrorProvider) error { // sempaphore: make sure the concurrent running syncing job won't explode // TODO: message struct for managerChan func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error { - + jobsDone.Add(1) m.disabled = make(chan empty) defer func() { close(m.disabled) - m.SetState(stateDisabled) + jobsDone.Done() }() provider := m.provider @@ -244,6 +252,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err case jobStart: m.SetState(stateReady) goto _wait_for_job + case jobHalt: + m.SetState(stateHalting) + close(kill) + <-jobDone + return nil default: // TODO: implement this close(kill) diff --git a/worker/worker.go b/worker/worker.go index c68c6d1..61daf21 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -20,6 +20,7 @@ type Worker struct { managerChan chan jobMessage semaphore chan empty + exit chan empty schedule *scheduleQueue httpEngine *gin.Engine @@ -38,6 +39,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker { managerChan: make(chan jobMessage, 32), semaphore: make(chan empty, cfg.Global.Concurrent), + exit: make(chan empty), schedule: newScheduleQueue(), } @@ -57,12 +59,26 @@ func GetTUNASyncWorker(cfg *Config) *Worker { return w } -func (w *Worker) initJobs() { - for _, mirror := range w.cfg.Mirrors { - // Create Provider - provider := newMirrorProvider(mirror, w.cfg) - w.jobs[provider.Name()] = newMirrorJob(provider) +// Run runs worker forever +func (w *Worker) Run() { + w.registorWorker() + go w.runHTTPServer() + w.runSchedule() +} + +// Halt stops all jobs +func (w *Worker) Halt() { + w.L.Lock() + logger.Notice("Stopping all the jobs") + for _, job := range w.jobs { + if job.State() != stateDisabled { + job.ctrlChan <- jobHalt + } } + jobsDone.Wait() + logger.Notice("All the jobs are stopped") + w.L.Unlock() + close(w.exit) } // ReloadMirrorConfig refresh the providers and jobs @@ -132,7 +148,14 @@ func (w *Worker) ReloadMirrorConfig(newMirrors []mirrorConfig) { } w.cfg.Mirrors = newMirrors +} +func (w *Worker) initJobs() { + for _, mirror := range w.cfg.Mirrors { + // Create Provider + provider := newMirrorProvider(mirror, w.cfg) + w.jobs[provider.Name()] = newMirrorJob(provider) + } } func (w *Worker) disableJob(job *mirrorJob) { @@ -222,13 +245,6 @@ func (w *Worker) runHTTPServer() { } } -// Run runs worker forever -func (w *Worker) Run() { - w.registorWorker() - go w.runHTTPServer() - w.runSchedule() -} - func (w *Worker) runSchedule() { w.L.Lock() @@ -284,7 +300,7 @@ func (w *Worker) runSchedule() { continue } - if job.State() != stateReady { + if (job.State() != stateReady) && (job.State() != stateHalting) { logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name) continue } @@ -312,6 +328,25 @@ func (w *Worker) runSchedule() { if job := w.schedule.Pop(); job != nil { job.ctrlChan <- jobStart } + case <-w.exit: + // flush status update messages + w.L.Lock() + defer w.L.Unlock() + for { + select { + case jobMsg := <-w.managerChan: + logger.Debugf("status update from %s", jobMsg.name) + job, ok := w.jobs[jobMsg.name] + if !ok { + continue + } + if jobMsg.status == Failed || jobMsg.status == Success { + w.updateStatus(job, jobMsg) + } + default: + return + } + } } } }