6 USES MemStream
,NetAddr
,Store2
,dhtLookup
,Fetch
;
9 type tMutableMeta
=packed record
12 pad
: packed array [1..8] of byte;
16 function SetMutable( var so
:tStoreObject
; out id
: tFID
): boolean;
17 function SetMutable( var so
:tStoreObject
; out meta
:tMutableMeta
): boolean;
18 function GetMutable( id
: tFID
; out meta
:tMutableMeta
): boolean;
19 function GetMutable( id
: tFID
; out fid
: tFID
): boolean;
20 function GetMutable( id
: tFID
; out so
: tStoreObject
): boolean; experimental
;
22 type tMutatorRslt
=record
29 meSearchEnd
=1, meSearchFound
, meSearchInvalid
, meFetchStart
,
30 meFetchLocal
, meFetchSource
, meFetchError
, meCheckOK
,
31 meCheckOld
, meCheckBad
, meNotify
, meSendEnd
34 Target
:tFID
; {id of mutable}
35 OnEvent
:procedure ( ev
:tMutEvt
; ver
:longword
; const fid
:tFID
; const Src
:tNetAddr
) of object;
36 OnComplete
:procedure of object;
37 FinalMeta
:tMutableMeta
;
39 procedure Init(aTarget
:tFID
);
40 procedure Done
; unimplemented
;
42 Search
:^dhtLookup
.tSearch
;
45 Peers
:array [0..5] of tSearchPeer
;
46 CurrentFetch
:tMutatorRslt
;
47 procedure SRslt(const Source
:tNetAddr
; var extra
:tMemoryStream
);
50 procedure DoSendLocals
;
51 procedure DoSendLocals2
;
52 function DoCheck( var so
:tStoreObject
):boolean;
53 procedure Destroy
; experimental
;
57 USES opcode
,ServerLoop
,DHT
,HKVS
,sha512
,ed25519
;
60 function SetMutable( var so
:tStoreObject
; out id
: tFID
): boolean;
61 var meta
:tMutableMeta
;
63 result
:=SetMutable(so
,meta
);
66 function GetMutable( id
: tFID
; out fid
: tFID
): boolean;
67 var meta
:tMutableMeta
;
69 result
:=GetMutable(id
,meta
);
72 function GetMutable( id
: tFID
; out so
: tStoreObject
): boolean;
73 var meta
:tMutableMeta
;
75 result
:=GetMutable(id
,meta
);
76 if result
then so
.Init(meta
.fid
);
80 function SetMutable( var so
:tStoreObject
; out meta
:tMutableMeta
): boolean;
82 var hash
:tSha512Context
;
83 var buf
:packed array [0..1023] of byte;
91 if so
.left
<Sizeof(ph
) then exit
;
93 so
.Read(ph
,sizeof(ph
));
94 if CompareByte(ph
.Magic
,cMutHdrMagic
,4)<>0 then exit
;
95 {calculate id (loginpubhash)}
97 Sha512Update(hash
,ph
.Pub
,sizeof(ph
.Pub
));
98 Sha512Final(hash
,mid
,sizeof(mid
));
99 {check if newer than db}
100 oldis
:=db
.GetVal(mid
,meta
);
103 if DWord(meta
.Ver
)>=DWord(ph
.Ver
) then begin
105 result
:= true; exit
end;
107 {hash for signature check}
109 Sha512Update(hash
,ph
,64);
110 while so
.left
>0 do begin
112 if hbs
>sizeof(buf
) then hbs
:=sizeof(buf
);
114 Sha512Update(hash
,buf
,hbs
);
117 if not ed25519
.Verify2(hash
, ph
.Sig
, ph
.Pub
) then exit
;
118 {update db if all checks passed}
122 if oldis
then Store2
.Reference(oldfid
,-1);
123 {reference the new object and dereference the old one}
129 function GetMutable( id
: tFID
; out meta
:tMutableMeta
): boolean;
131 result
:=db
.GetVal(id
,meta
);
133 (****** Mutator ******)
134 procedure tMutator
.Init(aTarget
:tFID
);
141 search
^.Init(Target
,capMutable
,@SRslt
);
146 procedure tMutator
.Done
;
148 {if called while searching: destroy}
149 {if called while fetching: destroy}
150 {if called while Sending: keep background}
151 {bevare when called from OnEvent}
152 if not Sending
then Destroy
159 procedure tMutator
.SRslt(const Source
:tNetAddr
; var extra
:tMemoryStream
);
160 var p
,n
:^tMutatorRslt
;
166 if Source
.isNil
then begin
168 b
:=0; for a
:=0 to high(Search
^.Peers
) do begin
169 if (b
>high(Peers
))and Search
^.Peers
[a
].addr
.isNil
then break
;
170 Peers
[b
]:=Search
^.Peers
[a
]; inc(b
);
171 end; if b
<=high(Peers
) then Peers
[b
].Addr
.Clear
;
172 if assigned(OnEvent
) then OnEvent(meSearchEnd
,0,tFID(nil^),tNetAddr(nil^));
174 Shedule(10,@FetchStart
);
175 end else if extra
.left
>=24 then begin
177 fid
:=extra
.ReadPtr(20);
178 Ver
:=extra
.ReadWord(4);
179 {store results in linkedlist}
180 if assigned(OnEvent
) then OnEvent(meSearchFound
,Ver
,Fid
^,Source
);
181 {highest version first}
183 while assigned(p
) do begin
184 if p
^.Src
=Source
then exit
; {$hint not effective}
185 if p
^.Ver
<Ver
then break
;
186 {grop same FIDs together}
187 if (p
^.Ver
=Ver
) and (p
^.FID
=FID
^) then break
;
189 new(n
); n
^.next
:=p
; pp
^:=n
;
193 end else if assigned(OnEvent
) then OnEvent(meSearchInvalid
,0,tKey20(nil^),Source
);
195 procedure tMutator
.FetchStart
;
200 if assigned(Found
) then begin {value found}
204 {$hint, do not fetch older or same as DB, proceed to propagate; but it does not hurt}
205 try so
.Init(CurrentFetch
.fid
);
206 except {not found: download}
207 on eObjectNF
do FetchJ
:=FetchObject(p
^.fid
, p
^.Src
, 48, @self
.FetchEvent
);
209 if assigned(FetchJ
) then begin
210 FetchJ
^.SetMaxSize(4096);
211 if assigned(OnEvent
) then OnEvent(meFetchStart
,CurrentFetch
.Ver
,CurrentFetch
.Fid
,p
^.src
);
212 end else if assigned(OnEvent
) then OnEvent(meFetchLocal
,CurrentFetch
.Ver
,CurrentFetch
.Fid
,tNetAddr(nil^));
217 if (p
=nil) or (CurrentFetch
.fid
<>p
^.fid
) then break
;
218 if assigned(FetchJ
) then FetchJ
^.AddSource(p
^.Src
);
219 if assigned(OnEvent
) then OnEvent(meFetchSource
,CurrentFetch
.Ver
,CurrentFetch
.Fid
,p
^.Src
);
221 {value was found, if job is nil, is already opened from store}
222 if FetchJ
=nil then begin
225 if check
then DoSendLocals
;
228 {no more Found results to try, at least send what we have}
231 procedure tMutator
.FetchEvent
;
235 if FetchJ
^.Done
then begin
236 //if assigned(OnEvent) then OnEvent(meFetchDone,CurrentFetch.Ver,CurrentFetch.Fid,tNetAddr(nil^));
237 {download is OK, proceed to check mutable}
238 so
.Init(CurrentFetch
.FID
);
241 so
.Reference(-1); {anyway unref the object here, SetMutable does Ref}
246 {mutable is invalid, try next result}
247 else Shedule(300,@FetchStart
)
250 {download failed, try next result}
251 Shedule(300,@FetchStart
);
253 if assigned(OnEvent
) then OnEvent(meFetchError
,ORD(FetchJ
^.Error
),CurrentFetch
.Fid
,tNetAddr(nil^));
256 function tMutator
.DoCheck( var so
:tStoreObject
):boolean;
258 if SetMutable( so
, FinalMeta
)and(FinalMeta
.Fid
=Target
) then begin
259 {sender may lie about his version, check if >= than expected}
260 if (LongWord(FinalMeta
.Ver
)>=CurrentFetch
.Ver
) then begin
262 if assigned(OnEvent
) then OnEvent(meCheckOK
,FinalMeta
.Ver
,CurrentFetch
.Fid
,tNetAddr(nil^));
264 else if assigned(OnEvent
) then OnEvent(meCheckOld
,FinalMeta
.Ver
,CurrentFetch
.Fid
,tNetAddr(nil^));
266 else if assigned(OnEvent
) then OnEvent(meCheckBad
,0,CurrentFetch
.Fid
,tNetAddr(nil^));
268 procedure tMutator
.DoSendLocals
;
269 var dbMeta
:tMutableMeta
;
271 Sending
:=true;{$note dont Notify if no-result and not in db}
272 if GetMutable(Target
,dbMeta
) then begin
274 {send update to Found and Peers}
275 Shedule(300,@DoSendLocals2
);
276 OnComplete
; {signal success, may destroy self, must be called last}
278 OnComplete
; {signal success, may destroy self, must be called last}
279 if assigned(OnEvent
) then OnEvent(meSendEnd
,0,tFID(nil^),tNetAddr(nil^));
283 procedure tMutator
.DoSendLocals2
;
286 var ded
:boolean=true;
287 procedure SendTo(const trg
:tNetAddr
);
288 var pk
:tMemoryStream
;
290 if assigned(OnEvent
) then OnEvent(meNotify
,FinalMeta
.Ver
,FinalMeta
.Fid
,Trg
);
292 pk
.WriteByte(opcode
.mutableUpdate
);
293 pk
.WriteWord(FinalMeta
.Ver
,4);
295 pk
.Write(FinalMeta
.FID
,20);
296 ServerLoop
.SendMessage(pk
.base
^,pk
.Length
, Trg
);
300 {send update to Found and Peers}
302 if assigned(Found
) then begin
308 end else for i
:=0 to high(Peers
) do if not Peers
[i
].Addr
.isNil
then begin
309 SendTo(Peers
[i
].Addr
);
314 Shedule(250,@DoSendLocals2
);
316 if assigned(OnEvent
) then OnEvent(meSendEnd
,FinalMeta
.VER
,FinalMeta
.FID
,tNetAddr(nil^));
320 procedure tMutator
.Destroy
;
321 var p
,q
:^tMutatorRslt
;
324 if assigned(search
) then search
^.close
;
325 if assigned(FetchJ
) then FetchJ
^.Abort(@FetchEvent
);
328 while assigned(p
) do begin
334 UnShedule(@FetchStart
);
335 UnShedule(@DoSendLocals2
);
337 FreeMem(@self
,sizeof(self
));
340 function CapHMutable(const source
:tNetAddr
; caps
:byte; const Target
:tPID
; var extra
:tMemoryStream
):boolean;
342 var des
:tMutableMeta
;
344 write('Mutable.Cap: ',string(Target
));
345 result
:=db
.GetVal(Target
,des
);
347 assert(caps
=capMutable
);
350 r
.WriteByte(opcode
.dhtCapable
);
351 r
.Write(dht
.MyID
,20);
354 r
.Write(des
.FID
,sizeof(des
.FID
));
355 r
.Write(des
.Ver
,sizeof(des
.Ver
));
356 SendMessage(r
.base
^,r
.length
,source
);
357 FreeMem(r
.base
,r
.size
);
361 (****** Upate on Notify ******)
362 var UpdatesInProgress
:Word;
363 type tMutableUpdate
=object
370 procedure recvUpdate(msg
:tSMsg
);
371 var s
:tMemoryStream
absolute msg
.stream
;
372 var ver
,mver
:LongWord
;
375 var meta
:tMutableMeta
;
376 var o
:^tMutableUpdate
;
383 has
:=GetMutable(mid
^,meta
); mver
:=meta
.ver
;
384 if (not has
) or (mver
<ver
) then begin
385 if UpdatesInProgress
>=16 then begin
386 writeln('Mutable.recvUpdate: too many updates');
388 writeln('Mutable.recvUpdate: ',string(mid
^),' v',ver
,' ',string(fid
^));
389 {Start Fetch from source}
394 O
^.J
:=FetchObject(fid
^, msg
.source
^, 9, @O
^.ev
);
395 if O
^.J
=nil then O
^.EV
; {todo...}
396 end else writeln('Mutable.recvUpdate: ',string(msg
.source
^),' v',ver
,'<=',mver
);
398 procedure tMutableUpdate
.ev
;
400 var meta
:tMutableMeta
;
403 if (J
=nil) or (J
^.Done
) then begin
405 valid
:=SetMutable(so
,meta
);
408 then writeln('Mutable: ',string(meta
.fid
),' updated to v',LongWord(meta
.ver
),' ',string(FID
))
410 writeln('Mutable.Update.ev: ',string(Src
),' MutID mismatch!');
414 else writeln('Mutable.Update.ev: ',string(Src
),' invalid signature!');
417 writeln('Mutable.Update.ev: ',string(Src
),' Fetch failed ',J
^.Error
);
419 FreeMem(@self
,sizeof(self
));
423 db
.Init('mutable.dat',sizeof(tMutableMeta
), 128);
424 writeln('Mutable: Database initialized, valsz=',db
.valsz
,' bktsz=',db
.bucksz
);
425 dht
.RegisterCapability(capMutable
,@CapHMutable
);
426 SetMsgHandler(opcode
.mutableUpdate
,@recvUpdate
);
427 UpdatesInProgress
:=0;