From b077db1d0bbecb771eb63ed142b882bb8fd395b2 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sun, 24 Apr 2016 20:23:44 +0800 Subject: [PATCH] feature(worker): job schedule --- worker/job.go | 65 ++++++++++++++++++++++++------------- worker/job_test.go | 21 ++++++------ worker/main.go | 17 ++++++++++ worker/schedule.go | 71 +++++++++++++++++++++++++++++++++++++++++ worker/schedule_test.go | 50 +++++++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 33 deletions(-) create mode 100644 worker/main.go create mode 100644 worker/schedule.go create mode 100644 worker/schedule_test.go diff --git a/worker/job.go b/worker/job.go index cae589a..50d4c3d 100644 --- a/worker/job.go +++ b/worker/job.go @@ -25,6 +25,24 @@ type jobMessage struct { msg string } +type mirrorJob struct { + provider mirrorProvider + ctrlChan chan ctrlAction + enabled bool +} + +func newMirrorJob(provider mirrorProvider) *mirrorJob { + return &mirrorJob{ + provider: provider, + ctrlChan: make(chan ctrlAction, 1), + enabled: true, + } +} + +func (m *mirrorJob) Name() string { + return m.provider.Name() +} + // runMirrorJob is the goroutine where syncing job runs in // arguments: // provider: mirror provider object @@ -32,7 +50,9 @@ type jobMessage struct { // managerChan: push messages to the manager, this channel should have a larger buffer // sempaphore: make sure the concurrent running syncing job won't explode // TODO: message struct for managerChan -func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerChan chan<- jobMessage, semaphore chan empty) error { +func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error { + + provider := m.provider // to make code shorter runHooks := func(Hooks []jobHook, action func(h jobHook) error, hookname string) error { @@ -40,10 +60,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh if err := action(hook); err != nil { logger.Error( "failed at %s hooks for %s: %s", - hookname, provider.Name(), err.Error(), + hookname, m.Name(), err.Error(), ) managerChan <- jobMessage{ - tunasync.Failed, provider.Name(), + tunasync.Failed, m.Name(), fmt.Sprintf("error exec hook %s: %s", hookname, err.Error()), } return err @@ -55,8 +75,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh runJobWrapper := func(kill <-chan empty, jobDone chan<- empty) error { defer close(jobDone) - managerChan <- jobMessage{tunasync.PreSyncing, provider.Name(), ""} - logger.Info("start syncing: %s", provider.Name()) + managerChan <- jobMessage{tunasync.PreSyncing, m.Name(), ""} + logger.Info("start syncing: %s", m.Name()) Hooks := provider.Hooks() rHooks := []jobHook{} @@ -74,7 +94,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh stopASAP := false // stop job as soon as possible if retry > 0 { - logger.Info("retry syncing: %s, retry: %d", provider.Name(), retry) + logger.Info("retry syncing: %s, retry: %d", m.Name(), retry) } err := runHooks(Hooks, func(h jobHook) error { return h.preExec() }, "pre-exec") if err != nil { @@ -82,12 +102,12 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } // start syncing - managerChan <- jobMessage{tunasync.Syncing, provider.Name(), ""} + managerChan <- jobMessage{tunasync.Syncing, m.Name(), ""} err = provider.Start() if err != nil { logger.Error( "failed to start syncing job for %s: %s", - provider.Name(), err.Error(), + m.Name(), err.Error(), ) return err } @@ -108,7 +128,7 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh stopASAP = true err := provider.Terminate() if err != nil { - logger.Error("failed to terminate provider %s: %s", provider.Name(), err.Error()) + logger.Error("failed to terminate provider %s: %s", m.Name(), err.Error()) return err } syncErr = errors.New("killed by manager") @@ -122,8 +142,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh if syncErr == nil { // syncing success - logger.Info("succeeded syncing %s", provider.Name()) - managerChan <- jobMessage{tunasync.Success, provider.Name(), ""} + logger.Info("succeeded syncing %s", m.Name()) + managerChan <- jobMessage{tunasync.Success, m.Name(), ""} // post-success hooks err := runHooks(rHooks, func(h jobHook) error { return h.postSuccess() }, "post-success") if err != nil { @@ -134,8 +154,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } // syncing failed - logger.Warning("failed syncing %s: %s", provider.Name(), syncErr.Error()) - managerChan <- jobMessage{tunasync.Failed, provider.Name(), syncErr.Error()} + logger.Warning("failed syncing %s: %s", m.Name(), syncErr.Error()) + managerChan <- jobMessage{tunasync.Failed, m.Name(), syncErr.Error()} // post-fail hooks logger.Debug("post-fail hooks") @@ -164,9 +184,8 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } } - enabled := true // whether this job is stopped by the manager for { - if enabled { + if m.enabled { kill := make(chan empty) jobDone := make(chan empty) go runJob(kill, jobDone) @@ -175,10 +194,10 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh select { case <-jobDone: logger.Debug("job done") - case ctrl := <-ctrlChan: + case ctrl := <-m.ctrlChan: switch ctrl { case jobStop: - enabled = false + m.enabled = false close(kill) <-jobDone case jobDisable: @@ -186,12 +205,12 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh <-jobDone return nil case jobRestart: - enabled = true + m.enabled = true close(kill) <-jobDone continue case jobStart: - enabled = true + m.enabled = true goto _wait_for_job default: // TODO: implement this @@ -201,16 +220,16 @@ func runMirrorJob(provider mirrorProvider, ctrlChan <-chan ctrlAction, managerCh } } - ctrl := <-ctrlChan + ctrl := <-m.ctrlChan switch ctrl { case jobStop: - enabled = false + m.enabled = false case jobDisable: return nil case jobRestart: - enabled = true + m.enabled = true case jobStart: - enabled = true + m.enabled = true default: // TODO return nil diff --git a/worker/job_test.go b/worker/job_test.go index f5e9382..065d10e 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -63,11 +63,11 @@ func TestMirrorJob(t *testing.T) { So(readedScriptContent, ShouldResemble, []byte(scriptContent)) Convey("If we let it run several times", func(ctx C) { - ctrlChan := make(chan ctrlAction) managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, 1) + job := newMirrorJob(provider) - go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + go job.Run(managerChan, semaphore) for i := 0; i < 2; i++ { msg := <-managerChan So(msg.status, ShouldEqual, PreSyncing) @@ -78,7 +78,7 @@ func TestMirrorJob(t *testing.T) { loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) - ctrlChan <- jobStart + job.ctrlChan <- jobStart } select { case msg := <-managerChan: @@ -92,7 +92,7 @@ func TestMirrorJob(t *testing.T) { So(0, ShouldEqual, 1) } - ctrlChan <- jobDisable + job.ctrlChan <- jobDisable select { case <-managerChan: So(0, ShouldEqual, 1) // made this fail @@ -112,12 +112,12 @@ echo $TUNASYNC_WORKING_DIR err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) So(err, ShouldBeNil) - ctrlChan := make(chan ctrlAction) managerChan := make(chan jobMessage, 10) semaphore := make(chan empty, 1) + job := newMirrorJob(provider) Convey("If we kill it", func(ctx C) { - go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + go job.Run(managerChan, semaphore) time.Sleep(1 * time.Second) msg := <-managerChan @@ -125,7 +125,7 @@ echo $TUNASYNC_WORKING_DIR msg = <-managerChan So(msg.status, ShouldEqual, Syncing) - ctrlChan <- jobStop + job.ctrlChan <- jobStop msg = <-managerChan So(msg.status, ShouldEqual, Failed) @@ -134,11 +134,12 @@ echo $TUNASYNC_WORKING_DIR loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) - ctrlChan <- jobDisable + job.ctrlChan <- jobDisable }) Convey("If we don't kill it", func(ctx C) { - go runMirrorJob(provider, ctrlChan, managerChan, semaphore) + go job.Run(managerChan, semaphore) + msg := <-managerChan So(msg.status, ShouldEqual, PreSyncing) msg = <-managerChan @@ -154,7 +155,7 @@ echo $TUNASYNC_WORKING_DIR loggedContent, err := ioutil.ReadFile(provider.LogFile()) So(err, ShouldBeNil) So(string(loggedContent), ShouldEqual, exceptedOutput) - ctrlChan <- jobDisable + job.ctrlChan <- jobDisable }) }) diff --git a/worker/main.go b/worker/main.go new file mode 100644 index 0000000..b1d1453 --- /dev/null +++ b/worker/main.go @@ -0,0 +1,17 @@ +package worker + +import "time" + +// toplevel module for workers + +func main() { + + for { + // if time.Now().After() { + // + // } + + time.Sleep(1 * time.Second) + } + +} diff --git a/worker/schedule.go b/worker/schedule.go new file mode 100644 index 0000000..cb95a5d --- /dev/null +++ b/worker/schedule.go @@ -0,0 +1,71 @@ +package worker + +// schedule queue for jobs + +import ( + "sync" + "time" + + "github.com/ryszard/goskiplist/skiplist" +) + +type scheduleQueue struct { + sync.Mutex + list *skiplist.SkipList +} + +func timeLessThan(l, r interface{}) bool { + tl := l.(time.Time) + tr := r.(time.Time) + return tl.Before(tr) +} + +func newScheduleQueue() *scheduleQueue { + queue := new(scheduleQueue) + queue.list = skiplist.NewCustomMap(timeLessThan) + return queue +} + +func (q *scheduleQueue) AddJob(schedTime time.Time, job *mirrorJob) { + q.Lock() + defer q.Unlock() + q.list.Set(schedTime, job) +} + +// pop out the first job if it's time to run it +func (q *scheduleQueue) Pop() *mirrorJob { + q.Lock() + defer q.Unlock() + + first := q.list.SeekToFirst() + if first == nil { + return nil + } + defer first.Close() + + t := first.Key().(time.Time) + if t.Before(time.Now()) { + job := first.Value().(*mirrorJob) + q.list.Delete(first.Key()) + return job + } + return nil +} + +// remove job +func (q *scheduleQueue) Remove(name string) bool { + q.Lock() + defer q.Unlock() + + cur := q.list.Iterator() + defer cur.Close() + + for cur.Next() { + cj := cur.Value().(*mirrorJob) + if cj.Name() == name { + q.list.Delete(cur.Key()) + return true + } + } + return false +} diff --git a/worker/schedule_test.go b/worker/schedule_test.go new file mode 100644 index 0000000..8bf3bc5 --- /dev/null +++ b/worker/schedule_test.go @@ -0,0 +1,50 @@ +package worker + +import ( + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestSchedule(t *testing.T) { + + Convey("MirrorJobSchedule should work", t, func(ctx C) { + schedule := newScheduleQueue() + + Convey("When poping on empty schedule", func() { + job := schedule.Pop() + So(job, ShouldBeNil) + }) + + Convey("When adding some jobs", func() { + c := cmdConfig{ + name: "schedule_test", + } + provider, _ := newCmdProvider(c) + job := newMirrorJob(provider) + sched := time.Now().Add(1 * time.Second) + + schedule.AddJob(sched, job) + So(schedule.Pop(), ShouldBeNil) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldEqual, job) + + }) + Convey("When removing jobs", func() { + c := cmdConfig{ + name: "schedule_test", + } + provider, _ := newCmdProvider(c) + job := newMirrorJob(provider) + sched := time.Now().Add(1 * time.Second) + + schedule.AddJob(sched, job) + So(schedule.Remove("something"), ShouldBeFalse) + So(schedule.Remove("schedule_test"), ShouldBeTrue) + time.Sleep(1200 * time.Millisecond) + So(schedule.Pop(), ShouldBeNil) + }) + + }) +}