Chat API update. Breaks everything.
[brdnet.git] / dht.pas
blob0836611e3c5aadc5c825e901e3f595580fef6090
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
13 INTERFACE
14 uses NetAddr,Store1;
15 type tPID=Store1.tFID;
16 type tPeerPub=object
17 ID :tPID;
18 Addr :tNetAddr;
19 //function IsGood
20 //function GetAge
21 end;
22 var MyID:tPID;
23 procedure NodeBootstrap(const contact:tNetAddr);
24 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
25 procedure InsertNode(const peer:tPeerPub);
27 IMPLEMENTATION
28 uses ServerLoop,MemStream,opcode;
30 type
31 tPeer=object(tPeerPub)
32 ReqDelta:word;
33 LastMsgFrom,
34 LastResFrom :tMTime;
35 end;
36 tPeer_ptr=^tPeer;
37 tBucket_ptr=^tBucket;
38 tBucket=object
39 Prefix: tPID;
40 Depth: byte;
41 peer: array [1..4] of tPeer;
42 ModifyTime: tMTime;
43 //ll: ^tll;
44 desperate:word;
45 next: tBucket_ptr;
46 function MatchPrefix(const tp:tFID):boolean;
47 procedure Refresh;
48 end;
50 var Table:^tBucket;
51 {deepest first}
53 function PrefixLength(const a,b:tFID):byte;
54 var i:byte;
55 var by:byte;
56 var m:byte;
57 begin
58 by:=0;
59 i:=0; while(i<=19) do begin
60 if a[i]<>b[i] then break;
61 inc(i);
62 end;
63 result:=i*8;
64 if i=20 then exit;
65 m:=$80;
66 while(m>0) do begin
67 if (a[i] and m)<>(b[i] and m) then break;
68 m:=m shr 1;
69 inc(result);
70 end;
71 end;
74 function tBucket.MatchPrefix(const tp:tFID):boolean;
75 begin
76 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
77 end;
79 function FindBucket(const prefix:tFID):tBucket_ptr;
80 var cur:^tBucket;
81 begin
82 cur:=Table;
83 result:=nil;
84 while (cur<>nil) and (result=nil) do begin
85 if cur^.MatchPrefix(prefix) {first matching is deepest}
86 then result:=cur;
87 cur:=cur^.next;
88 end;
89 end;
91 operator =(const a,b:tFID):boolean;
92 begin
93 result:=CompareWord(a,b,10)=0;
94 end;
96 procedure SplitBucket(ob:tBucket_ptr);
97 procedure Toggle(var prefix:tPID; bit:byte);
98 begin
99 prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
100 end;
101 var nb:tBucket_ptr;
102 var i:byte;
103 begin
104 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
105 {find pref to old bucket, in order to unlink}
106 if ob=Table then table:=table^.next else begin
107 nb:=Table;
108 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
109 assert(assigned(nb),'old bucket not in table');
110 {unlink}
111 nb^.next:=nb^.next^.next; nb:=nil;
112 end;
113 {increase depth of this bucket}
114 Inc(ob^.depth);
115 ob^.ModifyTime:=mNow;
116 {create new bucket with toggled bit}
117 New(nb);
118 nb^:=ob^;
119 Toggle(nb^.Prefix,nb^.depth-1);
120 nb^.next:=ob;
121 {clear nodes that do not belong in bucket}
122 for i:=1 to high(tBucket.peer) do begin
123 if ob^.peer[i].addr.isNil then continue;
124 if ob^.MatchPrefix(ob^.peer[i].id)
125 then nb^.peer[i].addr.clear
126 else ob^.peer[i].addr.clear;
127 end;
128 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
129 for i:=1 to high(tBucket.peer) do if not ob^.peer[i].addr.isnil
130 then writeln('-> -> ',string(ob^.peer[i].id));
131 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
132 for i:=1 to high(tBucket.peer) do if not nb^.peer[i].addr.isnil
133 then writeln('-> -> ',string(nb^.peer[i].id));
134 if table=nil then table:=nb else begin
135 ob:=Table;
136 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
137 ob^.next:=nb;
138 writeln('-> after /',ob^.depth);
139 end;
140 Shedule(2000,@nb^.Refresh);
141 end;
143 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
144 var bkt:^tBucket;
145 var i,fr:byte;
146 label again;
147 begin
148 if id=MyID then exit;
149 again:
150 bkt:=FindBucket(id);
151 if not assigned(bkt) then begin
152 New(Table); //todo
153 bkt:=Table;
154 bkt^.Prefix:=MyID;
155 bkt^.Depth:=0;
156 bkt^.ModifyTime:=mNow;
157 bkt^.next:=nil;
158 bkt^.desperate:=3;
159 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
160 Shedule(2000,@bkt^.Refresh);
161 end;
162 fr:=0;
163 for i:=1 to high(bkt^.peer)
164 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
165 else if (bkt^.peer[i].ReqDelta<2) then begin
166 {found node in the bucket}
167 if (bkt^.peer[i].id=id) then begin
168 bkt^.peer[i].LastMsgFrom:=mNow;
169 bkt^.peer[i].ReqDelta:=0;
170 exit
171 end;
172 if bkt^.peer[i].addr=addr then exit;
174 else if (fr=0)or (bkt^.peer[i].id=id)
175 then fr:=i;
176 if fr=0 then begin
177 if bkt^.MatchPrefix(MyID)
178 then begin
179 SplitBucket(bkt);
180 goto again;
181 end; {the bucket is full!}
182 {drop new node and hope nodes in the bucket are good}
183 end else begin
184 writeln('DHT: AddNode ',string(id),string(addr),' to ',string(bkt^.prefix),'/',bkt^.depth,'#',fr);
185 bkt^.ModifyTime:=mNow;
186 bkt^.peer[fr].ID:=ID;
187 bkt^.peer[fr].Addr:=Addr;
188 bkt^.peer[fr].LastMsgFrom:=mNow;
189 bkt^.peer[fr].LastResFrom:=0;
190 bkt^.peer[fr].ReqDelta:=0;
191 end;
192 end;
194 procedure InsertNode(const peer:tPeerPub);
195 begin
196 UpdateNode(peer.id,peer.addr);
197 end;
199 procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID; maxrd:word);
200 var bkt:^tBucket;
201 begin
202 if not assigned(ibkt) then exit;
203 bkt:=ibkt;
204 repeat
205 inc(ix);
206 if ix>high(tBucket.peer) then begin
207 ix:=1;
208 bkt:=bkt^.next;
209 if not assigned(bkt) then break;
210 end;
211 until (not bkt^.peer[ix].Addr.isNil)and(bkt^.peer[ix].ReqDelta<maxrd);
212 ibkt:=bkt;
213 end;
215 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
216 begin
217 if ibkt=nil then ibkt:=Table;
218 GetNextNode(ibkt,ix,MyID,3);
219 if assigned(ibkt)
220 then peer:=tBucket(ibkt^).peer[ix]
221 else peer.addr.clear;
222 end;
224 {Messages:
225 a)Request: op, SendID, TargetID, caps, adt
226 b)Select : op, caps, addr, TargetID, OrigID, adt (66)
227 : op, caps, addr, TatgetID, SendID, adt (66)
228 c)Ack : op, SenderID
229 d)Wazzup : op, SenderID
232 procedure RecvRequest(msg:tSMsg);
233 var s:tMemoryStream absolute msg.stream;
234 var sID:^tPID;
235 var rID:^tPID;
236 var caps:byte;
237 var r:tMemoryStream;
238 var bkt:^tBucket;
239 var i,li:byte;
240 var SendCnt:byte;
241 begin
242 s.skip(1);
243 sID:=s.ReadPtr(20);
244 rID:=s.ReadPtr(20);
245 caps:=s.ReadByte;
246 SendCnt:=0;
247 //writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
248 UpdateNode(sID^,msg.source^);
249 {Select peers only from The bucket,
250 if it is broken, send none, but still Ack}
251 bkt:=FindBucket(rID^);
252 r.Init(128);
253 if assigned(bkt) then begin
254 r.WriteByte(opcode.dhtSelect);
255 r.WriteByte(caps);
256 r.Write(msg.Source^,sizeof(tNetAddr));
257 r.Write(rID^,20);
258 r.Write(MyID,20);
259 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
260 for i:=1 to high(tBucket.peer) do begin
261 if bkt^.peer[i].addr.isNil then continue;
262 if bkt^.peer[i].addr=msg.source^ then continue;
263 if bkt^.peer[i].ReqDelta>1 then continue;
264 //writeln('-> Select to ',string(bkt^.peer[i].addr));
265 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
266 li:=i;
267 Inc(SendCnt);
268 end;
269 while SendCnt<4 do begin
270 GetNextNode(bkt,li,rID^,3);
271 if not assigned(bkt) then break;
272 SendMessage(r.base^,r.length,bkt^.peer[li].addr);
273 Inc(SendCnt);
274 end;
275 r.Seek(0);
276 r.Trunc;
278 //else writeln('-> empty bucket')
280 r.WriteByte(opcode.dhtReqAck);
281 r.Write(MyID,20);
282 //writeln('-> ReqAck to ',string(msg.Source^));
283 SendMessage(r.base^,r.length,msg.source^);
284 FreeMem(r.base,r.size);
285 end;
287 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
288 var r:tMemoryStream;
289 begin
290 r.Init(42);
291 r.WriteByte(opcode.dhtRequest);
292 r.Write(MyID,sizeof(tFID));
293 r.Write(ForID,sizeof(tFID));
294 r.WriteByte(caps);
295 SendMessage(r.base^,r.length,contact);
296 FreeMem(r.base,r.size);
297 end;
299 procedure RecvReqAck(msg:tSMsg);
300 var s:tMemoryStream absolute msg.stream;
301 var hID:^tPID;
302 begin
303 s.skip(1);
304 hID:=s.ReadPtr(20);
305 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
306 UpdateNode(hID^,msg.source^);
307 end;
309 procedure RecvWazzup(msg:tSMsg);
310 var s:tMemoryStream absolute msg.stream;
311 var hID:^tPID;
312 begin
313 s.skip(1);
314 hID:=s.ReadPtr(20);
315 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
316 UpdateNode(hID^,msg.source^);
317 //UpdateSearch(hID^,msg.source^);
318 end;
320 procedure NodeBootstrap(const contact:tNetAddr);
321 begin
322 SendRequest(contact,MyID,0);
323 end;
325 procedure RecvSelect(msg:tSMsg);
326 var s:tMemoryStream absolute msg.stream;
327 var caps:byte;
328 var addr:^tNetAddr;
329 var rID,sID:^tPID;
330 var r:tMemoryStream;
331 begin
332 s.skip(1);
333 caps:=s.ReadByte;
334 addr:=s.ReadPtr(sizeof(tNetAddr));
335 rID:=s.ReadPtr(20);
336 sID:=s.ReadPtr(20);
337 //UpdateNode(sID^,msg.source^);
338 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
339 if rID^=MyID then begin
340 //writeln('-> self');
341 exit end;
342 r.Init(21);
343 r.WriteByte(opcode.dhtWazzup);
344 r.Write(MyID,20);
345 //writeln('-> Wazzup to ',string(addr^));
346 SendMessage(r.base^,r.length,addr^);
347 FreeMem(r.base,r.size);
348 end;
350 const cStichRar=10;
351 procedure tBucket.Refresh;
352 var my,rtr,stich:boolean;
353 var i,ol,rv:byte;
354 var wait:LongWord;
355 var rvb:^tBucket;
356 procedure lSend(var peer:tPeer; const trg:tPID);
357 begin
358 SendRequest(peer.Addr,trg,0);
359 Inc(peer.ReqDelta);
360 end;
361 begin
362 my:=MatchPrefix(MyID);
363 ol:=0; rtr:=false;
364 {1 of 10 times try to contact dead nodes in attempt to recover from network split}
365 stich:=Random(cStichRar)=0;
366 for i:=1 to high(tBucket.peer)
367 do if (not peer[i].Addr.isNil) then begin
368 if peer[i].ReqDelta>0 then begin
369 if (peer[i].ReqDelta<=3)xor stich then begin
370 {this will get rid of half-dead nodes}
371 writeln('DHT: Refresh (R',peer[i].ReqDelta,') ',copy(string(peer[i].id),1,6),string(peer[i].addr));
372 lSend(peer[i],prefix);
373 rtr:=true;
376 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
377 then ol:=i;
378 end;
379 {now nudge the most quiet peer, but not too often}
380 if (ol>0) and ((mNow-peer[ol].LastMsgFrom)>10000) then begin
381 //writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
382 lSend(peer[ol],MyID);
383 end;
384 {try to recover bucket full of bad nodes}
385 if (ol=0){and(not rtr)} then begin
386 rv:=0; rvb:=@self;
387 GetNextNode(rvb,rv,prefix,desperate);
388 if not assigned(rvb) then begin
389 rv:=0; rvb:=Table; {in extreme cases, try the whole table}
390 GetNextNode(rvb,rv,prefix,desperate);
391 end;
392 if assigned(rvb) then begin
393 writeln('DHT: Recover ',string(prefix),'/',depth,' try ',copy(string(rvb^.peer[rv].id),1,6),string(rvb^.peer[rv].addr));
394 lSend(rvb^.peer[rv],prefix);
395 end else inc(desperate);
396 end else desperate:=3;
397 if my
398 then wait:=18000+(depth*600)
399 else wait:=30000;
400 if rtr and(not stich) then wait:=wait div 3;
401 Shedule(wait,@Refresh);
402 end;
405 {to bootstrap: ping address to get ID and insert to bucket/il
406 ping may get lost: separate bootstrap unit :)
407 now jut Ass-U-Me wont get lost}
409 BEGIN
410 SetMsgHandler(opcode.dhtRequest,@recvRequest);
411 SetMsgHandler(opcode.dhtSelect,@recvSelect);
412 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
413 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
414 END.