3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
15 type tPID
=Store1
.tFID
;
23 procedure NodeBootstrap(const contact
:tNetAddr
);
24 procedure GetNextNode(var ibkt
:pointer; var ix
:byte; out peer
:tPeerPub
);
25 procedure InsertNode(const peer
:tPeerPub
);
28 uses ServerLoop
,MemStream
,opcode
;
31 tPeer
=object(tPeerPub
)
41 peer
: array [1..4] of tPeer
;
46 function MatchPrefix(const tp
:tFID
):boolean;
53 function PrefixLength(const a
,b
:tFID
):byte;
59 i
:=0; while(i
<=19) do begin
60 if a
[i
]<>b
[i
] then break
;
67 if (a
[i
] and m
)<>(b
[i
] and m
) then break
;
74 function tBucket
.MatchPrefix(const tp
:tFID
):boolean;
76 result
:=(depth
=0)or(PrefixLength(prefix
,tp
)>=depth
);
79 function FindBucket(const prefix
:tFID
):tBucket_ptr
;
84 while (cur
<>nil) and (result
=nil) do begin
85 if cur
^.MatchPrefix(prefix
) {first matching is deepest}
91 operator
=(const a
,b
:tFID
):boolean;
93 result
:=CompareWord(a
,b
,10)=0;
96 procedure SplitBucket(ob
:tBucket_ptr
);
97 procedure Toggle(var prefix
:tPID
; bit
:byte);
99 prefix
[bit
div 8]:= prefix
[bit
div 8] xor ($80 shr (bit
mod 8));
104 writeln('DHT: SplitBucket ',string(ob
^.prefix
),'/',ob
^.depth
);
105 {find pref to old bucket, in order to unlink}
106 if ob
=Table
then table
:=table
^.next
else begin
108 while assigned(nb
) and (nb
^.next
<>ob
) do nb
:=nb
^.next
;
109 assert(assigned(nb
),'old bucket not in table');
111 nb
^.next
:=nb
^.next
^.next
; nb
:=nil;
113 {increase depth of this bucket}
115 ob
^.ModifyTime
:=mNow
;
116 {create new bucket with toggled bit}
119 Toggle(nb
^.Prefix
,nb
^.depth
-1);
121 {clear nodes that do not belong in bucket}
122 for i
:=1 to high(tBucket
.peer
) do begin
123 if ob
^.peer
[i
].addr
.isNil
then continue
;
124 if ob
^.MatchPrefix(ob
^.peer
[i
].id
)
125 then nb
^.peer
[i
].addr
.clear
126 else ob
^.peer
[i
].addr
.clear
;
128 writeln('-> ',string(ob
^.prefix
),'/',ob
^.depth
);
129 for i
:=1 to high(tBucket
.peer
) do if not ob
^.peer
[i
].addr
.isnil
130 then writeln('-> -> ',string(ob
^.peer
[i
].id
));
131 writeln('-> ',string(nb
^.prefix
),'/',nb
^.depth
);
132 for i
:=1 to high(tBucket
.peer
) do if not nb
^.peer
[i
].addr
.isnil
133 then writeln('-> -> ',string(nb
^.peer
[i
].id
));
134 if table
=nil then table
:=nb
else begin
136 while assigned(ob
^.next
)and (ob
^.next
^.depth
>nb
^.depth
) do ob
:=ob
^.next
;
138 writeln('-> after /',ob
^.depth
);
140 Shedule(2000,@nb
^.Refresh
);
143 procedure UpdateNode(const id
:tFID
; const addr
:tNetAddr
);
148 if id
=MyID
then exit
;
151 if not assigned(bkt
) then begin
156 bkt
^.ModifyTime
:=mNow
;
159 for i
:=1 to high(bkt
^.peer
) do bkt
^.peer
[i
].addr
.Clear
;
160 Shedule(2000,@bkt
^.Refresh
);
163 for i
:=1 to high(bkt
^.peer
)
164 do if (fr
=0)and bkt
^.peer
[i
].addr
.isNil
then fr
:=i
165 else if (bkt
^.peer
[i
].ReqDelta
<2) then begin
166 {found node in the bucket}
167 if (bkt
^.peer
[i
].id
=id
) then begin
168 bkt
^.peer
[i
].LastMsgFrom
:=mNow
;
169 bkt
^.peer
[i
].ReqDelta
:=0;
172 if bkt
^.peer
[i
].addr
=addr
then exit
;
174 else if (fr
=0)or (bkt
^.peer
[i
].id
=id
)
177 if bkt
^.MatchPrefix(MyID
)
181 end; {the bucket is full!}
182 {drop new node and hope nodes in the bucket are good}
184 writeln('DHT: AddNode ',string(id
),string(addr
),' to ',string(bkt
^.prefix
),'/',bkt
^.depth
,'#',fr
);
185 bkt
^.ModifyTime
:=mNow
;
186 bkt
^.peer
[fr
].ID
:=ID
;
187 bkt
^.peer
[fr
].Addr
:=Addr
;
188 bkt
^.peer
[fr
].LastMsgFrom
:=mNow
;
189 bkt
^.peer
[fr
].LastResFrom
:=0;
190 bkt
^.peer
[fr
].ReqDelta
:=0;
194 procedure InsertNode(const peer
:tPeerPub
);
196 UpdateNode(peer
.id
,peer
.addr
);
199 procedure GetNextNode(var ibkt
:tBucket_ptr
; var ix
:byte; const id
:tPID
; maxrd
:word);
202 if not assigned(ibkt
) then exit
;
206 if ix
>high(tBucket
.peer
) then begin
209 if not assigned(bkt
) then break
;
211 until (not bkt
^.peer
[ix
].Addr
.isNil
)and(bkt
^.peer
[ix
].ReqDelta
<maxrd
);
215 procedure GetNextNode(var ibkt
:pointer; var ix
:byte; out peer
:tPeerPub
);
217 if ibkt
=nil then ibkt
:=Table
;
218 GetNextNode(ibkt
,ix
,MyID
,3);
220 then peer
:=tBucket(ibkt
^).peer
[ix
]
221 else peer
.addr
.clear
;
225 a)Request: op, SendID, TargetID, caps, adt
226 b)Select : op, caps, addr, TargetID, OrigID, adt (66)
227 : op, caps, addr, TatgetID, SendID, adt (66)
229 d)Wazzup : op, SenderID
232 procedure RecvRequest(msg
:tSMsg
);
233 var s
:tMemoryStream
absolute msg
.stream
;
247 //writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
248 UpdateNode(sID
^,msg
.source
^);
249 {Select peers only from The bucket,
250 if it is broken, send none, but still Ack}
251 bkt
:=FindBucket(rID
^);
253 if assigned(bkt
) then begin
254 r
.WriteByte(opcode
.dhtSelect
);
256 r
.Write(msg
.Source
^,sizeof(tNetAddr
));
259 if (s
.RdBufLen
>0)and(s
.RdBufLen
<=8) then r
.Write(s
.RdBuf
^,s
.RdBufLen
);
260 for i
:=1 to high(tBucket
.peer
) do begin
261 if bkt
^.peer
[i
].addr
.isNil
then continue
;
262 if bkt
^.peer
[i
].addr
=msg
.source
^ then continue
;
263 if bkt
^.peer
[i
].ReqDelta
>1 then continue
;
264 //writeln('-> Select to ',string(bkt^.peer[i].addr));
265 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[i
].addr
);
269 while SendCnt
<4 do begin
270 GetNextNode(bkt
,li
,rID
^,3);
271 if not assigned(bkt
) then break
;
272 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[li
].addr
);
278 //else writeln('-> empty bucket')
280 r
.WriteByte(opcode
.dhtReqAck
);
282 //writeln('-> ReqAck to ',string(msg.Source^));
283 SendMessage(r
.base
^,r
.length
,msg
.source
^);
284 FreeMem(r
.base
,r
.size
);
287 procedure SendRequest(const contact
:tNetAddr
; const forid
: tPID
; caps
:byte);
291 r
.WriteByte(opcode
.dhtRequest
);
292 r
.Write(MyID
,sizeof(tFID
));
293 r
.Write(ForID
,sizeof(tFID
));
295 SendMessage(r
.base
^,r
.length
,contact
);
296 FreeMem(r
.base
,r
.size
);
299 procedure RecvReqAck(msg
:tSMsg
);
300 var s
:tMemoryStream
absolute msg
.stream
;
305 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
306 UpdateNode(hID
^,msg
.source
^);
309 procedure RecvWazzup(msg
:tSMsg
);
310 var s
:tMemoryStream
absolute msg
.stream
;
315 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
316 UpdateNode(hID
^,msg
.source
^);
317 //UpdateSearch(hID^,msg.source^);
320 procedure NodeBootstrap(const contact
:tNetAddr
);
322 SendRequest(contact
,MyID
,0);
325 procedure RecvSelect(msg
:tSMsg
);
326 var s
:tMemoryStream
absolute msg
.stream
;
334 addr
:=s
.ReadPtr(sizeof(tNetAddr
));
337 //UpdateNode(sID^,msg.source^);
338 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
339 if rID
^=MyID
then begin
340 //writeln('-> self');
343 r
.WriteByte(opcode
.dhtWazzup
);
345 //writeln('-> Wazzup to ',string(addr^));
346 SendMessage(r
.base
^,r
.length
,addr
^);
347 FreeMem(r
.base
,r
.size
);
351 procedure tBucket
.Refresh
;
352 var my
,rtr
,stich
:boolean;
356 procedure lSend(var peer
:tPeer
; const trg
:tPID
);
358 SendRequest(peer
.Addr
,trg
,0);
362 my
:=MatchPrefix(MyID
);
364 {1 of 10 times try to contact dead nodes in attempt to recover from network split}
365 stich
:=Random(cStichRar
)=0;
366 for i
:=1 to high(tBucket
.peer
)
367 do if (not peer
[i
].Addr
.isNil
) then begin
368 if peer
[i
].ReqDelta
>0 then begin
369 if (peer
[i
].ReqDelta
<=3)xor stich
then begin
370 {this will get rid of half-dead nodes}
371 writeln('DHT: Refresh (R',peer
[i
].ReqDelta
,') ',copy(string(peer
[i
].id
),1,6),string(peer
[i
].addr
));
372 lSend(peer
[i
],prefix
);
376 else if (ol
=0) or (peer
[i
].LastMsgFrom
<peer
[ol
].LastMsgFrom
)
379 {now nudge the most quiet peer, but not too often}
380 if (ol
>0) and ((mNow
-peer
[ol
].LastMsgFrom
)>10000) then begin
381 //writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
382 lSend(peer
[ol
],MyID
);
384 {try to recover bucket full of bad nodes}
385 if (ol
=0){and(not rtr)} then begin
387 GetNextNode(rvb
,rv
,prefix
,desperate
);
388 if not assigned(rvb
) then begin
389 rv
:=0; rvb
:=Table
; {in extreme cases, try the whole table}
390 GetNextNode(rvb
,rv
,prefix
,desperate
);
392 if assigned(rvb
) then begin
393 writeln('DHT: Recover ',string(prefix
),'/',depth
,' try ',copy(string(rvb
^.peer
[rv
].id
),1,6),string(rvb
^.peer
[rv
].addr
));
394 lSend(rvb
^.peer
[rv
],prefix
);
395 end else inc(desperate
);
396 end else desperate
:=3;
398 then wait
:=18000+(depth
*600)
400 if rtr
and(not stich
) then wait
:=wait
div 3;
401 Shedule(wait
,@Refresh
);
405 {to bootstrap: ping address to get ID and insert to bucket/il
406 ping may get lost: separate bootstrap unit :)
407 now jut Ass-U-Me wont get lost}
410 SetMsgHandler(opcode
.dhtRequest
,@recvRequest
);
411 SetMsgHandler(opcode
.dhtSelect
,@recvSelect
);
412 SetMsgHandler(opcode
.dhtReqAck
,@recvReqAck
);
413 SetMsgHandler(opcode
.dhtWazzup
,@recvWazzup
);