123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #ifndef _FY_HANDLER_HPP_
- #define _FY_HANDLER_HPP_
- #include <unistd.h>
- #include <errno.h>
- #include <string.h>
- #include <sys/eventfd.h>
- #include <sys/epoll.h>
- #include <pthread.h>
- #include <list>
- #include <system/Thread.h>
- #include "time.hpp"
- #include "utils/Log.h"
- #define TEMP_FAILURE_RETRY2(exp) ({ \
- int _rc; \
- do { \
- _rc = (exp); \
- } while (_rc == -1 && errno == EINTR); \
- _rc; })
- namespace base {
- template <typename T = void*>
- class handler;
- template <typename T = void*>
- class message {
- public:
- message(int what): what(what),when_(0),period_(0),runable_(NULL),user_data_(NULL) {
- }
- message(int what, T obj): what(what),obj(obj),when_(0),period_(0),runable_(NULL),user_data_(NULL) {
- }
- int what;
- T obj;
- private:
- int64_t when_;
- int64_t period_;
- void* runable_;
- const void* user_data_;
- friend class base::handler<T>;
- };
- template <typename T>
- class handler {
- public:
- typedef void (*func_message_handler)(const message<T>* msg, const void* user_data);
- typedef void (*func_runable)(const void* user_data);
- public:
- explicit handler(func_message_handler func_handler, const void* user_data)
- :looper_(this), user_data_((void*)user_data) {
- initialize();
- message_handler_ = func_handler;
- }
- explicit handler():looper_(this), user_data_(NULL) {
- initialize();
- }
- ~handler() {
- exit_ = true;
- wake();
- looper_.requestExitAndWait();
- close(epoll_fd_);
- close(wake_event_fd_);
- message_queue_.clear();
- }
- /**
- * typedef void (*func_message_handler)(const message* msg, const void* user_data);
- */
- void set_message_handler(func_message_handler func, const void* user_data) {
- message_handler_ = func;
- user_data_ = (void*)user_data;
- }
- void send_message(int what) {
- message<T> msg(what);
- enqueue_with_lock(msg);
- }
- void send_message(message<T> &msg) {
- enqueue_with_lock(msg);
- }
- void send_message_delayed(int what, int delay_millis) {
- message<T> msg(what);
- msg.when_ = base::time::uptime() + delay_millis;
- enqueue_with_lock(msg);
- }
- void send_message_delayed(message<T> &msg, int delay_millis) {
- msg.when_ = base::time::uptime() + delay_millis;
- enqueue_with_lock(msg);
- }
- void post(func_runable runable, const void* user_data) {
- message<T> msg(0);
- msg.runable_ = (void*)runable;
- msg.user_data_ = user_data;
- enqueue_with_lock(msg);
- }
- /**
- * typedef void (*func_runable)(const void* user_data);
- */
- void post_delayed(func_runable runable, const void* user_data, int delay_millis) {
- message<T> msg(0);
- msg.runable_ = (void*)runable;
- msg.user_data_ = user_data;
- msg.when_ = base::time::uptime() + delay_millis;
- enqueue_with_lock(msg);
- }
- void schedule(int what, int period_millis, int delay_millis) {
- message<T> msg(what);
- msg.when_ = base::time::uptime() + delay_millis;
- msg.period_ = period_millis;
- enqueue_with_lock(msg);
- }
- void schedule(message<T> &msg, int period_millis, int delay_millis) {
- msg.when_ = base::time::uptime() + delay_millis;
- msg.period_ = period_millis;
- enqueue_with_lock(msg);
- }
- void remove_messages(int what) {
- Mutex::Autolock lock(mutex_);
- typename std::list<message<T> >::iterator it;
- for (it = message_queue_.begin(); it != message_queue_.end(); ) {
- if (it->what == what) {
- it = message_queue_.erase(it);
- } else {
- ++it;
- }
- }
- }
- typedef bool (*remove_filter)(const message<T>& msg, void* user_data);
- void remove_messages(remove_filter filter, void* user_data) {
- Mutex::Autolock lock(mutex_);
- typename std::list<message<T> >::iterator it;
- for (it = message_queue_.begin(); it != message_queue_.end(); ) {
- if (filter(*it, user_data)) {
- it = message_queue_.erase(it);
- } else {
- ++it;
- }
- }
- }
- bool has_message(int what) {
- Mutex::Autolock lock(mutex_);
- typename std::list<message<T> >::iterator it;
- for (it = message_queue_.begin(); it != message_queue_.end(); ) {
- if (it->what == what) {
- return true;
- }
- }
- return false;
- }
- private:
- void initialize() {
- init_ = false;
- wake_event_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- message_handler_ = NULL;
- const int kEpollSizeHint = 1;
- epoll_fd_ = epoll_create(kEpollSizeHint);
- if (epoll_fd_ < 0) {
- LOGE("%s %d epoll_create failed", __func__, __LINE__);
- return;
- }
- struct epoll_event eventItem;
- memset(&eventItem, 0, sizeof(epoll_event));
- eventItem.events = EPOLLIN | EPOLLET;
- eventItem.data.fd = wake_event_fd_;
- int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wake_event_fd_, &eventItem);
- if (result < 0) {
- LOGE("%s %d epoll_ctl failed", __func__, __LINE__);
- return;
- }
- exit_ = false;
- init_ = true;
- looper_.run("sup.handler");
- }
- void loop() {
- while (!exit_) {
- if (message_queue_.empty()) {
- wait(10 * 1000);
- awoken();
- continue;
- }
- mutex_.lock();
- base::message<T> front = message_queue_.front();
- int64_t now = base::time::uptime();
- if (front.when_ <= now) {
- message_queue_.pop_front();
- if (front.period_ > 0) {
- front.when_ = base::time::uptime() + front.period_;
- enqueue(front);
- }
- mutex_.unlock();
- if (front.runable_) {
- ((func_runable)front.runable_)(front.user_data_);
- } else if (message_handler_) {
- message_handler_(&front, user_data_);
- }
- continue;
- }
- mutex_.unlock();
- wait(front.when_ - now);
- awoken();
- }
- }
- void wait(int timeout_millis) {
- const int kEpollMaxEvents = 1;
- struct epoll_event eventItems[kEpollMaxEvents];
- int event_count = epoll_wait(epoll_fd_, eventItems, kEpollMaxEvents, timeout_millis);
- if (event_count < 0) {
- LOGE("%s %d epoll_wait failed", __func__, __LINE__);
- }
- }
- void wake() {
- uint64_t inc = 1;
- ssize_t nWrite = TEMP_FAILURE_RETRY2(write(wake_event_fd_, &inc, sizeof(uint64_t)));
- if (nWrite != sizeof(uint64_t)) {
- if (errno != EAGAIN) {
- LOGE("Could not write wake signal, errno=%d", errno);
- }
- }
- }
- void awoken() {
- uint64_t counter = 0;
- TEMP_FAILURE_RETRY2(read(wake_event_fd_, &counter, sizeof(uint64_t)));
- }
- void enqueue(const message<T> &msg) {
- typename std::list<message<T> >::iterator it;
- for (it = message_queue_.begin(); it != message_queue_.end(); ++it) {
- if (msg.when_ < it->when_) {
- it = message_queue_.insert(it, msg);
- if (it == message_queue_.begin()) {
- wake();
- }
- return;
- }
- }
- message_queue_.push_back(msg);
- if (message_queue_.size() == 1) {
- wake();
- }
- }
- void enqueue_with_lock(const message<T> &msg) {
- Mutex::Autolock lock(mutex_);
- enqueue(msg);
- }
- class Looper : public Thread {
- public:
- Looper(handler<T> *handler):handler_(handler){}
- virtual bool threadLoop() {
- handler_->loop();
- return false;
- }
- handler<T>* handler_;
- };
- Looper looper_;
- std::list<message<T> > message_queue_;
- func_message_handler message_handler_;
- void* user_data_;
- int wake_event_fd_;
- int epoll_fd_;
- bool init_;
- volatile bool exit_;
- Mutex mutex_;
- };
- } /* namespace sup */
- #endif /* _FY_HANDLER_HPP_ */
|