import errno
import functools
import gzip
-import itertools
import io
+import itertools
import json
import locale
import math
import pipes
import platform
import re
-import ssl
import socket
+import ssl
import struct
import subprocess
import sys
import zlib
from .compat import (
+ compat_HTMLParser,
compat_basestring,
compat_chr,
compat_etree_fromstring,
compat_str,
compat_urllib_error,
compat_urllib_parse,
+ compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
compat_urllib_request,
compat_urlparse,
+ compat_xpath,
shlex_quote,
)
'wav',
'f4f', 'f4m', 'm3u8', 'smil')
+# needed for sanitizing filenames in restricted mode
+ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖØÙÚÛÜÝÞßàáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ',
+ itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOUUUUYP', ['ss'],
+ 'aaaaaa', ['ae'], 'ceeeeiiiionoooooouuuuypy')))
+
def preferredencoding():
"""Get preferred encoding.
return node.find(expr)
else:
def find_xpath_attr(node, xpath, key, val=None):
- # Here comes the crazy part: In 2.6, if the xpath is a unicode,
- # .//node does not match if a node is a direct child of . !
- if isinstance(xpath, compat_str):
- xpath = xpath.encode('ascii')
-
- for f in node.findall(xpath):
+ for f in node.findall(compat_xpath(xpath)):
if key not in f.attrib:
continue
if val is None or f.attrib.get(key) == val:
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
- if sys.version_info < (2, 7): # Crazy 2.6
- xpath = xpath.encode('ascii')
- return node.find(xpath)
+ return node.find(compat_xpath(xpath))
if isinstance(xpath, (str, compat_str)):
n = _find_xpath(xpath)
m = re.search(r'''(?xs)
<([a-zA-Z0-9:._-]+)
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]+|="[^"]+"|='[^']+'))*?
+ (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'))*?
\s+%s=['"]?%s['"]?
- (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]+|="[^"]+"|='[^']+'))*?
+ (?:\s+[a-zA-Z0-9:._-]+(?:=[a-zA-Z0-9:._-]*|="[^"]*"|='[^']*'))*?
\s*>
(?P<content>.*?)
</\1>
return unescapeHTML(res)
+class HTMLAttributeParser(compat_HTMLParser):
+ """Trivial HTML parser to gather the attributes for a single element"""
+ def __init__(self):
+ self.attrs = {}
+ compat_HTMLParser.__init__(self)
+
+ def handle_starttag(self, tag, attrs):
+ self.attrs = dict(attrs)
+
+
+def extract_attributes(html_element):
+ """Given a string for an HTML element such as
+ <el
+ a="foo" B="bar" c="&98;az" d=boz
+ empty= noval entity="&"
+ sq='"' dq="'"
+ >
+ Decode and return a dictionary of attributes.
+ {
+ 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
+ 'empty': '', 'noval': None, 'entity': '&',
+ 'sq': '"', 'dq': '\''
+ }.
+ NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
+ but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
+ """
+ parser = HTMLAttributeParser()
+ parser.feed(html_element)
+ parser.close()
+ return parser.attrs
+
+
def clean_html(html):
"""Clean an HTML snippet into a readable string"""
Set is_id if this is not an arbitrary string, but an ID that should be kept if possible
"""
def replace_insane(char):
+ if restricted and char in ACCENT_CHARS:
+ return ACCENT_CHARS[char]
if char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
# Prepend protocol-less URLs with `http:` scheme in order to mitigate the number of
# unwanted failures due to missing protocol
+def sanitize_url(url):
+ return 'http:%s' % url if url.startswith('//') else url
+
+
def sanitized_Request(url, *args, **kwargs):
- return compat_urllib_request.Request(
- 'http:%s' % url if url.startswith('//') else url, *args, **kwargs)
+ return compat_urllib_request.Request(sanitize_url(url), *args, **kwargs)
def orderedSet(iterable):
# Substitute URL if any change after escaping
if url != url_escaped:
- req_type = HEADRequest if req.get_method() == 'HEAD' else compat_urllib_request.Request
- new_req = req_type(
- url_escaped, data=req.data, headers=req.headers,
- origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
- new_req.timeout = req.timeout
- req = new_req
+ req = update_Request(req, url=url_escaped)
for h, v in std_headers.items():
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
def smuggle_url(url, data):
""" Pass additional data in a URL for internal use. """
- sdata = compat_urllib_parse.urlencode(
+ sdata = compat_urllib_parse_urlencode(
{'__youtubedl_smuggle': json.dumps(data)})
return url + '#' + sdata
return '%.2f%s' % (converted, suffix)
+def lookup_unit_table(unit_table, s):
+ units_re = '|'.join(re.escape(u) for u in unit_table)
+ m = re.match(
+ r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)\b' % units_re, s)
+ if not m:
+ return None
+ num_str = m.group('num').replace(',', '.')
+ mult = unit_table[m.group('unit')]
+ return int(float(num_str) * mult)
+
+
def parse_filesize(s):
if s is None:
return None
'Yb': 1000 ** 8,
}
- units_re = '|'.join(re.escape(u) for u in _UNIT_TABLE)
- m = re.match(
- r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)' % units_re, s)
- if not m:
+ return lookup_unit_table(_UNIT_TABLE, s)
+
+
+def parse_count(s):
+ if s is None:
return None
- num_str = m.group('num').replace(',', '.')
- mult = _UNIT_TABLE[m.group('unit')]
- return int(float(num_str) * mult)
+ s = s.strip()
+
+ if re.match(r'^[\d,.]+$', s):
+ return str_to_int(s)
+
+ _UNIT_TABLE = {
+ 'k': 1000,
+ 'K': 1000,
+ 'm': 1000 ** 2,
+ 'M': 1000 ** 2,
+ 'kk': 1000 ** 2,
+ 'KK': 1000 ** 2,
+ }
+
+ return lookup_unit_table(_UNIT_TABLE, s)
def month_by_name(name):
s = s.strip()
- m = re.match(
- r'''(?ix)(?:P?T)?
- (?:
- (?P<only_mins>[0-9.]+)\s*(?:mins?\.?|minutes?)\s*|
- (?P<only_hours>[0-9.]+)\s*(?:hours?)|
-
- \s*(?P<hours_reversed>[0-9]+)\s*(?:[:h]|hours?)\s*(?P<mins_reversed>[0-9]+)\s*(?:[:m]|mins?\.?|minutes?)\s*|
- (?:
+ days, hours, mins, secs, ms = [None] * 5
+ m = re.match(r'(?:(?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?$', s)
+ if m:
+ days, hours, mins, secs, ms = m.groups()
+ else:
+ m = re.match(
+ r'''(?ix)(?:P?T)?
(?:
- (?:(?P<days>[0-9]+)\s*(?:[:d]|days?)\s*)?
- (?P<hours>[0-9]+)\s*(?:[:h]|hours?)\s*
+ (?P<days>[0-9]+)\s*d(?:ays?)?\s*
)?
- (?P<mins>[0-9]+)\s*(?:[:m]|mins?|minutes?)\s*
- )?
- (?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*(?:s|secs?|seconds?)?
- )$''', s)
- if not m:
- return None
- res = 0
- if m.group('only_mins'):
- return float_or_none(m.group('only_mins'), invscale=60)
- if m.group('only_hours'):
- return float_or_none(m.group('only_hours'), invscale=60 * 60)
- if m.group('secs'):
- res += int(m.group('secs'))
- if m.group('mins_reversed'):
- res += int(m.group('mins_reversed')) * 60
- if m.group('mins'):
- res += int(m.group('mins')) * 60
- if m.group('hours'):
- res += int(m.group('hours')) * 60 * 60
- if m.group('hours_reversed'):
- res += int(m.group('hours_reversed')) * 60 * 60
- if m.group('days'):
- res += int(m.group('days')) * 24 * 60 * 60
- if m.group('ms'):
- res += float(m.group('ms'))
- return res
+ (?:
+ (?P<hours>[0-9]+)\s*h(?:ours?)?\s*
+ )?
+ (?:
+ (?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?\s*
+ )?
+ (?:
+ (?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*s(?:ec(?:ond)?s?)?\s*
+ )?$''', s)
+ if m:
+ days, hours, mins, secs, ms = m.groups()
+ else:
+ m = re.match(r'(?i)(?:(?P<hours>[0-9.]+)\s*(?:hours?)|(?P<mins>[0-9.]+)\s*(?:mins?\.?|minutes?)\s*)$', s)
+ if m:
+ hours, mins = m.groups()
+ else:
+ return None
+
+ duration = 0
+ if secs:
+ duration += float(secs)
+ if mins:
+ duration += float(mins) * 60
+ if hours:
+ duration += float(hours) * 60 * 60
+ if days:
+ duration += float(days) * 24 * 60 * 60
+ if ms:
+ duration += float(ms)
+ return duration
def prepend_extension(filename, ext, expected_real_ext=None):
"""Escape URL as suggested by RFC 3986"""
url_parsed = compat_urllib_parse_urlparse(url)
return url_parsed._replace(
+ netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(url_parsed.path),
params=escape_rfc3986(url_parsed.params),
query=escape_rfc3986(url_parsed.query),
try:
struct.pack('!I', 0)
except TypeError:
- # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument
+ # In Python 2.6 and 2.7.x < 2.7.7, struct requires a bytes argument
+ # See https://bugs.python.org/issue19099
def struct_pack(spec, *args):
if isinstance(spec, compat_str):
spec = spec.encode('ascii')
def urlencode_postdata(*args, **kargs):
- return compat_urllib_parse.urlencode(*args, **kargs).encode('ascii')
+ return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
-def encode_dict(d, encoding='utf-8'):
- def encode(v):
- return v.encode(encoding) if isinstance(v, compat_basestring) else v
- return dict((encode(k), encode(v)) for k, v in d.items())
+def update_url_query(url, query):
+ if not query:
+ return url
+ parsed_url = compat_urlparse.urlparse(url)
+ qs = compat_parse_qs(parsed_url.query)
+ qs.update(query)
+ return compat_urlparse.urlunparse(parsed_url._replace(
+ query=compat_urllib_parse_urlencode(qs, True)))
+
+
+def update_Request(req, url=None, data=None, headers={}, query={}):
+ req_headers = req.headers.copy()
+ req_headers.update(headers)
+ req_data = data or req.data
+ req_url = update_url_query(url or req.get_full_url(), query)
+ req_type = HEADRequest if req.get_method() == 'HEAD' else compat_urllib_request.Request
+ new_req = req_type(
+ req_url, data=req_data, headers=req_headers,
+ origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
+ if hasattr(req, 'timeout'):
+ new_req.timeout = req.timeout
+ return new_req
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
def mimetype2ext(mt):
+ if mt is None:
+ return None
+
ext = {
'audio/mp4': 'm4a',
}.get(mt)
_x = functools.partial(xpath_with_ns, ns_map={
'ttml': 'http://www.w3.org/ns/ttml',
'ttaf1': 'http://www.w3.org/2006/10/ttaf1',
+ 'ttaf1_0604': 'http://www.w3.org/2006/04/ttaf1',
})
class TTMLPElementParser(object):
dfxp = compat_etree_fromstring(dfxp_data.encode('utf-8'))
out = []
- paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall(_x('.//ttaf1:p')) or dfxp.findall('.//p')
+ paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall(_x('.//ttaf1:p')) or dfxp.findall(_x('.//ttaf1_0604:p')) or dfxp.findall('.//p')
if not paras:
raise ValueError('Invalid dfxp/TTML subtitle')