source: nscp/modules/Scheduler/simple_scheduler.cpp @ e11d494

0.4.00.4.10.4.2
Last change on this file since e11d494 was acf0660, checked in by Michael Medin <michael@…>, 3 years ago

renamed to_string as it clashed with some boost classes, as well as refactoring of the cmake build scripts to work better..

  • Property mode set to 100644
File size: 4.5 KB
Line 
1#include "simple_scheduler.hpp"
2
3#include <boost/bind.hpp>
4#include <strEx.h>
5#include <unicode_char.hpp>
6
7using namespace nscp::helpers;
8
9namespace scheduler {
10
11        int simple_scheduler::add_task(target item) {
12                {
13                        boost::mutex::scoped_lock l(mutex_);
14                        item.id = ++target_id_;
15                        targets_[item.id] = item;
16                }
17                reschedule(item);
18                return item.id;
19        }
20        void simple_scheduler::remove_task(int id) {
21                boost::mutex::scoped_lock l(mutex_);
22                target_list_type::iterator it = targets_.find(id);
23                targets_.erase(it);
24        }
25        boost::optional<target> simple_scheduler::get_task(int id) {
26                boost::mutex::scoped_lock l(mutex_);
27                target_list_type::iterator it = targets_.find(id);
28                if (it == targets_.end())
29                        return boost::optional<target>();
30                return boost::optional<target>((*it).second);
31        }
32
33        void simple_scheduler::start() {
34                running_ = true;
35                start_thread();
36        }
37        void simple_scheduler::stop() {
38                running_ = false;
39                stop_requested_ = true;
40                threads_.interrupt_all();
41                threads_.join_all();
42        }
43
44        void simple_scheduler::start_thread() {
45                if (!running_)
46                        return;
47                stop_requested_ = false;
48                int missing_threads = thread_count_ - threads_.size();
49                if (missing_threads > 0 && missing_threads <= thread_count_) {
50                        for (int i=0;i<missing_threads;i++) {
51                                //std::wcout << _T("***START_THREAD: ") << threads_.size() << std::endl;
52                                threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this, i));
53                        }
54                }
55                threads_.create_thread(boost::bind(&simple_scheduler::watch_dog, this, 0));
56                //thread_ = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&simple_scheduler::thread_proc, this)));
57        }
58
59        void simple_scheduler::watch_dog(int id) {
60
61                schedule_queue_type::value_type instance;
62                while(!stop_requested_) {
63                        instance = queue_.top();
64                        if (instance) {
65                                boost::posix_time::time_duration off = now() - (*instance).time;
66                                if (off.total_seconds() > error_threshold_) {
67                                        log_error(_T("NOONE IS HANDLING scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id));
68                                }
69                        } else {
70                                log_error(_T("Nothing is scheduled to run"));
71                        }
72
73                        // add support for checking queue length
74                        boost::thread::sleep(boost::get_system_time() + boost::posix_time::seconds(5));
75                }
76
77        }
78
79        void simple_scheduler::thread_proc(int id) {
80                int iteration = 0;
81                schedule_queue_type::value_type instance;
82                while (!stop_requested_) {
83                        instance = queue_.pop();
84                        if (!instance) {
85                                boost::unique_lock<boost::mutex> lock(idle_thread_mutex_);
86                                idle_thread_cond_.wait(lock);
87                                continue;
88                        }
89
90                        try {
91                                boost::posix_time::time_duration off = now() - (*instance).time;
92                                if (off.total_seconds() > error_threshold_) {
93                                        log_error(_T("Ran scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id));
94                                }
95                                boost::thread::sleep((*instance).time);
96                        } catch (boost::thread_interrupted  &e) {
97                                if (!queue_.push(*instance))
98                                        log_error(_T("ERROR"));
99                                if (stop_requested_) {
100                                        log_error(_T("Terminating thread: ") + to_wstring(id));
101                                        return;
102                                }
103                                continue;
104                        } catch (...) {
105                                if (!queue_.push(*instance))
106                                        log_error(_T("ERROR"));
107                                continue;
108                        }
109
110                        boost::posix_time::ptime now_time = now();
111                        boost::optional<target> item = get_task((*instance).schedule_id);
112                        if (item) {
113                                try {
114                                        if (handler_)
115                                                handler_->handle_schedule(*item);
116                                        reschedule(*item,now_time);
117                                } catch (...) {
118                                        log_error(_T("UNKNOWN ERROR RUNING TASK: "));
119                                        reschedule(*item);
120                                }
121                        } else {
122                                log_error(_T("Task not found: ") + to_wstring((*instance).schedule_id));
123                        }
124                }
125        }
126
127        void simple_scheduler::reschedule(target item) {
128                reschedule_wnext(item, now() + boost::posix_time::seconds(rand()%item.duration.total_seconds()));
129        }
130        void simple_scheduler::reschedule(target item, boost::posix_time::ptime now) {
131                reschedule_wnext(item, now + item.duration);
132        }
133        void simple_scheduler::reschedule_wnext(target item, boost::posix_time::ptime next) {
134                schedule_instance instance;
135                instance.schedule_id = item.id;
136                instance.time = next;
137                if (!queue_.push(instance)) {
138                        log_error(_T("ERROR"));
139                }
140                idle_thread_cond_.notify_one();
141        }
142
143
144        void simple_scheduler::log_error(std::wstring err) {
145                if (handler_)
146                        handler_->on_error(err);
147        }
148
149}
150
151
152
Note: See TracBrowser for help on using the repository browser.