Recover broken bucket from progressively more bad nodes from other buckets.
[brdnet.git] / dht.pas
blob7b57a5c6d54f455f65b5ffed40f5609e2e44c730
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
13 INTERFACE
14 uses NetAddr,Store1;
15 type tPID=Store1.tFID;
16 type tPeerPub=object
17 ID :tPID;
18 Addr :tNetAddr;
19 //function IsGood
20 //function GetAge
21 end;
22 var MyID:tPID;
23 procedure NodeBootstrap(const contact:tNetAddr);
24 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
25 procedure InsertNode(const peer:tPeerPub);
27 IMPLEMENTATION
28 uses ServerLoop,MemStream,opcode;
30 type
31 tPeer=object(tPeerPub)
32 ReqDelta:word;
33 LastMsgFrom,
34 LastResFrom :tMTime;
35 end;
36 tPeer_ptr=^tPeer;
37 tBucket_ptr=^tBucket;
38 tBucket=object
39 Prefix: tPID;
40 Depth: byte;
41 peer: array [1..4] of tPeer;
42 ModifyTime: tMTime;
43 //ll: ^tll;
44 desperate:word;
45 next: tBucket_ptr;
46 function MatchPrefix(const tp:tFID):boolean;
47 procedure Refresh;
48 end;
50 var Table:^tBucket;
51 {deepest first}
53 function PrefixLength(const a,b:tFID):byte;
54 var i:byte;
55 var by:byte;
56 var m:byte;
57 begin
58 by:=0;
59 i:=0; while(i<=19) do begin
60 if a[i]<>b[i] then break;
61 inc(i);
62 end;
63 result:=i*8;
64 if i=20 then exit;
65 m:=$80;
66 while(m>0) do begin
67 if (a[i] and m)<>(b[i] and m) then break;
68 m:=m shr 1;
69 inc(result);
70 end;
71 end;
74 function tBucket.MatchPrefix(const tp:tFID):boolean;
75 begin
76 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
77 end;
79 function FindBucket(const prefix:tFID):tBucket_ptr;
80 var cur:^tBucket;
81 begin
82 cur:=Table;
83 result:=nil;
84 while (cur<>nil) and (result=nil) do begin
85 if cur^.MatchPrefix(prefix) {first matching is deepest}
86 then result:=cur;
87 cur:=cur^.next;
88 end;
89 end;
91 operator =(const a,b:tFID):boolean;
92 begin
93 result:=CompareWord(a,b,10)=0;
94 end;
96 procedure SplitBucket(ob:tBucket_ptr);
97 procedure Toggle(var prefix:tPID; bit:byte);
98 begin
99 prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
100 end;
101 var nb:tBucket_ptr;
102 var i:byte;
103 begin
104 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
105 {find pref to old bucket, in order to unlink}
106 if ob=Table then table:=table^.next else begin
107 nb:=Table;
108 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
109 assert(assigned(nb),'old bucket not in table');
110 {unlink}
111 nb^.next:=nb^.next^.next; nb:=nil;
112 end;
113 {increase depth of this bucket}
114 Inc(ob^.depth);
115 ob^.ModifyTime:=mNow;
116 {create new bucket with toggled bit}
117 New(nb);
118 nb^:=ob^;
119 Toggle(nb^.Prefix,nb^.depth-1);
120 nb^.next:=ob;
121 {clear nodes that do not belong in bucket}
122 for i:=1 to high(tBucket.peer) do begin
123 if ob^.peer[i].addr.isNil then continue;
124 if ob^.MatchPrefix(ob^.peer[i].id)
125 then nb^.peer[i].addr.clear
126 else ob^.peer[i].addr.clear;
127 end;
128 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
129 for i:=1 to high(tBucket.peer) do if not ob^.peer[i].addr.isnil
130 then writeln('-> -> ',string(ob^.peer[i].id));
131 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
132 for i:=1 to high(tBucket.peer) do if not nb^.peer[i].addr.isnil
133 then writeln('-> -> ',string(nb^.peer[i].id));
134 if table=nil then table:=nb else begin
135 ob:=Table;
136 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
137 ob^.next:=nb;
138 writeln('-> after /',ob^.depth);
139 end;
140 Shedule(2000,@nb^.Refresh);
141 end;
143 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
144 var bkt:^tBucket;
145 var i,fr:byte;
146 label again;
147 begin
148 if id=MyID then exit;
149 again:
150 bkt:=FindBucket(id);
151 if not assigned(bkt) then begin
152 New(Table); //todo
153 bkt:=Table;
154 bkt^.Prefix:=MyID;
155 bkt^.Depth:=0;
156 bkt^.ModifyTime:=mNow;
157 bkt^.next:=nil;
158 bkt^.desperate:=3;
159 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
160 Shedule(2000,@bkt^.Refresh);
161 end;
162 fr:=0;
163 for i:=1 to high(bkt^.peer)
164 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
165 //else if bkt^.peer[i].addr=addr then fr:=i
166 else if bkt^.peer[i].id=id then begin
167 if bkt^.peer[i].addr<>addr then continue;
168 {found node in the bucket}
169 //writeln('DHT: UpdateNode ',string(id));
170 // ?? bkt^.ModifyTime:=mNow;
171 bkt^.peer[i].LastMsgFrom:=mNow;
172 bkt^.peer[i].ReqDelta:=0;
173 exit end else if (fr=0) and (bkt^.peer[i].ReqDelta>=2)
174 then fr:=i {use non-responding as free};
175 if fr=0 then begin
176 if bkt^.MatchPrefix(MyID)
177 then begin
178 SplitBucket(bkt);
179 goto again;
180 end; {the bucket is full!}
181 {drop new node and hope nodes in the bucket are good}
182 end else begin
183 writeln('DHT: AddNode ',string(id),' to ',string(bkt^.prefix),'/',bkt^.depth,'#',fr);
184 bkt^.ModifyTime:=mNow;
185 bkt^.peer[fr].ID:=ID;
186 bkt^.peer[fr].Addr:=Addr;
187 bkt^.peer[fr].LastMsgFrom:=mNow;
188 bkt^.peer[fr].LastResFrom:=0;
189 bkt^.peer[fr].ReqDelta:=0;
190 end;
191 end;
193 procedure InsertNode(const peer:tPeerPub);
194 begin
195 UpdateNode(peer.id,peer.addr);
196 end;
198 procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID; maxrd:word);
199 var bkt:^tBucket;
200 begin
201 if not assigned(ibkt) then exit;
202 bkt:=ibkt;
203 repeat
204 inc(ix);
205 if ix>high(tBucket.peer) then begin
206 ix:=1;
207 bkt:=bkt^.next;
208 if not assigned(bkt) then break;
209 end;
210 until (not bkt^.peer[ix].Addr.isNil)and(bkt^.peer[ix].ReqDelta<maxrd);
211 ibkt:=bkt;
212 end;
214 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
215 begin
216 if ibkt=nil then ibkt:=Table;
217 GetNextNode(ibkt,ix,MyID,3);
218 if assigned(ibkt)
219 then peer:=tBucket(ibkt^).peer[ix]
220 else peer.addr.clear;
221 end;
223 procedure RecvRequest(msg:tSMsg);
224 var s:tMemoryStream absolute msg.stream;
225 var hID:^tPID;
226 var rID:^tPID;
227 var caps:byte;
228 var r:tMemoryStream;
229 var bkt:^tBucket;
230 var i:byte;
231 begin
232 s.skip(1);
233 hID:=s.ReadPtr(20);
234 rID:=s.ReadPtr(20);
235 caps:=s.ReadByte;
236 writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
237 UpdateNode(hID^,msg.source^);
238 {Select peers only from The bucket,
239 if it is broken, send none, but still Ack}
240 bkt:=FindBucket(rID^);
241 r.Init(128);
242 if assigned(bkt) then begin
243 r.WriteByte(opcode.dhtSelect);
244 r.WriteByte(caps);
245 r.Write(msg.Source^,sizeof(tNetAddr));
246 r.Write(rID^,20);
247 r.Write(hID^,20);
248 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
249 for i:=1 to high(tBucket.peer) do begin
250 if bkt^.peer[i].addr.isNil then continue;
251 if bkt^.peer[i].addr=msg.source^ then continue;
252 if bkt^.peer[i].ReqDelta>1 then continue;
253 writeln('-> Select to ',string(bkt^.peer[i].addr));
254 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
255 end;
256 r.Seek(0);
257 r.Trunc;
259 else writeln('-> empty bucket');
260 r.WriteByte(opcode.dhtReqAck);
261 r.Write(MyID,20);
262 writeln('-> ReqAck to ',string(msg.Source^));
263 SendMessage(r.base^,r.length,msg.source^);
264 FreeMem(r.base,r.size);
265 end;
267 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
268 var r:tMemoryStream;
269 begin
270 r.Init(42);
271 r.WriteByte(opcode.dhtRequest);
272 r.Write(MyID,sizeof(tFID));
273 r.Write(ForID,sizeof(tFID));
274 r.WriteByte(caps);
275 SendMessage(r.base^,r.length,contact);
276 FreeMem(r.base,r.size);
277 end;
279 procedure RecvReqAck(msg:tSMsg);
280 var s:tMemoryStream absolute msg.stream;
281 var hID:^tPID;
282 begin
283 s.skip(1);
284 hID:=s.ReadPtr(20);
285 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
286 UpdateNode(hID^,msg.source^);
287 end;
289 procedure RecvWazzup(msg:tSMsg);
290 var s:tMemoryStream absolute msg.stream;
291 var hID:^tPID;
292 begin
293 s.skip(1);
294 hID:=s.ReadPtr(20);
295 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
296 UpdateNode(hID^,msg.source^);
297 //UpdateSearch(hID^,msg.source^);
298 end;
300 procedure NodeBootstrap(const contact:tNetAddr);
301 begin
302 SendRequest(contact,MyID,0);
303 end;
305 procedure RecvSelect(msg:tSMsg);
306 var s:tMemoryStream absolute msg.stream;
307 var caps:byte;
308 var addr:^tNetAddr;
309 var rID:^tPID;
310 var r:tMemoryStream;
311 begin
312 s.skip(1);
313 caps:=s.ReadByte;
314 addr:=s.ReadPtr(sizeof(tNetAddr));
315 rID:=s.ReadPtr(20);
316 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
317 if rID^=MyID then begin
318 //writeln('-> self');
319 exit end;
320 r.Init(21);
321 r.WriteByte(opcode.dhtWazzup);
322 r.Write(MyID,20);
323 //writeln('-> Wazzup to ',string(addr^));
324 SendMessage(r.base^,r.length,addr^);
325 FreeMem(r.base,r.size);
326 end;
328 procedure tBucket.Refresh;
329 var my,rtr:boolean;
330 var i,ol,rv:byte;
331 var wait:LongWord;
332 var rvb:^tBucket;
333 procedure lSend(var peer:tPeer; const trg:tPID);
334 begin
335 SendRequest(peer.Addr,trg,0);
336 Inc(peer.ReqDelta);
337 end;
338 begin
339 my:=MatchPrefix(MyID);
340 ol:=0;
341 rtr:=false;
342 for i:=1 to high(tBucket.peer) do
343 if (not peer[i].Addr.isNil) and (peer[i].ReqDelta<4) then begin
344 if peer[i].ReqDelta>0 then begin
345 {peer is not responding, but try once more}
346 writeln('DHT: Refresh (R',peer[i].ReqDelta,') #',i,' ',string(peer[i].addr));
347 lSend(peer[i],prefix);
348 rtr:=true;
350 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
351 then ol:=i;
352 end;
353 {now nudge the most quiet peer}
354 if (ol>0) and (not rtr) then begin
355 if not rtr then writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
356 lSend(peer[ol],MyID);
357 end;
358 if (not rtr)and(ol=0) then begin
359 {no usable nodes in this bucket, try to recover from other buckets}
360 rv:=0; rvb:=@self;
361 GetNextNode(rvb,rv,prefix,desperate);
362 inc(desperate);
363 if not assigned(rvb) then begin
364 rv:=0; rvb:=Table; {in extreme cases, try the whole table}
365 GetNextNode(rvb,rv,prefix,desperate);
366 end;
367 if assigned(rvb) then begin
368 writeln('DHT: Refresh (RV) #',rv,' ',string(rvb^.peer[rv].addr));
369 lSend(rvb^.peer[rv],prefix);
370 end;
371 end else desperate:=3;
372 if my
373 then wait:=18000+(depth*600)
374 else wait:=30000;
375 if rtr then wait:=wait div 3;
376 Shedule(wait,@Refresh);
377 end;
380 {to bootstrap: ping address to get ID and insert to bucket/il
381 ping may get lost: separate bootstrap unit :)
382 now jut Ass-U-Me wont get lost}
384 procedure LoadIDFromArgs;
385 var oi:word;
386 const opt='-id';
387 begin
388 oi:=OptIndex(opt);
389 if oi>0 then begin
390 assert(OptParamCount(oi)=1,opt+'(pid:sha1)');
391 writeln('DHT: set ID to '+paramstr(oi+1));
392 MyID:=tPID(paramstr(oi+1));
393 end;
394 end;
396 BEGIN
397 SetMsgHandler(opcode.dhtRequest,@recvRequest);
398 SetMsgHandler(opcode.dhtSelect,@recvSelect);
399 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
400 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
401 LoadIdFromArgs;
402 END.