source: nscp/modules/Scheduler/simple_scheduler.cpp @ 1bfe6f0

0.4.00.4.10.4.2
Last change on this file since 1bfe6f0 was 1bfe6f0, checked in by Michael Medin <michael@…>, 3 years ago

Finnished Scheduler and added basic "Notify" support (no sinks yet)

  • Property mode set to 100644
File size: 3.5 KB
Line 
1#include "simple_scheduler.hpp"
2#include <unicode_char.hpp>
3
4namespace scheduler {
5
6        int simple_scheduler::add_task(target item) {
7                {
8                        boost::mutex::scoped_lock l(mutex_);
9                        item.id = ++target_id_;
10                        targets_[item.id] = item;
11                }
12                reschedule(item);
13                return item.id;
14        }
15        void simple_scheduler::remove_task(int id) {
16                boost::mutex::scoped_lock l(mutex_);
17                target_list_type::iterator it = targets_.find(id);
18                targets_.erase(it);
19        }
20        boost::optional<target> simple_scheduler::get_task(int id) {
21                boost::mutex::scoped_lock l(mutex_);
22                target_list_type::iterator it = targets_.find(id);
23                if (it == targets_.end())
24                        return boost::optional<target>();
25                return boost::optional<target>((*it).second);
26        }
27
28        void simple_scheduler::start() {
29                running_ = true;
30                if (!queue_.empty())
31                        start_thread();
32        }
33        void simple_scheduler::stop() {
34                running_ = false;
35                //if (!thread_)
36                //      return;
37                stop_requested_ = true;
38                threads_.interrupt_all();
39                threads_.join_all();
40                /*
41                if (!threads.join_all(boost::posix_time::seconds(5))) {
42                        std::wcout << _T("FAILED TO TERMINATE!!!") << std::endl;
43                } else {
44                        std::wcout << _T("THREAD TERMINATED NICELY!") << std::endl;
45                }
46                */
47        }
48
49        void simple_scheduler::start_thread() {
50                if (!running_)
51                        return;
52                stop_requested_ = false;
53                int missing_threads = thread_count_ - threads_.size();
54                if (missing_threads > 0 && missing_threads <= thread_count_) {
55                        for (int i=0;i<missing_threads;i++) {
56                                std::wcout << _T("***START_THREAD***") << std::endl;
57                                threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this));
58                        }
59                }
60                //thread_ = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&simple_scheduler::thread_proc, this)));
61        }
62
63        void simple_scheduler::thread_proc() {
64                int iteration = 0;
65                schedule_queue_type::value_type instance;
66                while (!stop_requested_) {
67                        instance = queue_.pop();
68                        if (!instance)
69                                return;
70
71                        try {
72
73                                boost::posix_time::time_duration off = now() - (*instance).time;
74                                if (off.total_seconds() > 0) {
75                                        std::wcout << _T("MISSED IT!") << off.total_seconds() << std::endl;
76                                }
77                                boost::thread::sleep((*instance).time);
78                        } catch (boost::thread_interrupted  &e) {
79                                if (!queue_.push(*instance))
80                                        std::wcout << _T("ERROR") << std::endl;
81                                if (stop_requested_)
82                                        return;
83                                continue;
84                        } catch (...) {
85                                if (!queue_.push(*instance))
86                                        std::wcout << _T("ERROR") << std::endl;
87                                std::wcout << _T("ERROR!!!") << std::endl;
88                                return;
89                        }
90
91                        boost::posix_time::ptime now_time = now();
92                        boost::optional<target> item = get_task((*instance).schedule_id);
93                        if (item) {
94                                try {
95                                        if (handler_)
96                                                handler_->handle_schedule(*item);
97                                        reschedule(*item,now_time);
98                                } catch (...) {
99                                        std::wcout << _T("UNKNOWN ERROR RUNING TASK: ") << std::endl;
100                                        reschedule(*item);
101                                }
102                        } else {
103                                std::wcout << _T("Task not found: ") << (*instance).schedule_id << std::endl;
104                        }
105                }
106        }
107
108        void simple_scheduler::reschedule(target item) {
109                reschedule_wnext(item, now() + boost::posix_time::seconds(rand()%item.duration.total_seconds()));
110        }
111        void simple_scheduler::reschedule(target item, boost::posix_time::ptime now) {
112                reschedule_wnext(item, now + item.duration);
113        }
114        void simple_scheduler::reschedule_wnext(target item, boost::posix_time::ptime next) {
115                schedule_instance instance;
116                instance.schedule_id = item.id;
117                instance.time = next;
118                if (!queue_.push(instance)) {
119                        std::wcout << _T("ERROR") << std::endl;
120                }
121                start_thread();
122        }
123}
124
125
126
Note: See TracBrowser for help on using the repository browser.