3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
10 {used by: messages, fileshare}
14 type tPID
=Store1
.tFID
;
16 procedure NodeBootstrap(const contact
:tNetAddr
);
19 uses ServerLoop
,MemStream
,opcode
;
34 peer
: array [1..4] of tPeer
;
38 function MatchPrefix(const tp
:tFID
):boolean;
45 function PrefixLength(const a
,b
:tFID
):byte;
51 i
:=0; while(i
<=19) do begin
52 if a
[i
]<>b
[i
] then break
;
59 if (a
[i
] and m
)<>(b
[i
] and m
) then break
;
66 function tBucket
.MatchPrefix(const tp
:tFID
):boolean;
68 result
:=(depth
=0)or(PrefixLength(prefix
,tp
)>=depth
);
71 function FindBucket(const prefix
:tFID
):tBucket_ptr
;
76 while (cur
<>nil) and (result
=nil) do begin
77 if cur
^.MatchPrefix(prefix
) {first matching is deepest}
83 operator
=(const a
,b
:tFID
):boolean;
85 result
:=CompareWord(a
,b
,10)=0;
88 procedure SplitBucket(ob
:tBucket_ptr
);
89 procedure Toggle(var prefix
:tPID
; bit
:byte);
91 prefix
[bit
div 8]:= prefix
[bit
div 8] xor ($80 shr (bit
mod 8));
96 writeln('DHT: SplitBucket ',string(ob
^.prefix
),'/',ob
^.depth
);
97 {find pref to old bucket, in order to unlink}
98 if ob
=Table
then table
:=table
^.next
else begin
100 while assigned(nb
) and (nb
^.next
<>ob
) do nb
:=nb
^.next
;
101 assert(assigned(nb
),'old bucket not in table');
103 nb
^.next
:=nb
^.next
^.next
; nb
:=nil;
105 {increase depth of this bucket}
107 ob
^.ModifyTime
:=mNow
;
108 {create new bucket with toggled bit}
111 Toggle(nb
^.Prefix
,nb
^.depth
-1);
113 {clear nodes that do not belong in bucket}
114 for i
:=1 to high(tBucket
.peer
) do begin
115 if ob
^.peer
[i
].addr
.isNil
then continue
;
116 if ob
^.MatchPrefix(ob
^.peer
[i
].id
)
117 then nb
^.peer
[i
].addr
.clear
118 else ob
^.peer
[i
].addr
.clear
;
120 writeln('-> ',string(ob
^.prefix
),'/',ob
^.depth
);
121 for i
:=1 to high(tBucket
.peer
) do if not ob
^.peer
[i
].addr
.isnil
122 then writeln('-> -> ',string(ob
^.peer
[i
].id
));
123 writeln('-> ',string(nb
^.prefix
),'/',nb
^.depth
);
124 for i
:=1 to high(tBucket
.peer
) do if not nb
^.peer
[i
].addr
.isnil
125 then writeln('-> -> ',string(nb
^.peer
[i
].id
));
126 if table
=nil then table
:=nb
else begin
128 while assigned(ob
^.next
)and (ob
^.next
^.depth
>nb
^.depth
) do ob
:=ob
^.next
;
130 writeln('-> after /',ob
^.depth
);
132 Shedule(2000,@nb
^.Refresh
);
135 procedure UpdateNode(const id
:tFID
; const addr
:tNetAddr
);
142 if not assigned(bkt
) then begin
147 bkt
^.ModifyTime
:=mNow
;
149 for i
:=1 to high(bkt
^.peer
) do bkt
^.peer
[i
].addr
.Clear
;
150 Shedule(2000,@bkt
^.Refresh
);
153 for i
:=1 to high(bkt
^.peer
)
154 do if (fr
=0)and bkt
^.peer
[i
].addr
.isNil
then fr
:=i
155 //else if bkt^.peer[i].addr=addr then fr:=i
156 else if bkt
^.peer
[i
].id
=id
then begin
157 if bkt
^.peer
[i
].addr
<>addr
then continue
;
158 {found node in the bucket}
159 //writeln('DHT: UpdateNode ',string(id));
160 // ?? bkt^.ModifyTime:=mNow;
161 bkt
^.peer
[i
].LastMsgFrom
:=mNow
;
162 bkt
^.peer
[i
].ReqDelta
:=0;
163 exit
end else if (fr
=0) and (bkt
^.peer
[i
].ReqDelta
>=2)
164 then fr
:=i
{use non-responding as free};
166 if bkt
^.MatchPrefix(MyID
)
170 end; {the bucket is full!}
171 {drop new node and hope nodes in the bucket are good}
173 writeln('DHT: AddNode ',string(id
),' to ',string(bkt
^.prefix
),'/',bkt
^.depth
,'#',fr
);
174 bkt
^.ModifyTime
:=mNow
;
175 bkt
^.peer
[fr
].ID
:=ID
;
176 bkt
^.peer
[fr
].Addr
:=Addr
;
177 bkt
^.peer
[fr
].LastMsgFrom
:=mNow
;
178 bkt
^.peer
[fr
].LastResFrom
:=0;
179 bkt
^.peer
[fr
].ReqDelta
:=0;
183 procedure GetNextNode(var ibkt
:tBucket_ptr
; var ix
:byte; const id
:tPID
);
189 if ix
>high(tBucket
.peer
) then begin
192 if not assigned(bkt
) then break
;
194 until (not bkt
^.peer
[ix
].Addr
.isNil
)and(bkt
^.peer
[ix
].ReqDelta
<3);
198 procedure RecvRequest(msg
:tSMsg
);
199 var s
:tMemoryStream
absolute msg
.stream
;
211 writeln('DHT: ',string(msg
.source
^),' Request for ',string(rID
^));
212 UpdateNode(hID
^,msg
.source
^);
213 {Select peers only from The bucket,
214 if it is broken, send none, but still Ack}
215 bkt
:=FindBucket(rID
^);
217 if assigned(bkt
) then begin
218 r
.WriteByte(opcode
.dhtSelect
);
220 r
.Write(msg
.Source
^,sizeof(tNetAddr
));
223 if (s
.RdBufLen
>0)and(s
.RdBufLen
<=8) then r
.Write(s
.RdBuf
^,s
.RdBufLen
);
224 for i
:=1 to high(tBucket
.peer
) do begin
225 if bkt
^.peer
[i
].addr
.isNil
then continue
;
226 if bkt
^.peer
[i
].addr
=msg
.source
^ then continue
;
227 if bkt
^.peer
[i
].ReqDelta
>1 then continue
;
228 writeln('-> Select to ',string(bkt
^.peer
[i
].addr
));
229 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[i
].addr
);
234 else writeln('-> empty bucket');
235 r
.WriteByte(opcode
.dhtReqAck
);
237 writeln('-> ReqAck to ',string(msg
.Source
^));
238 SendMessage(r
.base
^,r
.length
,msg
.source
^);
239 FreeMem(r
.base
,r
.size
);
242 procedure SendRequest(const contact
:tNetAddr
; const forid
: tPID
; caps
:byte);
246 r
.WriteByte(opcode
.dhtRequest
);
247 r
.Write(MyID
,sizeof(tFID
));
248 r
.Write(ForID
,sizeof(tFID
));
250 SendMessage(r
.base
^,r
.length
,contact
);
251 FreeMem(r
.base
,r
.size
);
254 procedure RecvReqAck(msg
:tSMsg
);
255 var s
:tMemoryStream
absolute msg
.stream
;
260 writeln('DHT: ',string(msg
.source
^),' is ',string(hID
^),' (ReqAck)');
261 UpdateNode(hID
^,msg
.source
^);
264 procedure RecvWazzup(msg
:tSMsg
);
265 var s
:tMemoryStream
absolute msg
.stream
;
270 writeln('DHT: ',string(msg
.source
^),' is ',string(hID
^),' (Wazzup)');
271 UpdateNode(hID
^,msg
.source
^);
272 //UpdateSearch(hID^,msg.source^);
275 procedure NodeBootstrap(const contact
:tNetAddr
);
277 SendRequest(contact
,MyID
,0);
280 procedure RecvSelect(msg
:tSMsg
);
281 var s
:tMemoryStream
absolute msg
.stream
;
289 addr
:=s
.ReadPtr(sizeof(tNetAddr
));
291 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
292 if rID
^=MyID
then begin
293 //writeln('-> self');
296 r
.WriteByte(opcode
.dhtWazzup
);
298 //writeln('-> Wazzup to ',string(addr^));
299 SendMessage(r
.base
^,r
.length
,addr
^);
300 FreeMem(r
.base
,r
.size
);
303 procedure tBucket
.Refresh
;
308 procedure lSend(var peer
:tPeer
; const trg
:tPID
);
310 SendRequest(peer
.Addr
,trg
,0);
314 my
:=MatchPrefix(MyID
);
317 for i
:=1 to high(tBucket
.peer
) do
318 if (not peer
[i
].Addr
.isNil
) and (peer
[i
].ReqDelta
<4) then begin
319 if peer
[i
].ReqDelta
>0 then begin
320 {peer is not responding, but try once more}
321 writeln('DHT: Refresh (R',peer
[i
].ReqDelta
,') #',i
,' ',string(peer
[i
].addr
));
322 lSend(peer
[i
],prefix
);
325 else if (ol
=0) or (peer
[i
].LastMsgFrom
<peer
[ol
].LastMsgFrom
)
328 {now nudge the most quiet peer}
329 if (ol
>0) and (not rtr
) then begin
330 if not rtr
then writeln('DHT: Refresh (T',mNow
-peer
[ol
].LastMsgFrom
,') #',ol
,' ',string(peer
[ol
].addr
));
331 lSend(peer
[ol
],MyID
);
333 if (not rtr
)and(ol
=0) then begin
334 {no usable nodes in this bucket, try to recover from other buckets}
335 writeln('DHT: Refresh BROKEN BUCKET');
337 GetNextNode(rvb
,rv
,prefix
);
338 if assigned(rvb
) then begin
339 writeln('DHT: Refresh (RV) #',rv
,' ',string(rvb
^.peer
[rv
].addr
));
340 lSend(rvb
^.peer
[rv
],prefix
);
344 then wait
:=18000+(depth
*600)
346 if rtr
then wait
:=wait
div 3;
347 Shedule(wait
,@Refresh
);
351 {to bootstrap: ping address to get ID and insert to bucket/il
352 ping may get lost: separate bootstrap unit :)
353 now jut Ass-U-Me wont get lost}
355 procedure LoadIDFromArgs
;
361 assert(OptParamCount(oi
)=1,opt
+'(pid:sha1)');
362 writeln('DHT: set ID to '+paramstr(oi
+1));
363 MyID
:=tPID(paramstr(oi
+1));
368 SetMsgHandler(opcode
.dhtRequest
,@recvRequest
);
369 SetMsgHandler(opcode
.dhtSelect
,@recvSelect
);
370 SetMsgHandler(opcode
.dhtReqAck
,@recvReqAck
);
371 SetMsgHandler(opcode
.dhtWazzup
,@recvWazzup
);