Changes in UpdateNode and RefreshBucket.
[brdnet.git] / dht.pas
blob6a12150c6da6572e99022755094023f8dfcf1f3c
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
13 INTERFACE
14 uses NetAddr,Store1;
15 type tPID=Store1.tFID;
16 type tPeerPub=object
17 ID :tPID;
18 Addr :tNetAddr;
19 //function IsGood
20 //function GetAge
21 end;
22 var MyID:tPID;
23 procedure NodeBootstrap(const contact:tNetAddr);
24 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
25 procedure InsertNode(const peer:tPeerPub);
27 IMPLEMENTATION
28 uses ServerLoop,MemStream,opcode;
30 type
31 tPeer=object(tPeerPub)
32 ReqDelta:word;
33 LastMsgFrom,
34 LastResFrom :tMTime;
35 end;
36 tPeer_ptr=^tPeer;
37 tBucket_ptr=^tBucket;
38 tBucket=object
39 Prefix: tPID;
40 Depth: byte;
41 peer: array [1..4] of tPeer;
42 ModifyTime: tMTime;
43 //ll: ^tll;
44 desperate:word;
45 next: tBucket_ptr;
46 function MatchPrefix(const tp:tFID):boolean;
47 procedure Refresh;
48 end;
50 var Table:^tBucket;
51 {deepest first}
53 function PrefixLength(const a,b:tFID):byte;
54 var i:byte;
55 var by:byte;
56 var m:byte;
57 begin
58 by:=0;
59 i:=0; while(i<=19) do begin
60 if a[i]<>b[i] then break;
61 inc(i);
62 end;
63 result:=i*8;
64 if i=20 then exit;
65 m:=$80;
66 while(m>0) do begin
67 if (a[i] and m)<>(b[i] and m) then break;
68 m:=m shr 1;
69 inc(result);
70 end;
71 end;
74 function tBucket.MatchPrefix(const tp:tFID):boolean;
75 begin
76 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
77 end;
79 function FindBucket(const prefix:tFID):tBucket_ptr;
80 var cur:^tBucket;
81 begin
82 cur:=Table;
83 result:=nil;
84 while (cur<>nil) and (result=nil) do begin
85 if cur^.MatchPrefix(prefix) {first matching is deepest}
86 then result:=cur;
87 cur:=cur^.next;
88 end;
89 end;
91 operator =(const a,b:tFID):boolean;
92 begin
93 result:=CompareWord(a,b,10)=0;
94 end;
96 procedure SplitBucket(ob:tBucket_ptr);
97 procedure Toggle(var prefix:tPID; bit:byte);
98 begin
99 prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
100 end;
101 var nb:tBucket_ptr;
102 var i:byte;
103 begin
104 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
105 {find pref to old bucket, in order to unlink}
106 if ob=Table then table:=table^.next else begin
107 nb:=Table;
108 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
109 assert(assigned(nb),'old bucket not in table');
110 {unlink}
111 nb^.next:=nb^.next^.next; nb:=nil;
112 end;
113 {increase depth of this bucket}
114 Inc(ob^.depth);
115 ob^.ModifyTime:=mNow;
116 {create new bucket with toggled bit}
117 New(nb);
118 nb^:=ob^;
119 Toggle(nb^.Prefix,nb^.depth-1);
120 nb^.next:=ob;
121 {clear nodes that do not belong in bucket}
122 for i:=1 to high(tBucket.peer) do begin
123 if ob^.peer[i].addr.isNil then continue;
124 if ob^.MatchPrefix(ob^.peer[i].id)
125 then nb^.peer[i].addr.clear
126 else ob^.peer[i].addr.clear;
127 end;
128 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
129 for i:=1 to high(tBucket.peer) do if not ob^.peer[i].addr.isnil
130 then writeln('-> -> ',string(ob^.peer[i].id));
131 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
132 for i:=1 to high(tBucket.peer) do if not nb^.peer[i].addr.isnil
133 then writeln('-> -> ',string(nb^.peer[i].id));
134 if table=nil then table:=nb else begin
135 ob:=Table;
136 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
137 ob^.next:=nb;
138 writeln('-> after /',ob^.depth);
139 end;
140 Shedule(2000,@nb^.Refresh);
141 end;
143 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
144 var bkt:^tBucket;
145 var i,fr:byte;
146 label again;
147 begin
148 if id=MyID then exit;
149 again:
150 bkt:=FindBucket(id);
151 if not assigned(bkt) then begin
152 New(Table); //todo
153 bkt:=Table;
154 bkt^.Prefix:=MyID;
155 bkt^.Depth:=0;
156 bkt^.ModifyTime:=mNow;
157 bkt^.next:=nil;
158 bkt^.desperate:=3;
159 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
160 Shedule(2000,@bkt^.Refresh);
161 end;
162 fr:=0;
163 for i:=1 to high(bkt^.peer)
164 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
165 else if (bkt^.peer[i].ReqDelta<2) then begin
166 {found node in the bucket}
167 if (bkt^.peer[i].id=id) then begin
168 bkt^.peer[i].LastMsgFrom:=mNow;
169 bkt^.peer[i].ReqDelta:=0;
170 exit
171 end;
172 if bkt^.peer[i].addr=addr then exit;
174 else if (fr=0)or (bkt^.peer[i].id=id)
175 then fr:=i;
176 if fr=0 then begin
177 if bkt^.MatchPrefix(MyID)
178 then begin
179 SplitBucket(bkt);
180 goto again;
181 end; {the bucket is full!}
182 {drop new node and hope nodes in the bucket are good}
183 end else begin
184 writeln('DHT: AddNode ',string(id),string(addr),' to ',string(bkt^.prefix),'/',bkt^.depth,'#',fr);
185 bkt^.ModifyTime:=mNow;
186 bkt^.peer[fr].ID:=ID;
187 bkt^.peer[fr].Addr:=Addr;
188 bkt^.peer[fr].LastMsgFrom:=mNow;
189 bkt^.peer[fr].LastResFrom:=0;
190 bkt^.peer[fr].ReqDelta:=0;
191 end;
192 end;
194 procedure InsertNode(const peer:tPeerPub);
195 begin
196 UpdateNode(peer.id,peer.addr);
197 end;
199 procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID; maxrd:word);
200 var bkt:^tBucket;
201 begin
202 if not assigned(ibkt) then exit;
203 bkt:=ibkt;
204 repeat
205 inc(ix);
206 if ix>high(tBucket.peer) then begin
207 ix:=1;
208 bkt:=bkt^.next;
209 if not assigned(bkt) then break;
210 end;
211 until (not bkt^.peer[ix].Addr.isNil)and(bkt^.peer[ix].ReqDelta<maxrd);
212 ibkt:=bkt;
213 end;
215 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
216 begin
217 if ibkt=nil then ibkt:=Table;
218 GetNextNode(ibkt,ix,MyID,3);
219 if assigned(ibkt)
220 then peer:=tBucket(ibkt^).peer[ix]
221 else peer.addr.clear;
222 end;
224 procedure RecvRequest(msg:tSMsg);
225 var s:tMemoryStream absolute msg.stream;
226 var hID:^tPID;
227 var rID:^tPID;
228 var caps:byte;
229 var r:tMemoryStream;
230 var bkt:^tBucket;
231 var i:byte;
232 begin
233 s.skip(1);
234 hID:=s.ReadPtr(20);
235 rID:=s.ReadPtr(20);
236 caps:=s.ReadByte;
237 //writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
238 UpdateNode(hID^,msg.source^);
239 {Select peers only from The bucket,
240 if it is broken, send none, but still Ack}
241 bkt:=FindBucket(rID^);
242 r.Init(128);
243 if assigned(bkt) then begin
244 r.WriteByte(opcode.dhtSelect);
245 r.WriteByte(caps);
246 r.Write(msg.Source^,sizeof(tNetAddr));
247 r.Write(rID^,20);
248 r.Write(hID^,20);
249 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
250 for i:=1 to high(tBucket.peer) do begin
251 if bkt^.peer[i].addr.isNil then continue;
252 if bkt^.peer[i].addr=msg.source^ then continue;
253 if bkt^.peer[i].ReqDelta>1 then continue;
254 //writeln('-> Select to ',string(bkt^.peer[i].addr));
255 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
256 end;
257 r.Seek(0);
258 r.Trunc;
260 //else writeln('-> empty bucket')
262 r.WriteByte(opcode.dhtReqAck);
263 r.Write(MyID,20);
264 //writeln('-> ReqAck to ',string(msg.Source^));
265 SendMessage(r.base^,r.length,msg.source^);
266 FreeMem(r.base,r.size);
267 end;
269 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
270 var r:tMemoryStream;
271 begin
272 r.Init(42);
273 r.WriteByte(opcode.dhtRequest);
274 r.Write(MyID,sizeof(tFID));
275 r.Write(ForID,sizeof(tFID));
276 r.WriteByte(caps);
277 SendMessage(r.base^,r.length,contact);
278 FreeMem(r.base,r.size);
279 end;
281 procedure RecvReqAck(msg:tSMsg);
282 var s:tMemoryStream absolute msg.stream;
283 var hID:^tPID;
284 begin
285 s.skip(1);
286 hID:=s.ReadPtr(20);
287 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
288 UpdateNode(hID^,msg.source^);
289 end;
291 procedure RecvWazzup(msg:tSMsg);
292 var s:tMemoryStream absolute msg.stream;
293 var hID:^tPID;
294 begin
295 s.skip(1);
296 hID:=s.ReadPtr(20);
297 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
298 UpdateNode(hID^,msg.source^);
299 //UpdateSearch(hID^,msg.source^);
300 end;
302 procedure NodeBootstrap(const contact:tNetAddr);
303 begin
304 SendRequest(contact,MyID,0);
305 end;
307 procedure RecvSelect(msg:tSMsg);
308 var s:tMemoryStream absolute msg.stream;
309 var caps:byte;
310 var addr:^tNetAddr;
311 var rID:^tPID;
312 var r:tMemoryStream;
313 begin
314 s.skip(1);
315 caps:=s.ReadByte;
316 addr:=s.ReadPtr(sizeof(tNetAddr));
317 rID:=s.ReadPtr(20);
318 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
319 if rID^=MyID then begin
320 //writeln('-> self');
321 exit end;
322 r.Init(21);
323 r.WriteByte(opcode.dhtWazzup);
324 r.Write(MyID,20);
325 //writeln('-> Wazzup to ',string(addr^));
326 SendMessage(r.base^,r.length,addr^);
327 FreeMem(r.base,r.size);
328 end;
330 const cStichRar=10;
331 procedure tBucket.Refresh;
332 var my,rtr,stich:boolean;
333 var i,ol,rv:byte;
334 var wait:LongWord;
335 var rvb:^tBucket;
336 procedure lSend(var peer:tPeer; const trg:tPID);
337 begin
338 SendRequest(peer.Addr,trg,0);
339 Inc(peer.ReqDelta);
340 end;
341 begin
342 my:=MatchPrefix(MyID);
343 ol:=0; rtr:=false;
344 {1 of 10 times try to contact dead nodes in attempt to recover from network split}
345 stich:=Random(cStichRar)=0;
346 for i:=1 to high(tBucket.peer)
347 do if (not peer[i].Addr.isNil) then begin
348 if peer[i].ReqDelta>0 then begin
349 if (peer[i].ReqDelta<=3)xor stich then begin
350 {this will get rid of half-dead nodes}
351 writeln('DHT: Refresh (R',peer[i].ReqDelta,') ',copy(string(peer[i].id),1,6),string(peer[i].addr));
352 lSend(peer[i],prefix);
353 rtr:=true;
356 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
357 then ol:=i;
358 end;
359 {now nudge the most quiet peer, but not too often}
360 if (ol>0) and ((mNow-peer[ol].LastMsgFrom)>10000) then begin
361 //writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
362 lSend(peer[ol],MyID);
363 end;
364 {try to recover bucket full of bad nodes}
365 if (ol=0)and(not rtr) then begin
366 rv:=0; rvb:=@self;
367 GetNextNode(rvb,rv,prefix,desperate);
368 if not assigned(rvb) then begin
369 rv:=0; rvb:=Table; {in extreme cases, try the whole table}
370 GetNextNode(rvb,rv,prefix,desperate);
371 end;
372 if assigned(rvb) then begin
373 writeln('DHT: Recover ',string(prefix),'/',depth,' try ',copy(string(rvb^.peer[rv].id),1,6),string(rvb^.peer[rv].addr));
374 lSend(rvb^.peer[rv],prefix);
375 end else inc(desperate);
376 end else desperate:=3;
377 if my
378 then wait:=18000+(depth*600)
379 else wait:=30000;
380 if rtr then wait:=wait div 3;
381 Shedule(wait,@Refresh);
382 end;
385 {to bootstrap: ping address to get ID and insert to bucket/il
386 ping may get lost: separate bootstrap unit :)
387 now jut Ass-U-Me wont get lost}
389 BEGIN
390 SetMsgHandler(opcode.dhtRequest,@recvRequest);
391 SetMsgHandler(opcode.dhtSelect,@recvSelect);
392 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
393 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
394 END.