work.
[brdnet.git] / dht.pas
blobcb70a37ba2bd9e49b5011128f76da2c4304cafef
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
10 {used by: messages, fileshare}
12 INTERFACE
13 uses NetAddr,Store1;
14 type tPID=Store1.tFID;
15 var MyID:tPID;
16 procedure NodeBootstrap(const contact:tNetAddr);
18 IMPLEMENTATION
19 uses ServerLoop,MemStream,opcode;
21 type
22 tPeer=object
23 ID :tPID;
24 Addr :tNetAddr;
25 ReqDelta:word;
26 LastMsgFrom,
27 LastResFrom :tMTime;
28 end;
29 tPeer_ptr=^tPeer;
30 tBucket_ptr=^tBucket;
31 tBucket=object
32 Prefix: tPID;
33 Depth: byte;
34 peer: array [1..4] of tPeer;
35 ModifyTime: tMTime;
36 //ll: ^tll;
37 next: tBucket_ptr;
38 function MatchPrefix(const tp:tFID):boolean;
39 procedure Refresh;
40 end;
42 var Table:^tBucket;
43 {deepest first}
45 function PrefixLength(const a,b:tFID):byte;
46 var i:byte;
47 var by:byte;
48 var m:byte;
49 begin
50 by:=0;
51 i:=0; while(i<=19) do begin
52 if a[i]<>b[i] then break;
53 inc(i);
54 end;
55 result:=i*8;
56 if i=20 then exit;
57 m:=$80;
58 while(m>0) do begin
59 if (a[i] and m)<>(b[i] and m) then break;
60 m:=m shr 1;
61 inc(result);
62 end;
63 end;
66 function tBucket.MatchPrefix(const tp:tFID):boolean;
67 begin
68 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
69 end;
71 function FindBucket(const prefix:tFID):tBucket_ptr;
72 var cur:^tBucket;
73 begin
74 cur:=Table;
75 result:=nil;
76 while (cur<>nil) and (result=nil) do begin
77 if cur^.MatchPrefix(prefix) {first matching is deepest}
78 then result:=cur;
79 cur:=cur^.next;
80 end;
81 end;
83 operator =(const a,b:tFID):boolean;
84 begin
85 result:=CompareWord(a,b,10)=0;
86 end;
88 procedure SplitBucket(ob:tBucket_ptr);
89 procedure Toggle(var prefix:tPID; bit:byte);
90 begin
91 prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
92 end;
93 var nb:tBucket_ptr;
94 var i:byte;
95 begin
96 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
97 {find pref to old bucket, in order to unlink}
98 if ob=Table then table:=table^.next else begin
99 nb:=Table;
100 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
101 assert(assigned(nb),'old bucket not in table');
102 {unlink}
103 nb^.next:=nb^.next^.next; nb:=nil;
104 end;
105 {increase depth of this bucket}
106 Inc(ob^.depth);
107 ob^.ModifyTime:=mNow;
108 {create new bucket with toggled bit}
109 New(nb);
110 nb^:=ob^;
111 Toggle(nb^.Prefix,nb^.depth-1);
112 nb^.next:=ob;
113 {clear nodes that do not belong in bucket}
114 for i:=1 to high(tBucket.peer) do begin
115 if ob^.peer[i].addr.isNil then continue;
116 if ob^.MatchPrefix(ob^.peer[i].id)
117 then nb^.peer[i].addr.clear
118 else ob^.peer[i].addr.clear;
119 end;
120 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
121 for i:=1 to high(tBucket.peer) do if not ob^.peer[i].addr.isnil
122 then writeln('-> -> ',string(ob^.peer[i].id));
123 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
124 for i:=1 to high(tBucket.peer) do if not nb^.peer[i].addr.isnil
125 then writeln('-> -> ',string(nb^.peer[i].id));
126 if table=nil then table:=nb else begin
127 ob:=Table;
128 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
129 ob^.next:=nb;
130 writeln('-> after /',ob^.depth);
131 end;
132 Shedule(2000,@nb^.Refresh);
133 end;
135 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
136 var bkt:^tBucket;
137 var i,fr:byte;
138 label again;
139 begin
140 again:
141 bkt:=FindBucket(id);
142 if not assigned(bkt) then begin
143 New(Table); //todo
144 bkt:=Table;
145 bkt^.Prefix:=MyID;
146 bkt^.Depth:=0;
147 bkt^.ModifyTime:=mNow;
148 bkt^.next:=nil;
149 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
150 Shedule(2000,@bkt^.Refresh);
151 end;
152 fr:=0;
153 for i:=1 to high(bkt^.peer)
154 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
155 //else if bkt^.peer[i].addr=addr then fr:=i
156 else if bkt^.peer[i].id=id then begin
157 if bkt^.peer[i].addr<>addr then continue;
158 {found node in the bucket}
159 //writeln('DHT: UpdateNode ',string(id));
160 // ?? bkt^.ModifyTime:=mNow;
161 bkt^.peer[i].LastMsgFrom:=mNow;
162 bkt^.peer[i].ReqDelta:=0;
163 exit end else if (fr=0) and (bkt^.peer[i].ReqDelta>=2)
164 then fr:=i {use non-responding as free};
165 if fr=0 then begin
166 if bkt^.MatchPrefix(MyID)
167 then begin
168 SplitBucket(bkt);
169 goto again;
170 end; {the bucket is full!}
171 {drop new node and hope nodes in the bucket are good}
172 end else begin
173 writeln('DHT: AddNode ',string(id),' to ',string(bkt^.prefix),'/',bkt^.depth,'#',fr);
174 bkt^.ModifyTime:=mNow;
175 bkt^.peer[fr].ID:=ID;
176 bkt^.peer[fr].Addr:=Addr;
177 bkt^.peer[fr].LastMsgFrom:=mNow;
178 bkt^.peer[fr].LastResFrom:=0;
179 bkt^.peer[fr].ReqDelta:=0;
180 end;
181 end;
183 procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID);
184 var bkt:^tBucket;
185 begin
186 bkt:=ibkt;
187 repeat
188 inc(ix);
189 if ix>high(tBucket.peer) then begin
190 ix:=1;
191 bkt:=bkt^.next;
192 if not assigned(bkt) then break;
193 end;
194 until (not bkt^.peer[ix].Addr.isNil)and(bkt^.peer[ix].ReqDelta<3);
195 ibkt:=bkt;
196 end;
198 procedure RecvRequest(msg:tSMsg);
199 var s:tMemoryStream absolute msg.stream;
200 var hID:^tPID;
201 var rID:^tPID;
202 var caps:byte;
203 var r:tMemoryStream;
204 var bkt:^tBucket;
205 var i:byte;
206 begin
207 s.skip(1);
208 hID:=s.ReadPtr(20);
209 rID:=s.ReadPtr(20);
210 caps:=s.ReadByte;
211 writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
212 UpdateNode(hID^,msg.source^);
213 {Select peers only from The bucket,
214 if it is broken, send none, but still Ack}
215 bkt:=FindBucket(rID^);
216 r.Init(128);
217 if assigned(bkt) then begin
218 r.WriteByte(opcode.dhtSelect);
219 r.WriteByte(caps);
220 r.Write(msg.Source^,sizeof(tNetAddr));
221 r.Write(rID^,20);
222 r.Write(hID^,20);
223 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
224 for i:=1 to high(tBucket.peer) do begin
225 if bkt^.peer[i].addr.isNil then continue;
226 if bkt^.peer[i].addr=msg.source^ then continue;
227 if bkt^.peer[i].ReqDelta>1 then continue;
228 writeln('-> Select to ',string(bkt^.peer[i].addr));
229 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
230 end;
231 r.Seek(0);
232 r.Trunc;
234 else writeln('-> empty bucket');
235 r.WriteByte(opcode.dhtReqAck);
236 r.Write(MyID,20);
237 writeln('-> ReqAck to ',string(msg.Source^));
238 SendMessage(r.base^,r.length,msg.source^);
239 FreeMem(r.base,r.size);
240 end;
242 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
243 var r:tMemoryStream;
244 begin
245 r.Init(42);
246 r.WriteByte(opcode.dhtRequest);
247 r.Write(MyID,sizeof(tFID));
248 r.Write(ForID,sizeof(tFID));
249 r.WriteByte(caps);
250 SendMessage(r.base^,r.length,contact);
251 FreeMem(r.base,r.size);
252 end;
254 procedure RecvReqAck(msg:tSMsg);
255 var s:tMemoryStream absolute msg.stream;
256 var hID:^tPID;
257 begin
258 s.skip(1);
259 hID:=s.ReadPtr(20);
260 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
261 UpdateNode(hID^,msg.source^);
262 end;
264 procedure RecvWazzup(msg:tSMsg);
265 var s:tMemoryStream absolute msg.stream;
266 var hID:^tPID;
267 begin
268 s.skip(1);
269 hID:=s.ReadPtr(20);
270 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
271 UpdateNode(hID^,msg.source^);
272 //UpdateSearch(hID^,msg.source^);
273 end;
275 procedure NodeBootstrap(const contact:tNetAddr);
276 begin
277 SendRequest(contact,MyID,0);
278 end;
280 procedure RecvSelect(msg:tSMsg);
281 var s:tMemoryStream absolute msg.stream;
282 var caps:byte;
283 var addr:^tNetAddr;
284 var rID:^tPID;
285 var r:tMemoryStream;
286 begin
287 s.skip(1);
288 caps:=s.ReadByte;
289 addr:=s.ReadPtr(sizeof(tNetAddr));
290 rID:=s.ReadPtr(20);
291 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
292 if rID^=MyID then begin
293 //writeln('-> self');
294 exit end;
295 r.Init(21);
296 r.WriteByte(opcode.dhtWazzup);
297 r.Write(MyID,20);
298 //writeln('-> Wazzup to ',string(addr^));
299 SendMessage(r.base^,r.length,addr^);
300 FreeMem(r.base,r.size);
301 end;
303 procedure tBucket.Refresh;
304 var my,rtr:boolean;
305 var i,ol,rv:byte;
306 var wait:LongWord;
307 var rvb:^tBucket;
308 procedure lSend(var peer:tPeer; const trg:tPID);
309 begin
310 SendRequest(peer.Addr,trg,0);
311 Inc(peer.ReqDelta);
312 end;
313 begin
314 my:=MatchPrefix(MyID);
315 ol:=0;
316 rtr:=false;
317 for i:=1 to high(tBucket.peer) do
318 if (not peer[i].Addr.isNil) and (peer[i].ReqDelta<4) then begin
319 if peer[i].ReqDelta>0 then begin
320 {peer is not responding, but try once more}
321 writeln('DHT: Refresh (R',peer[i].ReqDelta,') #',i,' ',string(peer[i].addr));
322 lSend(peer[i],prefix);
323 rtr:=true;
325 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
326 then ol:=i;
327 end;
328 {now nudge the most quiet peer}
329 if (ol>0) and (not rtr) then begin
330 if not rtr then writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
331 lSend(peer[ol],MyID);
332 end;
333 if (not rtr)and(ol=0) then begin
334 {no usable nodes in this bucket, try to recover from other buckets}
335 writeln('DHT: Refresh BROKEN BUCKET');
336 rv:=0; rvb:=@self;
337 GetNextNode(rvb,rv,prefix);
338 if assigned(rvb) then begin
339 writeln('DHT: Refresh (RV) #',rv,' ',string(rvb^.peer[rv].addr));
340 lSend(rvb^.peer[rv],prefix);
341 end;
342 end;
343 if my
344 then wait:=18000+(depth*600)
345 else wait:=30000;
346 if rtr then wait:=wait div 3;
347 Shedule(wait,@Refresh);
348 end;
351 {to bootstrap: ping address to get ID and insert to bucket/il
352 ping may get lost: separate bootstrap unit :)
353 now jut Ass-U-Me wont get lost}
355 procedure LoadIDFromArgs;
356 var oi:word;
357 const opt='-id';
358 begin
359 oi:=OptIndex(opt);
360 if oi>0 then begin
361 assert(OptParamCount(oi)=1,opt+'(pid:sha1)');
362 writeln('DHT: set ID to '+paramstr(oi+1));
363 MyID:=tPID(paramstr(oi+1));
364 end;
365 end;
367 BEGIN
368 SetMsgHandler(opcode.dhtRequest,@recvRequest);
369 SetMsgHandler(opcode.dhtSelect,@recvSelect);
370 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
371 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
372 LoadIdFromArgs;
373 END.