Quick binary expression handling for “test_token_buffering“
[vadmium-streams.git] / datacopy.py
blob103ca854f68b8123596f87fbc99dc487cd2c1d0a
1 from sys import stderr, argv
2 import os#, os.path
3 from io import UnsupportedOperation
4 from warnings import warn
5 import time
6 from io import SEEK_END
7 from contextlib import ExitStack
8 from math import floor, ceil, log10, exp
10 import csv
11 from time import strftime
13 def main(input, output=''):
14 #~ output = os.path.join(output, os.path.basename(input))
15 with ExitStack() as cleanup:
16 input = cleanup.enter_context(open(input, 'rb', 0))
17 size = os.fstat(input.fileno()).st_size
18 output = cleanup.enter_context(open(output, 'ab'))
19 offset = output.seek(0, SEEK_END)
20 input.seek(offset)
21 log = TerminalLog()
22 cleanup.callback(Progress.close, log)
23 progress = Progress(log, size, offset)
24 while offset < size:
25 data = input.read(0x10000)
26 if not data:
27 raise EOFError('Unexpected end of file')
28 output.write(data)
29 offset += len(data)
30 progress.update(offset)
32 class TerminalLog:
33 def __init__(self):
34 # Defaults
35 self.tty = False
36 self.curses = None
38 if not stderr:
39 return
40 try:
41 if not stderr.buffer.isatty():
42 raise UnsupportedOperation()
43 except (AttributeError, UnsupportedOperation):
44 return
46 self.tty = True
47 try:
48 import curses
49 except ImportError:
50 return
51 self.write(str())
52 self.flush()
53 termstr = os.getenv("TERM", "")
54 fd = stderr.buffer.fileno()
55 try:
56 curses.setupterm(termstr, fd)
57 except curses.error as err:
58 warn(err)
59 self.curses = curses
61 def write(self, text):
62 if stderr:
63 stderr.write(text)
64 self.flushed = False
66 def flush(self):
67 if stderr and not self.flushed:
68 stderr.flush()
69 self.flushed = True
71 def carriage_return(self):
72 if self.curses:
73 self.tput("cr")
74 elif self.tty:
75 self.write("\r")
76 else:
77 self.write("\n")
79 def clear_eol(self):
80 return self.tput("el")
82 def tput(self, capname):
83 if not self.curses:
84 return
85 string = self.curses.tigetstr(capname)
86 segs = string.split(b"$<")
87 string = segs[0]
88 string += bytes().join(s.split(b">", 1)[1] for s in segs[1:])
89 self.flush()
90 stderr.buffer.write(string)
92 class Progress:
93 TIME_CONST = 1
94 SAMPLES = 30
95 # TODO: try keeping samples for say up to 10 s, but drop samples
96 # that are older than 10 s rather than having a fixed # of samples
98 def __init__(self, log, total, progress=0):
99 self.log = log
100 self.total = total
101 self.last = time.monotonic()
102 #~ self.csv = csv.writer(open(strftime('progress.%Y%m%dT%H%M%S.csv'), 'at', encoding='ascii', newline=''))
103 #~ self.csv.writerow(('time', 'progress'))
104 self.last_progress = progress
105 self.last_rate = None
106 #~ self.samples = [(self.last, progress)] * self.SAMPLES
107 #~ self.sample = 0
108 self.rate_prefix = 0
110 def update(self, progress):
111 now = time.monotonic()
112 #~ self.csv.writerow((now, progress))
113 interval = now - self.last
114 if interval < 0.1:
115 return
116 #~ [then, prev] = self.samples[self.sample]
117 rate = (progress - self.last_progress) / interval
118 if self.last_rate is not None:
119 rate += exp(-interval/self.TIME_CONST) * (self.last_rate - rate)
120 self.last = now
121 self.last_progress = progress
122 self.last_rate = rate
123 # TODO: detect non terminal, including IDLE; allow this determination to be overridden
125 PREFIXES = 'kMGTPE'
126 if rate:
127 if 0.1 < rate < 10000 * 1e3**len(PREFIXES):
128 scaled = round(rate / 1000**self.rate_prefix)
129 if 1000 <= scaled < 10000:
130 scaled = format(scaled)
131 else:
132 self.rate_prefix = min(int(log10(rate)) // 3, len(PREFIXES))
133 scaled = format(rate / 1000**self.rate_prefix, '#.3g')
134 if 'e' in scaled:
135 if self.rate_prefix < len(PREFIXES):
136 self.rate_prefix += 1
137 scaled = format(rate / 1000**self.rate_prefix, '#.3g')
138 else:
139 scaled = round(rate / 1000**self.rate_prefix)
140 if 1000 <= scaled < 10000:
141 scaled = format(scaled)
142 else:
143 self.rate_prefix = -1
144 else:
145 self.rate_prefix = -1
146 if self.rate_prefix < 0:
147 scaled = format(rate, '.0e')
148 if 0 < rate < 1:
149 [m, e, expon] = scaled.partition('e')
150 if expon and int(expon) >= -3:
151 scaled = format(rate, '.3f')
152 self.rate_prefix = 0
153 scaled = scaled.rstrip('.')
154 if self.rate_prefix > 0:
155 scaled += 'kMGTPE'[self.rate_prefix - 1]
157 eta = ceil((self.total - progress) / rate)
158 else:
159 scaled = '0'
160 self.rate_prefix = 0
161 if rate and eta <= 9999 * 60 + 59:
162 [minutes, sec] = divmod(eta, 60)
163 else:
164 minutes = 9999
165 sec = 99
167 if progress == self.total:
168 progress = '100'
169 else:
170 progress /= self.total
171 progress = floor(progress * 100*10)
172 [progress, frac] = divmod(progress, 10)
173 progress = f'{progress:2}.{frac:01}'
174 self.log.carriage_return()
175 self.log.clear_eol()
176 self.log.write("{:>4}%{:>7}B/s{:>5}m{:02}s".format(
177 progress, scaled, f'-{minutes}', sec))
178 self.log.flush()
180 @classmethod
181 def close(cls, log):
182 log.carriage_return()
183 log.clear_eol()
185 if __name__ == '__main__':
186 with open(argv[1], 'rt', encoding='ascii', newline='') as samples:
187 samples = csv.reader(samples)
188 assert next(samples) == ['time', 'progress']
189 samples = tuple(samples)
190 total = int(samples[-1][1])
191 log = TerminalLog()
192 samples = iter(samples)
193 import time
194 from time import sleep
195 [start, sample] = next(samples)
196 start = float(start)
197 sample = int(sample)
198 real_start = time.monotonic()
199 progress = Progress(log, total, sample)
200 for [t, sample] in samples:
201 sample = int(sample)
202 t = float(t) - start + real_start - time.monotonic()
203 if t > 0:
204 sleep(t)
205 progress.update(sample)
206 log.write('\n')
207 #~ main(*argv[1:])