Stream Init helper to Chat.
[brdnet.git] / upmgr.pas
blobe524c6d6321df1690c03c238634022102dcb8f26
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 {mission:
5 - read files
6 - add demux info
7 - add sequence/offset
8 - handle retransmit requests
9 - handle all requests
10 - deprioritize/cancel uploads
11 keep one TC connection per peer (+expire-delete)
12 => need 'chat' protocol
15 INTERFACE
16 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
18 IMPLEMENTATION
20 type
21 tAggr_ptr=^tAggr;
22 tPrv=object
23 u:byte;
24 end;
25 tPrv_ptr=^tPrv;
26 tAggr=object
27 idle: boolean;
28 tcs: tTCS;
29 ch:^tChat;
30 prv: array [0..15] of ^tPrv;
31 cprv:byte;{current}
32 next,prev: tAggr_ptr;
33 procedure OnCont;
34 procedure OnMsg(msg: tSMsg; data: boolean);
35 procedure Init(var nchat:tChat; msg:tSMsg);
36 procedure ForceClose;
37 procedure Done;
38 procedure IdleTimeout; procedure TCTimeout; procedure CHTimeout(willwait:LongWOrd);
39 procedure SendTestReply;
40 procedure ExpandPrv(last:byte);
41 end;
43 var Peers:^tAggr;
45 {Requests
46 Close();
47 GET(channel:byte; filehash:20; baseHi:word2; base:word4; limit:word4);
48 SEG(channel:byte; baseHi:word2; base:word4; limit:word4);
49 FIN(channel:byte; avail:byte);
50 }{Responses
51 INFO(channel:byte; struct);
52 FAIL(channel:byte; code:byte);
53 DONE(channel:byte);
56 procedure tAggr.OnMsg(msg: tSMsg; data: boolean);
57 var op:byte;
58 begin
59 if data then begin
60 op:=msg.stream.readbyte;
61 case op of
62 opcode.upClose: begin
63 ch^.Ack;
64 Done;
65 FreeMem(@self,sizeof(self));
66 end;
67 opcode.upGET: ReqGET(msg);
68 99: SendTestReply;
69 end{case};
70 end{data};
71 end;
73 procedure tAggr.ReqGET(msg:tSMsg);
74 var chan:byte;
75 var filehash: array [1..20] of byte;
76 var basehi:word;
77 var base:LongWord;
78 var limit:LongWord;
79 begin
80 if msg.stream.RdBufLen<31 then begin
81 SendError(opcode.upErrMalformed); exit end;
82 chan:=msg.stream.ReadByte;
83 if chan>high(prv) then begin
84 SendError(opcode.upErrHiChan,chan); exit end;
85 if assigned(prv[chan]) then begin
86 SendError(opcode.upErrChanInUse,chan); exit end;
87 msg.stream.Read(FileHash,20);
88 basehi:=msg.stream.ReadWord(2);
89 base:=msg.stream.Read(4);
90 limit:=msg.stream.Read(4);
91 New(prv[chan]);
92 ch^.Ack;
93 with prv[chan]^ do begin
94 channel:=chan;
95 aggr:=@self;
96 Init(filehash,basehi,base,limit);
97 end;
98 end;
100 procedure tAggr.OnCont;
101 var pprv:byte;
102 begin
103 pprv:=cprv;
104 repeat
105 repeat
106 if cprv>=length(prv) then cprv:=0 else inc(cprv);
107 if cprv=pprv then begin
108 idle:=true;
109 Shedule(15000,@IdleTimeout);
110 exit;
111 end;
112 until prv[cprv].u>0;
114 until tcs.txLastSize>0;
115 end;
117 procedure tAggr.SendTestReply;
118 var s:tMemoryStream;
119 begin
120 writeln('upmgr: test');
121 s.Init(GetMem(56),0,56);
122 ch^.AddHeaders(s);
123 s.WriteByte(98);
124 s.WriteByte(42);
125 ch^.send(s);
126 end;
128 procedure tAggr.Init(var nchat:tChat; msg:tSMsg);
129 var i:byte;
130 begin
131 writeln('upmgr: init');
132 next:=Peers;
133 prev:=nil;
134 if assigned(Peers) then Peers^.prev:=@self;
135 Peers:=@self;
136 ch:=@nchat;
137 tcs.Init(msg.source^);
138 tcs.CanSend:=@OnCont;
139 tcs.maxTimeout:=8;
140 tcs.OnTimeout:=@TCTimeout;
141 for i:=0 to high(prv) do prv[i]:=nil;
142 cprv:=0;
143 ch^.Callback:=@OnMsg;
144 ch^.TMHook:=@CHTimeout;
145 writeln('upmgr: send ack to init');
146 ch^.Ack;
147 idle:=true;
148 Shedule(15000,@IdleTimeout);
149 end;
151 procedure tAggr.IdleTimeout;
152 begin if not idle then exit;
153 writeln('Idle Timeout');
154 ForceClose end;
155 procedure tAggr.TCTimeout;
156 begin
157 writeln('TCTimeout');
158 ForceClose end;
159 procedure tAggr.CHTimeout(willwait:LongWOrd);
160 begin if willwait<30000 then exit;
161 writeln('ChatTimeout');
162 ForceClose end;
164 procedure tAggr.ForceClose;
165 var s:tMemoryStream;
166 begin
167 writeln('upmgr: force close');
168 s.Init(GetMem(56),0,56);
169 ch^.AddHeaders(s);
170 s.WriteByte(opcode.upClose);
171 s.WriteByte(22);
173 ch^.send(s);
174 except end;
175 Done; {fixme sheduler}
176 FreeMem(@self,sizeof(self));
177 end;
179 procedure tAggr.Done;
180 begin
181 writeln('upmgr: close');
182 ch^.Close;
183 tcs.Done;
184 UnShedule(@IdleTimeout);
185 if assigned(prev) then prev^.next:=next else Peers:=next;
186 if assigned(next) then next^.prev:=prev;
187 end;
189 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
190 begin
191 result:=Peers;
192 while assigned(result) do begin
193 if result^.tcs.remote=addr then exit;
194 assert(result^.prev=result);
195 result:=result^.next;
196 end;
197 end;
199 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
200 var dup:^tAggr;
201 begin
202 {check dup}
203 dup:=FindAggr(msg.source^);
204 if assigned(dup) then begin
205 Dup^.ForceClose;
206 Dup^.Done;
207 end else begin
208 New(dup);
209 end;
210 Dup^.Init(nchat,msg);
211 end;
213 BEGIN
214 SetChatHandler(opcode.upFileServer,@ChatHandler);
215 END.