File Sharing download/upload units impl
[brdnet.git] / DownloadTC.pas
blob01ac4c731cf883eb6e1679ce6a68965359dca780
1 {Include file}
2 tAggr=object
3 Rate:Real;
4 ByteCnt:LongWord;
5 DgrCnt:LongWord;
6 CurMark,PrvMark:byte;
7 StartT:tMTime;
8 Jobs: array [0..15] of ^tJob;
9 refs,acnt:byte;
10 ChanOfs:byte;
11 DgrCntCheck:LongWord;
12 remote:tNetAddr;
13 next:tAggr_ptr;
14 procedure Init(const src:tNetAddr);
15 procedure MsgDATA(sz:Word;mark:byte);
16 procedure MsgIMME(sz:Word;mark:byte);
17 procedure Recv(msg:tSMsg);
18 procedure Periodic;
19 procedure Done;
20 procedure Start(ix:byte);
21 procedure Stop(ix:byte);
22 end;
23 var AggrChain:^tAggr;
25 function GetAggr(const remote:tNetAddr):tAggr_ptr;
26 var a:^tAggr;
27 var p:^pointer;
28 begin
29 p:=@AggrChain;
30 a:=AggrChain;
31 while assigned(a) do begin
32 if a^.remote=remote then begin
33 GetAggr:=a;
34 p^:=a^.next;
35 a^.next:=AggrChain;
36 AggrChain:=a^.next;
37 exit;
38 end;
39 end;
40 GetAggr:=nil;
41 end;
42 procedure tAggr.Init(const src:tNetAddr);
43 begin
44 acnt:=0;
45 Rate:=0;
46 ByteCnt:=0;
47 DgrCnt:=0;
48 CurMark:=0;PrvMark:=0;
49 StartT:=mNow;
50 refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end;
51 ChanOfs:=Random(255-high(Jobs));
52 DgrCntCheck:=0;
53 remote:=src;
54 SetMsgHandler(opcode.tcdata,remote,@Recv);
55 SetMsgHandler(opcode.tcdataimm,remote,@Recv);
56 end;
58 procedure tAggr.Recv(msg:tSMsg);
59 var op:byte;
60 var chan:byte;
61 var mark:byte;
62 var base:DWORD;
63 begin
64 op:=msg.stream.readbyte;
65 mark:=msg.stream.readbyte;
66 if op=opcode.tcdataimm then MsgIMME(msg.length,mark);
67 MsgDATA(msg.length,mark);
68 chan:=msg.stream.readbyte;
69 base:=msg.stream.ReadWord(4);
70 if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf);
71 end;
73 procedure tAggr.MsgIMME(sz:Word; mark:byte);
74 var r:tMemoryStream;
75 var buf:array [1..4] of byte;
76 begin
77 r.Init(@buf,0,sizeof(buf));
78 r.WriteByte(opcode.tceack);
79 r.WriteByte(mark);
80 r.WriteWord(sz,2);
81 SendMessage(r.base^,r.length,remote);
82 end;
84 procedure tAggr.MsgDATA(sz:Word; mark:byte);
85 var r:tMemoryStream;
86 var rateb: DWord; {BytesPerSecond shr 6 (=64)}
87 var buf:array [1..6] of byte;
88 var delta:tMTime;
89 begin
90 if mark<>PrvMark then begin
91 if mark<>CurMark then begin
92 PrvMark:=CurMark;
93 CurMark:=mark;
94 StartT:=mNow;
95 ByteCnt:=1;
96 DgrCnt:=1;
97 end else begin Inc(ByteCnt,sz); Inc(DgrCnt); end;
98 inc(DgrCntCheck);
99 end;
100 //writeln('Download: got ',DgrCnt,'dg,',ByteCnt,'B in ',delta,'ms');
101 if DgrCnt<8 then exit;
102 delta:=(mNow-StartT){*MSecsPerDay};
103 if delta<400 then exit;
104 rate:=(ByteCnt/delta)*1000;
105 //writeln('Download: rate ',(rate/1024):7:1, 'kB/s');
106 rateb:=round((rate)/64);
107 StartT:=mNow;
108 ByteCnt:=1;
109 DgrCnt:=0;
110 r.Init(@buf,0,sizeof(buf));
111 r.WriteByte(opcode.tccont);
112 r.WriteByte(mark);
113 r.WriteWord(rateb,4);
114 SendMessage(r.base^,r.length,remote);
115 end;
117 procedure tAggr.Periodic;
118 begin
119 if DgrCntCheck>1 then begin
120 DgrCntCheck:=0;
121 Shedule(5000,@Periodic);
122 exit end;
123 writeln('Download: Periodic check failed, unimplemented!');
124 //todo do
125 end;
127 procedure tAggr.Done;
128 var a:^tAggr;
129 var p:^pointer;
130 begin
131 p:=@AggrChain;
132 a:=AggrChain;
133 while assigned(a) do begin
134 if a=@self then begin
135 p^:=next;
136 break end;
137 end;
138 UnShedule(@Periodic);
139 SetMsgHandler(opcode.tcdata,remote,nil);
140 SetMsgHandler(opcode.tcdataimm,remote,nil);
141 FreeMem(@self,sizeof(self));
142 end;
144 procedure tAggr.Start(ix:byte);
145 begin
146 if Jobs[ix]^.active then exit;
147 if acnt=0 then Shedule(5000,@Periodic);
148 inc(acnt);
149 Jobs[ix]^.active:=true;
150 end;
152 procedure tAggr.Stop(ix:byte);
153 begin
154 if not Jobs[ix]^.active then exit;
155 dec(acnt);
156 Jobs[ix]^.active:=false;
157 if acnt=0 then UnShedule(@Periodic);
158 end;