1 /* Invisible Vector Library
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // multithreaded massive file downloading engine with fancy progress
18 module iv
.multidown
/*is aliced*/;
22 import std
.concurrency
;
31 // ////////////////////////////////////////////////////////////////////////// //
32 // pbar0: fast changing (current file download, for example)
33 // pbar1: slow changing (total number of files to download, for example)
35 immutable usize ttyWdt
;
40 usize
[2] len
; // for tty
45 this (string atext
, usize cur1
, usize tot1
) {
46 import std
.algorithm
: min
, max
;
47 ttyWdt
= max(6, min(ttyWidth
, 512));
49 if (text
.length
> ttyWdt
+5) text
= text
[0..ttyWdt
-5];
60 void setTotal0 (usize tot0
) {
61 if (total
[0] != tot0
) {
64 immutable c0
= cur
[0];
70 void setTotal1 (usize tot1
) {
71 if (total
[1] != tot1
) {
74 immutable c0
= cur
[1];
80 void opIndexAssign (usize acur
, usize idx
) {
81 if (acur
> total
[idx
]) acur
= total
[idx
];
82 if (cur
[idx
] == acur
) return; // nothing to do
84 if (total
[idx
] == 0) return; // total is unknown
86 // percents for first counter
87 usize newprc
= 100*cur
[idx
]/total
[idx
];
94 usize newlen
= ttyWdt
*cur
[idx
]/total
[idx
];
95 if (newlen
!= len
[idx
]) {
101 void draw () nothrow @nogc {
102 import std
.algorithm
: min
;
105 char[1024] buf
= ' ';
106 buf
[0..text
.length
] = text
[];
107 usize bufpos
= text
.length
;
110 foreach_reverse (immutable idx
; 0..3) {
111 buf
[bufpos
+idx
] = '0'+prc
%10;
112 if ((prc
/= 10) == 0) break;
115 const wrt
= buf
[0..ttyWdt
];
116 // first write [0] and [1] progress
117 usize cpos
= min(len
[0], len
[1]);
119 ttyRawWrite("\x1b[?25l");
121 ttyRawWrite("\r\x1b[0;1;42m");
122 ttyRawWrite(wrt
[0..cpos
]);
126 ttyRawWrite("\x1b[1;45m");
127 ttyRawWrite(wrt
[cpos
..len
[0]]);
129 } else if (cpos
< len
[1]) {
132 ttyRawWrite("\x1b[1;43m");
133 ttyRawWrite(wrt
[cpos
..len
[1]]);
136 // what is left is emptiness
137 ttyRawWrite("\x1b[0m");
138 ttyRawWrite(wrt
[cpos
..$]);
140 //write("\x1b[K\r\x1b[", text.length+4, "C");
145 // ////////////////////////////////////////////////////////////////////////// //
146 // move cursor to tnum's thread info line
147 __gshared usize threadCount
; // # of running threads
148 __gshared usize prevThreadCount
; // # of running threads on previous call
149 __gshared usize curInfoLine
= usize
.max
; // 0: bottom; 1: one before bottom; etc.
152 // WARNING! CALL MUST BE SYNCHRONIZED
153 void cursorToInfoLine (usize tnum
) {
154 if (curInfoLine
== usize
.max
) {
155 if (threadCount
== 0) assert(0); // the thing that should not be
158 // move cursor to bottom
159 if (curInfoLine
) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine
); ttyRawWrite("B"); }
160 // add status lines if necessary
161 while (prevThreadCount
< threadCount
) {
163 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K\n");
166 // move cursor to required line from bottom
167 if (tnum
> 0) { ttyRawWrite("\x1b["); ttyRawWriteInt(tnum
); ttyRawWrite("A"); }
172 void removeInfoLines () {
173 if (curInfoLine
!= usize
.max
) {
174 // move cursor to bottom
175 if (curInfoLine
) { ttyRawWrite("\x1b["); ttyRawWriteInt(curInfoLine
); ttyRawWrite("B"); }
177 while (threadCount
-- > 1) ttyRawWrite("\r\x1b[0m\x1b[K\x1b[A");
178 ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h");
183 // ////////////////////////////////////////////////////////////////////////// //
186 public __gshared string mdUserAgent
= "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)";
188 // fill this with urls to download
189 public __gshared string
[] urlList
; // the following is protected by `synchronized`
190 shared usize urlDone
;
192 // this will be called to get path from url
193 public __gshared string
delegate (string url
) url2path
;
195 // this will be called when download is complete
196 // call is synchronized
198 public __gshared
void delegate (string url
) urldone
;
201 struct UrlInfo
{ string url
, diskpath
; }
202 private __gshared UrlInfo
[] urlinfo
;
205 void downloadThread (usize tnum
, Tid ownerTid
) {
211 cursorToInfoLine(tnum);
212 ttyRawWrite("\r\x1b[0;1;33mIDLE\x1b[0m\x1b[K");
219 // usize: url index to download
222 if (unum
>= urlinfo
.length
) {
223 // url index too big? done with it all
225 cursorToInfoLine(tnum
);
226 ttyRawWrite("\r\x1b[0;1;31mDONE\x1b[0m\x1b[K");
228 uinfo
= urlinfo
[unum
];
230 utotal
= urlinfo
.length
;
237 import std
.exception
: collectException
;
238 import std
.file
: mkdirRecurse
;
239 import std
.path
: baseName
, dirName
;
240 string line
, upath
, ddir
, dname
;
241 //upath = url2path(url);
242 upath
= uinfo
.diskpath
;
243 if (upath
.length
== 0) {
244 static if (is(typeof(() { import core
.exception
: ExitError
; }()))) {
245 import core
.exception
: ExitError
;
246 throw new ExitError();
251 ddir
= upath
.dirName
;
252 dname
= upath
.baseName
;
257 import std
.conv
: to
;
258 line
~= to
!string(tnum
)~": [";
259 auto cs
= to
!string(atomicLoad(urlDone
));
260 auto ts
= to
!string(utotal
);
261 foreach (immutable _
; cs
.length
..ts
.length
) line
~= ' ';
262 line
~= cs
~"/"~ts
~"] "~upath
~" ... ";
264 auto pbar
= PBar2(line
, atomicLoad(urlDone
), utotal
);
266 //ttyRawWrite("\r", line, " 0%");
268 //string location = null;
269 //bool hdrChecked = false;
270 //bool hasLocation = false;
271 bool wasProgress
= false;
272 bool showProgress
= true;
273 int oldPrc
= -1, oldPos
= -1;
277 auto fo
= File("/tmp/zzz", "a");
278 fo
.writeln("====================================================");
281 conn
.maxRedirects
= 64;
282 //conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
283 conn
.setUserAgent(mdUserAgent
);
284 conn
.onReceiveHeader
= (in char[] key
, in char[] value
) {
291 auto fo
= File("/tmp/zzz", "a");
292 fo
.writeln(key
, "\t", value
);
294 if (strEquCI(key
, "Location") && value
.length
) {
295 //hasLocation = true;
296 showProgress
= false;
298 location = value.idup;
299 if (location.length > 0) {
301 throw new Exception("boo!");
306 conn
.onProgress
= (scope usize dlTotal
, scope usize dlNow
, scope usize ulTotal
, scope usize ulNow
) {
310 pbar
.setTotal0(dlTotal
);
314 pbar
[1] = atomicLoad(urlDone
);
315 cursorToInfoLine(tnum
);
321 collectException(mkdirRecurse(ddir
));
322 string fname
= ddir
~"/"~dname
;
327 download(url
, fname
, conn
);
329 } catch (Exception e
) {
333 if (location.length) {
339 if (--retries
<= 0) {
340 ttyRawWrite("\n\n\n\n\x1b[0mFUCK!\n");
341 static if (is(typeof(() { import core
.exception
: ExitError
; }()))) {
342 import core
.exception
: ExitError
;
343 throw new ExitError();
349 if (urldone
!is null) {
350 synchronized urldone(url
);
353 } catch (Exception e
) {
358 // signal parent that we are idle
364 // ////////////////////////////////////////////////////////////////////////// //
371 ThreadInfo
[] threads
;
374 void startThreads () {
375 foreach (immutable usize idx
; 0..threads
.length
) {
376 synchronized ++threadCount
;
377 threads
[idx
].idle
= true;
378 threads
[idx
].tid
= spawn(&downloadThread
, idx
, thisTid
);
379 threads
[idx
].tid
.setMaxMailboxSize(2, OnCrowding
.block
);
384 void stopThreads () {
386 foreach (ref trd
; threads
) if (trd
.idle
) ++idleCount
;
387 while (idleCount
< threads
.length
) {
390 if (!threads
[tnum
].idle
) {
391 threads
[tnum
].idle
= true;
397 // send 'stop' signal to all threads
398 foreach (ref trd
; threads
) {
400 trd
.tid
.send(usize
.max
); // 'stop' signal
402 // wait for completion
404 while (idleCount
< threads
.length
) {
407 if (!threads
[tnum
].idle
) {
408 threads
[tnum
].idle
= true;
418 // ////////////////////////////////////////////////////////////////////////// //
419 shared bool ctrlC
= false;
421 extern(C
) void sigtermh (int snum
) nothrow @nogc {
422 atomicStore(ctrlC
, true);
426 // ////////////////////////////////////////////////////////////////////////// //
427 // pass number of threads
428 // fill `urlList` first!
429 // WARNING! DON'T CALL THIS TWICE!
430 public string
downloadAll (uint tcount
=4) {
431 if (tcount
< 1 || tcount
> 64) assert(0);
432 if (urlList
.length
== 0) return "nothing to do";
436 foreach (string url
; urlList
) {
438 foreach (const ref ui
; urlinfo
) if (url
== ui
.url
) { found
= true; break; }
440 auto path
= url2path(url
);
441 if (path
.length
) urlinfo
~= UrlInfo(url
, url2path(url
));
445 { import core
.memory
: GC
; GC
.collect(); }
446 import core
.stdc
.signal
;
447 auto oldh
= signal(SIGINT
, &sigtermh
);
448 scope(exit
) signal(SIGINT
, oldh
);
449 scope(exit
) { ttyRawWrite("\x1b[?25h"); }
451 //auto oldTTYMode = ttySetRaw();
452 //scope(exit) ttySetMode(oldTTYMode);
453 threads
= new ThreadInfo
[](tcount
);
454 prevThreadCount
= 1; // we already has one empty line
457 auto timer
= Timer(Timer
.State
.Running
);
458 atomicStore(urlDone
, 0);
459 while (atomicLoad(urlDone
) < urlinfo
.length
) {
460 // force periodical collect to keep CURL happy
461 if (toCollect
-- == 0) {
462 import core
.memory
: GC
;
466 if (atomicLoad(ctrlC
)) break;
467 // find idle thread and send it url index
469 for (freeTNum
= 0; freeTNum
< threads
.length
; ++freeTNum
) if (threads
[freeTNum
].idle
) break;
470 if (freeTNum
== threads
.length
) {
471 // no idle thread found, wait for completion message
474 bool got
= receiveTimeout(50.msecs
,
476 threads
[tnum
].idle
= true;
480 if (got ||
atomicLoad(ctrlC
)) break;
482 if (atomicLoad(ctrlC
)) break;
484 usize uidx
= atomicOp
!"+="(urlDone
, 1);
485 --uidx
; // 'cause `atomicOp()` returns op result
486 with (threads
[freeTNum
]) {
492 // all downloads sheduled; wait for completion
496 { ttyRawWrite("\r\x1b[0m\x1b[K\x1b[?25h"); }
498 import std
.string
: format
;
499 return format("%s files downloaded; time: %s", atomicLoad(urlDone
), timer
.toString
);