This source file includes following definitions.
- OnEvent
- OnShutdown
- OnRegistration
- OnModification
- OnUnregistration
- in_shutdown_
- CleanupFDToCBMap
- CleanupTimeToAlarmCBMap
- AddToReadyList
- RemoveFromReadyList
- RegisterFD
- GetFlags
- SetNonblocking
- epoll_wait_impl
- RegisterFDForWrite
- RegisterFDForReadWrite
- RegisterFDForRead
- UnregisterFD
- ModifyCallback
- StopRead
- StartRead
- StopWrite
- StartWrite
- HandleEvent
- WaitForEventsAndExecuteCallbacks
- SetFDReady
- SetFDNotReady
- IsFDReady
- VerifyReadyList
- RegisterAlarm
- UnregisterAlarm
- NumFDsRegistered
- Wake
- NowInUsec
- ApproximateNowInUsec
- EventMaskToString
- LogStateOnCrash
- DelFD
- AddFD
- ModFD
- ModifyFD
- WaitForEventsAndCallHandleEvents
- CallReadyListCallbacks
- CallAndReregisterAlarmEvents
- registered_
- OnAlarm
- OnRegistration
- OnUnregistration
- OnShutdown
- UnregisterIfRegistered
#include "net/tools/epoll_server/epoll_server.h"
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <algorithm>
#include <utility>
#include <vector>
#include "base/logging.h"
#include "base/timer/timer.h"
static const int kErrorBufferSize = 256;
namespace net {
class ReadPipeCallback : public EpollCallbackInterface {
public:
virtual void OnEvent(int fd, EpollEvent* event) OVERRIDE {
DCHECK(event->in_events == EPOLLIN);
int data;
int data_read = 1;
while (data_read > 0) {
data_read = read(fd, &data, sizeof(data));
}
}
virtual void OnShutdown(EpollServer *eps, int fd) OVERRIDE {}
virtual void OnRegistration(EpollServer*, int, int) OVERRIDE {}
virtual void OnModification(int, int) OVERRIDE {}
virtual void OnUnregistration(int, bool) OVERRIDE {}
};
EpollServer::EpollServer()
: epoll_fd_(epoll_create(1024)),
timeout_in_us_(0),
recorded_now_in_us_(0),
ready_list_size_(0),
wake_cb_(new ReadPipeCallback),
read_fd_(-1),
write_fd_(-1),
in_wait_for_events_and_execute_callbacks_(false),
in_shutdown_(false) {
CHECK_NE(epoll_fd_, -1);
LIST_INIT(&ready_list_);
LIST_INIT(&tmp_list_);
int pipe_fds[2];
if (pipe(pipe_fds) < 0) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Error " << saved_errno
<< " in pipe(): " << strerror_r(saved_errno, buf, sizeof(buf));
}
read_fd_ = pipe_fds[0];
write_fd_ = pipe_fds[1];
RegisterFD(read_fd_, wake_cb_.get(), EPOLLIN);
}
void EpollServer::CleanupFDToCBMap() {
FDToCBMap::iterator cb_iter = cb_map_.begin();
while (cb_iter != cb_map_.end()) {
int fd = cb_iter->fd;
CB* cb = cb_iter->cb;
cb_iter->in_use = true;
if (cb) {
cb->OnShutdown(this, fd);
}
cb_map_.erase(cb_iter);
cb_iter = cb_map_.begin();
}
}
void EpollServer::CleanupTimeToAlarmCBMap() {
TimeToAlarmCBMap::iterator erase_it;
for (TimeToAlarmCBMap::iterator i = alarm_map_.begin();
i != alarm_map_.end();
) {
i->second->OnShutdown(this);
erase_it = i;
++i;
alarm_map_.erase(erase_it);
}
}
EpollServer::~EpollServer() {
DCHECK_EQ(in_shutdown_, false);
in_shutdown_ = true;
#ifdef EPOLL_SERVER_EVENT_TRACING
LOG(INFO) << "\n" << event_recorder_;
#endif
VLOG(2) << "Shutting down epoll server ";
CleanupFDToCBMap();
LIST_INIT(&ready_list_);
LIST_INIT(&tmp_list_);
CleanupTimeToAlarmCBMap();
close(read_fd_);
close(write_fd_);
close(epoll_fd_);
}
inline void EpollServer::AddToReadyList(CBAndEventMask* cb_and_mask) {
if (cb_and_mask->entry.le_prev == NULL) {
LIST_INSERT_HEAD(&ready_list_, cb_and_mask, entry);
++ready_list_size_;
}
}
inline void EpollServer::RemoveFromReadyList(
const CBAndEventMask& cb_and_mask) {
if (cb_and_mask.entry.le_prev != NULL) {
LIST_REMOVE(&cb_and_mask, entry);
cb_and_mask.entry.le_prev = NULL;
--ready_list_size_;
if (ready_list_size_ == 0) {
DCHECK(ready_list_.lh_first == NULL);
DCHECK(tmp_list_.lh_first == NULL);
}
}
}
void EpollServer::RegisterFD(int fd, CB* cb, int event_mask) {
CHECK(cb);
VLOG(3) << "RegisterFD fd=" << fd << " event_mask=" << event_mask;
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (cb_map_.end() != fd_i) {
CB* other_cb = fd_i->cb;
if (other_cb) {
RemoveFromReadyList(*fd_i);
other_cb->OnUnregistration(fd, true);
ModFD(fd, event_mask);
} else {
AddFD(fd, event_mask);
}
fd_i->cb = cb;
fd_i->event_mask = event_mask;
fd_i->events_to_fake = 0;
} else {
AddFD(fd, event_mask);
cb_map_.insert(CBAndEventMask(cb, event_mask, fd));
}
SetNonblocking(fd);
cb->OnRegistration(this, fd, event_mask);
}
int EpollServer::GetFlags(int fd) {
return fcntl(fd, F_GETFL, 0);
}
void EpollServer::SetNonblocking(int fd) {
int flags = GetFlags(fd);
if (flags == -1) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Error " << saved_errno
<< " doing fcntl(" << fd << ", F_GETFL, 0): "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
if (!(flags & O_NONBLOCK)) {
int saved_flags = flags;
flags = SetFlags(fd, flags | O_NONBLOCK);
if (flags == -1) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Error " << saved_errno
<< " doing fcntl(" << fd << ", F_SETFL, " << saved_flags << "): "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
}
}
int EpollServer::epoll_wait_impl(int epfd,
struct epoll_event* events,
int max_events,
int timeout_in_ms) {
return epoll_wait(epfd, events, max_events, timeout_in_ms);
}
void EpollServer::RegisterFDForWrite(int fd, CB* cb) {
RegisterFD(fd, cb, EPOLLOUT);
}
void EpollServer::RegisterFDForReadWrite(int fd, CB* cb) {
RegisterFD(fd, cb, EPOLLIN | EPOLLOUT);
}
void EpollServer::RegisterFDForRead(int fd, CB* cb) {
RegisterFD(fd, cb, EPOLLIN);
}
void EpollServer::UnregisterFD(int fd) {
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (cb_map_.end() == fd_i || fd_i->cb == NULL) {
return;
}
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordUnregistration(fd);
#endif
CB* cb = fd_i->cb;
RemoveFromReadyList(*fd_i);
DelFD(fd);
cb->OnUnregistration(fd, false);
if (!fd_i->in_use) {
cb_map_.erase(fd_i);
} else {
fd_i->cb = NULL;
fd_i->event_mask = 0;
fd_i->events_to_fake = 0;
}
}
void EpollServer::ModifyCallback(int fd, int event_mask) {
ModifyFD(fd, ~0, event_mask);
}
void EpollServer::StopRead(int fd) {
ModifyFD(fd, EPOLLIN, 0);
}
void EpollServer::StartRead(int fd) {
ModifyFD(fd, 0, EPOLLIN);
}
void EpollServer::StopWrite(int fd) {
ModifyFD(fd, EPOLLOUT, 0);
}
void EpollServer::StartWrite(int fd) {
ModifyFD(fd, 0, EPOLLOUT);
}
void EpollServer::HandleEvent(int fd, int event_mask) {
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordEpollEvent(fd, event_mask);
#endif
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (fd_i == cb_map_.end() || fd_i->cb == NULL) {
return;
}
fd_i->events_asserted = event_mask;
CBAndEventMask* cb_and_mask = const_cast<CBAndEventMask*>(&*fd_i);
AddToReadyList(cb_and_mask);
}
class TrueFalseGuard {
public:
explicit TrueFalseGuard(bool* guarded_bool) : guarded_bool_(guarded_bool) {
DCHECK(guarded_bool_ != NULL);
DCHECK(*guarded_bool_ == false);
*guarded_bool_ = true;
}
~TrueFalseGuard() {
*guarded_bool_ = false;
}
private:
bool* guarded_bool_;
};
void EpollServer::WaitForEventsAndExecuteCallbacks() {
if (in_wait_for_events_and_execute_callbacks_) {
LOG(DFATAL) <<
"Attempting to call WaitForEventsAndExecuteCallbacks"
" when an ancestor to the current function is already"
" WaitForEventsAndExecuteCallbacks!";
return;
}
TrueFalseGuard recursion_guard(&in_wait_for_events_and_execute_callbacks_);
if (alarm_map_.empty()) {
WaitForEventsAndCallHandleEvents(timeout_in_us_,
events_,
events_size_);
recorded_now_in_us_ = 0;
return;
}
int64 now_in_us = NowInUsec();
int64 next_alarm_time_in_us = alarm_map_.begin()->first;
VLOG(4) << "next_alarm_time = " << next_alarm_time_in_us
<< " now = " << now_in_us
<< " timeout_in_us = " << timeout_in_us_;
int64 wait_time_in_us;
int64 alarm_timeout_in_us = next_alarm_time_in_us - now_in_us;
if (alarm_timeout_in_us < timeout_in_us_ || timeout_in_us_ < 0) {
wait_time_in_us = std::max(alarm_timeout_in_us, static_cast<int64>(0));
} else {
wait_time_in_us = timeout_in_us_;
}
VLOG(4) << "wait_time_in_us = " << wait_time_in_us;
WaitForEventsAndCallHandleEvents(wait_time_in_us,
events_,
events_size_);
CallAndReregisterAlarmEvents();
recorded_now_in_us_ = 0;
}
void EpollServer::SetFDReady(int fd, int events_to_fake) {
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (cb_map_.end() != fd_i && fd_i->cb != NULL) {
CBAndEventMask* cb_and_mask = const_cast<CBAndEventMask*>(&*fd_i);
cb_and_mask->events_to_fake = events_to_fake;
AddToReadyList(cb_and_mask);
}
}
void EpollServer::SetFDNotReady(int fd) {
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (cb_map_.end() != fd_i) {
RemoveFromReadyList(*fd_i);
}
}
bool EpollServer::IsFDReady(int fd) const {
FDToCBMap::const_iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
return (cb_map_.end() != fd_i &&
fd_i->cb != NULL &&
fd_i->entry.le_prev != NULL);
}
void EpollServer::VerifyReadyList() const {
int count = 0;
CBAndEventMask* cur = ready_list_.lh_first;
for (; cur; cur = cur->entry.le_next) {
++count;
}
for (cur = tmp_list_.lh_first; cur; cur = cur->entry.le_next) {
++count;
}
CHECK_EQ(ready_list_size_, count) << "Ready list size does not match count";
}
void EpollServer::RegisterAlarm(int64 timeout_time_in_us, AlarmCB* ac) {
CHECK(ac);
if (ContainsAlarm(ac)) {
LOG(FATAL) << "Alarm already exists " << ac;
}
VLOG(4) << "RegisteringAlarm at : " << timeout_time_in_us;
TimeToAlarmCBMap::iterator alarm_iter =
alarm_map_.insert(std::make_pair(timeout_time_in_us, ac));
all_alarms_.insert(ac);
ac->OnRegistration(alarm_iter, this);
}
void EpollServer::UnregisterAlarm(const AlarmRegToken& iterator_token) {
AlarmCB* cb = iterator_token->second;
alarm_map_.erase(iterator_token);
all_alarms_.erase(cb);
cb->OnUnregistration();
}
int EpollServer::NumFDsRegistered() const {
DCHECK_GE(cb_map_.size(), 1u);
return cb_map_.size() - 1;
}
void EpollServer::Wake() {
char data = 'd';
int rv = write(write_fd_, &data, 1);
DCHECK_EQ(rv, 1);
}
int64 EpollServer::NowInUsec() const {
return base::Time::Now().ToInternalValue();
}
int64 EpollServer::ApproximateNowInUsec() const {
if (recorded_now_in_us_ != 0) {
return recorded_now_in_us_;
}
return this->NowInUsec();
}
std::string EpollServer::EventMaskToString(int event_mask) {
std::string s;
if (event_mask & EPOLLIN) s += "EPOLLIN ";
if (event_mask & EPOLLPRI) s += "EPOLLPRI ";
if (event_mask & EPOLLOUT) s += "EPOLLOUT ";
if (event_mask & EPOLLRDNORM) s += "EPOLLRDNORM ";
if (event_mask & EPOLLRDBAND) s += "EPOLLRDBAND ";
if (event_mask & EPOLLWRNORM) s += "EPOLLWRNORM ";
if (event_mask & EPOLLWRBAND) s += "EPOLLWRBAND ";
if (event_mask & EPOLLMSG) s += "EPOLLMSG ";
if (event_mask & EPOLLERR) s += "EPOLLERR ";
if (event_mask & EPOLLHUP) s += "EPOLLHUP ";
if (event_mask & EPOLLONESHOT) s += "EPOLLONESHOT ";
if (event_mask & EPOLLET) s += "EPOLLET ";
return s;
}
void EpollServer::LogStateOnCrash() {
LOG(ERROR) << "----------------------Epoll Server---------------------------";
LOG(ERROR) << "Epoll server " << this << " polling on fd " << epoll_fd_;
LOG(ERROR) << "timeout_in_us_: " << timeout_in_us_;
LOG(ERROR) << alarm_map_.size() << " alarms registered.";
for (TimeToAlarmCBMap::iterator it = alarm_map_.begin();
it != alarm_map_.end();
++it) {
const bool skipped =
alarms_reregistered_and_should_be_skipped_.find(it->second)
!= alarms_reregistered_and_should_be_skipped_.end();
LOG(ERROR) << "Alarm " << it->second << " registered at time " << it->first
<< " and should be skipped = " << skipped;
}
LOG(ERROR) << cb_map_.size() << " fd callbacks registered.";
for (FDToCBMap::iterator it = cb_map_.begin();
it != cb_map_.end();
++it) {
LOG(ERROR) << "fd: " << it->fd << " with mask " << it->event_mask
<< " registered with cb: " << it->cb;
}
LOG(ERROR) << "----------------------/Epoll Server--------------------------";
}
void EpollServer::DelFD(int fd) const {
struct epoll_event ee;
memset(&ee, 0, sizeof(ee));
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordFDMaskEvent(fd, 0, "DelFD");
#endif
if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &ee)) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Epoll set removal error for fd " << fd << ": "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
}
void EpollServer::AddFD(int fd, int event_mask) const {
struct epoll_event ee;
memset(&ee, 0, sizeof(ee));
ee.events = event_mask | EPOLLERR | EPOLLHUP;
ee.data.fd = fd;
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordFDMaskEvent(fd, ee.events, "AddFD");
#endif
if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ee)) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Epoll set insertion error for fd " << fd << ": "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
}
void EpollServer::ModFD(int fd, int event_mask) const {
struct epoll_event ee;
memset(&ee, 0, sizeof(ee));
ee.events = event_mask | EPOLLERR | EPOLLHUP;
ee.data.fd = fd;
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordFDMaskEvent(fd, ee.events, "ModFD");
#endif
VLOG(3) << "modifying fd= " << fd << " "
<< EventMaskToString(ee.events);
if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ee)) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Epoll set modification error for fd " << fd << ": "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
}
void EpollServer::ModifyFD(int fd, int remove_event, int add_event) {
FDToCBMap::iterator fd_i = cb_map_.find(CBAndEventMask(NULL, 0, fd));
if (cb_map_.end() == fd_i) {
VLOG(2) << "Didn't find the fd " << fd << "in internal structures";
return;
}
if (fd_i->cb != NULL) {
int & event_mask = fd_i->event_mask;
VLOG(3) << "fd= " << fd
<< " event_mask before: " << EventMaskToString(event_mask);
event_mask &= ~remove_event;
event_mask |= add_event;
VLOG(3) << " event_mask after: " << EventMaskToString(event_mask);
ModFD(fd, event_mask);
fd_i->cb->OnModification(fd, event_mask);
}
}
void EpollServer::WaitForEventsAndCallHandleEvents(int64 timeout_in_us,
struct epoll_event events[],
int events_size) {
if (timeout_in_us == 0 || ready_list_.lh_first != NULL) {
timeout_in_us = 0;
} else if (timeout_in_us < 0) {
LOG(INFO) << "Negative epoll timeout: " << timeout_in_us
<< "us; epoll will wait forever for events.";
timeout_in_us = -1000;
} else {
if (timeout_in_us < 1000) {
timeout_in_us = 1000;
}
}
const int timeout_in_ms = timeout_in_us / 1000;
int nfds = epoll_wait_impl(epoll_fd_,
events,
events_size,
timeout_in_ms);
VLOG(3) << "nfds=" << nfds;
#ifdef EPOLL_SERVER_EVENT_TRACING
event_recorder_.RecordEpollWaitEvent(timeout_in_ms, nfds);
#endif
recorded_now_in_us_ = NowInUsec();
if (nfds > 0) {
for (int i = 0; i < nfds; ++i) {
int event_mask = events[i].events;
int fd = events[i].data.fd;
HandleEvent(fd, event_mask);
}
} else if (nfds < 0) {
if (errno != EINTR && errno != 0) {
int saved_errno = errno;
char buf[kErrorBufferSize];
LOG(FATAL) << "Error " << saved_errno << " in epoll_wait: "
<< strerror_r(saved_errno, buf, sizeof(buf));
}
}
if (ready_list_.lh_first) {
CallReadyListCallbacks();
}
}
void EpollServer::CallReadyListCallbacks() {
DCHECK(tmp_list_.lh_first == NULL);
std::swap(ready_list_.lh_first, tmp_list_.lh_first);
if (tmp_list_.lh_first) {
tmp_list_.lh_first->entry.le_prev = &tmp_list_.lh_first;
EpollEvent event(0, false);
while (tmp_list_.lh_first != NULL) {
DCHECK_GT(ready_list_size_, 0);
CBAndEventMask* cb_and_mask = tmp_list_.lh_first;
RemoveFromReadyList(*cb_and_mask);
event.out_ready_mask = 0;
event.in_events =
cb_and_mask->events_asserted | cb_and_mask->events_to_fake;
cb_and_mask->events_asserted = 0;
cb_and_mask->events_to_fake = 0;
{
TrueFalseGuard in_use_guard(&(cb_and_mask->in_use));
cb_and_mask->cb->OnEvent(cb_and_mask->fd, &event);
}
if (cb_and_mask->cb == NULL) {
cb_map_.erase(*cb_and_mask);
} else if (event.out_ready_mask != 0) {
cb_and_mask->events_to_fake = event.out_ready_mask;
AddToReadyList(cb_and_mask);
}
}
}
DCHECK(tmp_list_.lh_first == NULL);
}
void EpollServer::CallAndReregisterAlarmEvents() {
int64 now_in_us = recorded_now_in_us_;
DCHECK_NE(0, recorded_now_in_us_);
TimeToAlarmCBMap::iterator erase_it;
for (TimeToAlarmCBMap::iterator i = alarm_map_.begin();
i != alarm_map_.end();
) {
if (i->first > now_in_us) {
break;
}
AlarmCB* cb = i->second;
const bool added_in_this_round =
alarms_reregistered_and_should_be_skipped_.find(cb)
!= alarms_reregistered_and_should_be_skipped_.end();
if (added_in_this_round) {
++i;
continue;
}
all_alarms_.erase(cb);
const int64 new_timeout_time_in_us = cb->OnAlarm();
erase_it = i;
++i;
alarm_map_.erase(erase_it);
if (new_timeout_time_in_us > 0) {
DVLOG(3) << "Reregistering alarm "
<< " " << cb
<< " " << new_timeout_time_in_us
<< " " << now_in_us;
if (new_timeout_time_in_us <= now_in_us) {
alarms_reregistered_and_should_be_skipped_.insert(cb);
}
RegisterAlarm(new_timeout_time_in_us, cb);
}
}
alarms_reregistered_and_should_be_skipped_.clear();
}
EpollAlarm::EpollAlarm() : eps_(NULL), registered_(false) {
}
EpollAlarm::~EpollAlarm() {
UnregisterIfRegistered();
}
int64 EpollAlarm::OnAlarm() {
registered_ = false;
return 0;
}
void EpollAlarm::OnRegistration(const EpollServer::AlarmRegToken& token,
EpollServer* eps) {
DCHECK_EQ(false, registered_);
token_ = token;
eps_ = eps;
registered_ = true;
}
void EpollAlarm::OnUnregistration() {
registered_ = false;
}
void EpollAlarm::OnShutdown(EpollServer* eps) {
registered_ = false;
eps_ = NULL;
}
void EpollAlarm::UnregisterIfRegistered() {
if (!registered_) {
return;
}
eps_->UnregisterAlarm(token_);
}
}