Changeset c0d7e82 in nscp for modules/Scheduler/simple_scheduler.cpp
- Timestamp:
- 02/05/10 08:03:49 (3 years ago)
- Branches:
- master, 0.4.0, 0.4.1, 0.4.2
- Children:
- f0e6036
- Parents:
- 3080680
- File:
-
- 1 edited
-
modules/Scheduler/simple_scheduler.cpp (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
modules/Scheduler/simple_scheduler.cpp
r6822839 rc0d7e82 2 2 3 3 #include <boost/bind.hpp> 4 4 #include <strEx.h> 5 5 #include <unicode_char.hpp> 6 6 … … 31 31 void simple_scheduler::start() { 32 32 running_ = true; 33 if (!queue_.empty()) 34 start_thread(); 33 start_thread(); 35 34 } 36 35 void simple_scheduler::stop() { 37 36 running_ = false; 38 //if (!thread_)39 // return;40 37 stop_requested_ = true; 41 38 threads_.interrupt_all(); 42 39 threads_.join_all(); 43 /*44 if (!threads.join_all(boost::posix_time::seconds(5))) {45 std::wcout << _T("FAILED TO TERMINATE!!!") << std::endl;46 } else {47 std::wcout << _T("THREAD TERMINATED NICELY!") << std::endl;48 }49 */50 40 } 51 41 … … 57 47 if (missing_threads > 0 && missing_threads <= thread_count_) { 58 48 for (int i=0;i<missing_threads;i++) { 59 std::wcout << _T("***START_THREAD***") << std::endl;60 threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this ));49 //std::wcout << _T("***START_THREAD: ") << threads_.size() << std::endl; 50 threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this, i)); 61 51 } 62 52 } 53 threads_.create_thread(boost::bind(&simple_scheduler::watch_dog, this, 0)); 63 54 //thread_ = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&simple_scheduler::thread_proc, this))); 64 55 } 65 56 66 void simple_scheduler::thread_proc() { 57 void simple_scheduler::watch_dog(int id) { 58 59 schedule_queue_type::value_type instance; 60 while(!stop_requested_) { 61 instance = queue_.top(); 62 if (instance) { 63 boost::posix_time::time_duration off = now() - (*instance).time; 64 if (off.total_seconds() > error_threshold_) { 65 log_error(_T("NOONE IS HANDLING scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id)); 66 } 67 } else { 68 log_error(_T("Nothing is scheduled to run")); 69 } 70 71 // add support for checking queue length 72 boost::thread::sleep(boost::get_system_time() + boost::posix_time::seconds(5)); 73 } 74 75 } 76 77 void simple_scheduler::thread_proc(int id) { 67 78 int iteration = 0; 68 79 schedule_queue_type::value_type instance; 69 80 while (!stop_requested_) { 70 81 instance = queue_.pop(); 71 if (!instance) 72 return; 82 if (!instance) { 83 boost::unique_lock<boost::mutex> lock(idle_thread_mutex_); 84 idle_thread_cond_.wait(lock); 85 continue; 86 } 73 87 74 88 try { 75 76 89 boost::posix_time::time_duration off = now() - (*instance).time; 77 if (off.total_seconds() > 0) {78 std::wcout << _T("MISSED IT!") << off.total_seconds() << std::endl;90 if (off.total_seconds() > error_threshold_) { 91 log_error(_T("Ran scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id)); 79 92 } 80 93 boost::thread::sleep((*instance).time); 81 94 } catch (boost::thread_interrupted &e) { 82 95 if (!queue_.push(*instance)) 83 std::wcout << _T("ERROR") << std::endl; 84 if (stop_requested_) 96 log_error(_T("ERROR")); 97 if (stop_requested_) { 98 log_error(_T("Terminating thread: ") + to_wstring(id)); 85 99 return; 100 } 86 101 continue; 87 102 } catch (...) { 88 103 if (!queue_.push(*instance)) 89 std::wcout << _T("ERROR") << std::endl; 90 std::wcout << _T("ERROR!!!") << std::endl; 91 return; 104 log_error(_T("ERROR")); 105 continue; 92 106 } 93 107 … … 100 114 reschedule(*item,now_time); 101 115 } catch (...) { 102 std::wcout << _T("UNKNOWN ERROR RUNING TASK: ") << std::endl;116 log_error(_T("UNKNOWN ERROR RUNING TASK: ")); 103 117 reschedule(*item); 104 118 } 105 119 } else { 106 std::wcout << _T("Task not found: ") << (*instance).schedule_id << std::endl;120 log_error(_T("Task not found: ") + to_wstring((*instance).schedule_id)); 107 121 } 108 122 } … … 120 134 instance.time = next; 121 135 if (!queue_.push(instance)) { 122 std::wcout << _T("ERROR") << std::endl;136 log_error(_T("ERROR")); 123 137 } 124 start_thread();138 idle_thread_cond_.notify_one(); 125 139 } 140 141 142 void simple_scheduler::log_error(std::wstring err) { 143 if (handler_) 144 handler_->on_error(err); 145 } 146 126 147 } 127 148
Note: See TracChangeset
for help on using the changeset viewer.








