udp tcp epoll serv
[socket_samples.git] / socketserver.c
blobf6d8d4c0d9a09017771cd8de39f65a587cb7cefa
1 //下面的代码离生产环境还差内存池和logger哦!
2 #include "socketserver.h"
3 #include <dirent.h>
4 #include <regex.h>
5 #define DIGIT_PATTERN_STRING "^[0-9]+$"
6 void * epollWorkerRoutine(void *);
7 void * blockingSendEpollerRoutine(void *);
8 int isDigitStr(const char *str){
9 int ret=-1;
10 regex_t regex;
11 regmatch_t matchs[1];
12 if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*这里不要传0哦,否则nomatch*/)){
13 ret=!regexec(&regex,str, 1,matchs,0);
14 regfree(&regex);
16 return ret;
19 static int setNonBlocking(int sock)
21 int opts;
22 opts=fcntl(sock,F_GETFL);
23 if(opts==-1)
25 perror("fcntl(sock,GETFL) failed!\n");
26 return opts;
28 opts = opts|O_NONBLOCK;
29 opts=fcntl(sock,F_SETFL,opts);
30 if(opts==-1)
32 perror("fcntl(sock,SETFL,opts) failed!\n");
33 return opts;
35 return 1;
38 static void adjustQSSWorkerLimits(QSocketServer *qss){
39 //to adjust availabe size.
41 typedef struct{
42 QSocketServer * qss;
43 pthread_t th;
44 }QSSWORKER_PARAM;
46 static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
47 WORD res=0;
48 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
50 QSSWORKER_PARAM * pParam=NULL;
51 int i=0;
52 pthread_spin_lock(&qss->g_spinlock);
53 if(qss->workerCounter+addCounter<=qss->maxThreads)
54 for(;i<addCounter;i++)
56 pParam=malloc(sizeof(QSSWORKER_PARAM));
58 if(pParam){
59 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
60 pParam->qss=qss;
61 qss->workerCounter++,res++;
64 pthread_spin_unlock(&qss->g_spinlock);
66 return res;
69 static void SOlogger(const char * msg,SOCKET s){
70 perror(msg);
71 if(s>0)
72 close(s);
75 static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
76 QSSEPollEvent *qssEPEvent=event->data.ptr;
77 int ret;
78 printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
79 if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
80 //sleep(5);
81 ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
83 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
84 return ret;
87 int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
90 QSocketServer * qss=malloc(sizeof(QSocketServer));
91 qss->passive=passive;
92 qss->port=port;
93 qss->minThreads=minThreads;
94 qss->maxThreads=maxThreads;
95 qss->workerWaitTimeout=workerWaitTimeout;
96 qss->lifecycleStatus=0;
97 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
98 qss->workerCounter=0;
99 qss->currentBusyWorkers=0;
100 qss->CSocketsCounter=0;
101 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
102 if(!qss->protoHandler)
103 qss->protoHandler=_InternalProtocolHandler;
104 adjustQSSWorkerLimits(qss);
105 *qss_ptr=qss;
106 return 1;
109 int startSocketServer(QSocketServer *qss)
111 if(qss==NULL)
112 return 0;
113 else{
114 pthread_spin_lock(&qss->g_spinlock);
115 if(qss->lifecycleStatus==0){
116 qss->lifecycleStatus=1;
117 pthread_spin_unlock(&qss->g_spinlock);
118 }else{
119 pthread_spin_unlock(&qss->g_spinlock);
120 return 0;
123 //bzero(&qss->serv_addr, sizeof(qss->serv_addr));
125 qss->serv_addr.sin_family=AF_INET;
126 qss->serv_addr.sin_port=htons(qss->port);
127 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
128 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
130 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
131 if(setNonBlocking(qss->server_s)==-1)
133 SOlogger("setNonBlocking server_s failed.\n",0);
134 return 0;
137 if(qss->server_s==INVALID_SOCKET)
139 SOlogger("socket failed.\n",0);
140 return 0;
143 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
145 SOlogger("bind failed.\n",qss->server_s);
146 return 0;
149 if(listen(qss->server_s,SOMAXCONN/*这个宏windows也有,这里是128,当然你可以设的小些,它影响开销的*/)==SOCKET_ERROR)
151 SOlogger("listen failed.\n",qss->server_s);
152 return 0;
154 qss->epollFD=epoll_create1(0);/*这里不是epoll_create(size)哦,你可能不知道如何设置size,所以忽略它吧*/
155 if(qss->epollFD==-1){
156 SOlogger("epoll_create1 0, main epollFD failed.\n",qss->server_s);
157 return 0;
159 qss->BSendEpollFD=epoll_create1(0);//for blocking send.
160 if(qss->BSendEpollFD==-1){
161 SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
162 return 0;
165 {//ADD ACCEPT EVENT
166 struct epoll_event _epEvent;
167 QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
168 qssEPEvent->client_s=qss->server_s;
169 _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
170 _epEvent.data.ptr=qssEPEvent;
171 if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
172 SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
173 free(qssEPEvent);
174 return 0;
177 {//starup blocking send epoller.
178 QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
179 pParam->qss=qss;
180 pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
183 //initialize worker for epoll events.
184 addQSSWorker(qss,qss->minThreads);
185 qss->lifecycleStatus=2;
186 return 1;
189 int shutdownSocketServer(QSocketServer *qss){
190 //change qss->lifecycleStatus
191 if(qss==NULL)
192 return 0;
193 else{
194 pthread_spin_lock(&qss->g_spinlock);
195 if(qss->lifecycleStatus==2){
196 qss->lifecycleStatus=3;
197 pthread_spin_unlock(&qss->g_spinlock);
198 }else{
199 pthread_spin_unlock(&qss->g_spinlock);
200 return 0;
203 /*shutdown server-listening socket,这里优雅的做法是shutdown--notify-->epoll-->close.记得shutdown会发送EOF的哦*/
204 shutdown(qss->server_s,SHUT_RDWR);
206 // /proc/getpid/fd shutdown all socket cs != serv_s
208 char dirBuf[64];
209 struct dirent * de;
210 DIR *pd=NULL;
211 int sockFD;
212 sprintf(dirBuf,"/proc/%d/fd/",getpid());
213 pd=opendir(dirBuf);
214 if(pd!=NULL){
215 while((de=readdir(pd))!=NULL){
216 if(isDigitStr(de->d_name)){
217 sockFD=atoi(de->d_name);
218 if(isfdtype(sockFD,S_IFSOCK))
219 shutdown(sockFD,SHUT_RDWR);
222 closedir(pd);
224 /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
226 return 1;
229 static int onAcceptRoutine(QSocketServer * qss)
231 SOCKADDR_IN client_addr;
232 unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
233 SOCKET cs;
234 struct epoll_event _epEvent;
235 QSSEPollEvent *qssEPEvent=NULL;
236 cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
237 if(cs==INVALID_SOCKET)
239 printf("onAccept failed:%d,%s\n",errno,strerror(errno));
240 epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22 Invalid argument
241 return 0;
243 if(setNonBlocking(cs)==-1)
245 printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
246 return 0;
249 {// set keepalive option
250 int keepAlive = 1;
251 int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
252 int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
253 int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
254 if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
255 setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
256 setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
257 setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
259 printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
260 return 0;
263 qssEPEvent=malloc(sizeof(QSSEPollEvent));
264 qssEPEvent->client_s=cs;
266 _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
267 qssEPEvent->BSendEpollFDRelated=0;
268 _epEvent.data.ptr=qssEPEvent;/*这里又和教科的不一样哦,真正的user data用ptr,而不是单一的fd*/
269 if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
270 printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
271 free(qssEPEvent);
272 return 0;
273 }else{
274 pthread_spin_lock(&qss->g_spinlock);
275 qss->CSocketsCounter++;
276 pthread_spin_unlock(&qss->g_spinlock);
277 if(qss->cslifecb)
278 qss->cslifecb(cs,0);
281 printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
282 return 1;
285 typedef struct{
286 QSocketServer * qss;
287 QSSEPollEvent * event;
288 }InternalSenderBase_t;
290 static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
291 InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
292 char * _sbuf=_buf;
293 int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;
295 QSSEPollEvent *qssEPEvent=NULL;
296 struct epoll_event _epEvent;
298 struct timespec sendTimeo;
300 while(1){
301 *errno_ptr=0;
302 while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
303 sum+=ret,_sbuf+=ret;
304 if(sum==nbs||ret==0)
305 break;
306 else if(ret==-1){
307 if(errno==EAGAIN&&sum<nbs){
308 qssEPEvent=sb->event;
309 _epEvent.data.ptr=qssEPEvent;
310 _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
311 if(qssEPEvent->BSendEpollFDRelated==0){
312 pthread_mutex_init(&qssEPEvent->writableLock,NULL);
313 pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
314 qssEPEvent->BSendEpollFDRelated=1;
315 curEpoll_ctl_opt=EPOLL_CTL_ADD;
316 }else{
317 curEpoll_ctl_opt=EPOLL_CTL_MOD;
320 {//wait writable.
321 int flag=0;
322 pthread_mutex_lock(&qssEPEvent->writableLock);
323 if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
324 sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
325 int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
326 if(err)
327 flag=-1;
328 }else
329 flag=-1;
330 pthread_mutex_unlock(&qssEPEvent->writableLock);
331 if(flag==-1)
332 break;
335 }else{
336 if(errno==EAGAIN&&sum==nbs)
337 ret=nbs;//it is ok;
338 break;
341 }//end while.
342 return ret;
344 void * blockingSendEpollerRoutine(void *_param){
345 QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
346 QSocketServer * qss=pParam->qss;
347 //pthread_t * curThread=&pParam->th;
348 struct epoll_event epEvents[qss->maxThreads];
349 QSSEPollEvent *qssEPEvent=NULL;
350 int pollRes,*errno_ptr=&errno;
352 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
354 free(pParam);
355 while(1){
357 pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
358 if(pollRes>=1){
359 int i=0;
360 for(;i<pollRes;i++)
361 if(epEvents[i].events&EPOLLOUT){//这个epollfd只应该做以下的事情,少做为快!
362 qssEPEvent=epEvents[i].data.ptr;
363 pthread_mutex_lock(&qssEPEvent->writableLock);
364 pthread_cond_signal(&qssEPEvent->writableMonitor);
365 pthread_mutex_unlock(&qssEPEvent->writableLock);
368 }else if(pollRes==-1){//errno
369 printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
370 break;
375 return NULL;
378 void * epollWorkerRoutine(void * _param){
379 QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
380 QSocketServer * qss=pParam->qss;
381 pthread_t * curThread=&pParam->th;
382 struct epoll_event _epEvent;
383 QSSEPollEvent *qssEPEvent=NULL;
384 InternalSenderBase_t _senderBase;
385 int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
386 _senderBase.qss=qss;
387 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
389 free(pParam);
390 while(!exitCode){
392 *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
393 pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
394 if(pollRes==1){
395 qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;
397 if(qssEPEvent->client_s==qss->server_s)
398 {//Accepted Socket.
399 onAcceptRoutine(qss);
400 continue;
401 }else{
402 if(qss->protoHandler){
403 _senderBase.event=_epEvent.data.ptr;
404 pthread_spin_lock(&qss->g_spinlock);
405 qss->currentBusyWorkers++;
406 pthread_spin_unlock(&qss->g_spinlock);
408 addQSSWorker(qss,1);
409 handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);
411 pthread_spin_lock(&qss->g_spinlock);
412 qss->currentBusyWorkers--;
413 pthread_spin_unlock(&qss->g_spinlock);
415 if(handleCode>0){
416 _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
417 if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
418 SOErrOccurred=2;
419 }else{
420 SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
425 }else if(pollRes==0){//timeout
426 printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
427 if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
429 pthread_spin_lock(&qss->g_spinlock);
430 if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
431 qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
432 exitCode=2;
434 pthread_spin_unlock(&qss->g_spinlock);
435 }else if(qss->lifecycleStatus>=4)
436 exitCode=4;
438 }else if(pollRes==-1){//errno
439 printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
440 exitCode=1;
443 if(SOErrOccurred){
444 if(qss->cslifecb)
445 qss->cslifecb(qssEPEvent->client_s,-1);
446 /*if(qssEPEvent)*/{
447 epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
448 epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
449 close(qssEPEvent->client_s);
450 if(qssEPEvent->BSendEpollFDRelated){
451 pthread_cond_destroy(&qssEPEvent->writableMonitor);
452 pthread_mutex_destroy(&qssEPEvent->writableLock);
454 free(qssEPEvent);
456 pthread_spin_lock(&qss->g_spinlock);
457 if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
458 //for qss workerSize,
459 qss->lifecycleStatus=4;
460 exitCode=3;
462 pthread_spin_unlock(&qss->g_spinlock);
463 }//SOErrOccurred handle;
465 }//end main while.
467 if(exitCode!=2){
468 int clearup=0;
469 pthread_spin_lock(&qss->g_spinlock);
470 if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
471 clearup=1;
473 pthread_spin_unlock(&qss->g_spinlock);
474 if(clearup){
475 close(qss->epollFD);
476 close(qss->BSendEpollFD);
477 pthread_spin_destroy(&qss->g_spinlock);
478 free(qss);
480 }//exitCode handle;
481 return NULL;