1 #define MODULE_LOG_PREFIX "work"
4 #include "module-cacheex.h"
5 #include "oscam-client.h"
8 #include "oscam-lock.h"
10 #include "oscam-reader.h"
11 #include "oscam-string.h"
12 #include "oscam-work.h"
13 #include "reader-common.h"
14 #include "module-cccam-data.h"
15 #include "module-cccshare.h"
16 #include "oscam-time.h"
18 extern CS_MUTEX_LOCK system_lock
;
19 extern int32_t thread_pipe
[2];
31 static void free_job_data(struct job_data
*data
)
35 if(data
->len
&& data
->ptr
)
38 if(data
->action
==ACTION_ECM_ANSWER_CACHE
)
40 NULLFREE(((struct s_write_from_cache
*)data
->ptr
)->er_cache
);
48 void free_joblist(struct s_client
*cl
)
50 int32_t lock_status
= pthread_mutex_trylock(&cl
->thread_lock
);
52 LL_ITER it
= ll_iter_create(cl
->joblist
);
53 struct job_data
*data
;
54 while((data
= ll_iter_next(&it
)))
58 ll_destroy(&cl
->joblist
);
60 if(cl
->work_job_data
) // Free job_data that was not freed by work_thread
61 { free_job_data(cl
->work_job_data
); }
62 cl
->work_job_data
= NULL
;
65 { SAFE_MUTEX_UNLOCK(&cl
->thread_lock
); }
67 pthread_mutex_destroy(&cl
->thread_lock
);
71 Work threads are named like this:
72 w[r|c]XX-[rdr->label|client->username]
74 w - work thread prefix
75 [r|c] - depending whether the the action is related to reader or client
76 XX - two digit action code from enum actions
77 label - reader label or client username (see username() function)
79 static void set_work_thread_name(struct job_data
*data
)
81 char thread_name
[16 + 1];
82 snprintf(thread_name
, sizeof(thread_name
), "w%c%02d-%s",
83 data
->action
< ACTION_CLIENT_FIRST
? 'r' : 'c',
87 set_thread_name(thread_name
);
90 #define __free_job_data(client, job_data) \
92 client->work_job_data = NULL; \
93 if (job_data && job_data != &tmp_data) { \
94 free_job_data(job_data); \
99 void *work_thread(void *ptr
)
101 struct job_data
*data
= (struct job_data
*)ptr
;
102 struct s_client
*cl
= data
->cl
;
103 struct s_reader
*reader
= cl
->reader
;
104 struct timeb start
, end
; // start time poll, end time poll
106 struct job_data tmp_data
;
107 struct pollfd pfd
[1];
109 SAFE_SETSPECIFIC(getclient
, cl
);
110 cl
->thread
= pthread_self();
111 cl
->thread_active
= 1;
113 set_work_thread_name(data
);
115 struct s_module
*module
= get_module(cl
);
116 uint16_t bufsize
= module
->bufsize
; //CCCam needs more than 1024bytes!
118 { bufsize
= DEFAULT_MODULE_BUFSIZE
; }
121 if(!cs_malloc(&mbuf
, bufsize
))
123 cl
->work_mbuf
= mbuf
; // Track locally allocated data, because some callback may call cs_exit/cs_disconect_client/pthread_exit and then mbuf would be leaked
124 int32_t n
= 0, rc
= 0, i
, idx
, s
;
126 int8_t restart_reader
= 0;
127 while(cl
->thread_active
)
129 cs_ftime(&start
); // register start time
130 while(cl
->thread_active
)
132 if(!cl
|| cl
->kill
|| !is_valid_client(cl
))
134 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
135 cl
->thread_active
= 0;
136 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
137 cs_log_dbg(D_TRACE
, "ending thread (kill)");
138 __free_job_data(cl
, data
);
139 cl
->work_mbuf
= NULL
; // Prevent free_client from freeing mbuf (->work_mbuf)
142 { restart_cardreader(reader
, 0); }
148 if(data
&& data
->action
!= ACTION_READER_CHECK_HEALTH
)
149 { cs_log_dbg(D_TRACE
, "data from add_job action=%d client %c %s", data
->action
, cl
->typ
, username(cl
)); }
153 if(!cl
->kill
&& cl
->typ
!= 'r')
154 { client_check_status(cl
); } // do not call for physical readers as this might cause an endless job loop
155 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
156 if(cl
->joblist
&& ll_count(cl
->joblist
) > 0)
158 LL_ITER itr
= ll_iter_create(cl
->joblist
);
159 data
= ll_iter_next_remove(&itr
);
161 { set_work_thread_name(data
); }
162 //cs_log_dbg(D_TRACE, "start next job from list action=%d", data->action);
164 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
169 /* for serial client cl->pfd is file descriptor for serial port not socket
170 for example: pfd=open("/dev/ttyUSB0"); */
171 if(!cl
->pfd
|| module
->listenertype
== LIS_SERIAL
)
174 pfd
[0].events
= POLLIN
| POLLPRI
;
176 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
177 cl
->thread_active
= 2;
178 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
179 rc
= poll(pfd
, 1, 3000);
180 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
181 cl
->thread_active
= 1;
182 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
185 cs_ftime(&end
); // register end time
186 cs_log_dbg(D_TRACE
, "[OSCAM-WORK] new event %d occurred on fd %d after %"PRId64
" ms inactivity", pfd
[0].revents
,
187 pfd
[0].fd
, comp_timeb(&end
, &start
));
190 cs_ftime(&start
); // register start time for new poll next run
193 { data
->action
= ACTION_READER_REMOTE
; }
198 data
->action
= ACTION_CLIENT_UDP
;
203 { data
->action
= ACTION_CLIENT_TCP
; }
204 if(pfd
[0].revents
& (POLLHUP
| POLLNVAL
| POLLERR
))
213 if(!reader
&& data
->action
< ACTION_CLIENT_FIRST
)
215 __free_job_data(cl
, data
);
222 struct timeb actualtime
;
223 cs_ftime(&actualtime
);
224 int64_t gone
= comp_timeb(&actualtime
, &data
->time
);
225 if(data
!= &tmp_data
&& gone
> (int) cfg
.ctimeout
+1000)
227 cs_log_dbg(D_TRACE
, "dropping client data for %s time %"PRId64
" ms", username(cl
), gone
);
228 __free_job_data(cl
, data
);
232 if(data
!= &tmp_data
)
233 { cl
->work_job_data
= data
; } // Track the current job_data
236 case ACTION_READER_IDLE
:
237 reader_do_idle(reader
);
239 case ACTION_READER_REMOTE
:
240 s
= check_fd_for_data(cl
->pfd
);
241 if(s
== 0) // no data, another thread already read from fd?
245 if(reader
->ph
.type
== MOD_CONN_TCP
)
246 { network_tcp_connection_close(reader
, "disconnect"); }
249 rc
= reader
->ph
.recv(cl
, mbuf
, bufsize
);
252 if(reader
->ph
.type
== MOD_CONN_TCP
)
253 { network_tcp_connection_close(reader
, "disconnect on receive"); }
256 cl
->last
= time(NULL
); // *********************************** TO BE REPLACE BY CS_FTIME() LATER ****************
257 idx
= reader
->ph
.c_recv_chk(cl
, dcw
, &rc
, mbuf
, rc
);
258 if(idx
< 0) { break; } // no dcw received
259 if(!idx
) { idx
= cl
->last_idx
; }
260 reader
->last_g
= time(NULL
); // *********************************** TO BE REPLACE BY CS_FTIME() LATER **************** // for reconnect timeout
261 for(i
= 0, n
= 0; i
< cfg
.max_pending
&& n
== 0; i
++)
263 if(cl
->ecmtask
[i
].idx
== idx
)
266 casc_check_dcw(reader
, i
, rc
, dcw
);
271 case ACTION_READER_RESET
:
272 cardreader_do_reset(reader
);
274 case ACTION_READER_ECM_REQUEST
:
275 reader_get_ecm(reader
, data
->ptr
);
277 case ACTION_READER_EMM
:
278 reader_do_emm(reader
, data
->ptr
);
280 case ACTION_READER_CARDINFO
:
281 reader_do_card_info(reader
);
283 case ACTION_READER_POLL_STATUS
:
284 cardreader_poll_status(reader
);
286 case ACTION_READER_INIT
:
288 { reader_init(reader
); }
290 case ACTION_READER_RESTART
:
294 case ACTION_READER_RESET_FAST
:
295 reader
->card_status
= CARD_NEED_INIT
;
296 cardreader_do_reset(reader
);
298 case ACTION_READER_CHECK_HEALTH
:
299 cardreader_do_checkhealth(reader
);
301 case ACTION_READER_CAPMT_NOTIFY
:
302 if(reader
->ph
.c_capmt
) { reader
->ph
.c_capmt(cl
, data
->ptr
); }
304 case ACTION_CLIENT_UDP
:
305 n
= module
->recv(cl
, data
->ptr
, data
->len
);
307 module
->s_handler(cl
, data
->ptr
, n
);
309 case ACTION_CLIENT_TCP
:
310 s
= check_fd_for_data(cl
->pfd
);
311 if(s
== 0) // no data, another thread already read from fd?
313 if(s
< 0) // system error or fd wants to be closed
315 cl
->kill
= 1; // kill client on next run
318 n
= module
->recv(cl
, mbuf
, bufsize
);
321 cl
->kill
= 1; // kill client on next run
324 module
->s_handler(cl
, mbuf
, n
);
326 case ACTION_CACHEEX1_DELAY
:
327 cacheex_mode1_delay(data
->ptr
);
329 case ACTION_CACHEEX_TIMEOUT
:
330 cacheex_timeout(data
->ptr
);
332 case ACTION_FALLBACK_TIMEOUT
:
333 fallback_timeout(data
->ptr
);
335 case ACTION_CLIENT_TIMEOUT
:
336 ecm_timeout(data
->ptr
);
338 case ACTION_ECM_ANSWER_READER
:
341 case ACTION_ECM_ANSWER_CACHE
:
342 write_ecm_answer_fromcache(data
->ptr
);
344 case ACTION_CLIENT_INIT
:
346 { module
->s_init(cl
); }
347 cl
->is_udp
= module
->type
== MOD_CONN_UDP
;
350 case ACTION_CLIENT_IDLE
:
352 { module
->s_idle(cl
); }
355 cs_log("user %s reached %d sec idle limit.", username(cl
), cfg
.cmaxidle
);
359 case ACTION_CACHE_PUSH_OUT
:
361 cacheex_push_out(cl
, data
->ptr
);
364 case ACTION_CLIENT_KILL
:
367 case ACTION_CLIENT_SEND_MSG
:
369 if (config_enabled(MODULE_CCCAM
))
371 struct s_clientmsg
*clientmsg
= (struct s_clientmsg
*)data
->ptr
;
372 cc_cmd_send(cl
, clientmsg
->msg
, clientmsg
->len
, clientmsg
->cmd
);
376 case ACTION_PEER_IDLE
:
377 if(module
->s_peer_idle
)
378 { module
->s_peer_idle(cl
); }
382 __free_job_data(cl
, data
);
385 if(thread_pipe
[1] && (mbuf
[0] != 0x00))
387 cs_log_dump_dbg(D_TRACE
, mbuf
, 1, "[OSCAM-WORK] Write to pipe:");
388 if(write(thread_pipe
[1], mbuf
, 1) == -1) // wakeup client check
390 cs_log_dbg(D_TRACE
, "[OSCAM-WORK] Writing to pipe failed (errno=%d %s)", errno
, strerror(errno
));
394 // Check for some race condition where while we ended, another thread added a job
395 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
396 if(cl
->joblist
&& ll_count(cl
->joblist
) > 0)
398 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
403 cl
->thread_active
= 0;
404 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
408 cl
->thread_active
= 0;
409 cl
->work_mbuf
= NULL
; // Prevent free_client from freeing mbuf (->work_mbuf)
416 * adds a job to the job queue
417 * if ptr should be free() after use, set len to the size
420 int32_t add_job(struct s_client
*cl
, enum actions action
, void *ptr
, int32_t len
)
425 { cs_log("WARNING: add_job failed. Client killed!"); } // Ignore jobs for killed clients
431 if(action
== ACTION_CACHE_PUSH_OUT
&& cacheex_check_queue_length(cl
))
438 struct job_data
*data
;
439 if(!cs_malloc(&data
, sizeof(struct job_data
)))
446 data
->action
= action
;
450 cs_ftime(&data
->time
);
452 SAFE_MUTEX_LOCK(&cl
->thread_lock
);
453 if(cl
&& !cl
->kill
&& cl
->thread_active
)
456 { cl
->joblist
= ll_create("joblist"); }
457 ll_append(cl
->joblist
, data
);
458 if(cl
->thread_active
== 2)
459 { pthread_kill(cl
->thread
, OSCAM_SIGNAL_WAKEUP
); }
460 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);
461 cs_log_dbg(D_TRACE
, "add %s job action %d queue length %d %s",
462 action
> ACTION_CLIENT_FIRST
? "client" : "reader", action
,
463 ll_count(cl
->joblist
), username(cl
));
467 /* pcsc doesn't like this; segfaults on x86, x86_64 */
468 int8_t modify_stacksize
= 0;
469 struct s_reader
*rdr
= cl
->reader
;
470 if(cl
->typ
!= 'r' || !rdr
|| rdr
->typ
!= R_PCSC
)
471 { modify_stacksize
= 1; }
473 if(action
!= ACTION_READER_CHECK_HEALTH
)
475 cs_log_dbg(D_TRACE
, "start %s thread action %d",
476 action
> ACTION_CLIENT_FIRST
? "client" : "reader", action
);
479 int32_t ret
= start_thread("client work", work_thread
, (void *)data
, &cl
->thread
, 1, modify_stacksize
);
482 cs_log("ERROR: can't create thread for %s (errno=%d %s)",
483 action
> ACTION_CLIENT_FIRST
? "client" : "reader", ret
, strerror(ret
));
487 cl
->thread_active
= 1;
488 SAFE_MUTEX_UNLOCK(&cl
->thread_lock
);