1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
19 #include "trackerlogic.h"
24 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
25 #define MTX_DBG( STRING )
27 /* Our global all torrents list */
28 static ot_vector all_torrents
[OT_BUCKET_COUNT
];
29 static pthread_mutex_t bucket_mutex
[OT_BUCKET_COUNT
];
30 static size_t g_torrent_count
;
32 /* Self pipe from opentracker.c */
33 extern int g_self_pipe
[2];
35 ot_vector
*mutex_bucket_lock( int bucket
) {
36 pthread_mutex_lock(bucket_mutex
+ bucket
);
37 return all_torrents
+ bucket
;
40 ot_vector
*mutex_bucket_lock_by_hash( ot_hash
const hash
) {
41 return mutex_bucket_lock( uint32_read_big( (const char*)hash
) >> OT_BUCKET_COUNT_SHIFT
);
44 void mutex_bucket_unlock( int bucket
, int delta_torrentcount
) {
45 pthread_mutex_unlock(bucket_mutex
+ bucket
);
46 g_torrent_count
+= delta_torrentcount
;
49 void mutex_bucket_unlock_by_hash( ot_hash
const hash
, int delta_torrentcount
) {
50 mutex_bucket_unlock( uint32_read_big( (char*)hash
) >> OT_BUCKET_COUNT_SHIFT
, delta_torrentcount
);
53 size_t mutex_get_torrent_count( ) {
54 return g_torrent_count
;
68 static ot_taskid next_free_taskid
= 1;
69 static struct ot_task
*tasklist
;
70 static pthread_mutex_t tasklist_mutex
;
71 static pthread_cond_t tasklist_being_filled
;
73 int mutex_workqueue_pushtask( int64 sock
, ot_tasktype tasktype
) {
74 struct ot_task
** tmptask
, * task
;
76 task
= malloc(sizeof( struct ot_task
));
81 task
->tasktype
= tasktype
;
83 task
->iovec_entries
= 0;
87 /* Want exclusive access to tasklist */
88 pthread_mutex_lock( &tasklist_mutex
);
90 /* Skip to end of list */
93 tmptask
= &(*tmptask
)->next
;
96 /* Inform waiting workers and release lock */
97 pthread_cond_broadcast( &tasklist_being_filled
);
98 pthread_mutex_unlock( &tasklist_mutex
);
102 void mutex_workqueue_canceltask( int64 sock
) {
103 struct ot_task
** task
;
105 /* Want exclusive access to tasklist */
106 pthread_mutex_lock( &tasklist_mutex
);
108 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
109 if ((*task
)->sock
== sock
) {
110 struct iovec
*iovec
= (*task
)->iovec
;
111 struct ot_task
*ptask
= *task
;
114 /* Free task's iovec */
115 for( i
=0; i
<(*task
)->iovec_entries
; ++i
)
116 free( iovec
[i
].iov_base
);
118 *task
= (*task
)->next
;
124 pthread_mutex_unlock( &tasklist_mutex
);
127 ot_taskid
mutex_workqueue_poptask( ot_tasktype
*tasktype
) {
128 struct ot_task
* task
;
129 ot_taskid taskid
= 0;
131 /* Want exclusive access to tasklist */
132 pthread_mutex_lock( &tasklist_mutex
);
135 /* Skip to the first unassigned task this worker wants to do */
136 for (task
= tasklist
; task
; task
= task
->next
)
137 if (!task
->taskid
&& ( TASK_CLASS_MASK
& task
->tasktype
) == *tasktype
) {
138 /* If we found an outstanding task, assign a taskid to it
139 and leave the loop */
140 task
->taskid
= taskid
= ++next_free_taskid
;
141 *tasktype
= task
->tasktype
;
145 /* Wait until the next task is being fed */
147 pthread_cond_wait( &tasklist_being_filled
, &tasklist_mutex
);
151 pthread_mutex_unlock( &tasklist_mutex
);
156 void mutex_workqueue_pushsuccess( ot_taskid taskid
) {
157 struct ot_task
** task
;
159 /* Want exclusive access to tasklist */
160 pthread_mutex_lock( &tasklist_mutex
);
162 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
163 if ((*task
)->taskid
== taskid
) {
164 struct ot_task
*ptask
= *task
;
165 *task
= (*task
)->next
;
171 pthread_mutex_unlock( &tasklist_mutex
);
174 int mutex_workqueue_pushresult( ot_taskid taskid
, int iovec_entries
, struct iovec
*iovec
) {
175 struct ot_task
* task
;
176 const char byte
= 'o';
178 /* Want exclusive access to tasklist */
179 pthread_mutex_lock( &tasklist_mutex
);
181 for (task
= tasklist
; task
; task
= task
->next
)
182 if (task
->taskid
== taskid
) {
183 task
->iovec_entries
= iovec_entries
;
185 task
->tasktype
= TASK_DONE
;
190 pthread_mutex_unlock( &tasklist_mutex
);
192 io_trywrite( g_self_pipe
[1], &byte
, 1 );
194 /* Indicate whether the worker has to throw away results */
195 return task
? 0 : -1;
198 int mutex_workqueue_pushchunked(ot_taskid taskid
, struct iovec
*iovec
) {
199 struct ot_task
* task
;
200 const char byte
= 'o';
202 /* Want exclusive access to tasklist */
203 pthread_mutex_lock( &tasklist_mutex
);
205 for (task
= tasklist
; task
; task
= task
->next
)
206 if (task
->taskid
== taskid
) {
208 fprintf(stderr
, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid
);
209 if (!iovec_append(&task
->iovec_entries
, &task
->iovec
, iovec
) )
211 task
->tasktype
= TASK_DONE_PARTIAL
;
213 fprintf(stderr
, "mutex_workqueue_pushchunked finished taskid %d\n", taskid
);
214 task
->tasktype
= TASK_DONE
;
220 pthread_mutex_unlock( &tasklist_mutex
);
222 io_trywrite( g_self_pipe
[1], &byte
, 1 );
224 fprintf(stderr
, "mutex_workqueue_pushchunked taskid %d not found\n", taskid
);
226 /* Indicate whether the worker has to throw away results */
227 return task
? 0 : -1;
231 int64
mutex_workqueue_popresult( int *iovec_entries
, struct iovec
** iovec
, int *is_partial
) {
232 struct ot_task
** task
;
237 /* Want exclusive access to tasklist */
238 pthread_mutex_lock( &tasklist_mutex
);
240 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
241 if (((*task
)->tasktype
& TASK_CLASS_MASK
) == TASK_DONE
) {
242 struct ot_task
*ptask
= *task
;
243 fprintf(stderr
, "Got task %d type %d with %d entries\n", (*task
)->taskid
, (*task
)->tasktype
, ptask
->iovec_entries
);
244 *iovec_entries
= ptask
->iovec_entries
;
245 *iovec
= ptask
->iovec
;
248 if ((*task
)->tasktype
== TASK_DONE
) {
252 ptask
->iovec_entries
= 0;
255 /* Prevent task from showing up immediately again unless new data was added */
256 (*task
)->tasktype
= TASK_FULLSCRAPE
;
262 pthread_mutex_unlock( &tasklist_mutex
);
268 pthread_mutex_init(&tasklist_mutex
, NULL
);
269 pthread_cond_init (&tasklist_being_filled
, NULL
);
270 for (i
=0; i
< OT_BUCKET_COUNT
; ++i
)
271 pthread_mutex_init(bucket_mutex
+ i
, NULL
);
272 byte_zero( all_torrents
, sizeof( all_torrents
) );
275 void mutex_deinit( ) {
277 for (i
=0; i
< OT_BUCKET_COUNT
; ++i
)
278 pthread_mutex_destroy(bucket_mutex
+ i
);
279 pthread_mutex_destroy(&tasklist_mutex
);
280 pthread_cond_destroy(&tasklist_being_filled
);
281 byte_zero( all_torrents
, sizeof( all_torrents
) );
284 const char *g_version_mutex_c
= "$Source$: $Revision$\n";