iv.ssarray: it is now possible to "extract" data pages, and re-insert them
[iv.d.git] / multidown.d
blob1eb62995d9b962cfecdaacff8c646cf10bb5cdf8
1 /* Invisible Vector Library
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 // multithreaded massive file downloading engine with fancy progress
19 module iv.multidown /*is aliced*/;
20 private:
22 pragma(lib, "curl");
23 import std.concurrency;
24 import std.net.curl;
26 import iv.alice;
27 import iv.rawtty;
28 import iv.strex;
29 import iv.timer;
32 // ////////////////////////////////////////////////////////////////////////// //
33 // pbar0: fast changing (current file download, for example)
34 // pbar1: slow changing (total number of files to download, for example)
35 struct PBar2 {
36 immutable usize ttyWdt;
38 string text;
39 usize[2] total;
40 usize[2] cur;
41 usize[2] len; // for tty
42 usize prc0;
43 bool dirty;
45 @disable this ();
46 this (string atext, usize cur1, usize tot1) {
47 import std.algorithm : min, max;
48 ttyWdt = max(6, min(ttyWidth, 512));
49 text = atext;
50 if (text.length > ttyWdt+5) text = text[0..ttyWdt-5];
51 total[0] = 0;
52 total[1] = tot1;
53 cur[0] = 0;
54 cur[1] = cur1+1;
55 len[] = 0;
56 prc0 = 0;
57 dirty = true;
58 this[1] = cur1;
61 void setTotal0 (usize tot0) {
62 if (total[0] != tot0) {
63 dirty = true;
64 total[0] = tot0;
65 immutable c0 = cur[0];
66 ++cur[0];
67 this[0] = c0;
71 void setTotal1 (usize tot1) {
72 if (total[1] != tot1) {
73 dirty = true;
74 total[1] = tot1;
75 immutable c0 = cur[1];
76 ++cur[1];
77 this[1] = c0;
81 void opIndexAssign (usize acur, usize idx) {
82 if (acur > total[idx]) acur = total[idx];
83 if (cur[idx] == acur) return; // nothing to do
84 cur[idx] = acur;
85 if (total[idx] == 0) return; // total is unknown
86 if (idx == 0) {
87 // percents for first counter
88 usize newprc = 100*cur[idx]/total[idx];
89 if (newprc != prc0) {
90 prc0 = newprc;
91 dirty = true;
94 // len
95 usize newlen = ttyWdt*cur[idx]/total[idx];
96 if (newlen != len[idx]) {
97 len[idx] = newlen;
98 dirty = true;
102 void draw () nothrow @nogc {
103 import std.algorithm : min;
104 if (!dirty) return;
105 dirty = false;
106 char[1024] buf = ' ';
107 buf[0..text.length] = text[];
108 usize bufpos = text.length;
109 // pad percents
110 usize prc = prc0;
111 foreach_reverse (immutable idx; 0..3) {
112 buf[bufpos+idx] = '0'+prc%10;
113 if ((prc /= 10) == 0) break;
115 buf[bufpos+3] = '%';
116 const wrt = buf[0..ttyWdt];
117 // first write [0] and [1] progress
118 usize cpos = min(len[0], len[1]);
119 // no cursor
120 ttyRawWrite("\x1b[?25l");
121 // green
122 ttyRawWrite("\r\x1b[0;1;42m");
123 ttyRawWrite(wrt[0..cpos]);
124 if (cpos < len[0]) {
125 // haz more [0]
126 // magenta
127 ttyRawWrite("\x1b[1;45m");
128 ttyRawWrite(wrt[cpos..len[0]]);
129 cpos = len[0];
130 } else if (cpos < len[1]) {
131 // haz more [1]
132 // brown
133 ttyRawWrite("\x1b[1;43m");
134 ttyRawWrite(wrt[cpos..len[1]]);
135 cpos = len[1];
137 // what is left is emptiness
138 ttyRawWrite("\x1b[0m");
139 ttyRawWrite(wrt[cpos..$]);
140 // and return cursor
141 //write("\x1b[K\r\x1b[", text.length+4, "C");
146 // ////////////////////////////////////////////////////////////////////////// //
147 // move cursor to tnum's thread info line
148 __gshared usize threadCount; // # of running threads
149 __gshared usize prevThreadCount; // # of running threads on previous call
150 __gshared usize curInfoLine = usize.max; // 0: bottom; 1: one before bottom; etc.
153 // WARNING! CALL MUST BE SYNCHRONIZED
154 void cursorToInfoLine (usize tnum) {
155 if (curInfoLine == usize.max) {
156 if (threadCount == 0) assert(0); // the thing that should not be
157 curInfoLine = 0;
159 // move cursor to bottom
160 if (curInfoLine) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine); ttyRawWrite("B"); }
161 // add status lines if necessary
162 while (prevThreadCount < threadCount) {
163 // mark as idle
164 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K\n");
165 ++prevThreadCount;
167 // move cursor to required line from bottom
168 if (tnum > 0) { ttyRawWrite("\x1b["); ttyRawWriteInt(tnum); ttyRawWrite("A"); }
169 curInfoLine = tnum;
173 void removeInfoLines () {
174 if (curInfoLine != usize.max) {
175 // move cursor to bottom
176 if (curInfoLine) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine); ttyRawWrite("B"); }
177 // erase info lines
178 while (threadCount-- > 1) ttyRawWrite("\r\x1b[0m\x1b[K\x1b[A");
179 ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h");
184 // ////////////////////////////////////////////////////////////////////////// //
185 import core.atomic;
187 public __gshared string mdUserAgent = "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)";
189 // fill this with urls to download
190 public __gshared string[] urlList; // the following is protected by `synchronized`
191 shared usize urlDone;
193 // this will be called to get path from url
194 public __gshared string delegate (string url) url2path;
196 // this will be called when download is complete
197 // call is synchronized
198 // can be `null`
199 public __gshared void delegate (string url) urldone;
202 struct UrlInfo { string url, diskpath; }
203 private __gshared UrlInfo[] urlinfo;
206 void downloadThread (usize tnum, Tid ownerTid) {
207 bool done = false;
208 while (!done) {
209 // update status
211 synchronized {
212 cursorToInfoLine(tnum);
213 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K");
216 UrlInfo uinfo;
217 string url;
218 usize utotal;
219 receive(
220 // usize: url index to download
221 (usize unum) {
222 synchronized {
223 if (unum >= urlinfo.length) {
224 // url index too big? done with it all
225 done = true;
226 cursorToInfoLine(tnum);
227 ttyRawWrite("\r\x1b[0;1;31mDONE\x1b[0m\x1b[K");
228 } else {
229 uinfo = urlinfo[unum];
230 url = uinfo.url;
231 utotal = urlinfo.length;
236 // download file
237 if (!done) {
238 import std.exception : collectException;
239 import std.file : mkdirRecurse;
240 import std.path : baseName, dirName;
241 string line, upath, ddir, dname;
242 //upath = url2path(url);
243 upath = uinfo.diskpath;
244 if (upath.length == 0) {
245 static if (is(typeof(() { import core.exception : ExitError; }()))) {
246 import core.exception : ExitError;
247 throw new ExitError();
248 } else {
249 assert(0);
252 ddir = upath.dirName;
253 dname = upath.baseName;
254 while (!done) {
255 try {
256 doItAgain:
258 import std.conv : to;
259 line ~= to!string(tnum)~": [";
260 auto cs = to!string(atomicLoad(urlDone));
261 auto ts = to!string(utotal);
262 foreach (immutable _; cs.length..ts.length) line ~= ' ';
263 line ~= cs~"/"~ts~"] "~upath~" ... ";
265 auto pbar = PBar2(line, atomicLoad(urlDone), utotal);
266 //pbar.draw();
267 //ttyRawWrite("\r", line, " 0%");
268 // down it
269 //string location = null;
270 //bool hdrChecked = false;
271 //bool hasLocation = false;
272 bool wasProgress = false;
273 bool showProgress = true;
274 int oldPrc = -1, oldPos = -1;
275 auto conn = HTTP();
276 version(none) {
277 import std.stdio;
278 auto fo = File("/tmp/zzz", "a");
279 fo.writeln("====================================================");
280 fo.writeln(url);
282 conn.maxRedirects = 64;
283 //conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
284 conn.setUserAgent(mdUserAgent);
285 conn.onReceiveHeader = (in char[] key, in char[] value) {
286 if (wasProgress) {
287 wasProgress = false;
288 showProgress = true;
290 version(none) {
291 import std.stdio;
292 auto fo = File("/tmp/zzz", "a");
293 fo.writeln(key, "\t", value);
295 if (strEquCI(key, "Location") && value.length) {
296 //hasLocation = true;
297 showProgress = false;
299 location = value.idup;
300 if (location.length > 0) {
301 //url = location;
302 throw new Exception("boo!");
307 conn.onProgress = (scope usize dlTotal, scope usize dlNow, scope usize ulTotal, scope usize ulNow) {
308 wasProgress = true;
309 if (showProgress) {
310 if (dlTotal > 0) {
311 pbar.setTotal0(dlTotal);
312 pbar[0] = dlNow;
314 synchronized {
315 pbar[1] = atomicLoad(urlDone);
316 cursorToInfoLine(tnum);
317 pbar.draw();
320 return 0;
322 collectException(mkdirRecurse(ddir));
323 string fname = ddir~"/"~dname;
324 int retries = 8;
325 for (;;) {
326 bool ok = false;
327 try {
328 download(url, fname, conn);
329 ok = true;
330 } catch (Exception e) {
331 ok = false;
334 if (location.length) {
335 url = location;
336 goto doItAgain;
339 if (ok) break;
340 if (--retries <= 0) {
341 ttyRawWrite("\n\n\n\n\x1b[0mFUCK!\n");
342 static if (is(typeof(() { import core.exception : ExitError; }()))) {
343 import core.exception : ExitError;
344 throw new ExitError();
345 } else {
346 assert(0);
350 if (urldone !is null) {
351 synchronized urldone(url);
353 done = true;
354 } catch (Exception e) {
357 done = false;
359 // signal parent that we are idle
360 ownerTid.send(tnum);
365 // ////////////////////////////////////////////////////////////////////////// //
366 struct ThreadInfo {
367 Tid tid;
368 bool idle;
369 usize uindex;
372 ThreadInfo[] threads;
375 void startThreads () {
376 foreach (immutable usize idx; 0..threads.length) {
377 synchronized ++threadCount;
378 threads[idx].idle = true;
379 threads[idx].tid = spawn(&downloadThread, idx, thisTid);
380 threads[idx].tid.setMaxMailboxSize(2, OnCrowding.block);
385 void stopThreads () {
386 usize idleCount = 0;
387 foreach (ref trd; threads) if (trd.idle) ++idleCount;
388 while (idleCount < threads.length) {
389 receive(
390 (uint tnum) {
391 if (!threads[tnum].idle) {
392 threads[tnum].idle = true;
393 ++idleCount;
398 // send 'stop' signal to all threads
399 foreach (ref trd; threads) {
400 trd.idle = false;
401 trd.tid.send(usize.max); // 'stop' signal
403 // wait for completion
404 idleCount = 0;
405 while (idleCount < threads.length) {
406 receive(
407 (uint tnum) {
408 if (!threads[tnum].idle) {
409 threads[tnum].idle = true;
410 ++idleCount;
415 threads = null;
419 // ////////////////////////////////////////////////////////////////////////// //
420 shared bool ctrlC = false;
422 extern(C) void sigtermh (int snum) nothrow @nogc {
423 atomicStore(ctrlC, true);
427 // ////////////////////////////////////////////////////////////////////////// //
428 // pass number of threads
429 // fill `urlList` first!
430 // WARNING! DON'T CALL THIS TWICE!
431 public string downloadAll (uint tcount=4) {
432 if (tcount < 1 || tcount > 64) assert(0);
433 if (urlList.length == 0) return "nothing to do";
435 delete urlinfo;
436 urlinfo = null;
437 foreach (string url; urlList) {
438 bool found = false;
439 foreach (const ref ui; urlinfo) if (url == ui.url) { found = true; break; }
440 if (!found) {
441 auto path = url2path(url);
442 if (path.length) urlinfo ~= UrlInfo(url, url2path(url));
446 { import core.memory : GC; GC.collect(); }
447 import core.stdc.signal;
448 auto oldh = signal(SIGINT, &sigtermh);
449 scope(exit) signal(SIGINT, oldh);
450 scope(exit) { ttyRawWrite("\x1b[?25h"); }
451 // do it!
452 //auto oldTTYMode = ttySetRaw();
453 //scope(exit) ttySetMode(oldTTYMode);
454 threads = new ThreadInfo[](tcount);
455 prevThreadCount = 1; // we already has one empty line
456 startThreads();
457 ulong toCollect = 0;
458 auto timer = Timer(Timer.State.Running);
459 atomicStore(urlDone, 0);
460 while (atomicLoad(urlDone) < urlinfo.length) {
461 // force periodical collect to keep CURL happy
462 if (toCollect-- == 0) {
463 import core.memory : GC;
464 GC.collect();
465 toCollect = 128;
467 if (atomicLoad(ctrlC)) break;
468 // find idle thread and send it url index
469 usize freeTNum;
470 for (freeTNum = 0; freeTNum < threads.length; ++freeTNum) if (threads[freeTNum].idle) break;
471 if (freeTNum == threads.length) {
472 // no idle thread found, wait for completion message
473 import core.time;
474 for (;;) {
475 bool got = receiveTimeout(50.msecs,
476 (uint tnum) {
477 threads[tnum].idle = true;
478 freeTNum = tnum;
481 if (got || atomicLoad(ctrlC)) break;
483 if (atomicLoad(ctrlC)) break;
485 usize uidx = atomicOp!"+="(urlDone, 1);
486 --uidx; // 'cause `atomicOp()` returns op result
487 with (threads[freeTNum]) {
488 idle = false;
489 uindex = uidx;
490 tid.send(uidx);
493 // all downloads sheduled; wait for completion
494 stopThreads();
495 timer.stop();
496 removeInfoLines();
497 { ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h"); }
499 import std.string : format;
500 return format("%s files downloaded; time: %s", atomicLoad(urlDone), timer.toString);