job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.
Dependents: JobSchedulerDemo Borsch
scheduler.cpp@15:6b8fa5dff770, 2017-08-02 (annotated)
- Committer:
- sgnezdov
- Date:
- Wed Aug 02 22:03:50 2017 +0000
- Revision:
- 15:6b8fa5dff770
- Parent:
- 14:a30cc783ae89
- Child:
- 16:f61b62b119dd
implemented JobList method
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
sgnezdov | 0:806403f3d0d1 | 1 | #include "scheduler.h" |
sgnezdov | 0:806403f3d0d1 | 2 | |
sgnezdov | 9:ee21cd055a97 | 3 | #include "mbed-trace/mbed_trace.h" |
sgnezdov | 9:ee21cd055a97 | 4 | #define TRACE_GROUP "schd" |
sgnezdov | 9:ee21cd055a97 | 5 | |
sgnezdov | 3:f08f55827736 | 6 | Timeout WakeOnce; |
sgnezdov | 3:f08f55827736 | 7 | |
sgnezdov | 0:806403f3d0d1 | 8 | namespace JobScheduler { |
sgnezdov | 1:ec6a1d054065 | 9 | |
sgnezdov | 3:f08f55827736 | 10 | const ActionType JobAddAT(1); |
sgnezdov | 3:f08f55827736 | 11 | const ActionType JobRunAT(3); |
sgnezdov | 7:98c8b2eabea3 | 12 | const ActionType JobQuitAT(4); |
sgnezdov | 15:6b8fa5dff770 | 13 | const ActionType JobListAT(5); |
sgnezdov | 3:f08f55827736 | 14 | |
sgnezdov | 2:9bf5366ad5a2 | 15 | bool descendingTimeline(Appointment *a1, Appointment *a2) |
sgnezdov | 2:9bf5366ad5a2 | 16 | { |
sgnezdov | 12:684ddfc57199 | 17 | time_t t1 = a1->GetTime(); |
sgnezdov | 12:684ddfc57199 | 18 | time_t t2 = a2->GetTime(); |
sgnezdov | 12:684ddfc57199 | 19 | bool rv = t1 <= t2; |
sgnezdov | 12:684ddfc57199 | 20 | tr_debug("apt %d:%d <= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv); |
sgnezdov | 2:9bf5366ad5a2 | 21 | return rv; |
sgnezdov | 2:9bf5366ad5a2 | 22 | }; |
sgnezdov | 3:f08f55827736 | 23 | |
sgnezdov | 2:9bf5366ad5a2 | 24 | /** |
sgnezdov | 2:9bf5366ad5a2 | 25 | JobAddReq adds new job to the scheduler. |
sgnezdov | 2:9bf5366ad5a2 | 26 | */ |
sgnezdov | 1:ec6a1d054065 | 27 | struct JobAddReq: Action { |
sgnezdov | 2:9bf5366ad5a2 | 28 | Appointment *apt; |
sgnezdov | 1:ec6a1d054065 | 29 | Response<JobID> response; |
sgnezdov | 2:9bf5366ad5a2 | 30 | JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {} |
sgnezdov | 1:ec6a1d054065 | 31 | }; |
sgnezdov | 15:6b8fa5dff770 | 32 | |
sgnezdov | 15:6b8fa5dff770 | 33 | struct JobListReq: Action { |
sgnezdov | 15:6b8fa5dff770 | 34 | LinkedList<Job>& _jobsToFill; |
sgnezdov | 15:6b8fa5dff770 | 35 | JobListReq(LinkedList<Job>& jobs) : Action(JobListAT), _jobsToFill(jobs) {} |
sgnezdov | 15:6b8fa5dff770 | 36 | }; |
sgnezdov | 7:98c8b2eabea3 | 37 | |
sgnezdov | 0:806403f3d0d1 | 38 | Scheduler::Scheduler(JobService *jobService) |
sgnezdov | 7:98c8b2eabea3 | 39 | : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { } |
sgnezdov | 0:806403f3d0d1 | 40 | |
sgnezdov | 0:806403f3d0d1 | 41 | void Scheduler::updateAdapter(void *thisPointer) { |
sgnezdov | 0:806403f3d0d1 | 42 | Scheduler *self = static_cast<Scheduler*>(thisPointer); |
sgnezdov | 0:806403f3d0d1 | 43 | self->updateHandler(); |
sgnezdov | 0:806403f3d0d1 | 44 | } |
sgnezdov | 4:78bcd5a675e1 | 45 | |
sgnezdov | 4:78bcd5a675e1 | 46 | void Scheduler::runAdapter(void *thisPointer) { |
sgnezdov | 4:78bcd5a675e1 | 47 | Scheduler *self = static_cast<Scheduler*>(thisPointer); |
sgnezdov | 4:78bcd5a675e1 | 48 | self->runHandler(); |
sgnezdov | 4:78bcd5a675e1 | 49 | } |
sgnezdov | 0:806403f3d0d1 | 50 | |
sgnezdov | 0:806403f3d0d1 | 51 | void Scheduler::Start() { |
sgnezdov | 0:806403f3d0d1 | 52 | _updater.start(callback(Scheduler::updateAdapter, this)); |
sgnezdov | 4:78bcd5a675e1 | 53 | _runner.start(callback(Scheduler::runAdapter, this)); |
sgnezdov | 0:806403f3d0d1 | 54 | } |
sgnezdov | 0:806403f3d0d1 | 55 | |
sgnezdov | 0:806403f3d0d1 | 56 | void Scheduler::Stop() { |
sgnezdov | 7:98c8b2eabea3 | 57 | _runs.put(NULL); |
sgnezdov | 7:98c8b2eabea3 | 58 | |
sgnezdov | 7:98c8b2eabea3 | 59 | Action uReq = Action(JobQuitAT); |
sgnezdov | 7:98c8b2eabea3 | 60 | _updates.put(&uReq); |
sgnezdov | 7:98c8b2eabea3 | 61 | uReq.resQueue.get(); |
sgnezdov | 0:806403f3d0d1 | 62 | } |
sgnezdov | 7:98c8b2eabea3 | 63 | |
sgnezdov | 0:806403f3d0d1 | 64 | void Scheduler::WaitToStop() { |
sgnezdov | 0:806403f3d0d1 | 65 | _updater.join(); |
sgnezdov | 4:78bcd5a675e1 | 66 | _runner.join(); |
sgnezdov | 0:806403f3d0d1 | 67 | } |
sgnezdov | 0:806403f3d0d1 | 68 | |
sgnezdov | 2:9bf5366ad5a2 | 69 | Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) { |
sgnezdov | 2:9bf5366ad5a2 | 70 | Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0)); |
sgnezdov | 2:9bf5366ad5a2 | 71 | if (NULL == apt) { |
sgnezdov | 10:8cff30b5b90d | 72 | tr_error("[JobAdd] failed to allocate appointment"); |
sgnezdov | 2:9bf5366ad5a2 | 73 | return Response<JobID>(1, 0); |
sgnezdov | 2:9bf5366ad5a2 | 74 | } |
sgnezdov | 5:d8f69ac330f2 | 75 | return this->reschedule(apt); |
sgnezdov | 5:d8f69ac330f2 | 76 | } |
sgnezdov | 5:d8f69ac330f2 | 77 | |
sgnezdov | 5:d8f69ac330f2 | 78 | Response<JobID> Scheduler::reschedule(Appointment *apt) { |
sgnezdov | 2:9bf5366ad5a2 | 79 | JobAddReq req(apt); |
sgnezdov | 6:5baa0e4ec500 | 80 | |
sgnezdov | 6:5baa0e4ec500 | 81 | // set next appointment time |
sgnezdov | 6:5baa0e4ec500 | 82 | time_t now = time(NULL); // now in seconds |
sgnezdov | 6:5baa0e4ec500 | 83 | apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now)); |
sgnezdov | 6:5baa0e4ec500 | 84 | if (apt->GetTime() == 0) { |
sgnezdov | 6:5baa0e4ec500 | 85 | // there is no next run time; delete appointment |
sgnezdov | 10:8cff30b5b90d | 86 | tr_debug("[reschedule] NO next appointment"); |
sgnezdov | 6:5baa0e4ec500 | 87 | delete apt; |
sgnezdov | 6:5baa0e4ec500 | 88 | req.response.error = 2; |
sgnezdov | 6:5baa0e4ec500 | 89 | return req.response; |
sgnezdov | 6:5baa0e4ec500 | 90 | } |
sgnezdov | 6:5baa0e4ec500 | 91 | |
sgnezdov | 10:8cff30b5b90d | 92 | tr_debug("[reschedule] put"); |
sgnezdov | 4:78bcd5a675e1 | 93 | _updates.put(&req); |
sgnezdov | 10:8cff30b5b90d | 94 | tr_debug("[reschedule] get"); |
sgnezdov | 0:806403f3d0d1 | 95 | // default is wait forever |
sgnezdov | 1:ec6a1d054065 | 96 | osEvent evt = req.resQueue.get(); |
sgnezdov | 0:806403f3d0d1 | 97 | if (evt.status == osEventMessage) { |
sgnezdov | 2:9bf5366ad5a2 | 98 | if (evt.value.p != NULL) { |
sgnezdov | 10:8cff30b5b90d | 99 | tr_debug("[reschedule] completed ok"); |
sgnezdov | 2:9bf5366ad5a2 | 100 | } else { |
sgnezdov | 10:8cff30b5b90d | 101 | tr_error("[reschedule] NOT added (C1)"); |
sgnezdov | 2:9bf5366ad5a2 | 102 | } |
sgnezdov | 2:9bf5366ad5a2 | 103 | } else { |
sgnezdov | 2:9bf5366ad5a2 | 104 | // not sure what condition is |
sgnezdov | 10:8cff30b5b90d | 105 | tr_error("[reschedule] NOT added (C2)"); |
sgnezdov | 2:9bf5366ad5a2 | 106 | delete apt; |
sgnezdov | 2:9bf5366ad5a2 | 107 | apt = NULL; |
sgnezdov | 0:806403f3d0d1 | 108 | } |
sgnezdov | 2:9bf5366ad5a2 | 109 | // yes, return a copy of the structure |
sgnezdov | 5:d8f69ac330f2 | 110 | return req.response; |
sgnezdov | 0:806403f3d0d1 | 111 | } |
sgnezdov | 0:806403f3d0d1 | 112 | |
sgnezdov | 2:9bf5366ad5a2 | 113 | void Scheduler::JobRemove(JobID jobID) { |
sgnezdov | 13:6be67ee77861 | 114 | tr_error("JobRemove is not implemented"); |
sgnezdov | 13:6be67ee77861 | 115 | } |
sgnezdov | 13:6be67ee77861 | 116 | |
sgnezdov | 15:6b8fa5dff770 | 117 | void Scheduler::JobList(LinkedList<Job>& jobs) { |
sgnezdov | 15:6b8fa5dff770 | 118 | JobListReq req(jobs); |
sgnezdov | 15:6b8fa5dff770 | 119 | _updates.put(&req); |
sgnezdov | 15:6b8fa5dff770 | 120 | osEvent evt = req.resQueue.get(); |
sgnezdov | 15:6b8fa5dff770 | 121 | if (evt.status != osEventMessage) { |
sgnezdov | 15:6b8fa5dff770 | 122 | // not sure what condition is |
sgnezdov | 15:6b8fa5dff770 | 123 | tr_error("[JobList] status error"); |
sgnezdov | 15:6b8fa5dff770 | 124 | } |
sgnezdov | 15:6b8fa5dff770 | 125 | } |
sgnezdov | 3:f08f55827736 | 126 | |
sgnezdov | 7:98c8b2eabea3 | 127 | static Action jobRunReq(JobRunAT); |
sgnezdov | 3:f08f55827736 | 128 | void Scheduler::onWakeOnce() |
sgnezdov | 3:f08f55827736 | 129 | { |
sgnezdov | 4:78bcd5a675e1 | 130 | _updates.put(&jobRunReq); |
sgnezdov | 3:f08f55827736 | 131 | } |
sgnezdov | 0:806403f3d0d1 | 132 | |
sgnezdov | 0:806403f3d0d1 | 133 | void Scheduler::updateHandler() { |
sgnezdov | 7:98c8b2eabea3 | 134 | while (!_quitUpdater) { |
sgnezdov | 10:8cff30b5b90d | 135 | tr_debug("[updateHandler] waiting for action"); |
sgnezdov | 0:806403f3d0d1 | 136 | // wait forever ... |
sgnezdov | 4:78bcd5a675e1 | 137 | osEvent evt = _updates.get(); |
sgnezdov | 0:806403f3d0d1 | 138 | if (evt.status == osEventMessage) { |
sgnezdov | 10:8cff30b5b90d | 139 | tr_debug("[updateHandler] process action"); |
sgnezdov | 0:806403f3d0d1 | 140 | this->process((Action*)evt.value.p); |
sgnezdov | 0:806403f3d0d1 | 141 | } else { |
sgnezdov | 10:8cff30b5b90d | 142 | tr_error("[updateHandler] NOT osEventMessage"); |
sgnezdov | 0:806403f3d0d1 | 143 | } |
sgnezdov | 0:806403f3d0d1 | 144 | } |
sgnezdov | 0:806403f3d0d1 | 145 | } |
sgnezdov | 3:f08f55827736 | 146 | |
sgnezdov | 0:806403f3d0d1 | 147 | void Scheduler::process(Action *action) |
sgnezdov | 0:806403f3d0d1 | 148 | { |
sgnezdov | 3:f08f55827736 | 149 | time_t now = time(NULL); // now in seconds |
sgnezdov | 1:ec6a1d054065 | 150 | switch(action->type) { |
sgnezdov | 1:ec6a1d054065 | 151 | case JobAddAT: { |
sgnezdov | 10:8cff30b5b90d | 152 | tr_debug("[process] JobAddAT"); |
sgnezdov | 1:ec6a1d054065 | 153 | JobAddReq *req = static_cast<JobAddReq*>(action); |
sgnezdov | 2:9bf5366ad5a2 | 154 | Job *job = req->apt->GetJob(); |
sgnezdov | 2:9bf5366ad5a2 | 155 | if (job->GetID() == 0) { |
sgnezdov | 2:9bf5366ad5a2 | 156 | // assign job its ID |
sgnezdov | 2:9bf5366ad5a2 | 157 | job->Init(_nextJobID++); |
sgnezdov | 12:684ddfc57199 | 158 | tr_debug("assigned new job its id %d", job->GetID()); |
sgnezdov | 12:684ddfc57199 | 159 | } else { |
sgnezdov | 12:684ddfc57199 | 160 | tr_debug("job already has id %d", job->GetID()); |
sgnezdov | 2:9bf5366ad5a2 | 161 | } |
sgnezdov | 2:9bf5366ad5a2 | 162 | node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline); |
sgnezdov | 2:9bf5366ad5a2 | 163 | if (NULL == tmp) { |
sgnezdov | 12:684ddfc57199 | 164 | tr_error("[process] timeline insert failed for job ID %d", job->GetID()); |
sgnezdov | 2:9bf5366ad5a2 | 165 | action->resQueue.put(NULL); |
sgnezdov | 3:f08f55827736 | 166 | // internal state has not changed |
sgnezdov | 2:9bf5366ad5a2 | 167 | return; |
sgnezdov | 2:9bf5366ad5a2 | 168 | } |
sgnezdov | 3:f08f55827736 | 169 | req->response.data = job->GetID(); |
sgnezdov | 10:8cff30b5b90d | 170 | //tr_debug("[process] simulate error"); |
sgnezdov | 2:9bf5366ad5a2 | 171 | //action->resQueue.put(NULL); |
sgnezdov | 1:ec6a1d054065 | 172 | action->resQueue.put(&req->response); |
sgnezdov | 1:ec6a1d054065 | 173 | break; |
sgnezdov | 1:ec6a1d054065 | 174 | } |
sgnezdov | 3:f08f55827736 | 175 | case JobRunAT: { |
sgnezdov | 10:8cff30b5b90d | 176 | tr_debug("[process] JobRunAT"); |
sgnezdov | 3:f08f55827736 | 177 | // execute job run logic after switch |
sgnezdov | 3:f08f55827736 | 178 | break; |
sgnezdov | 3:f08f55827736 | 179 | } |
sgnezdov | 7:98c8b2eabea3 | 180 | case JobQuitAT: { |
sgnezdov | 10:8cff30b5b90d | 181 | tr_debug("[process] JobQuitAT"); |
sgnezdov | 7:98c8b2eabea3 | 182 | action->resQueue.put(NULL); |
sgnezdov | 7:98c8b2eabea3 | 183 | _quitUpdater = true; |
sgnezdov | 7:98c8b2eabea3 | 184 | return; |
sgnezdov | 7:98c8b2eabea3 | 185 | } |
sgnezdov | 15:6b8fa5dff770 | 186 | case JobListAT: { |
sgnezdov | 15:6b8fa5dff770 | 187 | tr_debug("[process] JobListAT"); |
sgnezdov | 15:6b8fa5dff770 | 188 | JobListReq *req = static_cast<JobListReq*>(action); |
sgnezdov | 15:6b8fa5dff770 | 189 | int loc = 1; |
sgnezdov | 15:6b8fa5dff770 | 190 | node<Appointment>* n = _timeline.pop(loc); |
sgnezdov | 15:6b8fa5dff770 | 191 | while (n != NULL) { |
sgnezdov | 15:6b8fa5dff770 | 192 | Job* job = n->data->GetJob(); |
sgnezdov | 15:6b8fa5dff770 | 193 | tr_debug("[process] adding job ID %d to list", job->GetID()); |
sgnezdov | 15:6b8fa5dff770 | 194 | req->_jobsToFill.append(job); |
sgnezdov | 15:6b8fa5dff770 | 195 | ++loc; |
sgnezdov | 15:6b8fa5dff770 | 196 | n = _timeline.pop(loc); |
sgnezdov | 15:6b8fa5dff770 | 197 | } |
sgnezdov | 15:6b8fa5dff770 | 198 | action->resQueue.put(NULL); |
sgnezdov | 15:6b8fa5dff770 | 199 | return; |
sgnezdov | 15:6b8fa5dff770 | 200 | } |
sgnezdov | 1:ec6a1d054065 | 201 | default: |
sgnezdov | 10:8cff30b5b90d | 202 | tr_warn("[process] unknown action type"); |
sgnezdov | 1:ec6a1d054065 | 203 | action->resQueue.put(NULL); |
sgnezdov | 1:ec6a1d054065 | 204 | } |
sgnezdov | 3:f08f55827736 | 205 | node<Appointment> *wakeNode = _timeline.pop(1); |
sgnezdov | 12:684ddfc57199 | 206 | if (wakeNode == NULL) { |
sgnezdov | 12:684ddfc57199 | 207 | tr_debug("[process] found no nodes to run"); |
sgnezdov | 12:684ddfc57199 | 208 | return; |
sgnezdov | 12:684ddfc57199 | 209 | } |
sgnezdov | 3:f08f55827736 | 210 | Appointment *wakeApt = wakeNode->data; |
sgnezdov | 3:f08f55827736 | 211 | Job* wakeJob = wakeApt->GetJob(); |
sgnezdov | 5:d8f69ac330f2 | 212 | if (now < wakeApt->GetTime()) { |
sgnezdov | 3:f08f55827736 | 213 | // request wake up |
sgnezdov | 5:d8f69ac330f2 | 214 | time_t sleepTime = wakeApt->GetTime() - now; |
sgnezdov | 10:8cff30b5b90d | 215 | tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime); |
sgnezdov | 3:f08f55827736 | 216 | WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime); |
sgnezdov | 3:f08f55827736 | 217 | } else { |
sgnezdov | 3:f08f55827736 | 218 | // process job |
sgnezdov | 10:8cff30b5b90d | 219 | tr_debug("[process] job ID %d ready to run", wakeJob->GetID()); |
sgnezdov | 3:f08f55827736 | 220 | _timeline.remove(1); |
sgnezdov | 4:78bcd5a675e1 | 221 | _runs.put(wakeApt); |
sgnezdov | 12:684ddfc57199 | 222 | // make sure we run this function one more time |
sgnezdov | 12:684ddfc57199 | 223 | // in case there are jobs left to run |
sgnezdov | 12:684ddfc57199 | 224 | // |
sgnezdov | 12:684ddfc57199 | 225 | // don't use WakeOnce, because it appears to work only for |
sgnezdov | 12:684ddfc57199 | 226 | // one wake up request at a time. |
sgnezdov | 12:684ddfc57199 | 227 | _updates.put(&jobRunReq); |
sgnezdov | 3:f08f55827736 | 228 | } |
sgnezdov | 4:78bcd5a675e1 | 229 | } |
sgnezdov | 4:78bcd5a675e1 | 230 | |
sgnezdov | 4:78bcd5a675e1 | 231 | void Scheduler::runHandler() { |
sgnezdov | 7:98c8b2eabea3 | 232 | while (!_quitRunner) { |
sgnezdov | 10:8cff30b5b90d | 233 | tr_debug("[runHandler] waiting for action"); |
sgnezdov | 4:78bcd5a675e1 | 234 | // wait forever ... |
sgnezdov | 4:78bcd5a675e1 | 235 | osEvent evt = _runs.get(); |
sgnezdov | 4:78bcd5a675e1 | 236 | if (evt.status == osEventMessage) { |
sgnezdov | 10:8cff30b5b90d | 237 | tr_debug("[runHandler] run action"); |
sgnezdov | 4:78bcd5a675e1 | 238 | Appointment *apt = (Appointment*)evt.value.p; |
sgnezdov | 7:98c8b2eabea3 | 239 | if (NULL == apt) { |
sgnezdov | 10:8cff30b5b90d | 240 | tr_debug("[runHandler] quit requested"); |
sgnezdov | 7:98c8b2eabea3 | 241 | _quitRunner = true; |
sgnezdov | 7:98c8b2eabea3 | 242 | break; |
sgnezdov | 7:98c8b2eabea3 | 243 | } |
sgnezdov | 5:d8f69ac330f2 | 244 | Job *job = apt->GetJob(); |
sgnezdov | 10:8cff30b5b90d | 245 | JobType *jt = _jobService->GetJob(job->GetTypeID()); |
sgnezdov | 10:8cff30b5b90d | 246 | if (jt == NULL) { |
sgnezdov | 10:8cff30b5b90d | 247 | tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID()); |
sgnezdov | 5:d8f69ac330f2 | 248 | // NO reschedule |
sgnezdov | 5:d8f69ac330f2 | 249 | delete apt; |
sgnezdov | 5:d8f69ac330f2 | 250 | continue; |
sgnezdov | 5:d8f69ac330f2 | 251 | } |
sgnezdov | 10:8cff30b5b90d | 252 | tr_debug("Job Started"); |
sgnezdov | 10:8cff30b5b90d | 253 | jt->RunJob(); |
sgnezdov | 10:8cff30b5b90d | 254 | tr_debug("Job Finished"); |
sgnezdov | 5:d8f69ac330f2 | 255 | this->reschedule(apt); |
sgnezdov | 4:78bcd5a675e1 | 256 | } else { |
sgnezdov | 10:8cff30b5b90d | 257 | tr_error("[runHandler] NOT osEventMessage"); |
sgnezdov | 4:78bcd5a675e1 | 258 | } |
sgnezdov | 4:78bcd5a675e1 | 259 | } |
sgnezdov | 4:78bcd5a675e1 | 260 | } |
sgnezdov | 3:f08f55827736 | 261 | |
sgnezdov | 0:806403f3d0d1 | 262 | |
sgnezdov | 0:806403f3d0d1 | 263 | } |