Forgot to reset ReqDelta on update.
[brdnet.git] / dht.pas
blob23b680b707dc8d30ab2a8e5513f33b7c82f2d597
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
10 {used by: messages, fileshare}
12 INTERFACE
13 uses NetAddr,Store1;
14 type tPID=Store1.tFID;
15 var MyID:tPID;
16 procedure NodeBootstrap(const contact:tNetAddr);
18 IMPLEMENTATION
19 uses ServerLoop,MemStream,opcode;
21 type
22 tPeer=object
23 ID :tPID;
24 Addr :tNetAddr;
25 ReqDelta:word;
26 LastMsgFrom,
27 LastResFrom :tMTime;
28 end;
29 tBucket_ptr=^tBucket;
30 tBucket=object
31 Prefix: tPID;
32 Depth: byte;
33 peer: array [1..4] of tPeer;
34 ModifyTime: tMTime;
35 //ll: ^tll;
36 next: tBucket_ptr;
37 function MatchPrefix(const tp:tFID):boolean;
38 procedure Refresh;
39 end;
41 var Table:^tBucket;
42 {deepest first}
44 function PrefixLength(const a,b:tFID):byte;
45 var i:byte;
46 var m:byte;
47 begin
48 for result:=0 to 20 do
49 if a[result]<>b[result]
50 then break;
51 m:=$80;
52 for i:=7 downto 0 do
53 if (a[result] and m)<>(b[result] and m)
54 then break else m:=m shr 1;
55 result:=result*8+i;
56 end;
59 function tBucket.MatchPrefix(const tp:tFID):boolean;
60 begin
61 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
62 end;
64 function FindBucket(const prefix:tFID):tBucket_ptr;
65 var cur:^tBucket;
66 begin
67 cur:=Table;
68 result:=cur;
69 while (cur<>nil) and (result=nil) do begin
70 if cur^.MatchPrefix(prefix) {first matching is deepest}
71 then result:=cur;
72 cur:=cur^.next;
73 end;
74 end;
76 operator =(const a,b:tFID):boolean;
77 begin
78 result:=CompareWord(a,b,10)=0;
79 end;
81 procedure SplitBucket(ob:tBucket_ptr);
82 procedure Toggle(var prefix:tPID; bit:byte);
83 begin
84 prefix[bit div 8]:= prefix[bit div 8] xor (bit mod 8);
85 end;
86 var nb:tBucket_ptr;
87 var i:byte;
88 begin
89 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
90 {find pref to old bucket, in order to unlink}
91 if ob=Table then table:=table^.next else begin
92 nb:=Table;
93 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
94 assert(assigned(nb),'old bucket not in table');
95 {unlink}
96 nb^.next:=nb^.next^.next; nb:=nil;
97 end;
98 {increase depth of this bucket}
99 Inc(ob^.depth);
100 ob^.ModifyTime:=mNow;
101 {create new bucket with toggled bit}
102 New(nb);
103 nb^:=ob^;
104 Toggle(nb^.Prefix,nb^.depth);
105 nb^.next:=ob;
106 {clear nodes that do not belong in bucket}
107 for i:=1 to high(tBucket.peer) do begin
108 if ob^.peer[i].addr.isNil then continue;
109 if ob^.MatchPrefix(ob^.peer[i].id)
110 then nb^.peer[i].addr.clear
111 else ob^.peer[i].addr.clear;
112 end;
113 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
114 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
115 if table=nil then table:=nb else begin
116 ob:=Table;
117 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
118 ob^.next:=nb;
119 writeln('-> after /',ob^.depth);
120 end;
121 Shedule(2000,@nb^.Refresh);
122 end;
124 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
125 var bkt:^tBucket;
126 var i,fr:byte;
127 label again;
128 begin
129 again:
130 bkt:=FindBucket(id);
131 if not assigned(bkt) then begin
132 New(Table); //todo
133 bkt:=Table;
134 bkt^.Prefix:=MyID;
135 bkt^.Depth:=0;
136 bkt^.ModifyTime:=mNow;
137 bkt^.next:=nil;
138 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
139 Shedule(2000,@bkt^.Refresh);
140 end;
141 fr:=0;
142 for i:=1 to high(bkt^.peer)
143 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
144 //else if bkt^.peer[i].addr=addr then fr:=i
145 else if bkt^.peer[i].id=id then begin
146 if bkt^.peer[i].addr<>addr then continue;
147 {found node in the bucket}
148 writeln('DHT: UpdateNode ',string(id));
149 // ?? bkt^.ModifyTime:=mNow;
150 bkt^.peer[i].LastMsgFrom:=mNow;
151 bkt^.peer[i].ReqDelta:=0;
152 exit end else if (fr=0) and (bkt^.peer[i].ReqDelta>=2)
153 then fr:=i {use non-responding as free};
154 if fr=0 then begin
155 if bkt^.MatchPrefix(MyID)
156 then begin
157 SplitBucket(bkt);
158 goto again;
159 end; {the bucket is full!}
160 {drop new node and hope nodes in the bucket are good}
161 end else begin
162 writeln('DHT: AddNode ',string(id),' to /',bkt^.depth,'#',fr);
163 bkt^.ModifyTime:=mNow;
164 bkt^.peer[fr].ID:=ID;
165 bkt^.peer[fr].Addr:=Addr;
166 bkt^.peer[fr].LastMsgFrom:=mNow;
167 bkt^.peer[fr].LastResFrom:=0;
168 bkt^.peer[fr].ReqDelta:=0;
169 end;
170 end;
172 procedure RecvRequest(msg:tSMsg);
173 var s:tMemoryStream absolute msg.stream;
174 var hID:^tPID;
175 var rID:^tPID;
176 var caps:byte;
177 var r:tMemoryStream;
178 var bkt:^tBucket;
179 var i:byte;
180 begin
181 s.skip(1);
182 hID:=s.ReadPtr(20);
183 rID:=s.ReadPtr(20);
184 caps:=s.ReadByte;
185 writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
186 UpdateNode(hID^,msg.source^);
187 bkt:=FindBucket(rID^);
188 r.Init(128);
189 if assigned(bkt) then begin
190 r.WriteByte(opcode.dhtSelect);
191 r.WriteByte(caps);
192 r.Write(msg.Source^,sizeof(tNetAddr));
193 r.Write(rID^,20);
194 r.Write(hID^,20);
195 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
196 for i:=1 to high(tBucket.peer) do begin
197 if bkt^.peer[i].addr.isNil then continue;
198 if bkt^.peer[i].addr=msg.source^ then continue;
199 writeln('-> Select to ',string(bkt^.peer[i].addr));
200 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
201 end;
202 r.Seek(0);
203 r.Trunc;
205 else writeln('-> empty bucket');
206 r.WriteByte(opcode.dhtReqAck);
207 r.Write(MyID,20);
208 writeln('-> ReqAck to ',string(msg.Source^));
209 SendMessage(r.base^,r.length,msg.source^);
210 FreeMem(r.base,r.size);
211 end;
213 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
214 var r:tMemoryStream;
215 begin
216 r.Init(42);
217 r.WriteByte(opcode.dhtRequest);
218 r.Write(MyID,sizeof(tFID));
219 r.Write(ForID,sizeof(tFID));
220 r.WriteByte(caps);
221 writeln('DHT: Request to ',string(contact));
222 SendMessage(r.base^,r.length,contact);
223 FreeMem(r.base,r.size);
224 end;
226 procedure RecvReqAck(msg:tSMsg);
227 var s:tMemoryStream absolute msg.stream;
228 var hID:^tPID;
229 begin
230 s.skip(1);
231 hID:=s.ReadPtr(20);
232 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
233 UpdateNode(hID^,msg.source^);
234 end;
236 procedure RecvWazzup(msg:tSMsg);
237 var s:tMemoryStream absolute msg.stream;
238 var hID:^tPID;
239 begin
240 s.skip(1);
241 hID:=s.ReadPtr(20);
242 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
243 UpdateNode(hID^,msg.source^);
244 //UpdateSearch(hID^,msg.source^);
245 end;
247 procedure NodeBootstrap(const contact:tNetAddr);
248 begin
249 SendRequest(contact,MyID,0);
250 end;
252 procedure RecvSelect(msg:tSMsg);
253 var s:tMemoryStream absolute msg.stream;
254 var caps:byte;
255 var addr:^tNetAddr;
256 var rID:^tPID;
257 var r:tMemoryStream;
258 begin
259 s.skip(1);
260 caps:=s.ReadByte;
261 addr:=s.ReadPtr(sizeof(tNetAddr));
262 rID:=s.ReadPtr(20);
263 writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
264 if rID^=MyID then begin
265 writeln('-> self'); exit end;
266 r.Init(21);
267 r.WriteByte(opcode.dhtWazzup);
268 r.Write(MyID,20);
269 writeln('-> Wazzup to ',string(addr^));
270 SendMessage(r.base^,r.length,addr^);
271 FreeMem(r.base,r.size);
272 end;
274 procedure tBucket.Refresh;
275 var my,rtr:boolean;
276 var i,ol:byte;
277 begin
278 my:=MatchPrefix(MyID);
279 ol:=0;
280 rtr:=false;
281 for i:=1 to high(tBucket.peer) do
282 if (not peer[i].Addr.isNil) and (peer[i].ReqDelta<3) then begin
283 if peer[i].ReqDelta>1 then begin
284 {peer is not responding, but try once more}
285 if not rtr then write('DHT: **Refresh (',peer[i].ReqDelta,')** ');
286 SendRequest(peer[i].Addr,prefix,0);
287 inc(peer[i].ReqDelta);
288 rtr:=true;
290 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
291 then ol:=i;
292 end;
293 if (ol>0) and (not rtr) then begin
294 write('DHT: **Refresh(T)** ');
295 SendRequest(peer[ol].Addr,MyID,0);
296 inc(peer[ol].ReqDelta);
297 end;
298 if my
299 then Shedule(18000+(depth*600),@Refresh)
300 else Shedule(30000,@Refresh);
301 end;
304 {to bootstrap: ping address to get ID and insert to bucket/il
305 ping may get lost: separate bootstrap unit :)
306 now jut Ass-U-Me wont get lost}
308 procedure LoadIDFromArgs;
309 var oi:word;
310 const opt='-id';
311 begin
312 oi:=OptIndex(opt);
313 if oi>0 then begin
314 assert(OptParamCount(oi)=1,opt+'(pid:sha1)');
315 writeln('DHT: set ID to '+paramstr(oi+1));
316 MyID:=tPID(paramstr(oi+1));
317 end;
318 end;
320 BEGIN
321 SetMsgHandler(opcode.dhtRequest,@recvRequest);
322 SetMsgHandler(opcode.dhtSelect,@recvSelect);
323 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
324 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
325 LoadIdFromArgs;
326 END.