-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventLoop.cpp
122 lines (100 loc) · 2.75 KB
/
EventLoop.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//
// Created by Yuchen Qin on 2021/9/18.
#include "EventLoop.h"
#include "PollPoller.h"
#include "Channel.h"
#include <iostream>
#include <memory>
#include <unistd.h>
__thread EventLoop* loopInThisThread = nullptr;
const int kPollTimeMs = 10000;
EventLoop::EventLoop(int poller): looping(false), poller_(nullptr),
threadId_(std::this_thread::get_id()), quit_(false) {
if(loopInThisThread != nullptr) {
abortNotInLoopThread();
}
else{
if(!poller) poller_.reset(new PollPoller(this));
loopInThisThread = this;
auto res = pipe(wakeupPipe);
if(res == -1){
std::cerr << "Error when piping" << std::endl;
exit(1);
}
wakeupChannel_.reset(new Channel(this, wakeupPipe[0]));
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
}
EventLoop::~EventLoop() {
wakeupChannel_->disableAll();
wakeupChannel_->remove();
looping = false;
quit_ = true;
poller_.reset(nullptr);
loopInThisThread = nullptr;
}
EventLoop *EventLoop::getEventLoopOfCurrentThread() {
return loopInThisThread;
}
void EventLoop::loop() {
assert(!looping);
assertInLoopThread();
looping = true;
quit_ = false;
while(!quit_){
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for(auto iter = activeChannels_.cbegin(); iter != activeChannels_.cend(); iter++){
(*iter)->handleEvent();
}
doPendingFunctors();
}
looping = false;
}
void EventLoop::updateChannel(Channel *channel) {
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
void EventLoop::runInLoop(const std::function<void()> &cb) {
if(isInLoopThread()){
cb();
}
else{
{
std::lock_guard<std::mutex> lk_(mtx_);
pendingFunctors_.push_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_)
wakeup();
}
}
void EventLoop::doPendingFunctors() {
std::vector<std::function<void()>> callbacks;
callingPendingFunctors_ = true;
{
std::lock_guard<std::mutex> lk_(mtx_);
callbacks.swap(pendingFunctors_);
}
for(const std::function<void()> & cb: callbacks){
cb();
}
callingPendingFunctors_ = false;
}
void EventLoop::wakeup() {
char buffer[10];
buffer[0] = 'A';
auto write_bytes = write(wakeupPipe[1], buffer, 1);
}
void EventLoop::handleRead() {
char buffer[10];
auto read_bytes = read(wakeupPipe[0], buffer, 1);
}
bool EventLoop::hasChannel(Channel *channel) {
return poller_->hasChannel(channel);
}
void EventLoop::removeChannel(Channel *channel) {
poller_->removeChannel(channel);
}
//