Ignore:
Timestamp:
02/05/10 08:03:49 (3 years ago)
Author:
Michael Medin <michael@…>
Branches:
master, 0.4.0, 0.4.1, 0.4.2
Children:
f0e6036
Parents:
3080680
Message:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • modules/Scheduler/simple_scheduler.cpp

    r6822839 rc0d7e82  
    22 
    33#include <boost/bind.hpp> 
    4  
     4#include <strEx.h> 
    55#include <unicode_char.hpp> 
    66 
     
    3131  void simple_scheduler::start() { 
    3232    running_ = true; 
    33     if (!queue_.empty()) 
    34       start_thread(); 
     33    start_thread(); 
    3534  } 
    3635  void simple_scheduler::stop() { 
    3736    running_ = false; 
    38     //if (!thread_) 
    39     //  return; 
    4037    stop_requested_ = true; 
    4138    threads_.interrupt_all(); 
    4239    threads_.join_all(); 
    43     /* 
    44     if (!threads.join_all(boost::posix_time::seconds(5))) { 
    45       std::wcout << _T("FAILED TO TERMINATE!!!") << std::endl; 
    46     } else { 
    47       std::wcout << _T("THREAD TERMINATED NICELY!") << std::endl; 
    48     } 
    49     */ 
    5040  } 
    5141 
     
    5747    if (missing_threads > 0 && missing_threads <= thread_count_) { 
    5848      for (int i=0;i<missing_threads;i++) { 
    59         std::wcout << _T("***START_THREAD***") << std::endl; 
    60         threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this)); 
     49        //std::wcout << _T("***START_THREAD: ") << threads_.size() << std::endl; 
     50        threads_.create_thread(boost::bind(&simple_scheduler::thread_proc, this, i)); 
    6151      } 
    6252    } 
     53    threads_.create_thread(boost::bind(&simple_scheduler::watch_dog, this, 0)); 
    6354    //thread_ = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&simple_scheduler::thread_proc, this))); 
    6455  } 
    6556 
    66   void simple_scheduler::thread_proc() { 
     57  void simple_scheduler::watch_dog(int id) { 
     58 
     59    schedule_queue_type::value_type instance; 
     60    while(!stop_requested_) { 
     61      instance = queue_.top(); 
     62      if (instance) { 
     63        boost::posix_time::time_duration off = now() - (*instance).time; 
     64        if (off.total_seconds() > error_threshold_) { 
     65          log_error(_T("NOONE IS HANDLING scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id)); 
     66        } 
     67      } else { 
     68        log_error(_T("Nothing is scheduled to run")); 
     69      } 
     70 
     71      // add support for checking queue length 
     72      boost::thread::sleep(boost::get_system_time() + boost::posix_time::seconds(5)); 
     73    } 
     74 
     75  } 
     76 
     77  void simple_scheduler::thread_proc(int id) { 
    6778    int iteration = 0; 
    6879    schedule_queue_type::value_type instance; 
    6980    while (!stop_requested_) { 
    7081      instance = queue_.pop(); 
    71       if (!instance) 
    72         return; 
     82      if (!instance) { 
     83        boost::unique_lock<boost::mutex> lock(idle_thread_mutex_); 
     84        idle_thread_cond_.wait(lock); 
     85        continue; 
     86      } 
    7387 
    7488      try { 
    75  
    7689        boost::posix_time::time_duration off = now() - (*instance).time; 
    77         if (off.total_seconds() > 0) { 
    78           std::wcout << _T("MISSED IT!") << off.total_seconds() << std::endl; 
     90        if (off.total_seconds() > error_threshold_) { 
     91          log_error(_T("Ran scheduled item ") + to_wstring((*instance).schedule_id) + _T(" ") + to_wstring(off.total_seconds()) + _T(" seconds to late from thread ") + to_wstring(id)); 
    7992        } 
    8093        boost::thread::sleep((*instance).time); 
    8194      } catch (boost::thread_interrupted  &e) { 
    8295        if (!queue_.push(*instance)) 
    83           std::wcout << _T("ERROR") << std::endl; 
    84         if (stop_requested_) 
     96          log_error(_T("ERROR")); 
     97        if (stop_requested_) { 
     98          log_error(_T("Terminating thread: ") + to_wstring(id)); 
    8599          return; 
     100        } 
    86101        continue; 
    87102      } catch (...) { 
    88103        if (!queue_.push(*instance)) 
    89           std::wcout << _T("ERROR") << std::endl; 
    90         std::wcout << _T("ERROR!!!") << std::endl; 
    91         return; 
     104          log_error(_T("ERROR")); 
     105        continue; 
    92106      } 
    93107 
     
    100114          reschedule(*item,now_time); 
    101115        } catch (...) { 
    102           std::wcout << _T("UNKNOWN ERROR RUNING TASK: ") << std::endl; 
     116          log_error(_T("UNKNOWN ERROR RUNING TASK: ")); 
    103117          reschedule(*item); 
    104118        } 
    105119      } else { 
    106         std::wcout << _T("Task not found: ") << (*instance).schedule_id << std::endl; 
     120        log_error(_T("Task not found: ") + to_wstring((*instance).schedule_id)); 
    107121      } 
    108122    } 
     
    120134    instance.time = next; 
    121135    if (!queue_.push(instance)) { 
    122       std::wcout << _T("ERROR") << std::endl; 
     136      log_error(_T("ERROR")); 
    123137    } 
    124     start_thread(); 
     138    idle_thread_cond_.notify_one(); 
    125139  } 
     140 
     141 
     142  void simple_scheduler::log_error(std::wstring err) { 
     143    if (handler_) 
     144      handler_->on_error(err); 
     145  } 
     146 
    126147} 
    127148 
Note: See TracChangeset for help on using the changeset viewer.