job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.
Dependents: JobSchedulerDemo Borsch
Diff: scheduler.cpp
- Revision:
- 3:f08f55827736
- Parent:
- 2:9bf5366ad5a2
- Child:
- 4:78bcd5a675e1
--- a/scheduler.cpp Tue Jul 11 20:35:19 2017 +0000 +++ b/scheduler.cpp Tue Jul 11 21:47:53 2017 +0000 @@ -1,17 +1,22 @@ #include "scheduler.h" +Timeout WakeOnce; + void update(void *target) { }; namespace JobScheduler { + const ActionType JobAddAT(1); + const ActionType JobRunAT(3); + bool descendingTimeline(Appointment *a1, Appointment *a2) { bool rv = a1->GetTime() <= a2->GetTime(); //printf("(%d %d:%d)", *d1, *d2, rv); return rv; }; - + /** JobAddReq adds new job to the scheduler. */ @@ -21,6 +26,10 @@ JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {} }; + struct JobRunReq: Action { + JobRunReq() : Action(JobRunAT) {} + }; + Scheduler::Scheduler(JobService *jobService) : _jobService(jobService), _nextJobID(1) { } @@ -70,6 +79,12 @@ void Scheduler::JobRemove(JobID jobID) { } + + static JobRunReq jobRunReq; + void Scheduler::onWakeOnce() + { + _actions.put(&jobRunReq); + } void Scheduler::updateHandler() { while (!_quit) { @@ -85,9 +100,10 @@ wait(2); } } - + void Scheduler::process(Action *action) { + time_t now = time(NULL); // now in seconds switch(action->type) { case JobAddAT: { JobAddReq *req = static_cast<JobAddReq*>(action); @@ -96,21 +112,43 @@ // assign job its ID job->Init(_nextJobID++); } + // set next appointment time + req->apt->SetTime(job->GetSchedule()->NextRunTime(now)); node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline); if (NULL == tmp) { printf("[Scheduler::process] timeline insert failed\n"); action->resQueue.put(NULL); + // internal state has not changed return; } + req->response.data = job->GetID(); //printf("[Scheduler::process] simulate error\n"); //action->resQueue.put(NULL); action->resQueue.put(&req->response); break; } + case JobRunAT: { + // execute job run logic after switch + break; + } default: printf("[Scheduler::process] unknown action type\n"); action->resQueue.put(NULL); } + node<Appointment> *wakeNode = _timeline.pop(1); + Appointment *wakeApt = wakeNode->data; + Job* wakeJob = wakeApt->GetJob(); + time_t sleepTime = wakeApt->GetTime() - now; + if (sleepTime > 0) { + // request wake up + printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime); + WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime); + } else { + // process job + printf("[Scheduler::process] running job ID %d\n", wakeJob->GetID()); + _timeline.remove(1); + } + } } \ No newline at end of file