- fix Building without Nagra not possible at Nagra_Merlin https://trac.streamboard...
[oscam.git] / module-stat.c
blobee179b7942177ac2c7df3dad8875bcf874b8e9c1
1 #define MODULE_LOG_PREFIX "stat"
3 #include "globals.h"
5 #ifdef WITH_LB
6 #include "cscrypt/md5.h"
7 #include "module-cacheex.h"
8 #include "module-cccam.h"
9 #include "oscam-array.h"
10 #include "oscam-cache.h"
11 #include "oscam-conf-chk.h"
12 #include "oscam-chk.h"
13 #include "oscam-client.h"
14 #include "oscam-ecm.h"
15 #include "oscam-files.h"
16 #include "oscam-lock.h"
17 #include "oscam-string.h"
18 #include "oscam-time.h"
20 #define UNDEF_AVG_TIME 99999 // NOT set here 0 or small value! Could cause there reader get selected
21 #define MAX_ECM_SEND_CACHE 16
23 #define LB_NONE 0
24 #define LB_FASTEST_READER_FIRST 1
25 #define LB_OLDEST_READER_FIRST 2
26 #define LB_LOWEST_USAGELEVEL 3
28 #define DEFAULT_LOCK_TIMEOUT 1000000
30 extern CS_MUTEX_LOCK ecmcache_lock;
31 extern struct ecm_request_t *ecmcwcache;
33 static int32_t stat_load_save;
35 static struct timeb last_housekeeping;
37 void init_stat(void)
39 stat_load_save = -100;
41 //checking config
42 if(cfg.lb_nbest_readers < 2)
43 { cfg.lb_nbest_readers = DEFAULT_NBEST; }
44 if(cfg.lb_nfb_readers < 2)
45 { cfg.lb_nfb_readers = DEFAULT_NFB; }
46 if(cfg.lb_min_ecmcount < 2)
47 { cfg.lb_min_ecmcount = DEFAULT_MIN_ECM_COUNT; }
48 if(cfg.lb_max_ecmcount < 3)
49 { cfg.lb_max_ecmcount = DEFAULT_MAX_ECM_COUNT; }
50 if(cfg.lb_reopen_seconds < 10)
51 { cfg.lb_reopen_seconds = DEFAULT_REOPEN_SECONDS; }
52 if(cfg.lb_retrylimit < 0)
53 { cfg.lb_retrylimit = DEFAULT_RETRYLIMIT; }
54 if(cfg.lb_stat_cleanup <= 0)
55 { cfg.lb_stat_cleanup = DEFAULT_LB_STAT_CLEANUP; }
58 #define LINESIZE 1024
60 static uint32_t get_prid(uint16_t caid, uint32_t prid)
62 int32_t i;
63 for(i = 0; i < cfg.lb_noproviderforcaid.ctnum; i++)
65 CAIDTAB_DATA *d = &cfg.lb_noproviderforcaid.ctdata[i];
66 uint16_t tcaid = d->caid;
67 if(!tcaid) { break; }
68 if((tcaid == caid) || (tcaid < 0x0100 && (caid >> 8) == tcaid))
70 prid = 0;
71 break;
75 return prid;
78 static void get_stat_query(ECM_REQUEST *er, STAT_QUERY *q)
80 memset(q, 0, sizeof(STAT_QUERY));
82 q->caid = er->caid;
83 q->prid = get_prid(er->caid, er->prid);
84 q->srvid = er->srvid;
85 q->chid = er->chid;
86 q->ecmlen = er->ecmlen;
89 void load_stat_from_file(void)
91 stat_load_save = 0;
92 char buf[256];
93 char *line;
94 char *fname;
95 FILE *file;
97 if(!cfg.lb_savepath)
99 get_tmp_dir_filename(buf, sizeof(buf), "stat");
100 fname = buf;
102 else
103 { fname = cfg.lb_savepath; }
105 file = fopen(fname, "r");
106 if(!file)
108 cs_log_dbg(D_LB, "loadbalancer: could not open %s for reading (errno=%d %s)", fname, errno, strerror(errno));
109 return;
112 if(!cs_malloc(&line, LINESIZE))
114 fclose(file);
115 return;
118 cs_log_dbg(D_LB, "loadbalancer: load statistics from %s", fname);
120 struct timeb ts, te;
121 cs_ftime(&ts);
123 struct s_reader *rdr = NULL;
124 READER_STAT *s;
126 int32_t i = 1;
127 int32_t valid = 0;
128 int32_t count = 0;
129 int32_t type = 0;
130 char *ptr, *saveptr1 = NULL;
131 char *split[12];
133 while(fgets(line, LINESIZE, file))
135 if(!line[0] || line[0] == '#' || line[0] == ';')
136 { continue; }
138 if(!cs_malloc(&s, sizeof(READER_STAT)))
139 { continue; }
141 //get type by evaluating first line:
142 if(type == 0)
144 if(strstr(line, " rc ")) { type = 2; }
145 else { type = 1; }
148 if(type == 1) // New format - faster parsing:
150 for(i = 0, ptr = strtok_r(line, ",", &saveptr1); ptr && i < 12 ; ptr = strtok_r(NULL, ",", &saveptr1), i++)
151 { split[i] = ptr; }
152 valid = (i == 11);
153 if(valid)
155 cs_strncpy(buf, split[0], sizeof(buf));
156 s->rc = atoi(split[1]);
157 s->caid = a2i(split[2], 4);
158 s->prid = a2i(split[3], 6);
159 s->srvid = a2i(split[4], 4);
160 s->chid = a2i(split[5], 4);
161 s->time_avg = atoi(split[6]);
162 s->ecm_count = atoi(split[7]);
163 s->last_received.time = atol(split[8]);
164 s->fail_factor = atoi(split[9]);
165 s->ecmlen = a2i(split[10], 2);
168 else // Old format - keep for compatibility:
170 i = sscanf(line, "%255s rc %04d caid %04hX prid %06X srvid %04hX time avg %d ms ecms %d last %ld fail %d len %02hX\n",
171 buf, &s->rc, &s->caid, &s->prid, &s->srvid,
172 &s->time_avg, &s->ecm_count, &s->last_received.time, &s->fail_factor, &s->ecmlen);
173 valid = i > 5;
176 if(valid && s->ecmlen > 0)
178 if(rdr == NULL || strcmp(buf, rdr->label) != 0)
180 LL_ITER itr = ll_iter_create(configured_readers);
181 while((rdr = ll_iter_next(&itr)))
183 if(strcmp(rdr->label, buf) == 0)
185 break;
190 if(rdr != NULL && strcmp(buf, rdr->label) == 0)
192 if(!rdr->lb_stat)
194 rdr->lb_stat = ll_create("lb_stat");
195 cs_lock_create(__func__, &rdr->lb_stat_lock, rdr->label, DEFAULT_LOCK_TIMEOUT);
198 ll_append(rdr->lb_stat, s);
199 count++;
201 else
203 cs_log("loadbalancer: statistics could not be loaded for %s", buf);
204 NULLFREE(s);
207 else
209 cs_log_dbg(D_LB, "loadbalancer: statistics ERROR: %s rc=%d i=%d", buf, s->rc, i);
210 NULLFREE(s);
213 fclose(file);
214 NULLFREE(line);
216 cs_ftime(&te);
217 #ifdef WITH_DEBUG
218 int64_t load_time = comp_timeb(&te, &ts);
220 cs_log_dbg(D_LB, "loadbalancer: statistics loaded %d records in %"PRId64" ms", count, load_time);
221 #endif
224 void lb_destroy_stats(struct s_reader *rdr)
226 if(!rdr->lb_stat)
227 return;
228 cs_lock_destroy(__func__, &rdr->lb_stat_lock);
229 ll_destroy_data(&rdr->lb_stat);
233 * get statistic values for reader ridx and caid/prid/srvid/ecmlen
235 static READER_STAT *get_stat_lock(struct s_reader *rdr, STAT_QUERY *q, int8_t lock)
237 if(!rdr->lb_stat)
239 rdr->lb_stat = ll_create("lb_stat");
240 cs_lock_create(__func__, &rdr->lb_stat_lock, rdr->label, DEFAULT_LOCK_TIMEOUT);
243 if(lock) { cs_readlock(__func__, &rdr->lb_stat_lock); }
245 LL_ITER it = ll_iter_create(rdr->lb_stat);
246 READER_STAT *s;
247 int32_t i = 0;
248 while((s = ll_iter_next(&it)))
250 i++;
251 if(s->caid == q->caid && s->prid == q->prid && s->srvid == q->srvid && s->chid == q->chid)
253 if(s->ecmlen == q->ecmlen)
254 { break; }
255 if(!s->ecmlen)
257 s->ecmlen = q->ecmlen;
258 break;
260 if(!q->ecmlen) // Query without ecmlen from dvbapi
261 { break; }
264 if(lock) { cs_readunlock(__func__, &rdr->lb_stat_lock); }
266 // Move stat to list start for faster access:
267 if (i > 10 && s && !rdr->lb_stat_busy) {
268 if (lock) cs_writelock(__func__, &rdr->lb_stat_lock);
269 ll_iter_move_first(&it);
270 if (lock) cs_writeunlock(__func__, &rdr->lb_stat_lock);
273 return s;
277 * get statistic values for reader ridx and caid/prid/srvid/ecmlen
279 static READER_STAT *get_stat(struct s_reader *rdr, STAT_QUERY *q)
281 return get_stat_lock(rdr, q, 1);
285 * Calculates average time
287 static void calc_stat(READER_STAT *s)
289 int32_t i, c = 0, t = 0;
290 for(i = 0; i < LB_MAX_STAT_TIME; i++)
292 if(s->time_stat[i] > 0)
294 t += (int32_t)s->time_stat[i];
295 c++;
298 if(!c)
299 { s->time_avg = UNDEF_AVG_TIME; }
300 else
301 { s->time_avg = t / c; }
305 * Saves statistik to /tmp/.oscam/stat.n where n is reader-index
307 static void save_stat_to_file_thread(void)
309 stat_load_save = 0;
310 char buf[256];
312 set_thread_name(__func__);
314 char *fname;
315 if(!cfg.lb_savepath)
317 get_tmp_dir_filename(buf, sizeof(buf), "stat");
318 fname = buf;
320 else
321 { fname = cfg.lb_savepath; }
323 FILE *file = fopen(fname, "w");
325 if(!file)
327 cs_log("can't write to file %s", fname);
328 return;
331 struct timeb ts, te;
332 cs_ftime(&ts);
334 int32_t cleanup_timeout = (cfg.lb_stat_cleanup * 60 * 60 * 1000);
336 int32_t count = 0;
337 struct s_reader *rdr;
338 LL_ITER itr = ll_iter_create(configured_readers);
339 while((rdr = ll_iter_next(&itr)))
342 if(rdr->lb_stat)
344 rdr->lb_stat_busy = 1;
346 cs_writelock(__func__, &rdr->lb_stat_lock);
347 LL_ITER it = ll_iter_create(rdr->lb_stat);
348 READER_STAT *s;
349 while((s = ll_iter_next(&it)))
351 int64_t gone = comp_timeb(&ts, &s->last_received);
352 if(gone > cleanup_timeout || !s->ecmlen) // cleanup old stats
354 ll_iter_remove_data(&it);
355 continue;
358 //Old version, too slow to parse:
359 //fprintf(file, "%s rc %d caid %04hX prid %06X srvid %04hX time avg %d ms ecms %d last %ld fail %d len %02hX\n",
360 // rdr->label, s->rc, s->caid, s->prid,
361 // s->srvid, s->time_avg, s->ecm_count, s->last_received, s->fail_factor, s->ecmlen);
363 //New version:
364 fprintf(file, "%s,%d,%04hX,%06X,%04hX,%04hX,%d,%d,%ld,%d,%02hX\n",
365 rdr->label, s->rc, s->caid, s->prid,
366 s->srvid, (uint16_t)s->chid, s->time_avg, s->ecm_count, s->last_received.time, s->fail_factor, s->ecmlen);
368 count++;
369 //if(count % 500 == 0) { // Saving stats is using too much cpu and causes high file load. so we need a break
370 // cs_readunlock(__func__, &rdr->lb_stat_lock);
371 // cs_sleepms(100);
372 // cs_readlock(__func__, &rdr->lb_stat_lock);
375 cs_writeunlock(__func__, &rdr->lb_stat_lock);
377 rdr->lb_stat_busy = 0;
381 fclose(file);
383 cs_ftime(&te);
384 int64_t load_time = comp_timeb(&te, &ts);
387 cs_log("loadbalancer: statistic saved %d records to %s in %"PRId64" ms", count, fname, load_time);
390 void save_stat_to_file(int32_t thread)
392 stat_load_save = 0;
393 if(thread)
394 { start_thread("save lb stats", (void *)&save_stat_to_file_thread, NULL, NULL, 1, 1); }
395 else
396 { save_stat_to_file_thread(); }
400 * fail_factor is multiplied to the reopen_time. This function increases the fail_factor
402 static void inc_fail(READER_STAT *s)
404 if(s->fail_factor <= 0)
405 { s->fail_factor = 1; }
406 else
407 { s->fail_factor++; } // inc by one at the time
410 static READER_STAT *get_add_stat(struct s_reader *rdr, STAT_QUERY *q)
412 if (rdr->lb_stat_busy)
413 return NULL;
415 if(!rdr->lb_stat)
417 rdr->lb_stat = ll_create("lb_stat");
418 cs_lock_create(__func__, &rdr->lb_stat_lock, rdr->label, DEFAULT_LOCK_TIMEOUT);
421 cs_writelock(__func__, &rdr->lb_stat_lock);
423 READER_STAT *s = get_stat_lock(rdr, q, 0);
424 if(!s)
426 if(cs_malloc(&s, sizeof(READER_STAT)))
428 s->caid = q->caid;
429 s->prid = q->prid;
430 s->srvid = q->srvid;
431 s->chid = q->chid;
432 s->ecmlen = q->ecmlen;
433 s->time_avg = UNDEF_AVG_TIME; // dummy placeholder
434 s->rc = E_FOUND; // set to found--> do not change!
435 cs_ftime(&s->last_received);
436 s->fail_factor = 0;
437 s->ecm_count = 0;
438 ll_prepend(rdr->lb_stat, s);
441 cs_writeunlock(__func__, &rdr->lb_stat_lock);
443 return s;
446 static void housekeeping_stat(int32_t force);
448 void readerinfofix_get_stat_query(ECM_REQUEST *er, STAT_QUERY *q)
450 get_stat_query(er, q);
453 void readerinfofix_inc_fail(READER_STAT *s)
455 inc_fail(s);
458 READER_STAT *readerinfofix_get_add_stat(struct s_reader *rdr, STAT_QUERY *q)
460 return get_add_stat(rdr, q);
463 static int32_t get_reopen_seconds(READER_STAT *s)
465 int32_t max = (INT_MAX / cfg.lb_reopen_seconds);
466 if(max > 9999) { max = 9999; }
467 if(s->fail_factor > max)
468 { s->fail_factor = max; }
469 if(!s->fail_factor)
470 { return cfg.lb_reopen_seconds; }
471 return s->fail_factor * cfg.lb_reopen_seconds;
475 * Adds caid/prid/srvid/ecmlen to stat-list for reader ridx with time/rc
477 static void add_stat(struct s_reader *rdr, ECM_REQUEST *er, int32_t ecm_time, int32_t rc, uint8_t rcEx)
479 //inc ecm_count if found, drop to 0 if not found:
480 // rc codes:
481 // 0 = found +
482 // 1 = cache1 #
483 // 2 = cache2 #
484 // 3 = cacheex #
485 // 4 = not found -
486 // 5 = timeout -
487 // 6 = sleeping #
488 // 7 = fake -
489 // 8 = invalid -
490 // 9 = corrupt #
491 // 10= no card #
492 // 11= expdate #
493 // 12= disabled #
494 // 13= stopped #
495 // 100= unhandled #
496 // + = adds statistic values
497 // # = ignored because of duplicate values, temporary failures or softblocks
498 // - = causes loadbalancer to block this reader for this caid/prov/sid
501 if(!rdr || !er || !cfg.lb_mode || !er->ecmlen || !er->client || rdr->lb_stat_busy)
502 { return; }
504 struct s_client *cl = rdr->client;
505 if(!check_client(cl))
506 { return; }
509 // IGNORE stats for fallback reader with lb_force_fallback parameter
510 if(chk_is_fixed_fallback(rdr, er) && rdr->lb_force_fallback)
511 { return; }
514 // IGNORE fails for ratelimit check
515 if(rc == E_NOTFOUND && rcEx == E2_RATELIMIT)
517 #ifdef WITH_DEBUG
518 if((D_LB & cs_dblevel))
520 char buf[ECM_FMT_LEN];
521 format_ecm(er, buf, ECM_FMT_LEN);
522 cs_log_dbg(D_LB, "loadbalancer: NOT adding stat (blocking) for reader %s because fails ratelimit checks!", rdr->label);
524 #endif
525 return;
529 // IGNORE fails when reader has positive services defined in new lb_whitelist_services parameter! See ticket #3310,#3311
530 if(rc >= E_NOTFOUND && has_lb_srvid(cl, er))
532 #ifdef WITH_DEBUG
533 if((D_LB & cs_dblevel))
535 char buf[ECM_FMT_LEN];
536 format_ecm(er, buf, ECM_FMT_LEN);
537 cs_log_dbg(D_LB, "loadbalancer: NOT adding stat (blocking) for reader %s because has positive srvid: rc %d %s time %d ms",
538 rdr->label, rc, buf, ecm_time);
540 #endif
541 return;
545 // IGNORE fails for sleep CMD08
546 if(rc == E_NOTFOUND && rdr->client->stopped==2)
548 #ifdef WITH_DEBUG
549 if((D_LB & cs_dblevel))
551 char buf[ECM_FMT_LEN];
552 format_ecm(er, buf, ECM_FMT_LEN);
553 cs_log_dbg(D_LB, "loadbalancer: NOT adding stat (no block) for reader %s because CMD08 sleep command!", rdr->label);
555 #endif
556 return;
559 // IGNORE timeouts on local readers (they could be busy handling an emm or entitlement refresh)
560 if(rc == E_TIMEOUT && !is_network_reader(rdr))
562 #ifdef WITH_DEBUG
563 if((D_LB & cs_dblevel))
565 cs_log_dbg(D_LB, "loadbalancer: NOT adding stat (no block) for reader %s because timeout on local reader", rdr->label);
567 #endif
568 return;
571 // IGNORE unhandled ecmresponses
572 if(rc == E_UNHANDLED)
574 #ifdef WITH_DEBUG
575 if((D_LB & cs_dblevel))
577 cs_log_dbg(D_LB, "loadbalancer: NOT adding stat (no block) for reader %s because unhandled reponse", rdr->label);
579 #endif
580 return;
583 // ignore too old ecms
584 if((uint32_t)ecm_time >= 3 * cfg.ctimeout)
585 { return; }
587 if((uint32_t)ecm_time >= cfg.ctimeout)
588 { rc = E_TIMEOUT;}
590 STAT_QUERY q;
591 get_stat_query(er, &q);
592 READER_STAT *s;
593 s = get_add_stat(rdr, &q);
594 if (!s) return;
596 struct timeb now;
597 cs_ftime(&now);
599 cs_ftime(&s->last_received);
601 if(rc == E_FOUND) // found
604 s->rc = E_FOUND;
605 s->ecm_count++;
606 s->fail_factor = 0;
608 // FASTEST READER:
609 s->time_idx++;
610 if(s->time_idx >= LB_MAX_STAT_TIME)
611 { s->time_idx = 0; }
612 s->time_stat[s->time_idx] = ecm_time;
613 calc_stat(s);
615 // OLDEST READER now set by get best reader!
618 // USAGELEVEL:
619 /* Assign a value to rdr->lb_usagelevel_ecmcount,
620 because no determined value was assigned before. */
621 if(rdr->lb_usagelevel_ecmcount < 0)
622 { rdr->lb_usagelevel_ecmcount = 0; }
624 rdr->lb_usagelevel_ecmcount++; /* ecm is found so counter should increase */
625 if((rdr->lb_usagelevel_ecmcount % cfg.lb_min_ecmcount) == 0) //update every MIN_ECM_COUNT usagelevel:
627 int64_t t = comp_timeb(&now, &rdr->lb_usagelevel_time) / 1000;
628 rdr->lb_usagelevel = cfg.lb_min_ecmcount * 1000 / (t < 1 ? 1 : t);
629 /* Reset of usagelevel time and counter */
630 rdr->lb_usagelevel_time = now;
631 rdr->lb_usagelevel_ecmcount = 0;
635 else if(rc == E_NOTFOUND || rc == E_TIMEOUT || rc == E_FAKE) // not found / timeout /fake
637 inc_fail(s);
638 s->rc = rc;
640 else if(rc == E_INVALID) // invalid
642 s->rc = rc;
644 else
646 #ifdef WITH_DEBUG
647 if(rc >= E_FOUND && (D_LB & cs_dblevel))
649 char buf[ECM_FMT_LEN];
650 format_ecm(er, buf, ECM_FMT_LEN);
651 cs_log_dbg(D_LB, "loadbalancer: not handled stat for reader %s: rc %d %s time %d ms",
652 rdr->label, rc, buf, ecm_time);
654 #endif
655 return;
658 housekeeping_stat(0);
660 #ifdef WITH_DEBUG
661 if(D_LB & cs_dblevel)
663 char buf[ECM_FMT_LEN];
664 format_ecm(er, buf, ECM_FMT_LEN);
665 cs_log_dbg(D_LB, "loadbalancer: adding stat for reader %s: rc %d %s time %d ms fail %d",
666 rdr->label, rc, buf, ecm_time, s->fail_factor);
668 #endif
670 if(cfg.lb_save)
672 stat_load_save++;
673 if(stat_load_save > cfg.lb_save)
674 { save_stat_to_file(1); }
679 int32_t clean_stat_by_rc(struct s_reader *rdr, int8_t rc, int8_t inverse)
681 int32_t count = 0;
682 if(rdr && rdr->lb_stat)
684 if (rdr->lb_stat_busy) return 0;
685 rdr->lb_stat_busy = 1;
686 cs_writelock(__func__, &rdr->lb_stat_lock);
687 READER_STAT *s;
688 LL_ITER itr = ll_iter_create(rdr->lb_stat);
689 while((s = ll_iter_next(&itr)))
691 if((!inverse && s->rc == rc) || (inverse && s->rc != rc))
693 ll_iter_remove_data(&itr);
694 count++;
697 cs_writeunlock(__func__, &rdr->lb_stat_lock);
698 rdr->lb_stat_busy = 0;
700 return count;
703 int32_t clean_all_stats_by_rc(int8_t rc, int8_t inverse)
705 int32_t count = 0;
706 LL_ITER itr = ll_iter_create(configured_readers);
707 struct s_reader *rdr;
708 while((rdr = ll_iter_next(&itr)))
710 count += clean_stat_by_rc(rdr, rc, inverse);
712 save_stat_to_file(0);
713 return count;
716 int32_t clean_stat_by_id(struct s_reader *rdr, uint16_t caid, uint32_t prid, uint16_t srvid, uint16_t chid, uint16_t ecmlen)
718 int32_t count = 0;
719 if(rdr && rdr->lb_stat)
721 if (rdr->lb_stat_busy) return 0;
723 rdr->lb_stat_busy = 1;
724 cs_writelock(__func__, &rdr->lb_stat_lock);
725 READER_STAT *s;
726 LL_ITER itr = ll_iter_create(rdr->lb_stat);
727 while((s = ll_iter_next(&itr)))
729 if(s->caid == caid &&
730 s->prid == prid &&
731 s->srvid == srvid &&
732 s->chid == chid &&
733 s->ecmlen == ecmlen)
735 ll_iter_remove_data(&itr);
736 count++;
737 break; // because the entry should unique we can left here
740 cs_writeunlock(__func__, &rdr->lb_stat_lock);
741 rdr->lb_stat_busy = 0;
743 return count;
747 static int32_t has_ident(FTAB *ftab, ECM_REQUEST *er) {
749 if (!ftab || !ftab->filts)
750 return 0;
752 int32_t j, k;
754 for (j = 0; j < ftab->nfilts; j++) {
755 if (ftab->filts[j].caid) {
756 if (ftab->filts[j].caid==er->caid) { //caid matches!
757 int32_t nprids = ftab->filts[j].nprids;
758 if (!nprids) // No Provider ->Ok
759 return 1;
761 for (k = 0; k < nprids; k++) {
762 uint32_t prid = ftab->filts[j].prids[k];
763 if (prid == er->prid) { //Provider matches
764 return 1;
770 return 0; //No match!
773 static int32_t get_retrylimit(ECM_REQUEST *er)
775 return caidvaluetab_get_value(&cfg.lb_retrylimittab, er->caid, cfg.lb_retrylimit);
778 static int32_t get_nfb_readers(ECM_REQUEST *er)
780 int32_t nfb_readers = er->client->account->lb_nfb_readers == -1 ? cfg.lb_nfb_readers : er->client->account->lb_nfb_readers;
782 if(nfb_readers <= 0) { nfb_readers = 1; }
784 return nfb_readers;
787 static int32_t get_nbest_readers(ECM_REQUEST *er)
789 int32_t nbest_readers = er->client->account->lb_nbest_readers == -1 ? cfg.lb_nbest_readers : er->client->account->lb_nbest_readers;
790 CAIDVALUETAB *nbest_readers_tab = er->client->account->lb_nbest_readers_tab.cvnum == 0 ? &cfg.lb_nbest_readers_tab : &er->client->account->lb_nbest_readers_tab;
791 if(nbest_readers <= 0) { nbest_readers = 1; }
792 return caidvaluetab_get_value(nbest_readers_tab, er->caid, nbest_readers);
795 static void convert_to_beta_int(ECM_REQUEST *er, uint16_t caid_to)
797 uint8_t md5tmp[MD5_DIGEST_LENGTH];
798 convert_to_beta(er->client, er, caid_to);
799 // update ecmd5 for store ECM in cache
800 memcpy(er->ecmd5, MD5(er->ecm + 13, er->ecmlen - 13, md5tmp), CS_ECMSTORESIZE);
801 cacheex_update_hash(er);
802 er->btun = 2; // marked as auto-betatunnel converted. Also for fixing recursive lock in get_cw
805 static void convert_to_nagra_int(ECM_REQUEST *er, uint16_t caid_to)
807 uint8_t md5tmp[MD5_DIGEST_LENGTH];
808 convert_to_nagra(er->client, er, caid_to);
809 // update ecmd5 for store ECM in cache
810 memcpy(er->ecmd5, MD5(er->ecm + 3, er->ecmlen - 3, md5tmp), CS_ECMSTORESIZE);
811 cacheex_update_hash(er);
812 er->btun = 2; // marked as auto-betatunnel converted. Also for fixing recursive lock in get_cw
815 static int32_t lb_valid_btun(ECM_REQUEST *er, uint16_t caidto)
817 STAT_QUERY q;
818 READER_STAT *s;
819 struct s_reader *rdr;
821 get_stat_query(er, &q);
822 q.caid = caidto;
824 cs_readlock(__func__, &readerlist_lock);
825 for(rdr = first_active_reader; rdr ; rdr = rdr->next)
827 if(rdr->lb_stat && rdr->client)
829 s = get_stat(rdr, &q);
830 if(s && s->rc == E_FOUND)
832 cs_readunlock(__func__, &readerlist_lock);
833 return 1;
837 cs_readunlock(__func__, &readerlist_lock);
838 return 0;
841 static uint16_t __lb_get_betatunnel_caid_to(uint16_t caid)
843 int32_t lbbm = cfg.lb_auto_betatunnel_mode;
844 if(lbbm <= 3)
846 if(caid == 0x1801) { return 0x1722; }
847 if(caid == 0x1833) { return 0x1702; }
848 if(caid == 0x1834) { return 0x1722; }
849 if(caid == 0x1835) { return 0x1722; }
851 if(lbbm >= 1)
853 if(caid == 0x1702) { return 0x1833; }
855 if(lbbm == 1 || lbbm == 4)
857 if(caid == 0x1722) { return 0x1801; }
859 else if(lbbm == 2 || lbbm == 5)
861 if(caid == 0x1722) { return 0x1834; }
863 else if(lbbm == 3 || lbbm == 6)
865 if(caid == 0x1722) { return 0x1835; }
867 return 0;
870 uint16_t lb_get_betatunnel_caid_to(ECM_REQUEST *er)
872 if(!cfg.lb_auto_betatunnel)
873 return 0;
874 uint16_t caidto = __lb_get_betatunnel_caid_to(er->caid);
875 if(lb_valid_btun(er, caidto))
876 return caidto;
877 return 0;
880 void check_lb_auto_betatunnel_mode(ECM_REQUEST *er)
882 int32_t lbbm = cfg.lb_auto_betatunnel_mode;
883 if(lbbm == 1 || lbbm == 4)
885 er->caid = 0x1801;
887 else if(lbbm == 2 || lbbm == 5)
889 er->caid = 0x1834;
891 else if(lbbm == 3 || lbbm == 6)
893 er->caid = 0x1835;
895 // no other way to autodetect 1801, 1834 or 1835
898 uint16_t get_rdr_caid(struct s_reader *rdr)
900 if(is_network_reader(rdr) || rdr->typ == R_EMU)
902 return 0; // reader caid is not real caid
904 else
906 return rdr->caid;
910 static void reset_ecmcount_reader(READER_STAT *s, struct s_reader *rdr)
912 cs_readlock(__func__, &rdr->lb_stat_lock);
913 if(rdr->lb_stat && rdr->client)
915 if(s)
917 s->ecm_count = 0;
920 cs_readunlock(__func__, &rdr->lb_stat_lock);
923 static void reset_avgtime_reader(READER_STAT *s, struct s_reader *rdr)
925 cs_readlock(__func__, &rdr->lb_stat_lock);
926 if(rdr->lb_stat && rdr->client)
928 if(!s) { return; }
929 int32_t i;
930 for(i = 0; i < LB_MAX_STAT_TIME; i++)
932 if(s->time_stat[i] > 0) { s->time_stat[i] = 0; }
934 s->time_avg = UNDEF_AVG_TIME;
936 cs_readunlock(__func__, &rdr->lb_stat_lock);
939 /* force_reopen=1 -> force opening of block readers
940 * force_reopen=0 -> no force opening of block readers, use reopen_seconds
942 static void try_open_blocked_readers(ECM_REQUEST *er, STAT_QUERY *q, int32_t *max_reopen, int32_t *force_reopen)
944 struct s_ecm_answer *ea;
945 READER_STAT *s;
946 struct s_reader *rdr;
948 for(ea = er->matching_rdr; ea; ea = ea->next)
950 if((ea->status & READER_FALLBACK) || (ea->status & READER_ACTIVE)) { continue; }
951 rdr = ea->reader;
952 s = get_stat(rdr, q);
953 if(!s) { continue; }
955 if(!cfg.lb_reopen_invalid && s->rc == E_INVALID){
956 cs_log_dbg(D_LB, "loadbalancer: reader %s blocked because INVALID sent! It will be blocked until stats cleaned!", rdr->label);
957 continue;
960 // if force_reopen we must active the "valid" reader
961 if(s->rc != E_FOUND && (*force_reopen) && cfg.lb_force_reopen_always)
963 cs_log_dbg(D_LB, "loadbalancer: force opening reader %s and reset fail_factor! --> ACTIVE", rdr->label);
964 ea->status |= READER_ACTIVE;
965 s->fail_factor = 0;
966 continue;
969 //active readers reach get_reopen_seconds(s)
970 struct timeb now;
971 cs_ftime(&now);
972 int64_t gone = comp_timeb(&now, &s->last_received);
973 int32_t reopenseconds = get_reopen_seconds(s);
974 if(s->rc != E_FOUND && gone > reopenseconds*1000 )
976 if(*max_reopen)
978 cs_log_dbg(D_LB, "loadbalancer: reader %s reaches %d seconds for reopening (fail_factor %d) --> ACTIVE", rdr->label, reopenseconds, s->fail_factor);
979 ea->status |= READER_ACTIVE;
980 (*max_reopen)--;
982 else
984 cs_log_dbg(D_LB, "loadbalancer: reader %s reaches %d seconds for reopening (fail_factor %d), but max_reopen reached!", rdr->label, reopenseconds, s->fail_factor);
986 continue;
989 if(s->rc != E_FOUND) // for debug output
991 cs_log_dbg(D_LB, "loadbalancer: reader %s blocked for %d seconds (fail_factor %d), retrying in %d seconds", rdr->label, get_reopen_seconds(s), s->fail_factor, (uint) (reopenseconds - (gone/1000)));
992 continue;
995 if(s->rc == E_FOUND) // for debug output
996 { cs_log_dbg(D_LB, "loadbalancer: reader %s \"e_found\" but not selected for lbvalue check", rdr->label); }
1002 * Gets best reader for caid/prid/srvid/ecmlen.
1003 * Best reader is evaluated by lowest avg time but only if ecm_count > cfg.lb_min_ecmcount (5)
1004 * Also the reader is asked if he is "available"
1005 * returns ridx when found or -1 when not found
1007 void stat_get_best_reader(ECM_REQUEST *er)
1009 if(!cfg.lb_mode || cfg.lb_mode > 3)
1010 { return; }
1012 if(!er->reader_avail)
1013 { return; }
1015 struct s_reader *rdr;
1016 struct s_ecm_answer *ea;
1018 //preferred card forwarding (CCcam client):
1019 if(cccam_forward_origin_card(er))
1020 { return; }
1022 STAT_QUERY q;
1023 get_stat_query(er, &q);
1025 // auto-betatunnel: The trick is: "let the loadbalancer decide"!
1026 if(cfg.lb_auto_betatunnel && caid_is_nagra(er->caid) && er->ecmlen) // nagra
1028 uint16_t caid_to = __lb_get_betatunnel_caid_to(er->caid);
1029 if(caid_to)
1031 int8_t needs_stats_nagra = 1, needs_stats_beta = 1;
1033 // Clone query parameters for beta:
1034 STAT_QUERY qbeta = q;
1035 qbeta.caid = caid_to;
1036 qbeta.prid = 0;
1037 qbeta.ecmlen = er->ecm[2] + 3 + 10;
1039 int32_t time_nagra = 0;
1040 int32_t time_beta = 0;
1041 int32_t weight;
1042 int32_t ntime;
1044 READER_STAT *stat_nagra = NULL;
1045 READER_STAT *stat_beta = NULL;
1047 // What is faster? nagra or beta?
1048 int8_t isn;
1049 int8_t isb;
1050 int8_t overall_valid = 0;
1051 int8_t overall_nvalid = 0;
1052 for(ea = er->matching_rdr; ea; ea = ea->next)
1054 isn = 0;
1055 isb = 0;
1056 rdr = ea->reader;
1057 weight = rdr->lb_weight;
1058 if(weight <= 0) { weight = 1; }
1060 // Check if betatunnel is allowed on this reader:
1061 int8_t valid = chk_ctab(caid_to, &rdr->ctab) //Check caid
1062 && chk_rfilter2(caid_to, 0, rdr) //Ident
1063 && chk_srvid_by_caid_prov_rdr(rdr, caid_to, 0) //Services
1064 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr, caid_to)); //rdr-caid
1065 if(valid)
1067 stat_beta = get_stat(rdr, &qbeta);
1068 overall_valid = 1;
1070 //else
1071 //stat_beta = NULL;
1073 // Check if nagra is allowed on this reader:
1074 int8_t nvalid = chk_ctab(er->caid, &rdr->ctab)//Check caid
1075 && chk_rfilter2(er->caid, 0, rdr) //Ident
1076 && chk_srvid_by_caid_prov_rdr(rdr, er->caid, 0) //Services
1077 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr, er->caid)); //rdr-caid
1078 if(nvalid)
1080 stat_nagra = get_stat(rdr, &q);
1081 overall_nvalid = 1;
1084 // calculate nagra data:
1085 if(stat_nagra && stat_nagra->rc == E_FOUND)
1087 ntime = stat_nagra->time_avg * 100 / weight;
1088 if(!time_nagra || ntime < time_nagra)
1089 { time_nagra = ntime; }
1092 // calculate beta data:
1093 if(stat_beta && stat_beta->rc == E_FOUND)
1095 ntime = stat_beta->time_avg * 100 / weight;
1096 if(!time_beta || ntime < time_beta)
1097 { time_beta = ntime; }
1100 // Uncomplete reader evaluation, we need more stats!
1101 if(stat_nagra)
1103 needs_stats_nagra = 0;
1104 isn = 1;
1106 if(stat_beta)
1108 needs_stats_beta = 0;
1109 isb = 1;
1111 cs_log_dbg(D_LB, "loadbalancer-betatunnel valid %d, stat_nagra %d, stat_beta %d, (%04X,%04X)", valid, isn, isb , get_rdr_caid(rdr), caid_to);
1114 if(!overall_valid) // we have no valid betatunnel reader also we don't needs stats (converted)
1115 { needs_stats_beta = 0; }
1117 if(!overall_nvalid) // we have no valid reader also we don't needs stats (unconverted)
1118 { needs_stats_nagra = 0; }
1120 if(cfg.lb_auto_betatunnel_prefer_beta && time_beta)
1122 time_beta = time_beta * cfg.lb_auto_betatunnel_prefer_beta / 100;
1123 if(time_beta <= 0)
1124 { time_beta = 1; }
1127 if(needs_stats_nagra || needs_stats_beta)
1129 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X (%d/%d) needs more statistics...", er->caid, caid_to,
1130 needs_stats_nagra, needs_stats_beta);
1131 if(needs_stats_beta) // try beta first
1134 convert_to_beta_int(er, caid_to);
1135 get_stat_query(er, &q);
1138 else if(time_beta && (!time_nagra || time_beta <= time_nagra))
1140 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X selected beta: n%d ms > b%d ms", er->caid, caid_to, time_nagra, time_beta);
1141 convert_to_beta_int(er, caid_to);
1142 get_stat_query(er, &q);
1144 else
1146 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X selected nagra: n%d ms < b%d ms", er->caid, caid_to, time_nagra, time_beta);
1148 // else nagra is faster or no beta, so continue unmodified
1151 else
1153 if(cfg.lb_auto_betatunnel && (er->caid == 0x1702 || er->caid == 0x1722) && er->ocaid == 0x0000 && er->ecmlen) // beta
1155 uint16_t caid_to = __lb_get_betatunnel_caid_to(er->caid);
1156 if(caid_to)
1158 int8_t needs_stats_nagra = 1, needs_stats_beta = 1;
1160 // Clone query parameters for beta:
1161 STAT_QUERY qnagra = q;
1162 qnagra.caid = caid_to;
1163 qnagra.prid = 0;
1164 qnagra.ecmlen = er->ecm[2] - 7;
1166 int32_t time_nagra = 0;
1167 int32_t time_beta = 0;
1168 int32_t weight;
1169 int32_t avg_time;
1171 READER_STAT *stat_nagra = NULL;
1172 READER_STAT *stat_beta = NULL;
1173 //What is faster? nagra or beta?
1174 int8_t isb;
1175 int8_t isn;
1176 int8_t overall_valid = 0;
1177 int8_t overall_bvalid = 0;
1178 for(ea = er->matching_rdr; ea; ea = ea->next)
1180 isb = 0;
1181 isn = 0;
1182 rdr = ea->reader;
1183 weight = rdr->lb_weight;
1184 if(weight <= 0) { weight = 1; }
1186 //Check if reverse betatunnel is allowed on this reader:
1187 int8_t valid = chk_ctab(caid_to, &rdr->ctab)//, rdr->typ) //Check caid
1188 && chk_rfilter2(caid_to, 0, rdr) //Ident
1189 && chk_srvid_by_caid_prov_rdr(rdr, caid_to, 0) //Services
1190 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr, caid_to)); //rdr-caid
1191 if(valid)
1193 stat_nagra = get_stat(rdr, &qnagra);
1194 overall_valid = 1;
1196 //else
1197 //stat_nagra = NULL;
1199 // Check if beta is allowed on this reader:
1200 int8_t bvalid = chk_ctab(er->caid, &rdr->ctab)//, rdr->typ) //Check caid
1201 && chk_rfilter2(er->caid, 0, rdr) //Ident
1202 && chk_srvid_by_caid_prov_rdr(rdr, er->caid, 0) //Services
1203 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr, er->caid)); //rdr-caid
1204 if(bvalid)
1206 stat_beta = get_stat(rdr, &q);
1207 overall_bvalid = 1;
1210 // calculate nagra data:
1211 if(stat_nagra && stat_nagra->rc == E_FOUND)
1213 avg_time = stat_nagra->time_avg * 100 / weight;
1214 if(!time_nagra || avg_time < time_nagra)
1215 { time_nagra = avg_time; }
1218 // calculate beta data:
1219 if(stat_beta && stat_beta->rc == E_FOUND)
1221 avg_time = stat_beta->time_avg * 100 / weight;
1222 if(!time_beta || avg_time < time_beta)
1223 { time_beta = avg_time; }
1226 // Uncomplete reader evaluation, we need more stats!
1227 if(stat_beta)
1229 needs_stats_beta = 0;
1230 isb = 1;
1232 if(stat_nagra)
1234 needs_stats_nagra = 0;
1235 isn = 1;
1237 cs_log_dbg(D_LB, "loadbalancer-betatunnel valid %d, stat_beta %d, stat_nagra %d, (%04X,%04X)", valid, isb, isn , get_rdr_caid(rdr), caid_to);
1240 if(!overall_valid) // we have no valid reverse betatunnel reader also we don't needs stats (converted)
1241 { needs_stats_nagra = 0; }
1243 if(!overall_bvalid) // we have no valid reader also we don't needs stats (unconverted)
1244 { needs_stats_beta = 0; }
1246 if(cfg.lb_auto_betatunnel_prefer_beta && time_beta)
1248 time_beta = time_beta * cfg.lb_auto_betatunnel_prefer_beta / 100;
1249 if(time_beta < 0)
1250 { time_beta = 0; }
1253 // if we needs stats, we send 2 ecm requests: 18xx and 17xx:
1254 if(needs_stats_nagra || needs_stats_beta)
1256 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X (%d/%d) needs more statistics...", er->caid, caid_to,
1257 needs_stats_beta, needs_stats_nagra);
1258 if(needs_stats_nagra) // try nagra frist
1261 convert_to_nagra_int(er, caid_to);
1262 get_stat_query(er, &q);
1266 else if(time_nagra && (!time_beta || time_nagra <= time_beta))
1268 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X selected nagra: b%d ms > n%d ms", er->caid, caid_to, time_beta, time_nagra);
1269 convert_to_nagra_int(er, caid_to);
1270 get_stat_query(er, &q);
1272 else
1274 cs_log_dbg(D_LB, "loadbalancer-betatunnel %04X:%04X selected beta: b%d ms < n%d ms", er->caid, caid_to, time_beta, time_nagra);
1281 if(cfg.lb_auto_betatunnel && chk_is_betatunnel_caid(er->caid))
1283 // check again is caid valied to reader
1284 // with both caid on local readers or with proxy
1285 // (both caid will setup to reader for make tunnel caid in share (ccc) visible)
1286 // make sure dosn't send a beta ecm to nagra reader (or reverse)
1287 struct s_ecm_answer *prv = NULL;
1288 for(ea = er->matching_rdr; ea; ea = ea->next)
1290 rdr = ea->reader;
1291 if(is_network_reader(rdr) || rdr->typ == R_EMU) // reader caid is not real caid
1293 prv = ea;
1294 continue; // proxy can convert or reject
1296 cs_log_dbg(D_LB, "check again caid %04X on reader %s", er->caid, rdr->label);
1298 if(!get_rdr_caid(ea->reader) || chk_caid_rdr(ea->reader, er->caid))
1300 prv = ea;
1302 else
1304 if(!chk_is_fixed_fallback(rdr, er)) { er->reader_avail--; }
1305 cs_log_dbg(D_LB, "caid %04X not found in caidlist, reader %s removed from request reader list", er->caid, rdr->label);
1306 if(prv)
1308 prv->next = ea->next;
1310 else
1311 { er->matching_rdr = ea->next; }
1315 if(!er->reader_avail)
1316 { return; }
1319 struct timeb check_time;
1320 cs_ftime(&check_time);
1321 int64_t current = -1;
1322 READER_STAT *s = NULL;
1323 int32_t retrylimit = get_retrylimit(er);
1324 int32_t nlocal_readers = 0;
1326 int32_t nbest_readers = get_nbest_readers(er); // Number of NON fallback readers ecm requests go (minimum 1)
1327 int32_t nfb_readers = get_nfb_readers(er); // Number of fallback readers ecm requests go (minimum 1)
1328 int32_t nreaders = cfg.lb_max_readers; // lb_max_readers is limit lb uses while learning
1331 if(!nreaders) // if is configured zero -> replace it by -1 (default means unlimited!)
1332 { nreaders = -1; }
1333 else if(nreaders <= nbest_readers)
1334 { nreaders = nbest_readers + 1; } // nreaders must cover nbest more 1 reader for try to unblock/add stats
1336 int32_t reader_active = 0;
1337 int32_t max_reopen = nreaders - nbest_readers; // if nreaders=-1, we try to reopen all readers
1340 #ifdef WITH_DEBUG
1341 if(cs_dblevel & D_LB)
1343 //loadbalancer debug output:
1344 int32_t nr = 0;
1345 char buf[512];
1346 int n, l = 512;
1347 char *rptr = buf;
1348 *rptr = 0;
1350 for(ea = er->matching_rdr; ea; ea = ea->next)
1352 nr++;
1353 if(nr > 5) { continue; }
1355 if(!(ea->status & READER_FALLBACK))
1356 { n = snprintf(rptr, l, "%s%s%s ", ea->reader->label, (ea->status & READER_CACHEEX) ? "*" : "", (ea->status & READER_LOCAL) ? "L" : ""); }
1357 else
1358 { n = snprintf(rptr, l, "[%s%s%s] ", ea->reader->label, (ea->status & READER_CACHEEX) ? "*" : "", (ea->status & READER_LOCAL) ? "L" : ""); }
1359 rptr += n;
1360 l -= n;
1363 if(nr > 5)
1364 { snprintf(rptr, l, "...(%d more)", nr - 5); }
1366 char ecmbuf[ECM_FMT_LEN];
1367 format_ecm(er, ecmbuf, ECM_FMT_LEN);
1369 cs_log_dbg(D_LB, "loadbalancer: client %s for %s: n=%d valid readers: %s",
1370 username(er->client), ecmbuf, nr, buf);
1372 #endif
1374 //Deactive all matching readers and set ea->value = 0;
1375 for(ea = er->matching_rdr; ea; ea = ea->next)
1377 ea->status &= ~(READER_ACTIVE | READER_FALLBACK);
1378 ea->value = 0;
1381 cs_log_dbg(D_LB, "loadbalancer: --------------------------------------------");
1382 if(max_reopen < 1) { cs_log_dbg(D_LB, "loadbalancer: mode %d, nbest %d, nfb %d, max_reopen ALL, retrylimit %d ms", cfg.lb_mode, nbest_readers, nfb_readers, retrylimit); }
1383 else { cs_log_dbg(D_LB, "loadbalancer: mode %d, nbest %d, nfb %d, max_reopen %d, retrylimit %d ms", cfg.lb_mode, nbest_readers, nfb_readers, max_reopen, retrylimit); }
1386 // Here evaluate lbvalue for readers with valid statistics
1387 for(ea = er->matching_rdr; ea; ea = ea->next)
1389 rdr = ea->reader;
1390 s = get_stat(rdr, &q);
1392 int32_t weight = rdr->lb_weight <= 0 ? 100 : rdr->lb_weight;
1393 //struct s_client *cl = rdr->client;
1395 if(s && s->rc == E_FOUND
1396 && s->ecm_count >= cfg.lb_min_ecmcount
1397 && (s->ecm_count <= cfg.lb_max_ecmcount || (retrylimit && s->time_avg <= retrylimit)))
1399 // Reader can decode this service (rc==0) and has lb_min_ecmcount ecms:
1400 if(er->preferlocalcards && (ea->status & READER_LOCAL))
1401 { nlocal_readers++; } // Prefer local readers!
1403 switch(cfg.lb_mode)
1405 case LB_FASTEST_READER_FIRST:
1406 current = s->time_avg * 100 / weight;
1407 break;
1409 case LB_OLDEST_READER_FIRST:
1410 if(!rdr->lb_last.time)
1411 { rdr->lb_last = check_time; }
1413 //current is negative here!
1414 current = comp_timeb(&rdr->lb_last, &check_time);
1415 current = current * weight / 100;
1416 if(!current) { current = -1; }
1418 //handle retrylimit
1419 if(retrylimit)
1421 if(s->time_avg > retrylimit) // set lowest value for reader with time-avg>retrylimit
1423 current = s->time_avg; // in this way, it will choose best time-avg reader among the worst ones
1425 else
1427 current = current - 1; // so when all have same current, it prioritizes the one with s->time_avg<=retrylimit! This avoid a loop!
1430 break;
1432 case LB_LOWEST_USAGELEVEL:
1433 current = rdr->lb_usagelevel * 100 / weight;
1435 //handle retrylimit
1436 if(retrylimit)
1438 if(s->time_avg > retrylimit)
1439 { current = 1000; } //set lowest value for reader with time-avg>retrylimit
1440 else
1441 { current = current - 1; } //so when all reaches retrylimit (all have lb_value=1000) or all have same current, it prioritizes the one with s->time_avg<=retrylimit! This avoid a loop!
1443 break;
1446 if(cfg.lb_mode != LB_OLDEST_READER_FIRST) // Adjust selection to reader load:
1448 /*if(rdr->ph.c_available && !rdr->ph.c_available(rdr, AVAIL_CHECK_LOADBALANCE, er))
1450 current=current*2;
1453 if(cl && cl->pending)
1454 current=current*cl->pending;
1456 if(current < 1)
1457 { current = 1; }
1460 cs_log_dbg(D_LB, "loadbalancer: reader %s lbvalue = %d (time-avg %d)", rdr->label, (int) llabs(current), s->time_avg);
1462 #if defined(WEBIF) || defined(LCDSUPPORT)
1463 rdr->lbvalue = llabs(current);
1464 #endif
1465 ea->value = current;
1466 ea->time = s->time_avg;
1470 // check for local readers
1471 if(nlocal_readers > nbest_readers) // if we have local readers, we prefer them!
1473 nlocal_readers = nbest_readers;
1474 nbest_readers = 0;
1476 else
1478 nbest_readers = nbest_readers - nlocal_readers;
1481 struct s_reader *best_rdr = NULL;
1482 struct s_reader *best_rdri = NULL;
1483 int32_t best_time = 0;
1485 // Here choose nbest readers. We evaluate only readers with valid stats (they have ea->value>0, calculated above)
1486 while(1)
1488 struct s_ecm_answer *best = NULL;
1490 for(ea = er->matching_rdr; ea; ea = ea->next)
1492 if(nlocal_readers && !(ea->status & READER_LOCAL))
1493 { continue; }
1495 if(ea->value && (!best || ea->value < best->value))
1496 { best = ea; }
1499 if(!best)
1500 { break; }
1502 best_rdri = best->reader;
1503 if(!best_rdr)
1505 best_rdr = best_rdri;
1506 best_time = best->time;
1509 if(nlocal_readers) // primary readers, local
1511 nlocal_readers--;
1512 reader_active++;
1513 best->status |= READER_ACTIVE;
1514 best->value = 0;
1515 cs_log_dbg(D_LB, "loadbalancer: reader %s --> ACTIVE", best_rdri->label);
1517 else if(nbest_readers) // primary readers, other
1519 nbest_readers--;
1520 reader_active++;
1521 best->status |= READER_ACTIVE;
1522 best->value = 0;
1523 cs_log_dbg(D_LB, "loadbalancer: reader %s --> ACTIVE", best_rdri->label);
1525 else
1526 { break; }
1529 /* Here choose nfb_readers
1530 * Select fallbacks reader until nfb_readers reached using this priority:
1531 * 1. forced (lb_force_fallback=1) fixed fallback
1532 * 2. "normal" fixed fallback
1533 * 3. best ea->value remaining reader;
1535 //check for fixed fallbacks
1536 int32_t n_fixed_fb = chk_has_fixed_fallback(er);
1537 if(n_fixed_fb)
1539 //check before for lb_force_fallback=1 readers
1540 for(ea = er->matching_rdr; ea && nfb_readers; ea = ea->next)
1542 rdr = ea->reader;
1543 if(chk_is_fixed_fallback(rdr, er) && rdr->lb_force_fallback && !(ea->status & READER_ACTIVE)){
1544 nfb_readers--;
1545 ea->status |= (READER_ACTIVE | READER_FALLBACK);
1546 cs_log_dbg(D_LB, "loadbalancer: reader %s --> FALLBACK (FIXED with force)", rdr->label);
1550 //check for "normal" fixed fallback with valid stats
1551 for(ea = er->matching_rdr; ea && nfb_readers; ea = ea->next)
1553 rdr = ea->reader;
1554 if(chk_is_fixed_fallback(rdr, er) && !rdr->lb_force_fallback && !(ea->status & READER_ACTIVE)){
1556 s = get_stat(rdr, &q);
1557 if(s && s->rc == E_FOUND
1558 && s->ecm_count >= cfg.lb_min_ecmcount
1559 && (s->ecm_count <= cfg.lb_max_ecmcount || (retrylimit && s->time_avg <= retrylimit)))
1561 nfb_readers--;
1562 ea->status |= (READER_ACTIVE | READER_FALLBACK);
1563 cs_log_dbg(D_LB, "loadbalancer: reader %s --> FALLBACK (FIXED)", rdr->label);
1569 //check for remaining best ea->value readers as fallbacks
1570 while(nfb_readers)
1572 struct s_ecm_answer *best = NULL;
1574 for(ea = er->matching_rdr; ea; ea = ea->next)
1576 if((ea->status & READER_ACTIVE))
1577 { continue; }
1579 if(ea->value && (!best || ea->value < best->value))
1580 { best = ea; }
1582 if(!best)
1583 { break; }
1585 nfb_readers--;
1586 best->status |= (READER_ACTIVE | READER_FALLBACK);
1587 best->value = 0;
1588 cs_log_dbg(D_LB, "loadbalancer: reader %s --> FALLBACK", best->reader->label);
1590 // end fallback readers
1592 // ACTIVE readers with no stats, or with no lb_min_ecmcount, or lb_max_ecmcount reached --> NO use max_reopen for these readers, always open!
1593 for(ea = er->matching_rdr; ea; ea = ea->next)
1595 rdr = ea->reader;
1596 s = get_stat(rdr, &q);
1598 #ifdef CS_CACHEEX
1599 // if cacheex reader, always active and no stats
1600 if(rdr->cacheex.mode == 1)
1602 ea->status |= READER_ACTIVE;
1603 continue;
1605 #endif
1606 // ignore fixed fallback with lb_force_fallback=1: no need stats, always used as fallaback!
1607 if(chk_is_fixed_fallback(rdr, er) && rdr->lb_force_fallback)
1608 continue;
1610 // active readers with no stats
1611 if(!s)
1613 cs_log_dbg(D_LB, "loadbalancer: reader %s need starting statistics --> ACTIVE", rdr->label);
1614 ea->status |= READER_ACTIVE;
1615 reader_active++;
1616 continue;
1619 // active readers with no lb_min_ecmcount reached
1620 if(s->rc == E_FOUND && s->ecm_count < cfg.lb_min_ecmcount)
1622 cs_log_dbg(D_LB, "loadbalancer: reader %s needs to reach lb_min_ecmcount(%d), now %d --> ACTIVE", rdr->label, cfg.lb_min_ecmcount, s->ecm_count);
1623 ea->status |= READER_ACTIVE;
1624 reader_active++;
1625 continue;
1628 // reset stats and active readers reach cfg.lb_max_ecmcount and time_avg > retrylimit.
1629 if(s->rc == E_FOUND && s->ecm_count > cfg.lb_max_ecmcount && (!retrylimit || s->time_avg > retrylimit))
1631 cs_log_dbg(D_LB, "loadbalancer: reader %s reaches max ecms (%d), resetting statistics --> ACTIVE", rdr->label, cfg.lb_max_ecmcount);
1632 reset_ecmcount_reader(s, rdr); // ecm_count=0
1633 reset_avgtime_reader(s, rdr); // time_avg=0
1634 ea->status |= READER_ACTIVE;
1635 reader_active++;
1636 continue;
1639 struct timeb now;
1640 cs_ftime(&now);
1641 int64_t gone = comp_timeb(&now, &s->last_received);
1642 // reset avg-time and active reader with s->last_received older than 5 min and avg-time>retrylimit
1643 if(retrylimit && s->rc == E_FOUND && (gone >= 300*1000) && s->time_avg > retrylimit)
1645 cs_log_dbg(D_LB, "loadbalancer: reader %s has time-avg>retrylimit and last received older than 5 minutes, resetting avg-time --> ACTIVE", rdr->label);
1646 reset_avgtime_reader(s, rdr); // time_avg=0
1647 ea->status &= ~(READER_ACTIVE | READER_FALLBACK); // It could be activated as fallback above because has lb_vlaue>0, so remove fallback state!
1648 ea->status |= READER_ACTIVE;
1649 reader_active++;
1650 continue;
1654 int32_t force_reopen = 0;
1656 //no reader active --> force to reopen matching readers
1657 if(reader_active == 0)
1659 cs_log_dbg(D_LB, "loadbalancer: NO VALID MATCHING READER FOUND!");
1660 force_reopen = 1;
1662 else if(retrylimit)
1665 * check for lbretrylimit!
1667 * if best_time > retrylimit we need to reset avg times of all computed above matching readers, so we can re-evaluated lbvalue!
1668 * More, we force open blocked reader!
1670 int32_t retrylimit_reached = best_time && best_time > retrylimit;
1671 if(retrylimit_reached)
1673 cs_log_dbg(D_LB, "loadbalancer: best reader %s (avg_time %d ms) reaches RETRYLIMIT (%d ms), resetting avg times and ACTIVE all (valid and blocked) matching readers!", best_rdr->label, best_time, retrylimit);
1674 for(ea = er->matching_rdr; ea; ea = ea->next)
1676 rdr = ea->reader;
1677 #ifdef CS_CACHEEX
1678 if(rdr->cacheex.mode == 1) { continue; }
1679 #endif
1680 s = get_stat(rdr, &q);
1682 //reset avg time and ACTIVE all valid lbvalue readers
1683 if(s && s->rc == E_FOUND
1684 && s->ecm_count >= cfg.lb_min_ecmcount
1685 && (s->ecm_count <= cfg.lb_max_ecmcount || s->time_avg <= retrylimit))
1687 if((ea->status & READER_FALLBACK)) { cs_log_dbg(D_LB, "loadbalancer: reader %s selected as FALLBACK --> ACTIVE", rdr->label); }
1688 else if(!(ea->status & READER_ACTIVE)) { cs_log_dbg(D_LB, "loadbalancer: reader %s --> ACTIVE", rdr->label); }
1689 ea->status &= ~(READER_ACTIVE | READER_FALLBACK); //remove active and fallback
1690 ea->status |= READER_ACTIVE; //add active
1691 reset_avgtime_reader(s, rdr);
1694 //reset avg time all blocked "valid" readers. We active them by force_reopen=1
1695 if(s && s->rc != E_FOUND)
1697 reset_avgtime_reader(s, rdr);
1701 force_reopen = 1; //force reopen blocked readers
1705 //try to reopen max_reopen blocked readers (readers with last ecm not "e_found"); if force_reopen=1, force reopen valid blocked readers!
1706 try_open_blocked_readers(er, &q, &max_reopen, &force_reopen);
1708 cs_log_dbg(D_LB, "loadbalancer: --------------------------------------------");
1710 #ifdef WITH_DEBUG
1711 if(cs_dblevel & D_LB)
1713 //loadbalancer debug output:
1714 int32_t nr = 0;
1715 char buf[512];
1716 int32_t l = 512;
1717 char *rptr = buf;
1718 *rptr = 0;
1719 int32_t n = 0;
1721 for(ea = er->matching_rdr; ea; ea = ea->next)
1723 if(!(ea->status & READER_ACTIVE))
1724 { continue; }
1726 nr++;
1727 if(nr > 5) { continue; }
1729 if(!(ea->status & READER_FALLBACK))
1730 { n = snprintf(rptr, l, "%s%s%s ", ea->reader->label, (ea->status & READER_CACHEEX) ? "*" : "", (ea->status & READER_LOCAL) ? "L" : ""); }
1731 else
1732 { n = snprintf(rptr, l, "[%s%s%s] ", ea->reader->label, (ea->status & READER_CACHEEX) ? "*" : "", (ea->status & READER_LOCAL) ? "L" : ""); }
1733 rptr += n;
1734 l -= n;
1737 if(nr > 5)
1738 { snprintf(rptr, l, "...(%d more)", nr - 5); }
1740 if(cs_dblevel & D_LB)
1742 char ecmbuf[ECM_FMT_LEN];
1743 format_ecm(er, ecmbuf, ECM_FMT_LEN);
1745 cs_log_dbg(D_LB, "loadbalancer: client %s for %s: n=%d selected readers: %s",
1746 username(er->client), ecmbuf, nr, buf);
1749 #endif
1750 return;
1754 * clears statistic of reader ridx.
1756 void clear_reader_stat(struct s_reader *rdr)
1758 if(!rdr->lb_stat)
1759 { return; }
1761 ll_clear_data(rdr->lb_stat);
1764 void clear_all_stat(void)
1766 struct s_reader *rdr;
1767 LL_ITER itr = ll_iter_create(configured_readers);
1768 while((rdr = ll_iter_next(&itr)))
1770 clear_reader_stat(rdr);
1774 static void housekeeping_stat_thread(void)
1776 struct timeb now;
1777 cs_ftime(&now);
1778 int32_t cleanup_timeout = cfg.lb_stat_cleanup * 60 * 60 * 1000;
1779 int32_t cleaned = 0;
1780 struct s_reader *rdr;
1781 set_thread_name(__func__);
1782 LL_ITER itr = ll_iter_create(configured_readers);
1783 cs_readlock(__func__, &readerlist_lock); // this avoids cleaning a reading during writing
1784 while((rdr = ll_iter_next(&itr)))
1786 if(rdr->lb_stat)
1788 rdr->lb_stat_busy = 1;
1789 cs_writelock(__func__, &rdr->lb_stat_lock);
1790 LL_ITER it = ll_iter_create(rdr->lb_stat);
1791 READER_STAT *s;
1793 while((s = ll_iter_next(&it)))
1795 int64_t gone = comp_timeb(&now, &s->last_received);
1796 if(gone > cleanup_timeout)
1798 ll_iter_remove_data(&it);
1799 cleaned++;
1802 cs_writeunlock(__func__, &rdr->lb_stat_lock);
1803 rdr->lb_stat_busy = 0;
1806 cs_readunlock(__func__, &readerlist_lock);
1807 cs_log_dbg(D_LB, "loadbalancer cleanup: removed %d entries", cleaned);
1810 static void housekeeping_stat(int32_t force)
1812 struct timeb now;
1813 cs_ftime(&now);
1814 int64_t gone = comp_timeb(&now, &last_housekeeping);
1815 if(!force && (gone < 60 * 60 * 1000)) // only clean once in an hour
1816 { return; }
1818 last_housekeeping = now;
1819 start_thread("housekeeping lb stats", (void *)&housekeeping_stat_thread, NULL, NULL, 1, 1);
1822 static int compare_stat(READER_STAT **ps1, READER_STAT **ps2)
1824 READER_STAT *s1 = (*ps1), *s2 = (*ps2);
1825 int64_t res = s1->rc - s2->rc;
1826 if(res) { return res; }
1827 res = s1->caid - s2->caid;
1828 if(res) { return res; }
1829 res = s1->prid - s2->prid;
1830 if(res) { return res; }
1831 res = s1->srvid - s2->srvid;
1832 if(res) { return res; }
1833 res = s1->chid - s2->chid;
1834 if(res) { return res; }
1835 res = s1->ecmlen - s2->ecmlen;
1836 if(res) { return res; }
1837 res = comp_timeb(&s1->last_received, &s2->last_received);
1838 return res;
1841 static int compare_stat_r(READER_STAT **ps1, READER_STAT **ps2)
1843 return -compare_stat(ps1, ps2);
1846 READER_STAT **get_sorted_stat_copy(struct s_reader *rdr, int32_t reverse, int32_t *size)
1848 if(reverse)
1849 { return (READER_STAT **)ll_sort(rdr->lb_stat, compare_stat_r, size); }
1850 else
1851 { return (READER_STAT **)ll_sort(rdr->lb_stat, compare_stat, size); }
1854 static int8_t stat_in_ecmlen(struct s_reader *rdr, READER_STAT *s)
1856 int32_t i;
1857 for (i = 0; i < rdr->ecm_whitelist.ewnum; i++)
1859 ECM_WHITELIST_DATA *d = &rdr->ecm_whitelist.ewdata[i];
1860 if ((d->caid == 0 || d->caid == s->caid) && (d->ident == 0 || d->ident == s->prid) && (d->len == s->ecmlen))
1861 return 1;
1863 return 0;
1866 static int8_t add_to_ecmlen(struct s_reader *rdr, READER_STAT *s)
1868 int32_t i;
1869 for (i = 0; i < rdr->ecm_whitelist.ewnum; i++)
1871 ECM_WHITELIST_DATA *d = &rdr->ecm_whitelist.ewdata[i];
1872 if ((d->caid == s->caid) && (d->ident == s->prid) && (d->len == s->ecmlen))
1873 return 1;
1875 ECM_WHITELIST_DATA d = { .caid = s->caid, .ident = s->prid, .len = s->ecmlen };
1876 ecm_whitelist_add(&rdr->ecm_whitelist, &d);
1877 return 0;
1880 void update_ecmlen_from_stat(struct s_reader *rdr)
1882 if(!rdr || !rdr->lb_stat)
1883 { return; }
1885 cs_readlock(__func__, &rdr->lb_stat_lock);
1886 LL_ITER it = ll_iter_create(rdr->lb_stat);
1887 READER_STAT *s;
1888 while((s = ll_iter_next(&it)))
1890 if(s->rc == E_FOUND)
1892 if(!stat_in_ecmlen(rdr, s))
1893 { add_to_ecmlen(rdr, s); }
1896 cs_readunlock(__func__, &rdr->lb_stat_lock);
1900 * mark as last reader after checked for cache requests:
1902 void lb_mark_last_reader(ECM_REQUEST *er)
1904 //OLDEST_READER: set lb_last
1905 struct s_ecm_answer *ea;
1906 for(ea = er->matching_rdr; ea; ea = ea->next)
1908 if((ea->status & (READER_ACTIVE | READER_FALLBACK)) == READER_ACTIVE)
1909 { cs_ftime(&ea->reader->lb_last); }
1914 * Automatic timeout feature depending on statistik values
1916 static uint32_t __lb_auto_timeout(ECM_REQUEST *er, uint32_t ctimeout)
1918 STAT_QUERY q;
1919 READER_STAT *s = NULL;
1921 struct s_reader *rdr = NULL;
1922 struct s_ecm_answer *ea;
1924 for(ea = er->matching_rdr; ea; ea = ea->next)
1926 if((ea->status & (READER_ACTIVE | READER_FALLBACK)) == READER_ACTIVE)
1928 rdr = ea->reader;
1929 get_stat_query(er, &q);
1930 s = get_stat(rdr, &q);
1931 if(s) { break; }
1935 if(!s) { return ctimeout; }
1937 uint32_t t;
1938 if(s->rc == E_TIMEOUT)
1939 { t = ctimeout / 2; } //timeout known, early timeout!
1940 else
1942 if(s->ecm_count < cfg.lb_min_ecmcount) { return ctimeout; }
1944 t = s->time_avg * (100 + cfg.lb_auto_timeout_p) / 100;
1945 if((int32_t)(t - s->time_avg) < cfg.lb_auto_timeout_t) { t = s->time_avg + cfg.lb_auto_timeout_t; }
1948 if(t > ctimeout) { t = ctimeout; }
1950 #ifdef WITH_DEBUG
1951 if(D_TRACE & cs_dblevel)
1953 char buf[ECM_FMT_LEN];
1954 format_ecm(er, buf, ECM_FMT_LEN);
1955 cs_log_dbg(D_TRACE, "auto-timeout for %s %s set rdr %s to %d", username(er->client), buf, rdr->label, t);
1957 #endif
1958 return t;
1961 uint32_t lb_auto_timeout(ECM_REQUEST *er, uint32_t timeout)
1963 if(cfg.lb_auto_timeout)
1964 return __lb_auto_timeout(er, timeout);
1965 return timeout;
1968 bool lb_check_auto_betatunnel(ECM_REQUEST *er, struct s_reader *rdr)
1970 if(!cfg.lb_auto_betatunnel)
1971 return 0;
1973 bool match = 0;
1974 uint16_t caid = __lb_get_betatunnel_caid_to(er->caid);
1975 if(caid)
1977 uint16_t save_caid = er->caid;
1978 er->caid = caid;
1979 match = matching_reader(er, rdr); // matching
1980 er->caid = save_caid;
1982 return match;
1986 * search for same ecm hash with same readers
1988 static struct ecm_request_t *check_same_ecm(ECM_REQUEST *er)
1990 struct ecm_request_t *ecm;
1991 time_t timeout;
1992 struct s_ecm_answer *ea_ecm = NULL, *ea_er = NULL;
1993 uint8_t rdrs = 0;
1995 cs_readlock(__func__, &ecmcache_lock);
1996 for(ecm = ecmcwcache; ecm; ecm = ecm->next)
1998 timeout = time(NULL) - ((cfg.ctimeout + 500) / 1000);
2000 if(ecm->tps.time <= timeout)
2001 { break; }
2003 if(ecm == er) { continue; }
2005 if(er->caid != ecm->caid || memcmp(ecm->ecmd5, er->ecmd5, CS_ECMSTORESIZE))
2006 { continue; }
2008 if(!er->readers || !ecm->readers || er->readers != ecm->readers)
2009 { continue; }
2011 ea_ecm = ecm->matching_rdr;
2012 ea_er = er->matching_rdr;
2013 rdrs = er->readers;
2015 while(rdrs && ea_ecm && ea_er)
2017 if(ea_ecm->reader != ea_er->reader)
2018 { break; }
2019 ea_ecm = ea_ecm->next;
2020 ea_er = ea_er->next;
2021 rdrs--;
2024 if(!rdrs)
2026 cs_readunlock(__func__, &ecmcache_lock);
2027 return ecm;
2030 cs_readunlock(__func__, &ecmcache_lock);
2031 return NULL; // nothing found so return null
2034 static void use_same_readers(ECM_REQUEST *er_new, ECM_REQUEST *er_cache)
2036 struct s_ecm_answer *ea_new = er_new->matching_rdr;
2037 struct s_ecm_answer *ea_cache = er_cache->matching_rdr;
2038 uint8_t rdrs = er_new->readers;
2039 while(rdrs)
2041 ea_new->status &= ~(READER_ACTIVE | READER_FALLBACK);
2042 if((ea_cache->status & READER_ACTIVE))
2044 if(!(ea_cache->status & READER_FALLBACK))
2046 ea_new->status |= READER_ACTIVE;
2048 else
2050 ea_new->status |= (READER_ACTIVE | READER_FALLBACK);
2054 ea_new = ea_new->next;
2055 ea_cache = ea_cache->next;
2056 rdrs--;
2060 void lb_set_best_reader(ECM_REQUEST *er)
2062 if(!cfg.lb_mode)
2063 return;
2065 // cache2 is handled by readers queue, so, if a same ecm hash with same readers, use these same readers to get cache2 from them! Not ask other readers!
2066 struct ecm_request_t *ecm_eq = NULL;
2067 ecm_eq = check_same_ecm(er);
2068 if(ecm_eq)
2070 // set all readers used by ecm_eq, so we get cache2 from them!
2071 use_same_readers(er, ecm_eq);
2072 cs_log_dbg(D_LB, "{client %s, caid %04X, prid %06X, srvid %04X} [get_cw] found same ecm with same readers from client %s, use them!", (check_client(er->client) ? er->client->account->usr : "-"), er->caid, er->prid, er->srvid, (check_client(ecm_eq->client) ? ecm_eq->client->account->usr : "-"));
2074 else
2076 // FILTER readers by loadbalancing
2077 stat_get_best_reader(er);
2081 void lb_update_last(struct s_ecm_answer *ea_er, struct s_reader *reader)
2083 // for lb oldest reader mode - not use for fallback readers
2084 if (!(ea_er->status & READER_FALLBACK))
2085 cs_ftime(&reader->lb_last);
2088 void send_reader_stat(struct s_reader *rdr, ECM_REQUEST *er, struct s_ecm_answer *ea, int8_t rc)
2090 if(rc >= E_99 || cacheex_reader(rdr))
2091 { return; }
2093 int32_t ecm_time = cfg.ctimeout;
2094 if(ea->ecm_time && ea->rc <= E_NOTFOUND)
2095 { ecm_time = ea->ecm_time; }
2097 add_stat(rdr, er, ecm_time, rc, ea->rcEx);
2100 void stat_finish(void)
2102 if(cfg.lb_mode && cfg.lb_save)
2104 save_stat_to_file(0);
2105 if(cfg.lb_savepath)
2106 { cs_log("stats saved to file %s", cfg.lb_savepath); }
2107 cfg.lb_save = 0; //this is for avoiding duplicate saves
2111 #endif