FYI:http://www.wangafu.net/~nickm/libevent-book/
This lib is a integral of asynchronous IO. we should change the concept from blocking PRO to nonblocking PRO.
Example: A simple blocking HTTP client
1 /* For sockaddr_in */ 2 #include3 /* For socket functions */ 4 #include 5 /* For gethostbyname */ 6 #include 7 8 #include 9 #include 10 #include 11 12 int main(int c, char **v)13 {14 const char query[] =15 "GET / HTTP/1.0\r\n"16 "Host: www.google.com\r\n"17 "\r\n";18 const char hostname[] = "www.google.com";19 struct sockaddr_in sin;20 struct hostent *h;21 const char *cp;22 int fd;23 ssize_t n_written, remaining;24 char buf[1024];25 26 /* Look up the IP address for the hostname. Watch out; this isn't27 threadsafe on most platforms. */28 h = gethostbyname(hostname);29 if (!h) {30 fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));31 return 1;32 }33 if (h->h_addrtype != AF_INET) {34 fprintf(stderr, "No ipv6 support, sorry.");35 return 1;36 }37 38 /* Allocate a new socket */39 fd = socket(AF_INET, SOCK_STREAM, 0);40 if (fd < 0) {41 perror("socket");42 return 1;43 }44 45 /* Connect to the remote host. */46 sin.sin_family = AF_INET;47 sin.sin_port = htons(80);48 sin.sin_addr = *(struct in_addr*)h->h_addr;49 if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {50 perror("connect");51 close(fd);52 return 1;53 }54 55 /* Write the query. */56 /* XXX Can send succeed partially? */57 cp = query;58 remaining = strlen(query);59 while (remaining) {60 n_written = send(fd, cp, remaining, 0);61 if (n_written <= 0) {62 perror("send");63 return 1;64 }65 remaining -= n_written;66 cp += n_written;67 }68 69 /* Get an answer back. */70 while (1) {71 ssize_t result = recv(fd, buf, sizeof(buf), 0);72 if (result == 0) {73 break;74 } else if (result < 0) {75 perror("recv");76 close(fd);77 return 1;78 }79 fwrite(buf, 1, result, stdout);80 }81 82 close(fd);83 return 0;84 }
All the network calls are in the code above are blocking, gethostbyname, connect, recv, send. This makes the code cannot work effectively. To work with multiple IO, please see following code with fork()
Example: Forking ROT13 server:
1 /* For sockaddr_in */ 2 #include3 /* For socket functions */ 4 #include 5 6 #include 7 #include 8 #include 9 #include 10 11 #define MAX_LINE 16384 12 13 char 14 rot13_char(char c) 15 { 16 /* We don't want to use isalpha here; setting the locale would change 17 * which characters are considered alphabetical. */ 18 if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) 19 return c + 13; 20 else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) 21 return c - 13; 22 else 23 return c; 24 } 25 26 void 27 child(int fd) 28 { 29 char outbuf[MAX_LINE+1]; 30 size_t outbuf_used = 0; 31 ssize_t result; 32 33 while (1) { 34 char ch; 35 result = recv(fd, &ch, 1, 0); 36 if (result == 0) { 37 break; 38 } else if (result == -1) { 39 perror("read"); 40 break; 41 } 42 43 /* We do this test to keep the user from overflowing the buffer. */ 44 if (outbuf_used < sizeof(outbuf)) { 45 outbuf[outbuf_used++] = rot13_char(ch); 46 } 47 48 if (ch == '\n') { 49 send(fd, outbuf, outbuf_used, 0); 50 outbuf_used = 0; 51 continue; 52 } 53 } 54 } 55 56 void 57 run(void) 58 { 59 int listener; 60 struct sockaddr_in sin; 61 62 sin.sin_family = AF_INET; 63 sin.sin_addr.s_addr = 0; 64 sin.sin_port = htons(40713); 65 66 listener = socket(AF_INET, SOCK_STREAM, 0); 67 68 #ifndef WIN32 69 { 70 int one = 1; 71 setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); 72 } 73 #endif 74 75 if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { 76 perror("bind"); 77 return; 78 } 79 80 if (listen(listener, 16)<0) { 81 perror("listen"); 82 return; 83 } 84 85 86 87 while (1) { 88 struct sockaddr_storage ss; 89 socklen_t slen = sizeof(ss); 90 int fd = accept(listener, (struct sockaddr*)&ss, &slen); 91 if (fd < 0) { 92 perror("accept"); 93 } else { 94 if (fork() == 0) { 95 child(fd); 96 exit(0); 97 } 98 } 99 }100 }101 102 int103 main(int c, char **v)104 {105 run();106 return 0;107 }
Perfect? Not quite. Process creation (and even thread creation) can be pretty expensive on some platforms. A thread pool is the answer to having multiple connections.
First, set sockets nonblocking. Call
fcntl(fd, F_SETFL, O_NONBLOCK);
Once nonblocking is set to fd (the socket), return of the fd call is complete the operation immediately or a special error code.
For example:
1 /* This will work, but the performance will be unforgivably bad. */ 2 int i, n; 3 char buf[1024]; 4 for (i=0; i < n_sockets; ++i) 5 fcntl(fd[i], F_SETFL, O_NONBLOCK); 6 7 while (i_still_want_to_read()) { 8 for (i=0; i < n_sockets; ++i) { 9 n = recv(fd[i], buf, sizeof(buf), 0);10 if (n == 0) {11 handle_close(fd[i]);12 } else if (n < 0) {13 if (errno == EAGAIN)14 ; /* The kernel didn't have any data for us to read. */15 else16 handle_error(fd[i], errno);17 } else {18 handle_input(fd[i], buf, n);19 }20 }21 }
Using nonblocking sockets, the code would work, but only barely. The performance will be awful, for two reasons.
- First, when there is no data to read on either connection the loop will spin indefinitely, using up all your CPU cycles.
- Second, the delay is proportional to the number of users.
So what we need is a way to tell the kernel "wait until one of these sockets is ready to give me some data, and tell me which ones are ready."
The oldest solution that people still use for this problem is select(). Here’s a reimplementation of our ROT13 server, using select() this time.
Example: select()-based ROT13 server
1 /* For sockaddr_in */ 2 #include3 /* For socket functions */ 4 #include 5 /* For fcntl */ 6 #include 7 /* for select */ 8 #include 9 10 #include 11 #include 12 #include 13 #include 14 #include 15 #include 16 17 #define MAX_LINE 16384 18 19 char 20 rot13_char(char c) 21 { 22 /* We don't want to use isalpha here; setting the locale would change 23 * which characters are considered alphabetical. */ 24 if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) 25 return c + 13; 26 else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) 27 return c - 13; 28 else 29 return c; 30 } 31 32 struct fd_state { 33 char buffer[MAX_LINE]; 34 size_t buffer_used; 35 36 int writing; 37 size_t n_written; 38 size_t write_upto; 39 }; 40 41 struct fd_state * 42 alloc_fd_state(void) 43 { 44 struct fd_state *state = malloc(sizeof(struct fd_state)); 45 if (!state) 46 return NULL; 47 state->buffer_used = state->n_written = state->writing = 48 state->write_upto = 0; 49 return state; 50 } 51 52 void 53 free_fd_state(struct fd_state *state) 54 { 55 free(state); 56 } 57 58 void 59 make_nonblocking(int fd) 60 { 61 fcntl(fd, F_SETFL, O_NONBLOCK); 62 } 63 64 int 65 do_read(int fd, struct fd_state *state) 66 { 67 char buf[1024]; 68 int i; 69 ssize_t result; 70 while (1) { 71 result = recv(fd, buf, sizeof(buf), 0); 72 if (result <= 0) 73 break; 74 75 for (i=0; i < result; ++i) { 76 if (state->buffer_used < sizeof(state->buffer)) 77 state->buffer[state->buffer_used++] = rot13_char(buf[i]); 78 if (buf[i] == '\n') { 79 state->writing = 1; 80 state->write_upto = state->buffer_used; 81 } 82 } 83 } 84 85 if (result == 0) { 86 return 1; 87 } else if (result < 0) { 88 if (errno == EAGAIN) 89 return 0; 90 return -1; 91 } 92 93 return 0; 94 } 95 96 int 97 do_write(int fd, struct fd_state *state) 98 { 99 while (state->n_written < state->write_upto) {100 ssize_t result = send(fd, state->buffer + state->n_written,101 state->write_upto - state->n_written, 0);102 if (result < 0) {103 if (errno == EAGAIN)104 return 0;105 return -1;106 }107 assert(result != 0);108 109 state->n_written += result;110 }111 112 if (state->n_written == state->buffer_used)113 state->n_written = state->write_upto = state->buffer_used = 0;114 115 state->writing = 0;116 117 return 0;118 }119 120 void121 run(void)122 {123 int listener;124 struct fd_state *state[FD_SETSIZE];125 struct sockaddr_in sin;126 int i, maxfd;127 fd_set readset, writeset, exset;128 129 sin.sin_family = AF_INET;130 sin.sin_addr.s_addr = 0;131 sin.sin_port = htons(40713);132 133 for (i = 0; i < FD_SETSIZE; ++i)134 state[i] = NULL;135 136 listener = socket(AF_INET, SOCK_STREAM, 0);137 make_nonblocking(listener);138 139 #ifndef WIN32140 {141 int one = 1;142 setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));143 }144 #endif145 146 if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {147 perror("bind");148 return;149 }150 151 if (listen(listener, 16)<0) {152 perror("listen");153 return;154 }155 156 FD_ZERO(&readset);157 FD_ZERO(&writeset);158 FD_ZERO(&exset);159 160 while (1) {161 maxfd = listener;162 163 FD_ZERO(&readset);164 FD_ZERO(&writeset);165 FD_ZERO(&exset);166 167 FD_SET(listener, &readset);168 169 for (i=0; i < FD_SETSIZE; ++i) {170 if (state[i]) {171 if (i > maxfd)172 maxfd = i;173 FD_SET(i, &readset);174 if (state[i]->writing) {175 FD_SET(i, &writeset);176 }177 }178 }179 180 if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) {181 perror("select");182 return;183 }184 185 if (FD_ISSET(listener, &readset)) {186 struct sockaddr_storage ss;187 socklen_t slen = sizeof(ss);188 int fd = accept(listener, (struct sockaddr*)&ss, &slen);189 if (fd < 0) {190 perror("accept");191 } else if (fd > FD_SETSIZE) {192 close(fd);193 } else {194 make_nonblocking(fd);195 state[fd] = alloc_fd_state();196 assert(state[fd]);/*XXX*/197 }198 }199 200 for (i=0; i < maxfd+1; ++i) {201 int r = 0;202 if (i == listener)203 continue;204 205 if (FD_ISSET(i, &readset)) {206 r = do_read(i, state[i]);207 }208 if (r == 0 && FD_ISSET(i, &writeset)) {209 r = do_write(i, state[i]);210 }211 if (r) {212 free_fd_state(state[i]);213 state[i] = NULL;214 close(i);215 }216 }217 }218 }219 220 int221 main(int c, char **v)222 {223 setvbuf(stdout, NULL, _IONBF, 0);224 225 run();226 return 0;227 }
Problem: generating and reading the select() bit arrays takes time proportional to the largest fd that you provided for select(), the select() call scales terribly when the number of sockets is high.
Solution: diversity repalcement functions are comming out in different operating systems. Unfortunately, none of the efficient interfaces is a ubiquitous standard.
Linux: epoll(),
BSDs (including Darwin): kqueue(),
Solaris: evports and /dev/poll…
Libevent API is an abstraction that wraps all of these interfaces, and provides whichever one of them is the most efficient.