Enhaced VersionString. The build script is too complex.
[brdnet.git] / Mutable.pas
blobee1f788ba135b9e705206a59e7d8314894bc721d
1 UNIT Mutable;
5 INTERFACE
6 USES MemStream,NetAddr,Store2,dhtLookup,Fetch;
7 type tFID=Store2.tFID;
8 tProfileID=tFID;
9 type tMutableMeta=packed record
10 Fid:tFID;
11 Ver:Word4;
12 pad: packed array [1..8] of byte;
13 end;
14 {$I Mutable-file.pas}
16 function SetMutable( var so:tStoreObject; out id: tFID ): boolean;
17 function SetMutable( var so:tStoreObject; out meta:tMutableMeta ): boolean;
18 function GetMutable( id: tFID; out meta:tMutableMeta ): boolean;
19 function GetMutable( id: tFID; out fid: tFID ): boolean;
20 function GetMutable( id: tFID; out so: tStoreObject ): boolean; experimental;
22 type tMutatorRslt=record
23 Ver:LongWord;
24 Fid:tFID;
25 Src:tNetAddr;
26 next:^tMutatorRslt;
27 end;
28 type tMutEvt=(
29 meSearchEnd=1, meSearchFound, meSearchInvalid, meFetchStart,
30 meFetchLocal, meFetchSource, meFetchError, meCheckOK,
31 meCheckOld, meCheckBad, meNotify, meSendEnd
33 type tMutator=object
34 Target:tFID; {id of mutable}
35 OnEvent:procedure ( ev:tMutEvt; ver:longword; const fid:tFID; const Src:tNetAddr ) of object;
36 OnComplete:procedure of object;
37 FinalMeta:tMutableMeta;
38 Sending:boolean;
39 procedure Init(aTarget:tFID);
40 procedure Done; unimplemented;
41 private
42 Search:^dhtLookup.tSearch;
43 Fetchj:^tFetch;
44 Found:^tMutatorRslt;
45 Peers:array [0..5] of tSearchPeer;
46 CurrentFetch:tMutatorRslt;
47 procedure SRslt(const Source:tNetAddr; var extra:tMemoryStream);
48 procedure FetchStart;
49 procedure FetchEvent;
50 procedure DoSendLocals;
51 procedure DoSendLocals2;
52 function DoCheck( var so:tStoreObject ):boolean;
53 procedure Destroy; experimental;
54 end;
56 IMPLEMENTATION
57 USES opcode,ServerLoop,DHT,HKVS,sha512,ed25519;
58 var db:tHKVS;
60 function SetMutable( var so:tStoreObject; out id: tFID ): boolean;
61 var meta:tMutableMeta;
62 begin
63 result:=SetMutable(so,meta);
64 id:=meta.Fid;
65 end;
66 function GetMutable( id: tFID; out fid: tFID ): boolean;
67 var meta:tMutableMeta;
68 begin
69 result:=GetMutable(id,meta);
70 fid:=meta.fid;
71 end;
72 function GetMutable( id: tFID; out so: tStoreObject ): boolean;
73 var meta:tMutableMeta;
74 begin
75 result:=GetMutable(id,meta);
76 if result then so.Init(meta.fid);
77 end;
80 function SetMutable( var so:tStoreObject; out meta:tMutableMeta ): boolean;
81 var ph:tMutHdr;
82 var hash:tSha512Context;
83 var buf:packed array [0..1023] of byte;
84 var oldfid:tFID;
85 var oldis:boolean;
86 var mid:tFID;
87 var hbs:LongInt;
88 begin
89 result:=false;
90 so.Seek(0);
91 if so.left<Sizeof(ph) then exit;
92 {read the header}
93 so.Read(ph,sizeof(ph));
94 if CompareByte(ph.Magic,cMutHdrMagic,4)<>0 then exit;
95 {calculate id (loginpubhash)}
96 Sha512Init(hash);
97 Sha512Update(hash,ph.Pub,sizeof(ph.Pub));
98 Sha512Final(hash,mid,sizeof(mid));
99 {check if newer than db}
100 oldis:=db.GetVal(mid,meta);
101 if oldis then begin
102 oldfid:=meta.fid;
103 if DWord(meta.Ver)>=DWord(ph.Ver) then begin
104 meta.fid:=mid;
105 result:= true; exit end;
106 end;
107 {hash for signature check}
108 Sha512Init(hash);
109 Sha512Update(hash,ph,64);
110 while so.left>0 do begin
111 hbs:=so.left;
112 if hbs>sizeof(buf) then hbs:=sizeof(buf);
113 so.Read(buf,hbs);
114 Sha512Update(hash,buf,hbs);
115 end;
116 {load signature}
117 if not ed25519.Verify2(hash, ph.Sig, ph.Pub) then exit;
118 {update db if all checks passed}
119 meta.FID:=so.fid;
120 meta.Ver:=ph.Ver;
121 db.SetVal(mid,meta);
122 if oldis then Store2.Reference(oldfid,-1);
123 {reference the new object and dereference the old one}
124 so.Reference(+1);
125 meta.fid:=mid;
126 result:=true;
127 end;
129 function GetMutable( id: tFID; out meta:tMutableMeta ): boolean;
130 begin
131 result:=db.GetVal(id,meta);
132 end;
133 (****** Mutator ******)
134 procedure tMutator.Init(aTarget:tFID);
135 begin
136 New(search);
137 Sending:=false;
138 Target:=aTarget;
139 OnEvent:=nil;
140 OnComplete:=nil;
141 search^.Init(Target,capMutable,@SRslt);
142 Fetchj:=nil;
143 search^.Start;
144 Found:=nil;
145 end;
146 procedure tMutator.Done;
147 begin
148 {if called while searching: destroy}
149 {if called while fetching: destroy}
150 {if called while Sending: keep background}
151 {bevare when called from OnEvent}
152 if not Sending then Destroy
153 else begin
154 OnComplete:=nil;
155 OnEvent:=nil;
157 end;
158 end;
159 procedure tMutator.SRslt(const Source:tNetAddr; var extra:tMemoryStream);
160 var p,n:^tMutatorRslt;
161 var pp:^pointer;
162 var fid:^tfID;
163 var Ver:LongWord;
164 var a,b:shortint;
165 begin
166 if Source.isNil then begin
167 {copy closest peers}
168 b:=0; for a:=0 to high(Search^.Peers) do begin
169 if (b>high(Peers))and Search^.Peers[a].addr.isNil then break;
170 Peers[b]:=Search^.Peers[a]; inc(b);
171 end; if b<=high(Peers) then Peers[b].Addr.Clear;
172 if assigned(OnEvent) then OnEvent(meSearchEnd,0,tFID(nil^),tNetAddr(nil^));
173 search:=nil;
174 Shedule(10,@FetchStart);
175 end else if extra.left>=24 then begin
176 {real result}
177 fid:=extra.ReadPtr(20);
178 Ver:=extra.ReadWord(4);
179 {store results in linkedlist}
180 if assigned(OnEvent) then OnEvent(meSearchFound,Ver,Fid^,Source);
181 {highest version first}
182 pp:=@Found; p:=pp^;
183 while assigned(p) do begin
184 if p^.Src=Source then exit; {$hint not effective}
185 if p^.Ver<Ver then break;
186 {grop same FIDs together}
187 if (p^.Ver=Ver) and (p^.FID=FID^) then break;
188 end;
189 new(n); n^.next:=p; pp^:=n;
190 n^.Ver:=ver;
191 n^.Fid:=fid^;
192 n^.Src:=Source;
193 end else if assigned(OnEvent) then OnEvent(meSearchInvalid,0,tKey20(nil^),Source);
194 end;
195 procedure tMutator.FetchStart;
196 var p:^tMutatorRslt;
197 var so:tStoreObject;
198 var check:boolean;
199 begin
200 if assigned(Found) then begin {value found}
201 p:=Found;
202 CurrentFetch:=p^;
203 FetchJ:=nil;
204 {$hint, do not fetch older or same as DB, proceed to propagate; but it does not hurt}
205 try so.Init(CurrentFetch.fid);
206 except {not found: download}
207 on eObjectNF do FetchJ:=FetchObject(p^.fid, p^.Src, 48, @self.FetchEvent);
208 end;
209 if assigned(FetchJ) then begin
210 FetchJ^.SetMaxSize(4096);
211 if assigned(OnEvent) then OnEvent(meFetchStart,CurrentFetch.Ver,CurrentFetch.Fid,p^.src);
212 end else if assigned(OnEvent) then OnEvent(meFetchLocal,CurrentFetch.Ver,CurrentFetch.Fid,tNetAddr(nil^));
213 repeat
214 p:=p^.next;
215 Dispose(Found);
216 Found:=p;
217 if (p=nil) or (CurrentFetch.fid<>p^.fid) then break;
218 if assigned(FetchJ) then FetchJ^.AddSource(p^.Src);
219 if assigned(OnEvent) then OnEvent(meFetchSource,CurrentFetch.Ver,CurrentFetch.Fid,p^.Src);
220 until false;
221 {value was found, if job is nil, is already opened from store}
222 if FetchJ=nil then begin
223 check:=DoCheck(so);
224 so.Close;
225 if check then DoSendLocals;
226 end;
228 {no more Found results to try, at least send what we have}
229 else DoSendLocals;
230 end;
231 procedure tMutator.FetchEvent;
232 var so:tStoreObject;
233 var check:boolean;
234 begin
235 if FetchJ^.Done then begin
236 //if assigned(OnEvent) then OnEvent(meFetchDone,CurrentFetch.Ver,CurrentFetch.Fid,tNetAddr(nil^));
237 {download is OK, proceed to check mutable}
238 so.Init(CurrentFetch.FID);
239 FetchJ:=nil;
240 check:=DoCheck(so);
241 so.Reference(-1); {anyway unref the object here, SetMutable does Ref}
242 so.Close;
243 if check
244 {mutable is OK}
245 then DoSendLocals
246 {mutable is invalid, try next result}
247 else Shedule(300,@FetchStart)
249 end else begin
250 {download failed, try next result}
251 Shedule(300,@FetchStart);
252 FetchJ:=nil;
253 if assigned(OnEvent) then OnEvent(meFetchError,ORD(FetchJ^.Error),CurrentFetch.Fid,tNetAddr(nil^));
254 end;
255 end;
256 function tMutator.DoCheck( var so:tStoreObject ):boolean;
257 begin
258 if SetMutable( so, FinalMeta )and(FinalMeta.Fid=Target) then begin
259 {sender may lie about his version, check if >= than expected}
260 if (LongWord(FinalMeta.Ver)>=CurrentFetch.Ver) then begin
261 Result:=true;
262 if assigned(OnEvent) then OnEvent(meCheckOK,FinalMeta.Ver,CurrentFetch.Fid,tNetAddr(nil^));
264 else if assigned(OnEvent) then OnEvent(meCheckOld,FinalMeta.Ver,CurrentFetch.Fid,tNetAddr(nil^));
266 else if assigned(OnEvent) then OnEvent(meCheckBad,0,CurrentFetch.Fid,tNetAddr(nil^));
267 end;
268 procedure tMutator.DoSendLocals;
269 var dbMeta:tMutableMeta;
270 begin
271 Sending:=true;{$note dont Notify if no-result and not in db}
272 if GetMutable(Target,dbMeta) then begin
273 FinalMeta:=dbMeta;
274 {send update to Found and Peers}
275 Shedule(300,@DoSendLocals2);
276 OnComplete; {signal success, may destroy self, must be called last}
277 end else begin
278 OnComplete; {signal success, may destroy self, must be called last}
279 if assigned(OnEvent) then OnEvent(meSendEnd,0,tFID(nil^),tNetAddr(nil^));
280 Destroy;
281 end;
282 end;
283 procedure tMutator.DoSendLocals2;
284 var p:^tMutatorRslt;
285 var i:integer;
286 var ded:boolean=true;
287 procedure SendTo(const trg:tNetAddr);
288 var pk:tMemoryStream;
289 begin
290 if assigned(OnEvent) then OnEvent(meNotify,FinalMeta.Ver,FinalMeta.Fid,Trg);
291 pk.Init(45);
292 pk.WriteByte(opcode.mutableUpdate);
293 pk.WriteWord(FinalMeta.Ver,4);
294 pk.Write(Target,20);
295 pk.Write(FinalMeta.FID,20);
296 ServerLoop.SendMessage(pk.base^,pk.Length, Trg );
297 pk.Free;
298 end;
299 begin
300 {send update to Found and Peers}
301 {first Founds}
302 if assigned(Found) then begin
303 p:=Found;
304 Found:=p^.Next;
305 SendTo(p^.Src);
306 Dispose(p);
307 ded:=false;
308 end else for i:=0 to high(Peers) do if not Peers[i].Addr.isNil then begin
309 SendTo(Peers[i].Addr);
310 Peers[i].Addr.Clear;
311 ded:=false;
312 break;
313 end;
314 Shedule(250,@DoSendLocals2);
315 if ded then begin
316 if assigned(OnEvent) then OnEvent(meSendEnd,FinalMeta.VER,FinalMeta.FID,tNetAddr(nil^));
317 Destroy;
318 end;
319 end;
320 procedure tMutator.Destroy;
321 var p,q:^tMutatorRslt;
322 begin
323 {kill subworkers}
324 if assigned(search) then search^.close;
325 if assigned(FetchJ) then FetchJ^.Abort(@FetchEvent);
326 {free Found list}
327 p:=Found;
328 while assigned(p) do begin
329 q:=p;
330 p:=p^.next;
331 Dispose(q);
332 end;
333 {remove timers}
334 UnShedule(@FetchStart);
335 UnShedule(@DoSendLocals2);
336 {ded}
337 FreeMem(@self,sizeof(self));
338 end;
340 function CapHMutable(const source:tNetAddr; caps:byte; const Target:tPID; var extra:tMemoryStream):boolean;
341 var r:tMemoryStream;
342 var des:tMutableMeta;
343 begin
344 write('Mutable.Cap: ',string(Target));
345 result:=db.GetVal(Target,des);
346 writeln(' ',result);
347 assert(caps=capMutable);
348 if result then begin
349 r.Init(200);
350 r.WriteByte(opcode.dhtCapable);
351 r.Write(dht.MyID,20);
352 r.Write(Target,20);
353 r.WriteByte(caps);
354 r.Write(des.FID,sizeof(des.FID));
355 r.Write(des.Ver,sizeof(des.Ver));
356 SendMessage(r.base^,r.length,source);
357 FreeMem(r.base,r.size);
358 end;
359 end;
361 (****** Upate on Notify ******)
362 var UpdatesInProgress:Word;
363 type tMutableUpdate=object
364 J:^tFetch;
365 FID,MID:tFID;
366 Src:tNetAddr;
367 procedure ev;
368 end;
370 procedure recvUpdate(msg:tSMsg);
371 var s:tMemoryStream absolute msg.stream;
372 var ver,mver:LongWord;
373 var has:boolean;
374 var fid,mid:^tFID;
375 var meta:tMutableMeta;
376 var o:^tMutableUpdate;
377 begin
378 s.skip(1);
379 ver:=s.readword(4);
380 mid:=s.readPtr(20);
381 fid:=s.readPtr(20);
382 {Consult DB}
383 has:=GetMutable(mid^,meta); mver:=meta.ver;
384 if (not has) or (mver<ver) then begin
385 if UpdatesInProgress>=16 then begin
386 writeln('Mutable.recvUpdate: too many updates');
387 exit end;
388 writeln('Mutable.recvUpdate: ',string(mid^),' v',ver,' ',string(fid^));
389 {Start Fetch from source}
390 new(O);
391 O^.MID:=mid^;
392 O^.FID:=fid^;
393 O^.Src:=msg.source^;
394 O^.J:=FetchObject(fid^, msg.source^, 9, @O^.ev);
395 if O^.J=nil then O^.EV; {todo...}
396 end else writeln('Mutable.recvUpdate: ',string(msg.source^),' v',ver,'<=',mver);
397 end;
398 procedure tMutableUpdate.ev;
399 var so:tStoreObject;
400 var meta:tMutableMeta;
401 var valid:boolean;
402 begin
403 if (J=nil) or (J^.Done) then begin
404 so.Init(FID);
405 valid:=SetMutable(so,meta);
406 if valid then begin
407 if meta.fid=mid
408 then writeln('Mutable: ',string(meta.fid),' updated to v',LongWord(meta.ver),' ',string(FID))
409 else begin
410 writeln('Mutable.Update.ev: ',string(Src),' MutID mismatch!');
411 {...delete?}
412 end;
414 else writeln('Mutable.Update.ev: ',string(Src),' invalid signature!');
415 so.Reference(-1);
416 end else begin
417 writeln('Mutable.Update.ev: ',string(Src),' Fetch failed ',J^.Error);
418 end;
419 FreeMem(@self,sizeof(self));
420 end;
422 BEGIN
423 db.Init('mutable.dat',sizeof(tMutableMeta), 128);
424 writeln('Mutable: Database initialized, valsz=',db.valsz,' bktsz=',db.bucksz);
425 dht.RegisterCapability(capMutable,@CapHMutable);
426 SetMsgHandler(opcode.mutableUpdate,@recvUpdate);
427 UpdatesInProgress:=0;
428 END.