Make chunked transfers use gzip also
[opentracker.git] / ot_mutex.c
blobb61b9151d9612244a1bc259110e9461385f56246
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 /* System */
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <sys/mman.h>
11 #include <sys/uio.h>
13 /* Libowfat */
14 #include "byte.h"
15 #include "io.h"
16 #include "uint32.h"
18 /* Opentracker */
19 #include "trackerlogic.h"
20 #include "ot_iovec.h"
21 #include "ot_mutex.h"
22 #include "ot_stats.h"
24 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
25 #define MTX_DBG( STRING )
27 /* Our global all torrents list */
28 static ot_vector all_torrents[OT_BUCKET_COUNT];
29 static pthread_mutex_t bucket_mutex[OT_BUCKET_COUNT];
30 static size_t g_torrent_count;
32 /* Self pipe from opentracker.c */
33 extern int g_self_pipe[2];
35 ot_vector *mutex_bucket_lock( int bucket ) {
36 pthread_mutex_lock(bucket_mutex + bucket );
37 return all_torrents + bucket;
40 ot_vector *mutex_bucket_lock_by_hash( ot_hash const hash ) {
41 return mutex_bucket_lock( uint32_read_big( (const char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
44 void mutex_bucket_unlock( int bucket, int delta_torrentcount ) {
45 pthread_mutex_unlock(bucket_mutex + bucket);
46 g_torrent_count += delta_torrentcount;
49 void mutex_bucket_unlock_by_hash( ot_hash const hash, int delta_torrentcount ) {
50 mutex_bucket_unlock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount );
53 size_t mutex_get_torrent_count( ) {
54 return g_torrent_count;
57 /* TaskQueue Magic */
59 struct ot_task {
60 ot_taskid taskid;
61 ot_tasktype tasktype;
62 int64 sock;
63 int iovec_entries;
64 struct iovec *iovec;
65 struct ot_task *next;
68 static ot_taskid next_free_taskid = 1;
69 static struct ot_task *tasklist;
70 static pthread_mutex_t tasklist_mutex;
71 static pthread_cond_t tasklist_being_filled;
73 int mutex_workqueue_pushtask( int64 sock, ot_tasktype tasktype ) {
74 struct ot_task ** tmptask, * task;
76 task = malloc(sizeof( struct ot_task));
77 if( !task )
78 return -1;
80 task->taskid = 0;
81 task->tasktype = tasktype;
82 task->sock = sock;
83 task->iovec_entries = 0;
84 task->iovec = NULL;
85 task->next = 0;
87 /* Want exclusive access to tasklist */
88 pthread_mutex_lock( &tasklist_mutex );
90 /* Skip to end of list */
91 tmptask = &tasklist;
92 while( *tmptask )
93 tmptask = &(*tmptask)->next;
94 *tmptask = task;
96 /* Inform waiting workers and release lock */
97 pthread_cond_broadcast( &tasklist_being_filled );
98 pthread_mutex_unlock( &tasklist_mutex );
99 return 0;
102 void mutex_workqueue_canceltask( int64 sock ) {
103 struct ot_task ** task;
105 /* Want exclusive access to tasklist */
106 pthread_mutex_lock( &tasklist_mutex );
108 for (task = &tasklist; *task; task = &((*task)->next))
109 if ((*task)->sock == sock) {
110 struct iovec *iovec = (*task)->iovec;
111 struct ot_task *ptask = *task;
112 int i;
114 /* Free task's iovec */
115 for( i=0; i<(*task)->iovec_entries; ++i )
116 free( iovec[i].iov_base );
118 *task = (*task)->next;
119 free( ptask );
120 break;
123 /* Release lock */
124 pthread_mutex_unlock( &tasklist_mutex );
127 ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ) {
128 struct ot_task * task;
129 ot_taskid taskid = 0;
131 /* Want exclusive access to tasklist */
132 pthread_mutex_lock( &tasklist_mutex );
134 while( !taskid ) {
135 /* Skip to the first unassigned task this worker wants to do */
136 for (task = tasklist; task; task = task->next)
137 if (!task->taskid && ( TASK_CLASS_MASK & task->tasktype ) == *tasktype) {
138 /* If we found an outstanding task, assign a taskid to it
139 and leave the loop */
140 task->taskid = taskid = ++next_free_taskid;
141 *tasktype = task->tasktype;
142 break;
145 /* Wait until the next task is being fed */
146 if (!taskid)
147 pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex );
150 /* Release lock */
151 pthread_mutex_unlock( &tasklist_mutex );
153 return taskid;
156 void mutex_workqueue_pushsuccess( ot_taskid taskid ) {
157 struct ot_task ** task;
159 /* Want exclusive access to tasklist */
160 pthread_mutex_lock( &tasklist_mutex );
162 for (task = &tasklist; *task; task = &((*task)->next))
163 if ((*task)->taskid == taskid) {
164 struct ot_task *ptask = *task;
165 *task = (*task)->next;
166 free( ptask );
167 break;
170 /* Release lock */
171 pthread_mutex_unlock( &tasklist_mutex );
174 int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
175 struct ot_task * task;
176 const char byte = 'o';
178 /* Want exclusive access to tasklist */
179 pthread_mutex_lock( &tasklist_mutex );
181 for (task = tasklist; task; task = task->next)
182 if (task->taskid == taskid) {
183 task->iovec_entries = iovec_entries;
184 task->iovec = iovec;
185 task->tasktype = TASK_DONE;
186 break;
189 /* Release lock */
190 pthread_mutex_unlock( &tasklist_mutex );
192 io_trywrite( g_self_pipe[1], &byte, 1 );
194 /* Indicate whether the worker has to throw away results */
195 return task ? 0 : -1;
198 int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
199 struct ot_task * task;
200 const char byte = 'o';
202 /* Want exclusive access to tasklist */
203 pthread_mutex_lock( &tasklist_mutex );
205 for (task = tasklist; task; task = task->next)
206 if (task->taskid == taskid) {
207 if( iovec ) {
208 fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid);
209 if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) )
210 return -1;
211 task->tasktype = TASK_DONE_PARTIAL;
212 } else {
213 fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid);
214 task->tasktype = TASK_DONE;
216 break;
219 /* Release lock */
220 pthread_mutex_unlock( &tasklist_mutex );
222 io_trywrite( g_self_pipe[1], &byte, 1 );
223 if(!task)
224 fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid);
226 /* Indicate whether the worker has to throw away results */
227 return task ? 0 : -1;
231 int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) {
232 struct ot_task ** task;
233 int64 sock = -1;
235 *is_partial = 0;
237 /* Want exclusive access to tasklist */
238 pthread_mutex_lock( &tasklist_mutex );
240 for (task = &tasklist; *task; task = &((*task)->next))
241 if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) {
242 struct ot_task *ptask = *task;
243 fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries);
244 *iovec_entries = ptask->iovec_entries;
245 *iovec = ptask->iovec;
246 sock = ptask->sock;
248 if ((*task)->tasktype == TASK_DONE) {
249 *task = ptask->next;
250 free( ptask );
251 } else {
252 ptask->iovec_entries = 0;
253 ptask->iovec = NULL;
254 *is_partial = 1;
255 /* Prevent task from showing up immediately again unless new data was added */
256 (*task)->tasktype = TASK_FULLSCRAPE;
258 break;
261 /* Release lock */
262 pthread_mutex_unlock( &tasklist_mutex );
263 return sock;
266 void mutex_init( ) {
267 int i;
268 pthread_mutex_init(&tasklist_mutex, NULL);
269 pthread_cond_init (&tasklist_being_filled, NULL);
270 for (i=0; i < OT_BUCKET_COUNT; ++i)
271 pthread_mutex_init(bucket_mutex + i, NULL);
272 byte_zero( all_torrents, sizeof( all_torrents ) );
275 void mutex_deinit( ) {
276 int i;
277 for (i=0; i < OT_BUCKET_COUNT; ++i)
278 pthread_mutex_destroy(bucket_mutex + i);
279 pthread_mutex_destroy(&tasklist_mutex);
280 pthread_cond_destroy(&tasklist_being_filled);
281 byte_zero( all_torrents, sizeof( all_torrents ) );
284 const char *g_version_mutex_c = "$Source$: $Revision$\n";