1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
11 #include <sys/types.h>
24 #include "trackerlogic.h"
26 #ifdef WANT_SYNC_BATCH
28 #define OT_SYNC_CHUNK_SIZE (512 * 1024)
30 /* Import Changeset from an external authority
31 format: d4:syncd[..]ee
32 [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+
34 int add_changeset_to_tracker(uint8_t *data
, size_t len
) {
36 uint8_t *end
= data
+ len
;
37 unsigned long peer_count
;
39 /* We do know, that the string is \n terminated, so it cant
41 if (byte_diff(data
, 8, "d4:syncd"))
46 if (byte_diff(data
, 3, "20:")) {
47 if (byte_diff(data
, 2, "ee"))
52 hash
= (ot_hash
*)data
;
53 data
+= sizeof(ot_hash
);
55 /* Scan string length indicator */
56 data
+= (len
= scan_ulong((char *)data
, &peer_count
));
58 /* If no long was scanned, it is not divisible by 8, it is not
59 followed by a colon or claims to need to much memory, we fail */
60 if (!len
|| !peer_count
|| (peer_count
& 7) || (*data
++ != ':') || (data
+ peer_count
> end
))
63 while (peer_count
> 0) {
64 add_peer_to_torrent(hash
, (ot_peer
*)data
, 1);
72 /* Proposed output format
73 d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
75 static void sync_make(int *iovec_entries
, struct iovec
**iovector
) {
79 /* Setup return vector... */
82 if (!(r
= iovec_increase(iovec_entries
, iovector
, OT_SYNC_CHUNK_SIZE
)))
85 /* ... and pointer to end of current output buffer.
86 This works as a low watermark */
87 re
= r
+ OT_SYNC_CHUNK_SIZE
;
89 memmove(r
, "d4:syncd", 8);
92 /* For each bucket... */
93 for (bucket
= 0; bucket
< OT_BUCKET_COUNT
; ++bucket
) {
94 /* Get exclusive access to that bucket */
95 ot_vector
*torrents_list
= mutex_bucket_lock(bucket
);
98 /* For each torrent in this bucket.. */
99 for (tor_offset
= 0; tor_offset
< torrents_list
->size
; ++tor_offset
) {
100 /* Address torrents members */
101 ot_peerlist
*peer_list
= (((ot_torrent
*)(torrents_list
->data
))[tor_offset
]).peer_list
;
102 ot_hash
*hash
= &(((ot_torrent
*)(torrents_list
->data
))[tor_offset
]).hash
;
103 const size_t byte_count
= sizeof(ot_peer
) * peer_list
->changeset
.size
;
105 /* If we reached our low watermark in buffer... */
106 if (re
- r
<= (ssize_t
)(/* strlen( "20:" ) == */ 3 + sizeof(ot_hash
) + /* strlen_max( "%zd" ) == */ 12 + byte_count
)) {
108 /* Allocate a fresh output buffer at the end of our buffers list
109 release bucket and return, if that fails */
110 if (!(r
= iovec_fix_increase_or_free(iovec_entries
, iovector
, r
, OT_SYNC_CHUNK_SIZE
)))
111 return mutex_bucket_unlock(bucket
);
113 /* Adjust new end of output buffer */
114 re
= r
+ OT_SYNC_CHUNK_SIZE
;
120 memmove(r
, hash
, sizeof(ot_hash
));
121 r
+= sizeof(ot_hash
);
122 r
+= sprintf(r
, "%zd:", byte_count
);
123 memmove(r
, peer_list
->changeset
.data
, byte_count
);
127 /* All torrents done: release lock on currenct bucket */
128 mutex_bucket_unlock(bucket
);
131 /* Close bencoded sync dictionary */
135 /* Release unused memory in current output buffer */
136 iovec_fixlast(iovec_entries
, iovector
, r
);
139 /* This is the entry point into this worker thread
140 It grabs tasks from mutex_tasklist and delivers results back
142 static void *sync_worker(void *args
) {
144 struct iovec
*iovector
;
149 ot_tasktype tasktype
= TASK_SYNC_OUT
;
150 ot_taskid taskid
= mutex_workqueue_poptask(&tasktype
);
151 sync_make(&iovec_entries
, &iovector
);
152 stats_issue_event(EVENT_SYNC_OUT
, FLAG_TCP
, iovec_length(&iovec_entries
, &iovector
));
153 if (mutex_workqueue_pushresult(taskid
, iovec_entries
, iovector
))
154 iovec_free(&iovec_entries
, &iovector
);
159 static pthread_t thread_id
;
161 pthread_create( &thread_id
, NULL
, sync_worker
, NULL
);
164 void sync_deinit( ) {
165 pthread_cancel( thread_id
);
168 void sync_deliver( int64 socket
) {
169 mutex_workqueue_pushtask( socket
, TASK_SYNC_OUT
);