some updates
[iv.d.git] / multidown.d
blob8c106c20f63c2c36c5c91a8016730f7a2bfa6641
1 /* Invisible Vector Library
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // multithreaded massive file downloading engine with fancy progress
18 module iv.multidown /*is aliced*/;
19 private:
21 pragma(lib, "curl");
22 import std.concurrency;
23 import std.net.curl;
25 import iv.alice;
26 import iv.rawtty;
27 import iv.strex;
28 import iv.timer;
31 // ////////////////////////////////////////////////////////////////////////// //
32 // pbar0: fast changing (current file download, for example)
33 // pbar1: slow changing (total number of files to download, for example)
34 struct PBar2 {
35 immutable usize ttyWdt;
37 string text;
38 usize[2] total;
39 usize[2] cur;
40 usize[2] len; // for tty
41 usize prc0;
42 bool dirty;
44 @disable this ();
45 this (string atext, usize cur1, usize tot1) {
46 import std.algorithm : min, max;
47 ttyWdt = max(6, min(ttyWidth, 512));
48 text = atext;
49 if (text.length > ttyWdt+5) text = text[0..ttyWdt-5];
50 total[0] = 0;
51 total[1] = tot1;
52 cur[0] = 0;
53 cur[1] = cur1+1;
54 len[] = 0;
55 prc0 = 0;
56 dirty = true;
57 this[1] = cur1;
60 void setTotal0 (usize tot0) {
61 if (total[0] != tot0) {
62 dirty = true;
63 total[0] = tot0;
64 immutable c0 = cur[0];
65 ++cur[0];
66 this[0] = c0;
70 void setTotal1 (usize tot1) {
71 if (total[1] != tot1) {
72 dirty = true;
73 total[1] = tot1;
74 immutable c0 = cur[1];
75 ++cur[1];
76 this[1] = c0;
80 void opIndexAssign (usize acur, usize idx) {
81 if (acur > total[idx]) acur = total[idx];
82 if (cur[idx] == acur) return; // nothing to do
83 cur[idx] = acur;
84 if (total[idx] == 0) return; // total is unknown
85 if (idx == 0) {
86 // percents for first counter
87 usize newprc = 100*cur[idx]/total[idx];
88 if (newprc != prc0) {
89 prc0 = newprc;
90 dirty = true;
93 // len
94 usize newlen = ttyWdt*cur[idx]/total[idx];
95 if (newlen != len[idx]) {
96 len[idx] = newlen;
97 dirty = true;
101 void draw () nothrow @nogc {
102 import std.algorithm : min;
103 if (!dirty) return;
104 dirty = false;
105 char[1024] buf = ' ';
106 buf[0..text.length] = text[];
107 usize bufpos = text.length;
108 // pad percents
109 usize prc = prc0;
110 foreach_reverse (immutable idx; 0..3) {
111 buf[bufpos+idx] = '0'+prc%10;
112 if ((prc /= 10) == 0) break;
114 buf[bufpos+3] = '%';
115 const wrt = buf[0..ttyWdt];
116 // first write [0] and [1] progress
117 usize cpos = min(len[0], len[1]);
118 // no cursor
119 ttyRawWrite("\x1b[?25l");
120 // green
121 ttyRawWrite("\r\x1b[0;1;42m");
122 ttyRawWrite(wrt[0..cpos]);
123 if (cpos < len[0]) {
124 // haz more [0]
125 // magenta
126 ttyRawWrite("\x1b[1;45m");
127 ttyRawWrite(wrt[cpos..len[0]]);
128 cpos = len[0];
129 } else if (cpos < len[1]) {
130 // haz more [1]
131 // brown
132 ttyRawWrite("\x1b[1;43m");
133 ttyRawWrite(wrt[cpos..len[1]]);
134 cpos = len[1];
136 // what is left is emptiness
137 ttyRawWrite("\x1b[0m");
138 ttyRawWrite(wrt[cpos..$]);
139 // and return cursor
140 //write("\x1b[K\r\x1b[", text.length+4, "C");
145 // ////////////////////////////////////////////////////////////////////////// //
146 // move cursor to tnum's thread info line
147 __gshared usize threadCount; // # of running threads
148 __gshared usize prevThreadCount; // # of running threads on previous call
149 __gshared usize curInfoLine = usize.max; // 0: bottom; 1: one before bottom; etc.
152 // WARNING! CALL MUST BE SYNCHRONIZED
153 void cursorToInfoLine (usize tnum) {
154 if (curInfoLine == usize.max) {
155 if (threadCount == 0) assert(0); // the thing that should not be
156 curInfoLine = 0;
158 // move cursor to bottom
159 if (curInfoLine) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine); ttyRawWrite("B"); }
160 // add status lines if necessary
161 while (prevThreadCount < threadCount) {
162 // mark as idle
163 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K\n");
164 ++prevThreadCount;
166 // move cursor to required line from bottom
167 if (tnum > 0) { ttyRawWrite("\x1b["); ttyRawWriteInt(tnum); ttyRawWrite("A"); }
168 curInfoLine = tnum;
172 void removeInfoLines () {
173 if (curInfoLine != usize.max) {
174 // move cursor to bottom
175 if (curInfoLine) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine); ttyRawWrite("B"); }
176 // erase info lines
177 while (threadCount-- > 1) ttyRawWrite("\r\x1b[0m\x1b[K\x1b[A");
178 ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h");
183 // ////////////////////////////////////////////////////////////////////////// //
184 import core.atomic;
186 public __gshared string mdUserAgent = "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)";
188 // fill this with urls to download
189 public __gshared string[] urlList; // the following is protected by `synchronized`
190 shared usize urlDone;
192 // this will be called to get path from url
193 public __gshared string delegate (string url) url2path;
195 // this will be called when download is complete
196 // call is synchronized
197 // can be `null`
198 public __gshared void delegate (string url) urldone;
201 struct UrlInfo { string url, diskpath; }
202 private __gshared UrlInfo[] urlinfo;
205 void downloadThread (usize tnum, Tid ownerTid) {
206 bool done = false;
207 while (!done) {
208 // update status
210 synchronized {
211 cursorToInfoLine(tnum);
212 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K");
215 UrlInfo uinfo;
216 string url;
217 usize utotal;
218 receive(
219 // usize: url index to download
220 (usize unum) {
221 synchronized {
222 if (unum >= urlinfo.length) {
223 // url index too big? done with it all
224 done = true;
225 cursorToInfoLine(tnum);
226 ttyRawWrite("\r\x1b[0;1;31mDONE\x1b[0m\x1b[K");
227 } else {
228 uinfo = urlinfo[unum];
229 url = uinfo.url;
230 utotal = urlinfo.length;
235 // download file
236 if (!done) {
237 import std.exception : collectException;
238 import std.file : mkdirRecurse;
239 import std.path : baseName, dirName;
240 string line, upath, ddir, dname;
241 //upath = url2path(url);
242 upath = uinfo.diskpath;
243 if (upath.length == 0) {
244 static if (is(typeof(() { import core.exception : ExitError; }()))) {
245 import core.exception : ExitError;
246 throw new ExitError();
247 } else {
248 assert(0);
251 ddir = upath.dirName;
252 dname = upath.baseName;
253 while (!done) {
254 try {
255 doItAgain:
257 import std.conv : to;
258 line ~= to!string(tnum)~": [";
259 auto cs = to!string(atomicLoad(urlDone));
260 auto ts = to!string(utotal);
261 foreach (immutable _; cs.length..ts.length) line ~= ' ';
262 line ~= cs~"/"~ts~"] "~upath~" ... ";
264 auto pbar = PBar2(line, atomicLoad(urlDone), utotal);
265 //pbar.draw();
266 //ttyRawWrite("\r", line, " 0%");
267 // down it
268 //string location = null;
269 //bool hdrChecked = false;
270 //bool hasLocation = false;
271 bool wasProgress = false;
272 bool showProgress = true;
273 int oldPrc = -1, oldPos = -1;
274 auto conn = HTTP();
275 version(none) {
276 import std.stdio;
277 auto fo = File("/tmp/zzz", "a");
278 fo.writeln("====================================================");
279 fo.writeln(url);
281 conn.maxRedirects = 64;
282 //conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
283 conn.setUserAgent(mdUserAgent);
284 conn.onReceiveHeader = (in char[] key, in char[] value) {
285 if (wasProgress) {
286 wasProgress = false;
287 showProgress = true;
289 version(none) {
290 import std.stdio;
291 auto fo = File("/tmp/zzz", "a");
292 fo.writeln(key, "\t", value);
294 if (strEquCI(key, "Location") && value.length) {
295 //hasLocation = true;
296 showProgress = false;
298 location = value.idup;
299 if (location.length > 0) {
300 //url = location;
301 throw new Exception("boo!");
306 conn.onProgress = (scope usize dlTotal, scope usize dlNow, scope usize ulTotal, scope usize ulNow) {
307 wasProgress = true;
308 if (showProgress) {
309 if (dlTotal > 0) {
310 pbar.setTotal0(dlTotal);
311 pbar[0] = dlNow;
313 synchronized {
314 pbar[1] = atomicLoad(urlDone);
315 cursorToInfoLine(tnum);
316 pbar.draw();
319 return 0;
321 collectException(mkdirRecurse(ddir));
322 string fname = ddir~"/"~dname;
323 int retries = 8;
324 for (;;) {
325 bool ok = false;
326 try {
327 download(url, fname, conn);
328 ok = true;
329 } catch (Exception e) {
330 ok = false;
333 if (location.length) {
334 url = location;
335 goto doItAgain;
338 if (ok) break;
339 if (--retries <= 0) {
340 ttyRawWrite("\n\n\n\n\x1b[0mFUCK!\n");
341 static if (is(typeof(() { import core.exception : ExitError; }()))) {
342 import core.exception : ExitError;
343 throw new ExitError();
344 } else {
345 assert(0);
349 if (urldone !is null) {
350 synchronized urldone(url);
352 done = true;
353 } catch (Exception e) {
356 done = false;
358 // signal parent that we are idle
359 ownerTid.send(tnum);
364 // ////////////////////////////////////////////////////////////////////////// //
365 struct ThreadInfo {
366 Tid tid;
367 bool idle;
368 usize uindex;
371 ThreadInfo[] threads;
374 void startThreads () {
375 foreach (immutable usize idx; 0..threads.length) {
376 synchronized ++threadCount;
377 threads[idx].idle = true;
378 threads[idx].tid = spawn(&downloadThread, idx, thisTid);
379 threads[idx].tid.setMaxMailboxSize(2, OnCrowding.block);
384 void stopThreads () {
385 usize idleCount = 0;
386 foreach (ref trd; threads) if (trd.idle) ++idleCount;
387 while (idleCount < threads.length) {
388 receive(
389 (uint tnum) {
390 if (!threads[tnum].idle) {
391 threads[tnum].idle = true;
392 ++idleCount;
397 // send 'stop' signal to all threads
398 foreach (ref trd; threads) {
399 trd.idle = false;
400 trd.tid.send(usize.max); // 'stop' signal
402 // wait for completion
403 idleCount = 0;
404 while (idleCount < threads.length) {
405 receive(
406 (uint tnum) {
407 if (!threads[tnum].idle) {
408 threads[tnum].idle = true;
409 ++idleCount;
414 threads = null;
418 // ////////////////////////////////////////////////////////////////////////// //
419 shared bool ctrlC = false;
421 extern(C) void sigtermh (int snum) nothrow @nogc {
422 atomicStore(ctrlC, true);
426 // ////////////////////////////////////////////////////////////////////////// //
427 // pass number of threads
428 // fill `urlList` first!
429 // WARNING! DON'T CALL THIS TWICE!
430 public string downloadAll (uint tcount=4) {
431 if (tcount < 1 || tcount > 64) assert(0);
432 if (urlList.length == 0) return "nothing to do";
434 delete urlinfo;
435 urlinfo = null;
436 foreach (string url; urlList) {
437 bool found = false;
438 foreach (const ref ui; urlinfo) if (url == ui.url) { found = true; break; }
439 if (!found) {
440 auto path = url2path(url);
441 if (path.length) urlinfo ~= UrlInfo(url, url2path(url));
445 { import core.memory : GC; GC.collect(); }
446 import core.stdc.signal;
447 auto oldh = signal(SIGINT, &sigtermh);
448 scope(exit) signal(SIGINT, oldh);
449 scope(exit) { ttyRawWrite("\x1b[?25h"); }
450 // do it!
451 //auto oldTTYMode = ttySetRaw();
452 //scope(exit) ttySetMode(oldTTYMode);
453 threads = new ThreadInfo[](tcount);
454 prevThreadCount = 1; // we already has one empty line
455 startThreads();
456 ulong toCollect = 0;
457 auto timer = Timer(Timer.State.Running);
458 atomicStore(urlDone, 0);
459 while (atomicLoad(urlDone) < urlinfo.length) {
460 // force periodical collect to keep CURL happy
461 if (toCollect-- == 0) {
462 import core.memory : GC;
463 GC.collect();
464 toCollect = 128;
466 if (atomicLoad(ctrlC)) break;
467 // find idle thread and send it url index
468 usize freeTNum;
469 for (freeTNum = 0; freeTNum < threads.length; ++freeTNum) if (threads[freeTNum].idle) break;
470 if (freeTNum == threads.length) {
471 // no idle thread found, wait for completion message
472 import core.time;
473 for (;;) {
474 bool got = receiveTimeout(50.msecs,
475 (uint tnum) {
476 threads[tnum].idle = true;
477 freeTNum = tnum;
480 if (got || atomicLoad(ctrlC)) break;
482 if (atomicLoad(ctrlC)) break;
484 usize uidx = atomicOp!"+="(urlDone, 1);
485 --uidx; // 'cause `atomicOp()` returns op result
486 with (threads[freeTNum]) {
487 idle = false;
488 uindex = uidx;
489 tid.send(uidx);
492 // all downloads sheduled; wait for completion
493 stopThreads();
494 timer.stop();
495 removeInfoLines();
496 { ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h"); }
498 import std.string : format;
499 return format("%s files downloaded; time: %s", atomicLoad(urlDone), timer.toString);