From cea7eda619d293516fc9be60a2efb066bec1db0b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Tue, 15 Dec 2015 16:12:33 +0100 Subject: [PATCH] DHT Persistance and static bootstrap. --- brodnetd.pas | 3 +- dht.pas | 33 ++++++++++++++-- dhtBootStatic.pas | 47 +++++++++++++++++++++++ dhtPersist.pas | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 dhtBootStatic.pas create mode 100644 dhtPersist.pas diff --git a/brodnetd.pas b/brodnetd.pas index b200393..2d7055c 100644 --- a/brodnetd.pas +++ b/brodnetd.pas @@ -9,7 +9,8 @@ uses cthreads,ServerLoop ,Download ,TestFS ,dht - ,TestDHT + ,dhtBootStatic + ,dhtPersist ; BEGIN diff --git a/dht.pas b/dht.pas index cb70a37..11dfbaa 100644 --- a/dht.pas +++ b/dht.pas @@ -5,6 +5,7 @@ unit DHT; node belongs to bucket, where at least 'depth' bits match 'prefix' old>new, new>dead + TODO: weight nodes by IP-Address common prefix length. } {used by: messages, fileshare} @@ -12,16 +13,22 @@ unit DHT; INTERFACE uses NetAddr,Store1; type tPID=Store1.tFID; +type tPeerPub=object + ID :tPID; + Addr :tNetAddr; + //function IsGood + //function GetAge + end; var MyID:tPID; procedure NodeBootstrap(const contact:tNetAddr); +procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub); +procedure InsertNode(const peer:tPeerPub); IMPLEMENTATION uses ServerLoop,MemStream,opcode; type - tPeer=object - ID :tPID; - Addr :tNetAddr; + tPeer=object(tPeerPub) ReqDelta:word; LastMsgFrom, LastResFrom :tMTime; @@ -137,6 +144,7 @@ procedure UpdateNode(const id:tFID; const addr:tNetAddr); var i,fr:byte; label again; begin + if id=MyID then exit; again: bkt:=FindBucket(id); if not assigned(bkt) then begin @@ -180,9 +188,15 @@ procedure UpdateNode(const id:tFID; const addr:tNetAddr); end; end; +procedure InsertNode(const peer:tPeerPub); + begin + UpdateNode(peer.id,peer.addr); +end; + procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID); var bkt:^tBucket; begin + if not assigned(ibkt) then exit; bkt:=ibkt; repeat inc(ix); @@ -195,6 +209,15 @@ procedure GetNextNode(var ibkt:tBucket_ptr; var ix:byte; const id:tPID); ibkt:=bkt; end; +procedure GetNextNode(var ibkt:pointer; var ix:byte; out peer:tPeerPub); + begin + if ibkt=nil then ibkt:=Table; + GetNextNode(ibkt,ix,MyID); + if assigned(ibkt) + then peer:=tBucket(ibkt^).peer[ix] + else peer.addr.clear; +end; + procedure RecvRequest(msg:tSMsg); var s:tMemoryStream absolute msg.stream; var hID:^tPID; @@ -335,6 +358,10 @@ procedure tBucket.Refresh; writeln('DHT: Refresh BROKEN BUCKET'); rv:=0; rvb:=@self; GetNextNode(rvb,rv,prefix); + if not assigned(rvb) then begin + rv:=0; rvb:=Table; {in extreme cases, try the whole table} + GetNextNode(rvb,rv,prefix); + end; if assigned(rvb) then begin writeln('DHT: Refresh (RV) #',rv,' ',string(rvb^.peer[rv].addr)); lSend(rvb^.peer[rv],prefix); diff --git a/dhtBootStatic.pas b/dhtBootStatic.pas new file mode 100644 index 0000000..19b482b --- /dev/null +++ b/dhtBootStatic.pas @@ -0,0 +1,47 @@ +unit dhtBootStatic; +{ + Bootstrap the dht from set of static nodes + from file. +} + +INTERFACE + +IMPLEMENTATION +uses NetAddr,ServerLoop,DHT,SysUtils; + +type t=object + procedure Boot; +end; + +procedure t.Boot; + var bs:TextFile; + const bsfn='bootstrap.txt'; + var line:string; + var addr:tNetAddr; + begin + assign(bs,bsfn); + try + reset(bs); + except + writeln('BootStatic: Error opening file '+bsfn); + exit; + end; + try + while not eof(bs) do begin + readln(bs,line); + try addr.FromString(line); + except on eConvertError do begin + writeln('BootStatic: ConvertError ',line,' to tNetAddr'); + continue; + end end; + DHT.NodeBootstrap(addr); + end; + finally + close(bs); + end; +end; + +var o:t; +BEGIN + Shedule(1000,@o.boot); +END. \ No newline at end of file diff --git a/dhtPersist.pas b/dhtPersist.pas new file mode 100644 index 0000000..066590d --- /dev/null +++ b/dhtPersist.pas @@ -0,0 +1,111 @@ +unit dhtPersist; +{ + Initialize DHT local node ID from file. + Restore DHT routing table from file. + Save DHT routing table on exit and periodically. +} + +INTERFACE + +IMPLEMENTATION +uses NetAddr,ServerLoop,DHT,SysUtils,Store1; + +const ndfn='nodes.dat'; +const idfn='idhash.txt'; + +procedure Save; + var nd: FILE of tPeerPub; + var node:tPeerPub; + var nnp:pointer=nil; + var nni:byte=0; + begin + assign(nd,'_'+ndfn); + ReWrite(nd); + try + repeat + dht.GetNextNode(nnp,nni,node); + if node.addr.isNil then break; + writeln('dhtPersist: save ',string(node.addr)); + write(nd,node); + until false; + finally + close(nd); + end; + rename(nd,ndfn); +end; + +procedure Load; + var nd: FILE of tPeerPub; + var node:tPeerPub; + var pos:Word; + begin + assign(nd,ndfn); + try + ReSet(nd); + except + writeln('dhtPersist: can not open state file ',ndfn); + exit end; + {need to read the file backwards} + pos:=FileSize(nd); + if pos=0 then exit; + for pos:=pos-1 downto 0 do begin + Seek(nd,pos); + Read(nd,node); + writeln('dhtPersist: load ',string(node.addr)); + dht.InsertNode(node); + end; + close(nd); +end; + +procedure LoadID; + var nd: TextFile; + var line:string; + begin + assign(nd,idfn); + try + ReSet(nd); + except + writeln('dhtPersist: can not open id file ',idfn); + exit end; + readln(nd,line); + writeln('dhtPersist: nodeID ',line); + dht.MyID:=line; + close(nd); +end; + +type t=object + pot:procedure; + procedure Init; + procedure doPeriodic; + procedure doSoon; +end; + +procedure doLast; + begin + Save; +end; + +procedure t.doSoon; + begin + Load; + LoadID; +end; + +procedure t.doPeriodic; + begin + Save; + Shedule(40000,@doPeriodic); +end; + +procedure t.Init; + begin + Shedule(500,@doSoon); + Shedule(40000,@doPeriodic); + pot:=ServerLoop.OnTerminate; + ServerLoop.OnTerminate:=@doLast; +end; + +var o:t; +BEGIN + o.init; +END. \ No newline at end of file -- 2.11.4.GIT