Change PoW timestamp format.
[brdnet.git] / dht.pas
bloba0ed23d7f73d7ba7f27f3aa19695063cc31640a4
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
13 INTERFACE
14 uses NetAddr,Store1;
15 type tPID=Store1.tFID; {reQ: ids can be shorter}
16 type tPeerPub=object
17 ID :tPID;
18 Addr :tNetAddr;
19 //function IsGood
20 //function GetAge
21 end;
22 var MyID:tPID;
23 procedure NodeBootstrap(const contact:tNetAddr);
24 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
25 procedure InsertNode(const peer:tPeerPub);
27 IMPLEMENTATION
28 uses ServerLoop,Chat,MemStream,opcode,sha1,ecc,CRAuth;
30 type
31 tPeer=object(tPeerPub)
32 ReqDelta:word;
33 LastMsgFrom,
34 LastResFrom :tMTime;
35 Ban:boolean;
36 Verify: ^CRAuth.tAuth; {nil when verified}
37 procedure VerifyCallback;
38 end;
39 tPeer_ptr=^tPeer;
40 tBucket_ptr=^tBucket;
41 tBucket=object
42 Prefix: tPID;
43 Depth: byte;
44 peer: array [1..4] of tPeer;
45 ModifyTime: tMTime;
46 //ll: ^tll;
47 desperate:word;
48 next: ^tBucket;
49 function MatchPrefix(const tp:tFID):boolean;
50 procedure Refresh;
51 end;
53 var Table:^tBucket;
54 {deepest first}
56 function PrefixLength(const a,b:tFID):byte;
57 var i:byte;
58 var by:byte;
59 var m:byte;
60 begin
61 by:=0;
62 i:=0; while(i<=19) do begin
63 if a[i]<>b[i] then break;
64 inc(i);
65 end;
66 result:=i*8;
67 if i=20 then exit;
68 m:=$80;
69 while(m>0) do begin
70 if (a[i] and m)<>(b[i] and m) then break;
71 m:=m shr 1;
72 inc(result);
73 end;
74 end;
77 function tBucket.MatchPrefix(const tp:tFID):boolean;
78 begin
79 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
80 end;
82 function FindBucket(const prefix:tFID):tBucket_ptr;
83 var cur:^tBucket;
84 begin
85 cur:=Table;
86 result:=nil;
87 while (cur<>nil) and (result=nil) do begin
88 if cur^.MatchPrefix(prefix) {first matching is deepest}
89 then result:=cur;
90 cur:=cur^.next;
91 end;
92 end;
94 operator =(const a,b:tFID):boolean;
95 begin
96 result:=CompareWord(a,b,10)=0;
97 end;
99 procedure SplitBucket(ob:tBucket_ptr);
100 procedure Toggle(var prefix:tPID; bit:byte);
101 begin
102 prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
103 end;
104 var nb:tBucket_ptr;
105 var i:byte;
106 begin
107 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
108 {find pref to old bucket, in order to unlink}
109 if ob=Table then table:=table^.next else begin
110 nb:=Table;
111 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
112 assert(assigned(nb),'old bucket not in table');
113 {unlink}
114 nb^.next:=nb^.next^.next; nb:=nil;
115 end;
116 {increase depth of this bucket}
117 Inc(ob^.depth);
118 ob^.ModifyTime:=mNow;
119 {create new bucket with toggled bit}
120 New(nb);
121 nb^:=ob^;
122 Toggle(nb^.Prefix,nb^.depth-1);
123 nb^.next:=ob;
124 {clear nodes that do not belong in bucket}
125 for i:=1 to high(tBucket.peer) do begin
126 if ob^.peer[i].addr.isNil then continue;
127 if ob^.MatchPrefix(ob^.peer[i].id)
128 then nb^.peer[i].addr.clear
129 else ob^.peer[i].addr.clear;
130 end;
131 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
132 for i:=1 to high(tBucket.peer) do if not ob^.peer[i].addr.isnil
133 then writeln('-> -> ',string(ob^.peer[i].id));
134 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
135 for i:=1 to high(tBucket.peer) do if not nb^.peer[i].addr.isnil
136 then writeln('-> -> ',string(nb^.peer[i].id));
137 if table=nil then table:=nb else begin
138 ob:=Table;
139 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
140 ob^.next:=nb;
141 writeln('-> after /',ob^.depth);
142 end;
143 Shedule(2000,@nb^.Refresh);
144 end;
146 procedure VerifyInit(b:tBucket_ptr; i:byte); forward;
148 function CheckNode(const id: tPID; const addr: tNetAddr): boolean;
149 {return false if node is banned}
150 {update or insert}
151 {initiate auth on insert and also on id conflict}
152 {replace only old, banned and free slots}
153 var b:^tBucket;
154 var i,fr:byte;
155 var dup:boolean;
156 label again;
157 begin
158 if id=MyID then exit;
159 CheckNode:=false;
160 again:
161 b:=FindBucket(id);
162 fr:=0; dup:=false;
163 if not assigned(b) then begin
164 New(Table); b:=Table;
165 b^.Prefix:=MyID;
166 b^.Depth:=0;
167 b^.ModifyTime:=mNow;
168 b^.next:=nil;
169 b^.desperate:=3;
170 for i:=1 to high(b^.peer) do b^.peer[i].addr.Clear;
171 for i:=1 to high(b^.peer) do b^.peer[i].ban:=false;
172 Shedule(2000,@b^.Refresh);
173 end;
174 for i:=1 to high(b^.peer) do begin {check for ban and dup}
175 if (b^.peer[i].Ban) and (b^.peer[i].Addr=addr) then exit;
176 if (fr=0)and(b^.peer[i].Addr.isNil) then fr:=i;
177 if (b^.peer[i].ID=id)or(b^.peer[i].Addr=Addr) then begin
178 fr:=i;dup:=(b^.peer[i].ReqDelta<2);break
179 end;
180 end;
181 if fr=0 then for i:=1 to high(b^.peer) do begin {check for old/banned}
182 if (b^.peer[i].ReqDelta>=2) then fr:=i;
183 if (fr=0) and (b^.peer[i].Ban) then fr:=i;
184 end;
185 if fr=0 then begin
186 if b^.MatchPrefix(MyID) then begin
187 SplitBucket(b);
188 goto again;
189 end (*else bucket is full and not splittable*)
190 end else begin
191 if dup then begin
192 if (b^.peer[i].addr=addr) then begin
193 b^.peer[i].LastMsgFrom:=mNow;
194 b^.peer[i].ReqDelta:=0;
195 CheckNode:=true;
196 end else begin
197 {todo conflict}
198 VerifyInit(b,fr);
200 end else begin
201 {add node here}
202 if (not b^.peer[fr].Addr.isNil)and assigned(b^.peer[fr].Verify)
203 then b^.peer[fr].Verify^.Cancel;
204 writeln('DHT: AddNode ',string(id),string(addr),' to ',string(b^.prefix),'/',b^.depth,'#',fr);
205 b^.ModifyTime:=mNow;
206 b^.peer[fr].ID:=ID;
207 b^.peer[fr].Addr:=Addr;
208 b^.peer[fr].LastMsgFrom:=mNow;
209 b^.peer[fr].LastResFrom:=0;
210 b^.peer[fr].ReqDelta:=0;
211 b^.peer[fr].ban:=false;
212 b^.peer[fr].Verify:=nil;
213 VerifyInit(b,fr);
214 CheckNode:=true;
217 end;
219 procedure InsertNode(const peer:tPeerPub);
220 begin
221 CheckNode(peer.id,peer.addr);
222 end;
224 procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID; maxrd:word; bans:boolean);
225 var bkt:^tBucket;
226 begin
227 if not assigned(ibkt) then exit;
228 bkt:=ibkt;
229 repeat
230 inc(ix);
231 if ix>high(tBucket.peer) then begin
232 ix:=1;
233 bkt:=bkt^.next;
234 if not assigned(bkt) then break;
235 end;
236 until (not bkt^.peer[ix].Addr.isNil)
237 and(bkt^.peer[ix].ReqDelta<maxrd)
238 and(bans or(bkt^.peer[ix].ban=false));
239 ibkt:=bkt;
240 end;
242 procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub);
243 begin
244 if ibkt=nil then ibkt:=Table;
245 GetNextNode(ibkt,ix,MyID,3,false);
246 if assigned(ibkt)
247 then peer:=tBucket(ibkt^).peer[ix]
248 else peer.addr.clear;
249 end;
251 {Messages:
252 a)Request: op, SendID, TargetID, caps, adt
253 b)Select : op, caps, addr, TargetID, OrigID, adt (66) [ ]
254 : op, caps, addr, TatgetID, SendID, adt (66) [*]
255 c)Ack : op, SenderID
256 d)Wazzup : op, SenderID
259 procedure RecvRequest(msg:tSMsg);
260 var s:tMemoryStream absolute msg.stream;
261 var sID:^tPID;
262 var rID:^tPID;
263 var caps:byte;
264 var r:tMemoryStream;
265 var bkt:^tBucket;
266 var i,li:byte;
267 var SendCnt:byte;
268 begin
269 s.skip(1);
270 sID:=s.ReadPtr(20);
271 rID:=s.ReadPtr(20);
272 caps:=s.ReadByte;
273 SendCnt:=0;
274 //writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
275 if not CheckNode(sID^,msg.source^) then exit;
276 {Select peers only from The bucket,
277 if it is broken, send none, but still Ack}
278 bkt:=FindBucket(rID^);
279 r.Init(128);
280 if assigned(bkt) then begin
281 r.WriteByte(opcode.dhtSelect);
282 r.WriteByte(caps);
283 r.Write(msg.Source^,sizeof(tNetAddr));
284 r.Write(rID^,20);
285 r.Write(MyID,20);
286 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
287 for i:=1 to high(tBucket.peer) do begin
288 if bkt^.peer[i].addr.isNil then continue;
289 if bkt^.peer[i].addr=msg.source^ then continue;
290 if bkt^.peer[i].ReqDelta>1 then continue;
291 //writeln('-> Select to ',string(bkt^.peer[i].addr));
292 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
293 li:=i;
294 Inc(SendCnt);
295 end;
296 while SendCnt<4 do begin
297 GetNextNode(bkt,li,rID^,3,false);
298 if not assigned(bkt) then break;
299 SendMessage(r.base^,r.length,bkt^.peer[li].addr);
300 Inc(SendCnt);
301 end;
302 r.Seek(0);
303 r.Trunc;
305 //else writeln('-> empty bucket')
307 r.WriteByte(opcode.dhtReqAck);
308 r.Write(MyID,20);
309 //writeln('-> ReqAck to ',string(msg.Source^));
310 SendMessage(r.base^,r.length,msg.source^);
311 FreeMem(r.base,r.size);
312 end;
314 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
315 var r:tMemoryStream;
316 begin
317 r.Init(42);
318 r.WriteByte(opcode.dhtRequest);
319 r.Write(MyID,sizeof(tFID));
320 r.Write(ForID,sizeof(tFID));
321 r.WriteByte(caps);
322 SendMessage(r.base^,r.length,contact);
323 FreeMem(r.base,r.size);
324 end;
326 procedure RecvReqAck(msg:tSMsg);
327 var s:tMemoryStream absolute msg.stream;
328 var hID:^tPID;
329 begin
330 s.skip(1);
331 hID:=s.ReadPtr(20);
332 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
333 CheckNode(hID^,msg.source^);
334 end;
336 procedure RecvWazzup(msg:tSMsg);
337 var s:tMemoryStream absolute msg.stream;
338 var hID:^tPID;
339 begin
340 s.skip(1);
341 hID:=s.ReadPtr(20);
342 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
343 if CheckNode(hID^,msg.source^) then
344 {UpdateSearch(hID^,msg.source^)};
345 end;
347 procedure NodeBootstrap(const contact:tNetAddr);
348 begin
349 SendRequest(contact,MyID,0);
350 end;
352 procedure RecvSelect(msg:tSMsg);
353 var s:tMemoryStream absolute msg.stream;
354 var caps:byte;
355 var addr:^tNetAddr;
356 var rID,sID:^tPID;
357 var r:tMemoryStream;
358 begin
359 s.skip(1);
360 caps:=s.ReadByte;
361 addr:=s.ReadPtr(sizeof(tNetAddr));
362 rID:=s.ReadPtr(20);
363 sID:=s.ReadPtr(20);
364 if CheckNode(sID^,msg.source^) then exit;
365 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
366 if rID^=MyID then begin
367 //writeln('-> self');
368 exit end;
369 r.Init(21);
370 r.WriteByte(opcode.dhtWazzup);
371 r.Write(MyID,20);
372 //writeln('-> Wazzup to ',string(addr^));
373 SendMessage(r.base^,r.length,addr^);
374 FreeMem(r.base,r.size);
375 end;
377 const cStichRar=10;
378 procedure tBucket.Refresh;
379 var my,rtr,stich:boolean;
380 var i,ol,rv:byte;
381 var wait:LongWord;
382 var rvb:^tBucket;
383 procedure lSend(var peer:tPeer; const trg:tPID);
384 begin
385 SendRequest(peer.Addr,trg,0);
386 Inc(peer.ReqDelta);
387 end;
388 begin
389 my:=MatchPrefix(MyID);
390 ol:=0; rtr:=false;
391 {1 of 10 times try to contact dead nodes in attempt to recover from network split}
392 stich:=Random(cStichRar)=0;
393 for i:=1 to high(tBucket.peer)
394 do if (not peer[i].Addr.isNil) and (not peer[i].Ban) then begin
395 if peer[i].ReqDelta>0 then begin
396 if (peer[i].ReqDelta<=3)xor stich then begin
397 {this will get rid of half-dead nodes}
398 writeln('DHT: Refresh (R',peer[i].ReqDelta,') ',copy(string(peer[i].id),1,6),string(peer[i].addr));
399 lSend(peer[i],prefix);
400 rtr:=true;
403 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
404 then ol:=i;
405 end;
406 {now nudge the most quiet peer, but not too often}
407 if (ol>0) and ((mNow-peer[ol].LastMsgFrom)>10000) then begin
408 //writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
409 lSend(peer[ol],MyID);
410 end;
411 {try to recover bucket full of bad nodes}
412 if (ol=0){and(not rtr)} then begin
413 rv:=0; rvb:=@self;
414 GetNextNode(rvb,rv,prefix,desperate,false);
415 if not assigned(rvb) then begin
416 rv:=0; rvb:=Table; {in extreme cases, try the whole table}
417 GetNextNode(rvb,rv,prefix,desperate,true);
418 end;
419 if assigned(rvb) then begin
420 writeln('DHT: Recover ',string(prefix),'/',depth,' try ',copy(string(rvb^.peer[rv].id),1,6),string(rvb^.peer[rv].addr));
421 lSend(rvb^.peer[rv],prefix);
422 end else inc(desperate);
423 end else desperate:=3;
424 if my
425 then wait:=18000+(depth*600)
426 else wait:=30000;
427 if rtr and(not stich) then wait:=wait div 3;
428 Shedule(wait,@Refresh);
429 end;
431 {to bootstrap: ping address to get ID and insert to bucket/il
432 ping may get lost: separate bootstrap unit :)
433 now jut Ass-U-Me wont get lost}
435 procedure VerifyInit(b:tBucket_ptr; i:byte);
436 begin
437 with b^.peer[i] do begin
438 if assigned(Verify) then exit;
439 new(Verify);
440 Verify^.Callback:=@VerifyCallback;
441 Verify^.Init(Addr);
442 //writeln('DHT: Starting Verificator for ',string(Addr));
444 end;
445 procedure tPeer.VerifyCallback;
446 begin
447 if Verify^.error>0 then begin
448 writeln('DHT: Verificator error ',string(Addr),Verify^.error);
449 ReqDelta:=3;
450 end else
451 if Verify^.Valid and Verify^.PowValid and (CompareWord(ID,Verify^.RemotePub,10)=0) then
452 Ban:=false
453 else begin
454 Ban:=true;
455 writeln('DHT: Verificator failed for ',string(Addr),Verify^.Valid,Verify^.PoWValid,Verify^.error);
456 end;
457 Verify:=nil; {it will free itelf}
458 end;
460 BEGIN
461 SetMsgHandler(opcode.dhtRequest,@recvRequest);
462 SetMsgHandler(opcode.dhtSelect,@recvSelect);
463 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
464 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
465 END.