sanitized_Request,
unescapeHTML,
unified_strdate,
+ unified_timestamp,
url_basename,
xpath_element,
xpath_text,
mimetype2ext,
update_Request,
update_url_query,
+ parse_m3u8_attributes,
+ extract_attributes,
+ parse_codecs,
)
* "height" (optional, int)
* "resolution" (optional, string "{width}x{height"},
deprecated)
+ * "filesize" (optional, int)
thumbnail: Full URL to a video thumbnail image.
description: Full video description.
uploader: Full name of the video uploader.
return self._og_search_property('url', html, **kargs)
def _html_search_meta(self, name, html, display_name=None, fatal=False, **kwargs):
+ if not isinstance(name, (list, tuple)):
+ name = [name]
if display_name is None:
- display_name = name
+ display_name = name[0]
return self._html_search_regex(
- self._meta_regex(name),
+ [self._meta_regex(n) for n in name],
html, display_name, fatal=fatal, group='content', **kwargs)
def _dc_search_uploader(self, html):
return self._html_search_meta('twitter:player', html,
'twitter card player')
- def _search_json_ld(self, html, video_id, **kwargs):
+ def _search_json_ld(self, html, video_id, expected_type=None, **kwargs):
json_ld = self._search_regex(
r'(?s)<script[^>]+type=(["\'])application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>',
html, 'JSON-LD', group='json_ld', **kwargs)
if not json_ld:
return {}
- return self._json_ld(json_ld, video_id, fatal=kwargs.get('fatal', True))
+ return self._json_ld(
+ json_ld, video_id, fatal=kwargs.get('fatal', True),
+ expected_type=expected_type)
- def _json_ld(self, json_ld, video_id, fatal=True):
+ def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
if isinstance(json_ld, compat_str):
json_ld = self._parse_json(json_ld, video_id, fatal=fatal)
if not json_ld:
info = {}
if json_ld.get('@context') == 'http://schema.org':
item_type = json_ld.get('@type')
+ if expected_type is not None and expected_type != item_type:
+ return info
if item_type == 'TVEpisode':
info.update({
'episode': unescapeHTML(json_ld.get('name')),
'title': unescapeHTML(json_ld.get('headline')),
'description': unescapeHTML(json_ld.get('articleBody')),
})
+ elif item_type == 'VideoObject':
+ info.update({
+ 'url': json_ld.get('contentUrl'),
+ 'title': unescapeHTML(json_ld.get('name')),
+ 'description': unescapeHTML(json_ld.get('description')),
+ 'thumbnail': json_ld.get('thumbnailUrl'),
+ 'duration': parse_duration(json_ld.get('duration')),
+ 'timestamp': unified_timestamp(json_ld.get('uploadDate')),
+ 'filesize': float_or_none(json_ld.get('contentSize')),
+ 'tbr': int_or_none(json_ld.get('bitrate')),
+ 'width': int_or_none(json_ld.get('width')),
+ 'height': int_or_none(json_ld.get('height')),
+ })
return dict((k, v) for k, v in info.items() if v is not None)
@staticmethod
f['ext'] = determine_ext(f['url'])
if isinstance(field_preference, (list, tuple)):
- return tuple(f.get(field) if f.get(field) is not None else -1 for field in field_preference)
+ return tuple(
+ f.get(field)
+ if f.get(field) is not None
+ else ('' if field == 'format_id' else -1)
+ for field in field_preference)
preference = f.get('preference')
if preference is None:
}]
last_info = None
last_media = None
- kv_rex = re.compile(
- r'(?P<key>[a-zA-Z_-]+)=(?P<val>"[^"]+"|[^",]+)(?:,|$)')
for line in m3u8_doc.splitlines():
if line.startswith('#EXT-X-STREAM-INF:'):
- last_info = {}
- for m in kv_rex.finditer(line):
- v = m.group('val')
- if v.startswith('"'):
- v = v[1:-1]
- last_info[m.group('key')] = v
+ last_info = parse_m3u8_attributes(line)
elif line.startswith('#EXT-X-MEDIA:'):
- last_media = {}
- for m in kv_rex.finditer(line):
- v = m.group('val')
- if v.startswith('"'):
- v = v[1:-1]
- last_media[m.group('key')] = v
+ last_media = parse_m3u8_attributes(line)
elif line.startswith('#') or not line.strip():
continue
else:
'url': format_url(line.strip()),
'tbr': tbr,
'ext': ext,
+ 'fps': float_or_none(last_info.get('FRAME-RATE')),
'protocol': entry_protocol,
'preference': preference,
}
width_str, height_str = resolution.split('x')
f['width'] = int(width_str)
f['height'] = int(height_str)
- codecs = last_info.get('CODECS')
- if codecs:
- vcodec, acodec = [None] * 2
- va_codecs = codecs.split(',')
- if len(va_codecs) == 1:
- # Audio only entries usually come with single codec and
- # no resolution. For more robustness we also check it to
- # be mp4 audio.
- if not resolution and va_codecs[0].startswith('mp4a'):
- vcodec, acodec = 'none', va_codecs[0]
- else:
- vcodec = va_codecs[0]
- else:
- vcodec, acodec = va_codecs[:2]
+ # Unified Streaming Platform
+ mobj = re.search(
+ r'audio.*?(?:%3D|=)(\d+)(?:-video.*?(?:%3D|=)(\d+))?', f['url'])
+ if mobj:
+ abr, vbr = mobj.groups()
+ abr, vbr = float_or_none(abr, 1000), float_or_none(vbr, 1000)
f.update({
- 'acodec': acodec,
- 'vcodec': vcodec,
+ 'vbr': vbr,
+ 'abr': abr,
})
+ f.update(parse_codecs(last_info.get('CODECS')))
if last_media is not None:
f['m3u8_media'] = last_media
last_media = None
self.report_warning('Unknown MIME type %s in DASH manifest' % mime_type)
return formats
+ def _parse_html5_media_entries(self, base_url, webpage):
+ def absolute_url(video_url):
+ return compat_urlparse.urljoin(base_url, video_url)
+
+ def parse_content_type(content_type):
+ if not content_type:
+ return {}
+ ctr = re.search(r'(?P<mimetype>[^/]+/[^;]+)(?:;\s*codecs="?(?P<codecs>[^"]+))?', content_type)
+ if ctr:
+ mimetype, codecs = ctr.groups()
+ f = parse_codecs(codecs)
+ f['ext'] = mimetype2ext(mimetype)
+ return f
+ return {}
+
+ entries = []
+ for media_tag, media_type, media_content in re.findall(r'(?s)(<(?P<tag>video|audio)[^>]*>)(.*?)</(?P=tag)>', webpage):
+ media_info = {
+ 'formats': [],
+ 'subtitles': {},
+ }
+ media_attributes = extract_attributes(media_tag)
+ src = media_attributes.get('src')
+ if src:
+ media_info['formats'].append({
+ 'url': absolute_url(src),
+ 'vcodec': 'none' if media_type == 'audio' else None,
+ })
+ media_info['thumbnail'] = media_attributes.get('poster')
+ if media_content:
+ for source_tag in re.findall(r'<source[^>]+>', media_content):
+ source_attributes = extract_attributes(source_tag)
+ src = source_attributes.get('src')
+ if not src:
+ continue
+ f = parse_content_type(source_attributes.get('type'))
+ f.update({
+ 'url': absolute_url(src),
+ 'vcodec': 'none' if media_type == 'audio' else None,
+ })
+ media_info['formats'].append(f)
+ for track_tag in re.findall(r'<track[^>]+>', media_content):
+ track_attributes = extract_attributes(track_tag)
+ kind = track_attributes.get('kind')
+ if not kind or kind == 'subtitles':
+ src = track_attributes.get('src')
+ if not src:
+ continue
+ lang = track_attributes.get('srclang') or track_attributes.get('lang') or track_attributes.get('label')
+ media_info['subtitles'].setdefault(lang, []).append({
+ 'url': absolute_url(src),
+ })
+ if media_info['formats']:
+ entries.append(media_info)
+ return entries
+
def _live_title(self, name):
""" Generate the title for a live video """
now = datetime.datetime.now()
def _mark_watched(self, *args, **kwargs):
raise NotImplementedError('This method must be implemented by subclasses')
+ def geo_verification_headers(self):
+ headers = {}
+ geo_verification_proxy = self._downloader.params.get('geo_verification_proxy')
+ if geo_verification_proxy:
+ headers['Ytdl-request-proxy'] = geo_verification_proxy
+ return headers
+
class SearchInfoExtractor(InfoExtractor):
"""