Upload REKTing.
[brdnet.git] / UploadThread.pas
blob0beaece2665e2af8bf0e38a2690ae6d18111ad15
1 unit UploadThread;
2 {coprocessor to Upload unit. Move I/O out of main thread}
3 INTERFACE
4 uses Store1,Sockets,NetAddr;
6 type tSegment=record
7 base,len:LongWord;
8 end;
9 type tChannel=record
10 s: array [1..24] of tSegment;
11 seg:byte;
12 weight:Word;
13 wcur:Word;
14 oi:tStoreObjectInfo;
15 end;
16 type tUploadThr=object
17 thrid:tThreadID;
18 crit:tRtlCriticalSection;
19 socket:tSocket;
20 remote:tSockAddrL;
21 size1,size2:Word;
22 mark:Byte;
23 rate:Single;
24 MarkTime:LongWord;{ms}
25 MarkData:LongWord;
26 chans:array [0..11] of ^tChannel;
27 curc:byte;
28 buffer:array [0..2047] of byte;
29 stop:boolean; {the therad is stopped or stopping}
30 wait:boolean; {the therad is waiting for data}
31 waitc:byte;
33 procedure Main;
34 procedure Init(source:tNetAddr);
35 procedure Start;
36 procedure Done;
37 end;
39 IMPLEMENTATION
40 uses MemStream,ServerLoop,SysUtils,opcode;
42 procedure tUploadThr.Init(source:tNetAddr);
43 var i:integer;
44 begin
45 InitCriticalSection(crit);
46 source.ToSocket(remote);
47 socket:=GetSocket(source);
48 MarkData:=0;
49 MarkTime:=0;
50 stop:=true;
51 wait:=false;
52 for i:=0 to high(chans) do chans[i]:=nil;
53 end;
55 procedure tUploadThr.Main;
56 var pch:byte;
57 var s:tMemoryStream;
58 var sz:Word;
59 var txwait,delta:single;//msec
60 var LastTime:tDateTime;//days
61 var chan:^tChannel;
62 var seg:^tSegment;
63 begin
64 txwait:=0;
65 delta:=0;
66 while not stop do begin
67 EnterCriticalSection(crit);
68 pch:=0;
69 {find usable channel}
70 while (chans[curc]=nil)or(chans[curc]^.wcur=0)or(chans[curc]^.seg=0) do begin
71 if assigned(chans[curc])and(chans[curc]^.WCur=0) then chans[curc]^.WCur:=chans[curc]^.weight;
72 inc(curc);
73 inc(pch);
74 if curc>high(chans) then curc:=0;
75 if pch>(high(chans)+1) then begin wait:=true; break; end;
76 end;
77 if wait then begin
78 LeaveCriticalSection(crit);
79 if waitc>10
80 then stop:=true
81 else inc(waitc);
82 sleep(200);
83 continue;
84 end;
85 waitc:=0;
86 LastTime:=SysUtils.Now;
87 chan:=chans[curc];
88 seg:=@chan^.s[chan^.seg];
89 s.Init(@buffer,0,high(buffer));
90 {prepare header}
91 if size2>s.size then size2:=0;
92 if size2=0 then begin
93 sz:=size1; if size1>s.size then sz:=s.size;
94 s.WriteByte(opcode.tcdata);
95 end else begin
96 sz:=size2; if sz>s.size then sz:=s.size;
97 s.WriteByte(opcode.tcdataimm);
98 size2:=0;
99 end;
100 Assert(seg^.len>0);
101 s.WriteByte(mark);
102 s.WriteByte(curc);
103 s.WriteWord(seg^.base,4);
104 dec(sz,s.length);
105 if sz>seg^.Len then sz:=seg^.Len;
106 assert(sz<=seg^.len);
107 chan^.oi.ReadSeg(s.WrBuf,seg^.base,sz);
108 Assert(chan^.oi.rc=0,'IO error reading segment');
109 s.WrEnd(sz);
110 assert((Seg^.Len-sz)>=0);
111 Dec(Seg^.Len,sz);
112 Dec(chan^.WCur);
113 if Seg^.Len=0 then Dec(chan^.seg)
114 else Inc(Seg^.Base,sz);
115 LeaveCriticalSection(crit);
116 fpSendTo(socket,s.base,s.length,0,@remote,sizeof(remote));
117 txwait:=((MarkData/Rate)*1000)-(MarkTime);
118 MarkData:=MarkData+s.length;
119 if txWait>1000 then begin writeln('!!! txwait=',round(txWait)); txWait:=1000;end;
120 if txWait>0 then Sleep(round(txWait));
121 Delta:=Delta+((SysUtils.Now-LastTime)*MSecsPerDay);
122 if Delta>5000 then Delta:=3000;
123 if Delta<0 then Delta:=0;
124 MarkTime:=MarkTime+trunc(Delta);
125 Delta:=frac(Delta);
126 end;
127 end;
129 function thrfunc(p:pointer):PtrInt;
130 begin
131 tUploadThr(p^).Main;
132 thrfunc:=9;
133 end;
134 procedure tUploadThr.Start;
135 begin
136 wait:=false;
137 if not stop then exit;
138 stop:=false;
139 MarkData:=0;
140 MarkTime:=0;
141 thrid:=BeginThread(@ThrFunc,@self);
142 end;
144 procedure tUploadThr.Done;
145 begin
146 if stop then exit;
147 EnterCriticalSection(crit);
148 stop:=true;
149 LeaveCriticalSection(crit);
150 WaitForThreadterminate(thrid,65535);
151 DoneCriticalSection(crit);
152 end;
153 END.