1 /* Invisible Vector Library
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 // multithreaded massive file downloading engine with fancy progress
19 module iv
.multidown
/*is aliced*/;
23 import std
.concurrency
;
32 // ////////////////////////////////////////////////////////////////////////// //
33 // pbar0: fast changing (current file download, for example)
34 // pbar1: slow changing (total number of files to download, for example)
36 immutable usize ttyWdt
;
41 usize
[2] len
; // for tty
46 this (string atext
, usize cur1
, usize tot1
) {
47 import std
.algorithm
: min
, max
;
48 ttyWdt
= max(6, min(ttyWidth
, 512));
50 if (text
.length
> ttyWdt
+5) text
= text
[0..ttyWdt
-5];
61 void setTotal0 (usize tot0
) {
62 if (total
[0] != tot0
) {
65 immutable c0
= cur
[0];
71 void setTotal1 (usize tot1
) {
72 if (total
[1] != tot1
) {
75 immutable c0
= cur
[1];
81 void opIndexAssign (usize acur
, usize idx
) {
82 if (acur
> total
[idx
]) acur
= total
[idx
];
83 if (cur
[idx
] == acur
) return; // nothing to do
85 if (total
[idx
] == 0) return; // total is unknown
87 // percents for first counter
88 usize newprc
= 100*cur
[idx
]/total
[idx
];
95 usize newlen
= ttyWdt
*cur
[idx
]/total
[idx
];
96 if (newlen
!= len
[idx
]) {
102 void draw () nothrow @nogc {
103 import std
.algorithm
: min
;
106 char[1024] buf
= ' ';
107 buf
[0..text
.length
] = text
[];
108 usize bufpos
= text
.length
;
111 foreach_reverse (immutable idx
; 0..3) {
112 buf
[bufpos
+idx
] = '0'+prc
%10;
113 if ((prc
/= 10) == 0) break;
116 const wrt
= buf
[0..ttyWdt
];
117 // first write [0] and [1] progress
118 usize cpos
= min(len
[0], len
[1]);
120 ttyRawWrite("\x1b[?25l");
122 ttyRawWrite("\r\x1b[0;1;42m");
123 ttyRawWrite(wrt
[0..cpos
]);
127 ttyRawWrite("\x1b[1;45m");
128 ttyRawWrite(wrt
[cpos
..len
[0]]);
130 } else if (cpos
< len
[1]) {
133 ttyRawWrite("\x1b[1;43m");
134 ttyRawWrite(wrt
[cpos
..len
[1]]);
137 // what is left is emptiness
138 ttyRawWrite("\x1b[0m");
139 ttyRawWrite(wrt
[cpos
..$]);
141 //write("\x1b[K\r\x1b[", text.length+4, "C");
146 // ////////////////////////////////////////////////////////////////////////// //
147 // move cursor to tnum's thread info line
148 __gshared usize threadCount
; // # of running threads
149 __gshared usize prevThreadCount
; // # of running threads on previous call
150 __gshared usize curInfoLine
= usize
.max
; // 0: bottom; 1: one before bottom; etc.
153 // WARNING! CALL MUST BE SYNCHRONIZED
154 void cursorToInfoLine (usize tnum
) {
155 if (curInfoLine
== usize
.max
) {
156 if (threadCount
== 0) assert(0); // the thing that should not be
159 // move cursor to bottom
160 if (curInfoLine
) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine
); ttyRawWrite("B"); }
161 // add status lines if necessary
162 while (prevThreadCount
< threadCount
) {
164 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K\n");
167 // move cursor to required line from bottom
168 if (tnum
> 0) { ttyRawWrite("\x1b["); ttyRawWriteInt(tnum
); ttyRawWrite("A"); }
173 void removeInfoLines () {
174 if (curInfoLine
!= usize
.max
) {
175 // move cursor to bottom
176 if (curInfoLine
) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine
); ttyRawWrite("B"); }
178 while (threadCount
-- > 1) ttyRawWrite("\r\x1b[0m\x1b[K\x1b[A");
179 ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h");
184 // ////////////////////////////////////////////////////////////////////////// //
187 public __gshared string mdUserAgent
= "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)";
189 // fill this with urls to download
190 public __gshared string
[] urlList
; // the following is protected by `synchronized`
191 shared usize urlDone
;
193 // this will be called to get path from url
194 public __gshared string
delegate (string url
) url2path
;
196 // this will be called when download is complete
197 // call is synchronized
199 public __gshared
void delegate (string url
) urldone
;
202 struct UrlInfo
{ string url
, diskpath
; }
203 private __gshared UrlInfo
[] urlinfo
;
206 void downloadThread (usize tnum
, Tid ownerTid
) {
212 cursorToInfoLine(tnum);
213 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K");
220 // usize: url index to download
223 if (unum
>= urlinfo
.length
) {
224 // url index too big? done with it all
226 cursorToInfoLine(tnum
);
227 ttyRawWrite("\r\x1b[0;1;31mDONE\x1b[0m\x1b[K");
229 uinfo
= urlinfo
[unum
];
231 utotal
= urlinfo
.length
;
238 import std
.exception
: collectException
;
239 import std
.file
: mkdirRecurse
;
240 import std
.path
: baseName
, dirName
;
241 string line
, upath
, ddir
, dname
;
242 //upath = url2path(url);
243 upath
= uinfo
.diskpath
;
244 if (upath
.length
== 0) {
245 static if (is(typeof(() { import core
.exception
: ExitError
; }()))) {
246 import core
.exception
: ExitError
;
247 throw new ExitError();
252 ddir
= upath
.dirName
;
253 dname
= upath
.baseName
;
258 import std
.conv
: to
;
259 line
~= to
!string(tnum
)~": [";
260 auto cs
= to
!string(atomicLoad(urlDone
));
261 auto ts
= to
!string(utotal
);
262 foreach (immutable _
; cs
.length
..ts
.length
) line
~= ' ';
263 line
~= cs
~"/"~ts
~"] "~upath
~" ... ";
265 auto pbar
= PBar2(line
, atomicLoad(urlDone
), utotal
);
267 //ttyRawWrite("\r", line, " 0%");
269 //string location = null;
270 //bool hdrChecked = false;
271 //bool hasLocation = false;
272 bool wasProgress
= false;
273 bool showProgress
= true;
274 int oldPrc
= -1, oldPos
= -1;
278 auto fo
= File("/tmp/zzz", "a");
279 fo
.writeln("====================================================");
282 conn
.maxRedirects
= 64;
283 //conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
284 conn
.setUserAgent(mdUserAgent
);
285 conn
.onReceiveHeader
= (in char[] key
, in char[] value
) {
292 auto fo
= File("/tmp/zzz", "a");
293 fo
.writeln(key
, "\t", value
);
295 if (strEquCI(key
, "Location") && value
.length
) {
296 //hasLocation = true;
297 showProgress
= false;
299 location = value.idup;
300 if (location.length > 0) {
302 throw new Exception("boo!");
307 conn
.onProgress
= (scope usize dlTotal
, scope usize dlNow
, scope usize ulTotal
, scope usize ulNow
) {
311 pbar
.setTotal0(dlTotal
);
315 pbar
[1] = atomicLoad(urlDone
);
316 cursorToInfoLine(tnum
);
322 collectException(mkdirRecurse(ddir
));
323 string fname
= ddir
~"/"~dname
;
328 download(url
, fname
, conn
);
330 } catch (Exception e
) {
334 if (location.length) {
340 if (--retries
<= 0) {
341 ttyRawWrite("\n\n\n\n\x1b[0mFUCK!\n");
342 static if (is(typeof(() { import core
.exception
: ExitError
; }()))) {
343 import core
.exception
: ExitError
;
344 throw new ExitError();
350 if (urldone
!is null) {
351 synchronized urldone(url
);
354 } catch (Exception e
) {
359 // signal parent that we are idle
365 // ////////////////////////////////////////////////////////////////////////// //
372 ThreadInfo
[] threads
;
375 void startThreads () {
376 foreach (immutable usize idx
; 0..threads
.length
) {
377 synchronized ++threadCount
;
378 threads
[idx
].idle
= true;
379 threads
[idx
].tid
= spawn(&downloadThread
, idx
, thisTid
);
380 threads
[idx
].tid
.setMaxMailboxSize(2, OnCrowding
.block
);
385 void stopThreads () {
387 foreach (ref trd
; threads
) if (trd
.idle
) ++idleCount
;
388 while (idleCount
< threads
.length
) {
391 if (!threads
[tnum
].idle
) {
392 threads
[tnum
].idle
= true;
398 // send 'stop' signal to all threads
399 foreach (ref trd
; threads
) {
401 trd
.tid
.send(usize
.max
); // 'stop' signal
403 // wait for completion
405 while (idleCount
< threads
.length
) {
408 if (!threads
[tnum
].idle
) {
409 threads
[tnum
].idle
= true;
419 // ////////////////////////////////////////////////////////////////////////// //
420 shared bool ctrlC
= false;
422 extern(C
) void sigtermh (int snum
) nothrow @nogc {
423 atomicStore(ctrlC
, true);
427 // ////////////////////////////////////////////////////////////////////////// //
428 // pass number of threads
429 // fill `urlList` first!
430 // WARNING! DON'T CALL THIS TWICE!
431 public string
downloadAll (uint tcount
=4) {
432 if (tcount
< 1 || tcount
> 64) assert(0);
433 if (urlList
.length
== 0) return "nothing to do";
437 foreach (string url
; urlList
) {
439 foreach (const ref ui
; urlinfo
) if (url
== ui
.url
) { found
= true; break; }
441 auto path
= url2path(url
);
442 if (path
.length
) urlinfo
~= UrlInfo(url
, url2path(url
));
446 { import core
.memory
: GC
; GC
.collect(); }
447 import core
.stdc
.signal
;
448 auto oldh
= signal(SIGINT
, &sigtermh
);
449 scope(exit
) signal(SIGINT
, oldh
);
450 scope(exit
) { ttyRawWrite("\x1b[?25h"); }
452 //auto oldTTYMode = ttySetRaw();
453 //scope(exit) ttySetMode(oldTTYMode);
454 threads
= new ThreadInfo
[](tcount
);
455 prevThreadCount
= 1; // we already has one empty line
458 auto timer
= Timer(Timer
.State
.Running
);
459 atomicStore(urlDone
, 0);
460 while (atomicLoad(urlDone
) < urlinfo
.length
) {
461 // force periodical collect to keep CURL happy
462 if (toCollect
-- == 0) {
463 import core
.memory
: GC
;
467 if (atomicLoad(ctrlC
)) break;
468 // find idle thread and send it url index
470 for (freeTNum
= 0; freeTNum
< threads
.length
; ++freeTNum
) if (threads
[freeTNum
].idle
) break;
471 if (freeTNum
== threads
.length
) {
472 // no idle thread found, wait for completion message
475 bool got
= receiveTimeout(50.msecs
,
477 threads
[tnum
].idle
= true;
481 if (got ||
atomicLoad(ctrlC
)) break;
483 if (atomicLoad(ctrlC
)) break;
485 usize uidx
= atomicOp
!"+="(urlDone
, 1);
486 --uidx
; // 'cause `atomicOp()` returns op result
487 with (threads
[freeTNum
]) {
493 // all downloads sheduled; wait for completion
497 { ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h"); }
499 import std
.string
: format
;
500 return format("%s files downloaded; time: %s", atomicLoad(urlDone
), timer
.toString
);