libevent默认情况下是单线程,每位线程有且仅有一个event_base,对应一个structevent_base结构体,以及赋于其上的风波管理器,拿来安排托管给它的一系列的风波。
当有一个风波发生的时侯redflag linux,event_base会在合适的时间去调用绑定在这个风波上的函数,直至这个函数执行完成,之后在返回安排其他风波。须要注意的是:合适的时间并不是立刻。
比如:
struct event_base *base;
base = event_base_new();//初始化libevent
event_base_new对比epoll,可以理解为epoll里的epoll_create。
event_base内部有一个循环,循环阻塞在epoll调用上,当有一个风波发生的时侯,就会去处理这个风波。其中,这个风波是被绑定在event_base里面的,每一个风波都会对应一个structevent,可以是窃听的fd。
其中structevent使用event_new来创建和绑定,使用event_add来启用,比如:
struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
参数说明:
base:event_base类型,event_base_new的返回值
listener:窃听的fd,listen的fd
EV_READ|EV_PERSIST:风波的类型及属性
do_accept:绑定的反弹函数
(void*)base:给反弹函数的参数
event_add(listener_event,NULL);
对比epoll:
event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。
event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加风波。
注:libevent支持的风波及属性包括(使用bitfield实现,所以要用|来让它们合体)
风波创建添加以后,就可以处理发生的风波了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直至不再有须要关注的风波。
有了前面的剖析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:
1.创建socket,bind,listen,设置为非阻塞模式
2.创建一个event_base,即
struct event_base * event_base_new(void)
3.创建一个event,将该socket托管给event_base,指定要窃听的风波类型,并绑定上相应的反弹函数(及须要给它的参数)。即
struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
4.启用该风波,即
int event_add(struct event *ev, const struct timeval *tv)
5.步入风波循环,即
int event_base_dispatch(struct event_base *event_base)
须要C/C++Linux服务器构架师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLlinux epoll 实现,Redis,fastdfs,MongoDB,ZKlinux epoll 实现,流媒体,CDN,P2P,K8S,Docker,TCP/IP,解释器,DPDK,ffmpeg等),免费分享
有了这种知识储备,看下官网上的demo,这儿引用的事例是Example:Alow-levelROT13serverwithLibevent
首先来翻译下事例里面的一段话:
对于select函数来说,不同的操作系统有不同的取代函数,它包括:poll,epoll,kqueue,evport和/dev/poll。这种函数的性能都比select要好,其中epoll在IO中添加,删掉,通知socket打算好方面性能复杂度为O(1)。
不幸的是,没有一个有效的插口是一个普遍存在的标准,linux下有epoll,BSDS有kqueue,Solaris有evport和/dev/poll,等等。没有任何一个操作系统有它们中所有的,所以假如你想做一个轻便的高性能的异步应用程序,你就须要把这种插口具象的封装上去,但是无论哪一个系统使用它都是最高效的。
这对于你来说就是最低级的libeventAPI,它提供了统一的插口代替了select,当它在计算机上运行的时侯,使用了最有效的版本。
这儿是ROT13服务器的另外一个版本,此次,他使用了libevent取代了select。这意味着我们不再使用fd_sets,取而代之的使用event_base添加和删掉风波,它可能在select,poll,epoll,kqueue等中执行。
代码剖析:
这是一个服务端的程序,可以处理顾客端大并发的联接,当收到顾客端的联接后,将收到的数据做了一个变换,倘若是’a’-‘m’之间的字符,将其降低13,倘若是’n’-‘z’之间的字符,将其降低13,其他字符不变,之后将转换后的数据发送给顾客端。
比如:顾客端发送:Client0sendMessage!
服务端会回复:Pyvrag0fraqZrffntr!
在这个代码中没有使用bufferevent这个强悍的东西linux内存管理,在一个结构体中自己管理了一个缓冲区。结构体为:
struct fd_state {
char buffer[MAX_LINE];//缓冲区的大小
size_t buffer_used;//接收到已经使用的buffer大小,每次将接收到的数据字节数相加,当发送的字节数累计相加和buffer_used都相等时候,将它们都置为1
size_t n_written;//发送的累加字节数
size_t write_upto;//相当于一个临时变量,当遇到换行符的时,将其收到的字节数(换行符除外)赋给该值,当检测到写事件的时候,用已经发送的字节数和该数值做比较,若收到的字节总数小于该值,则发送数据,等于该值,将结构体中3个字节数统计变量都置为1,为什么会置为1呢,因为有一个换行符吧。
struct event *read_event;
struct event *write_event;
};
代码中自己管理了一个缓冲区,用于储存接收到的数据,发送的数据将其转换后也装入该缓冲区中,代码深奥难懂,我也是经过打日志剖析后,才明白点,这个缓冲区自己还得控制好。并且libevent2早已提供了一个利器bufferevent,我们在使用的过程中最好不要自己管理这个缓冲区,之所以剖析这个代码,是为了熟悉libevent做服务端程序的流程及原理。
下边是代码,加有部份注释和日志:
代码:lowlevel_libevent_server.c
//说明,为了使我们的代码兼容win32网络API,我们使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl
/* For sockaddr_in */
#include
/* For socket functions */
#include
/* For fcntl */
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_LINE 80
void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);
char rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c = 'A' && c = 'n' && c = 'N' && c read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
if (!state->read_event)
{
free(state);
return NULL;
}
state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
if (!state->write_event)
{
event_free(state->read_event);
free(state);
return NULL;
}
state->buffer_used = state->n_written = state->write_upto = 0;
assert(state->write_event);
return state;
}
void free_fd_state(struct fd_state *state)
{
event_free(state->read_event);
event_free(state->write_event);
free(state);
}
void do_read(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;
char buf[20];
int i;
ssize_t result;
printf("ncome in do_read: fd: %d, state->buffer_used: %d, sizeof(state->buffer): %dn", fd, state->buffer_used, size
of(state->buffer));
while (1)
{
assert(state->write_event);
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;
printf("recv once, fd: %d, recv size: %d, recv buff: %sn", fd, result, buf);
for (i=0; i buffer_used buffer))//如果读事件的缓冲区还未满,将收到的数据做转换
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
// state->buffer[state->buffer_used++] = buf[i];//接收什么发送什么,不经过转换,测试用
if (buf[i] == 'n') //如果遇到换行,添加写事件,并设置写事件的大小
{
assert(state->write_event);
event_add(state->write_event, NULL);
state->write_upto = state->buffer_used;
printf("遇到换行符,state->write_upto: %d, state->buffer_used: %dn",state->write_upto, state->buffer_use
d);
}
}
printf("recv once, state->buffer_used: %dn", state->buffer_used);
}
//判断最后一次接收的字节数
if (result == 0)
{
free_fd_state(state);
}
else if (result n_written: %d, state->write_upto: %dn",fd, state->n_written, state->write
_upto);
while (state->n_written write_upto)
{
ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0);
if (result n_written += result;
printf("send fd: %d, send size: %d, state->n_written: %dn", fd, result, state->n_written);
}
if (state->n_written == state->buffer_used)
{
printf("state->n_written == state->buffer_used: %dn", state->n_written);
state->n_written = state->write_upto = state->buffer_used = 1;
printf("state->n_written = state->write_upto = state->buffer_used = 1n");
}
event_del(state->write_event);
}
void do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd FD_SETSIZE)
{
close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
}
else
{
struct fd_state *state;
evutil_make_socket_nonblocking(fd);
state = alloc_fd_state(base, fd);
assert(state); /*XXX err*/
assert(state->write_event);
event_add(state->read_event, NULL);
}
}
void run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;
base = event_base_new();//初始化libevent
if (!base)
return; /*XXXerr*/
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;//本机
sin.sin_port = htons(8000);
listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);
#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif
if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)
{
perror("bind");
return;
}
if (listen(listener, 16)<0)
{
perror("listen");
return;
}
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);
event_base_dispatch(base);
}
int main(int c, char **v)
{
// setvbuf(stdout, NULL, _IONBF, 0);
run();
return 0;
}
编译:gcc-I/usr/include-otestlowlevel_libevent_server.c-L/usr/local/lib-levent
运行结果: