Enhaced VersionString. The build script is too complex.
[brdnet.git] / ObjTrans.pas
blob3398f715229db373b251bffa9868307aef32c6d9
1 UNIT ObjTrans;
3 Object Transfer Client
5 INTERFACE
6 uses NetAddr,Store2,ServerLoop,MemStream;
8 type
9 tAggr_ptr=^tAggr;
10 tJob=object
11 public {writable}
12 Weight:0..127;
13 Callback:procedure of object;
14 public {read-only}
15 Received:LongWord;
16 Total:LongWord;
17 error:word;{0progress,1done,2ioerror,3timeout,4full,OT-DLEs}
18 dataf:file of byte; {do not touch while active}
19 FID:tFID;
20 procedure Init(const srce:tNetAddr; const iFID:tFID);
21 procedure Start;
22 procedure Done;
23 procedure MakeRequest(diag:byte);
24 private
25 aggr:tAggr_ptr;
26 ch:byte;
27 FirstSeg:pointer;
28 MaxReqCnt:word;
29 RetryCounter:word;
30 DgrCntSinceReq:LongWord;
31 procedure OnData(msg:tMemoryStream);
32 procedure OnTimeout; {nothing is flowing thru aggr or afterrequest check}
33 end;
35 tAggr=object
36 Remote:tNetAddr;
37 private
38 channel: array [1..16] of ^tJob;
39 refc,ncpt,idletick: byte;
40 next:tAggr_ptr;
41 ByteCnt:LongWord;
42 DgrCnt:LongWord;
43 DgrCntCheck:LongWord;
44 StartT:tMTime;
45 procedure Init(const src:tNetAddr);
46 procedure Add(var job: tJob);
47 procedure Del(var job: tJob);
48 procedure OnData(msg:tSMsg);
49 procedure Periodic;
50 end;
52 IMPLEMENTATION
53 uses opcode;
55 var AggrChain:^tAggr;
57 type
58 tSeg=object
59 first,after:LongWord;
60 end;
61 tSegItem=object(tSeg)
62 next:^tSegItem;
63 end;
65 function GetAggr(const remote:tNetAddr):tAggr_ptr;
66 var a:^tAggr;
67 var p:^pointer;
68 begin
69 p:=@AggrChain;
70 a:=AggrChain;
71 while assigned(a) do begin
72 if a^.remote=remote then begin
73 GetAggr:=a;
74 p^:=a^.next;
75 a^.next:=AggrChain;
76 AggrChain:=a^.next;
77 exit;
78 end;
79 end;
80 GetAggr:=nil;
81 end;
83 procedure tJob.Init(const srce:tNetAddr; const iFID:tFID);
84 begin
85 Weight:=32;
86 Callback:=nil;
87 Received:=0;
88 Total:=$FFFFFFFF;
89 MaxReqCnt:=99;
90 RetryCounter:=0;
91 FID:=iFID;
92 AssignTempObject(dataf,fid,'prt');
93 {$I-}ReWrite(dataf,1);
94 Seek(dataf,cObjHeaderSize);{$I+}
95 if IOResult>0 then begin error:=2; exit end;
96 {$note Open SEG file and read it ToDo}
97 aggr:=GetAggr(srce);
98 if not assigned(aggr) then begin
99 new(aggr);
100 aggr^.next:=AggrChain;
101 AggrChain:=aggr;
102 aggr^.Init(srce);
103 end;
104 ch:=0;
105 aggr^.Add(self);
106 if ch=0 then begin
107 error:=4;
108 Done; exit;
109 end;
110 error:=0;
111 end;
113 procedure tJob.Start;
114 begin
115 Assert( (Weight+opcode.otReq)<=high(byte));
116 Assert(assigned(callback));
117 MakeRequest(1);
118 end;
120 procedure tJob.Done;
121 begin
122 if error<>1 then Close(dataf);
123 UnShedule(@OnTimeout);
124 aggr^.Del(self);
125 end;
127 {Strategy for fixing holes without waiting for DONE message:
128 * stuff all small segments to LSEG
129 * put large one at end
130 * when datagrams from the large arrive, Repeat
133 procedure tJob.MakeRequest(diag:byte);
134 var s:tMemoryStream;
135 var b,l,ReqLen:LongWord;
136 var ReqCnt:byte;
137 var seg:^tSegItem;
138 const ReqLenLim=20000000;
139 begin
140 write('ObjTrans.',string(aggr^.remote),'#',ch,'.MakeRequest',diag);
141 seg:=nil;
142 ReqLen:=0;
143 ReqCnt:=0;
144 s.Init(180);
145 s.WriteByte(opcode.otCtrl);
146 s.WriteByte(opcode.otReq+Weight);
147 s.WriteByte(ch);
148 s.Write(FID,20);
149 repeat
150 if seg=nil then begin
151 b:=0;
152 seg:=FirstSeg;
153 if assigned(seg) and (seg^.first=0) then begin
154 b:=seg^.after;
155 seg:=seg^.next;
156 end;
157 end else begin
158 b:=seg^.after;
159 seg:=seg^.next;
160 end;
161 if assigned(seg) then l:=seg^.first-b else l:=Total-b;
162 if l=0 then break;
163 if (ReqLen+l)>ReqLenLim then l:=ReqLenLim-ReqLen;
164 write(' ',b,'+',l);
165 s.WriteByte(0);
166 s.WriteWord(b,4);
167 s.WriteWord(l,4);
168 inc(ReqLen,l);
169 inc(ReqCnt);
170 until (s.WrBufLen<9)or(ReqLen>=ReqLenLim)or(ReqCnt>=MaxReqCnt);
171 if ReqLen=0 then begin
172 writeln(' done');
173 FreeMem(s.base,s.size);
174 error:=1; Done;
175 if assigned(callback) then Callback;
176 end else begin
177 writeln(' send ',s.Length);
178 SendMessage(s.Base^,s.Length,aggr^.Remote);
179 FreeMem(s.base,s.size);
180 UnShedule(@OnTimeout);
181 Shedule(750,@OnTimeout);
182 DgrCntSinceReq:=0;
183 //HighestRequestBase:=b;
184 end;
185 end;
187 procedure tJob.OnData(msg:tMemoryStream);
188 var hiOfs:byte;
189 var Offset:LongWord;
190 var dtlen:word;
191 procedure SetSegment(first,after:LongWord);
192 var p:^pointer;
193 var c,k:^tSegItem;
194 begin
195 p:=@FirstSeg; c:=p^; k:=nil;
196 while assigned(c) do begin
197 {merge cur to new}
198 if (c^.first<first)and(c^.after>=first) then first:=c^.first;
199 if (c^.after>after)and(c^.first<=after) then after:=c^.after;
200 {remove cur if fully contained in new}
201 if (first<=c^.first)and(after>=c^.after) then begin
202 p^:=c^.next;
203 if assigned(k) then dispose(c) else k:=c;
204 c:=p^; continue;
205 end;
206 p:=@c^.next; c:=p^;
207 if first>=after then exit;
208 end;
209 {merge completed, insert new segment}
210 if not assigned(k) then new(k);
211 k^.first:=first;
212 k^.after:=after;
213 p:=@FirstSeg; c:=p^; while assigned(c) and (c^.first<first)
214 do begin p:=@c^.next; c:=p^ end;
215 k^.next:=c;
216 p^:=k;
217 end;
218 begin
219 hiOfs:=msg.ReadByte;
220 if hiOfs<otInfo then begin
221 Offset:=msg.ReadWord(4);
222 dtlen:=msg.RdBufLen;
223 Seek(dataf,Offset+cObjHeaderSize);
224 BlockWrite(dataf,msg.RdBuf^,dtlen);{$note iocheck todo}
225 SetSegment(Offset,Offset+dtlen);
226 RetryCounter:=0;
227 Inc(DgrCntSinceReq);
228 Inc(Received,dtLen);
229 {otSINC handled in Aggr}
230 end else if hiOfs=otInfo then begin
231 hiOfs:=msg.ReadByte;
232 Offset:=msg.ReadWord(4);
233 MaxReqCnt:=msg.ReadByte;
234 Total:=Offset; {$hint dangerous}
235 end else if (hiOfs=otFail) or (hiOfs=otNotFound) then begin
236 error:=hiOfs;
237 Done;
238 if assigned(callback) then Callback;
240 else if hiOfs=otEoT then MakeRequest(3);
241 end;
243 procedure tJob.OnTimeout;
244 {called by aggr or resheduled in MakeRequest}
245 begin
246 if DgrCntSinceReq=0 then begin
247 if RetryCounter>=13 then begin
248 error:=3;
249 Done;
250 if assigned(callback) then Callback;
251 end else begin
252 MakeRequest(4);
253 Inc(RetryCounter);
254 end;
256 else DgrCntSinceReq:=0; {next time aggr calls its probably serious}
257 end;
259 procedure tAggr.OnData(msg:tSMsg);
260 var chn,oh:byte;
261 var sm:Word;
262 var slen:LongWord;
263 var rate:single;
264 var s:tMemoryStream absolute msg.stream;
265 var debugmsg:string[127];
266 begin
267 s.Skip(1);
268 chn:=s.ReadByte;
269 oh:=s.ReadByte;
270 assert(chn<=high(channel));
271 if DgrCnt=0 then StartT:=mNow;
272 Inc(ByteCnt,s.Length);
273 Inc(DgrCnt); Inc(DgrCntCheck);
274 if oh=otRateInfo then begin
275 SetLength(debugmsg,s.RdBufLen);
276 s.Read(debugmsg[1],s.RdBufLen);
277 writeln('ObjTrans.',string(remote),'#',chn,'.ServerDebug: '+debugmsg);
278 exit end;
279 if chn=0 then exit;
280 if not assigned(channel[chn]) then begin
281 s.Seek(0); s.Trunc;
282 s.WriteByte(otCtrl);
283 s.WriteByte(otFin);
284 s.WriteByte(chn);
285 SendMessage(s.base^,s.length,Remote);
286 exit end;
287 if oh=otSINC then s.Read(sm,2) else s.seek(s.position-1);
288 channel[chn]^.OnData(s);
289 if oh=otSINC then begin
290 slen:=s.Length;
291 s.Seek(0); s.Trunc;
292 s.WriteByte(otCtrl); s.WriteByte(otSIACK);
293 s.Write(sm,2);
294 s.WriteWord(slen,2);
295 SendMessage(s.base^,s.length,Remote);
296 end else if (DgrCnt>=8) and ((mNow-StartT)>=400) then begin
297 s.Seek(0); s.Trunc;
298 s.WriteByte(otCtrl); s.WriteByte(otSPEED);
299 rate:=(ByteCnt/(mNow-StartT))*16;
300 s.WriteWord(round(rate),4);
301 ByteCnt:=1;
302 DgrCnt:=0;
303 SendMessage(s.base^,s.length,Remote);
304 end;
305 end;
308 procedure tAggr.Init(const src:tNetAddr);
309 var i:integer;
310 begin
311 Remote:=src;
312 refc:=0; ncpt:=0;
313 ServerLoop.SetMsgHandler(opcode.otData,Remote,@OnData);
314 Shedule(915,@Periodic);
315 for i:=1 to high(channel) do channel[i]:=nil;
316 ByteCnt:=0;
317 DgrCnt:=0;
318 DgrCntCheck:=0;
319 StartT:=0;
320 end;
322 procedure tAggr.Add(var job: tJob);
323 var i:integer;
324 begin
325 Assert(job.ch=0);
326 i:=ncpt;
327 repeat
328 if i=high(channel) then i:=1 else i:=i+1;
329 if channel[i]=nil then begin
330 job.ch:=i; ncpt:=i;
331 channel[job.ch]:=@job;
332 inc(refc);
333 idletick:=0;
334 break;
335 end;
336 until i=ncpt;
337 end;
338 procedure tAggr.Del(var job: tJob);
339 begin
340 Assert(job.ch>0);
341 Assert(assigned(channel[job.ch]));
342 dec(refc);
343 channel[job.ch]:=nil;
344 job.ch:=0;
345 end;
347 procedure tAggr.Periodic;
348 var a:^tAggr;
349 var p:^pointer;
350 var i:integer (*absolute p*);
351 begin
352 if refc>0 then begin
353 {check DgrCntCheck and issue Timeout}
354 if DgrCntCheck=0 then begin
355 for i:=1 to high(channel) do if assigned(channel[i])
356 then channel[i]^.OnTimeout;
357 end else DgrCntCheck:=0;
358 end else begin
359 {check idle time and delete self}
360 if idletick>17 then begin
361 p:=@AggrChain;a:=p^; while assigned(a) do begin
362 if a=@self then begin p^:=next; break end;
363 p:=@a^.next;a:=p^;
364 end;
365 SetMsgHandler(otData,Remote,nil);
366 FreeMem(@self,sizeof(self)); EXIT;
367 end else inc(idletick);
368 end;
369 Shedule(700,@Periodic);
370 end;
372 END.