Quick binary expression handling for “test_token_buffering“
[vadmium-streams.git] / yt.py
blobc987965f6e42241b25af8b19ccf925c750c2d059
1 #! /bin/python3
3 from net import request_decoded
4 from contextlib import ExitStack
5 import json
6 from urllib.parse import urlsplit, parse_qs
7 from sys import stderr
8 from datetime import timedelta
9 import re
10 from math import inf
11 from time import strftime, gmtime
13 def main(yt, client='TVHTML5_SIMPLY_EMBEDDED_PLAYER', version=None, kbps=inf,
14 *, cache=False, audio=False, separate=False):
15 CLIENTS = {
16 'TVHTML5_SIMPLY_EMBEDDED_PLAYER': "2.0",
17 'ANDROID': "17.36.4",
18 'TVLITE': '2',
20 if version is None:
21 version = CLIENTS[client]
23 key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
24 client_obj = {
25 "clientName": client,
26 "clientVersion": version
28 if client == 'ANDROID':
29 agent = f'com.google.android.youtube/{version}'
30 client_obj["androidSdkVersion"] = 31
31 else:
32 agent = 'yt'
34 split = urlsplit(yt)
35 if split.netloc:
36 if split.netloc == 'youtu.be':
37 yt = split.path[1:]
38 else:
39 assert ('.' + split.netloc).endswith('.youtube.com')
40 if split.path in {'/watch', '/watch_popup'}:
41 [yt] = parse_qs(split.query)['v']
42 else:
43 assert split.path.startswith((
44 '/shorts/', '/live/', '/v/', '/embed/'))
45 yt = split.path.split('/', 2)[2]
46 assert split.scheme in {'http', 'https'}
47 assert re.fullmatch(r'[\w-]{11}', yt, re.ASCII)
49 with ExitStack() as cleanup:
50 [header, yt] = request_decoded(
51 'https://www.youtube.com/youtubei/v1/player',
52 headers=(
53 ('User-Agent', agent),
54 ('Content-Type', 'application/json'),
55 ('X-Goog-Api-Key', key),
57 data=json.dumps({
58 "context": {"client": client_obj},
59 "videoId": yt}).encode('ascii'),
60 types=('application/json',),
61 cleanup=cleanup, cache=cache)
62 yt = json.load(yt)
64 if 'videoDetails' in yt:
65 d = yt['videoDetails']
66 if 'title' in d:
67 stderr.write(f'{d["title"]}\n')
68 if 'shortDescription' in d:
69 stderr.write(f'{d["shortDescription"]}\n')
70 length = timedelta(seconds=int(d["lengthSeconds"]))
71 stderr.write(f'{length}')
72 if 'viewCount' in d:
73 stderr.write(f', {d["viewCount"]} views, author {d["author"]}')
74 for attr in ('isPrivate', 'isUnpluggedCorpus', 'isLiveContent'):
75 if d[attr]:
76 stderr.write(f', {attr}')
77 for attr in ('isCrawlable', 'allowRatings'):
78 if not d[attr]:
79 stderr.write(f', not {attr}')
80 stderr.write('\n')
82 s = yt['playabilityStatus']
83 if s['status'] != 'OK':
84 stderr.write(f'{s["status"]}: ')
85 if 'reasonTitle' in s:
86 stderr.write(f'{s["reasonTitle"]}\n')
87 if 'reason' in s:
88 stderr.write(f'{s["reason"]}\n')
89 if 'messages' in s:
90 [t] = s['messages']
91 stderr.write(f'{t}\n')
92 if 'reasonTitle' not in s and 'errorScreen' in s \
93 and 'playerErrorMessageRenderer' in s['errorScreen']:
94 s = s['errorScreen']['playerErrorMessageRenderer']
95 for field in ('reason', 'subreason'):
96 if field in s:
97 try:
98 t = ''.join(t['text'] for t in s[field]['runs'])
99 except LookupError:
100 t = s[field]['simpleText']
101 stderr.write(f'{t}\n')
102 if 'learnMore' in s:
103 [t] = s['learnMore']['runs']
104 t = t['navigationEndpoint']['urlEndpoint']['url']
105 stderr.write(f'{t}\n')
106 if 'streamingData' not in yt:
107 raise SystemExit(1)
108 yt = yt['streamingData']
110 mod_min = +inf
111 mod_max = -inf
112 for formats in ('formats', 'adaptiveFormats'):
113 for format in yt.get(formats, ()):
114 mod_min = min(mod_min, int(format['lastModified']))
115 mod_max = max(mod_max, int(format['lastModified']))
116 stderr.write(f'{format_time(mod_min)} to {format_time(mod_max)}\n')
118 if audio or separate:
119 yt = yt['adaptiveFormats']
120 else:
121 if 'formats' not in yt:
122 stderr.write(f'streamingData: {yt.keys()}\n')
123 yt = yt['formats']
124 types = dict()
125 for format in yt:
126 [type, subtype] = format['mimeType'].split('/', 1)
127 types.setdefault(type, list()).append(format)
128 CODECS = r' *[^/]+/[^ ;]+ *; *codecs *= *"([^"\\]+)" *'
129 codec = re.fullmatch(CODECS, format['mimeType']).group(1)
130 best = 0
131 for codec in codec.split(','):
132 [codec, sep, rest] = codec.strip().partition('.')
133 BEST_kbps = {
134 'mp4v': 1600, 'avc1': 900, 'vp9': 600, 'av01': 400,
135 'mp4a': 140, 'opus': 130,
137 best += BEST_kbps[codec]
138 try:
139 bitrate = format['bitrate'] / 1000
140 except LookupError:
141 bitrate = int(format['contentLength']) * 8
142 bitrate /= int(format['approxDurationMs'])
143 format['qv'] = bitrate / best
145 if not audio:
146 print(best_url(types['video'], limit=float(kbps) * 1e3))
147 if audio or separate:
148 print(best_url(types['audio'], limit=float(kbps) * 1e3))
150 def best_url(yt, limit):
151 url = None
152 def key(s):
153 try:
154 abr = s['averageBitrate']
155 except LookupError:
156 try:
157 abr = int(s['contentLength']) * 8
158 abr /= int(s['approxDurationMs']) * 1e-3
159 except LookupError:
160 abr = s['bitrate']
161 # First, prioritize streams within the bitrate limit, followed by
162 # each stream with the lowest bitrate exceeding that limit. Then,
163 # prioritize streams that meet the minimum quality requirement,
164 # followed by each stream with the highest quality under that
165 # minimum. Finally, prioritize each stream with the lowest bitrate.
166 return (+max(abr, limit), -min(s['qv'], 1), +abr)
167 yt = sorted(yt, key=key)
168 for s in yt:
169 stderr.write(f'{s["quality"]}: {s["mimeType"]} ')
170 try:
171 try:
172 abr = s["averageBitrate"] / 1000
173 except LookupError:
174 abr = int(s['contentLength']) * 8
175 abr /= int(s['approxDurationMs'])
176 stderr.write(f'{abr:.0f} ')
177 if 'bitrate' in s:
178 stderr.write('< ')
179 except LookupError:
180 pass
181 if 'bitrate' in s:
182 stderr.write(f'{s["bitrate"] / 1000:.0f} ')
183 stderr.write(f'kb/s {s["qv"]:.1%}\n')
185 if url is None:
186 stderr.write('^^^\n')
187 url = s['url']
188 return url
190 def format_time(us):
191 [secs, us] = divmod(us, 10**6)
192 return f'{strftime("%Y-%m-%d %H:%M:%S", gmtime(secs))}.{us:06d}Z'
194 if __name__ == '__main__':
195 from clifunc import run
196 run(main)