password: Password for authentication purposes.
usenetrc: Use netrc for authentication instead.
quiet: Do not print messages to stdout.
+ simulate: Do not download the video files.
format: Video format code.
outtmpl: Template for output names.
"""
else:
exponent = long(math.log(float(bytes), 1024.0))
suffix = 'bkMGTPEZY'[exponent]
- if exponent == 0:
- return '%s%s' % (bytes, suffix)
converted = float(bytes) / float(1024**exponent)
return '%.2f%s' % (converted, suffix)
def calc_speed(start, now, bytes):
dif = now - start
if bytes == 0 or dif < 0.001: # One millisecond
- return '%9s' % 'N/A b/s'
+ return '%10s' % '---b/s'
return '%10s' % ('%s/s' % FileDownloader.format_bytes(float(bytes) / dif))
@staticmethod
self._ies.append(ie)
ie.set_downloader(self)
+ def to_stdout(self, message, skip_eol=False):
+ """Print message to stdout if not in quiet mode."""
+ if not self._params.get('quiet', False):
+ sys.stdout.write('%s%s' % (message, ['\n', ''][skip_eol]))
+ sys.stdout.flush()
+
+ def to_stderr(self, message):
+ """Print message to stderr."""
+ sys.stderr.write('%s\n' % message)
+
def download(self, url_list):
"""Download a given list of URLs."""
for url in url_list:
if (len(url_list) > 1 or len(results) > 1) and re.search(r'%\(.+?\)s', self._params['outtmpl']) is None:
sys.exit('ERROR: fixed output name but more than one file to download')
+ if self._params.get('simulate', False):
+ continue
+
for result in results:
try:
filename = self._params['outtmpl'] % result
except (KeyError), err:
- sys.stderr.write('ERROR: invalid output template: %s\n' % str(err))
+ self.to_stderr('ERROR: invalid output template: %s' % str(err))
continue
try:
self.pmkdir(filename)
except (OSError, IOError), err:
- sys.stderr.write('ERROR: unable to create directories: %s\n' % str(err))
+ self.to_stderr('ERROR: unable to create directories: %s' % str(err))
continue
try:
outstream = open(filename, 'wb')
except (OSError, IOError), err:
- sys.stderr.write('ERROR: unable to open for writing: %s\n' % str(err))
+ self.to_stderr('ERROR: unable to open for writing: %s' % str(err))
continue
try:
self._do_download(outstream, result['url'])
outstream.close()
except (OSError, IOError), err:
- sys.stderr.write('ERROR: unable to write video data: %s\n' % str(err))
+ self.to_stderr('ERROR: unable to write video data: %s' % str(err))
continue
except (urllib2.URLError, httplib.HTTPException, socket.error), err:
- sys.stderr.write('ERROR: unable to download video data: %s\n' % str(err))
+ self.to_stderr('ERROR: unable to download video data: %s' % str(err))
continue
break
if not suitable_found:
- sys.stderr.write('ERROR: no suitable InfoExtractor: %s\n' % url)
+ self.to_stderr('ERROR: no suitable InfoExtractor: %s' % url)
def _do_download(self, stream, url):
request = urllib2.Request(url, None, std_headers)
percent_str = self.calc_percent(byte_counter, data_len)
eta_str = self.calc_eta(start, time.time(), data_len, byte_counter)
speed_str = self.calc_speed(start, time.time(), byte_counter)
-
- if not self._params.get('quiet', False):
- sys.stdout.write('\r[download] %s of %s at %s ETA %s' %
- (percent_str, data_len_str, speed_str, eta_str))
- sys.stdout.flush()
+ self.to_stdout('\r[download] %s of %s at %s ETA %s' %
+ (percent_str, data_len_str, speed_str, eta_str), skip_eol=True)
before = time.time()
data_block = data.read(block_size)
stream.write(data_block)
block_size = self.best_block_size(after - before, data_block_len)
- if not self._params.get('quiet', False):
- print
-
+ self.to_stdout('')
if data_len is not None and str(byte_counter) != data_len:
raise ValueError('Content too short: %s/%s bytes' % (byte_counter, data_len))
return
# Log in
- login_form = { 'current_form': 'loginForm',
+ login_form = {
+ 'current_form': 'loginForm',
'next': '/',
'action_login': 'Log In',
'username': username,
- 'password': password, }
+ 'password': password,
+ }
request = urllib2.Request(self._LOGIN_URL, urllib.urlencode(login_form), std_headers)
try:
self.to_stdout('[youtube] Logging in')
return
# Confirm age
- age_form = { 'next_url': '/',
- 'action_confirm': 'Confirm', }
+ age_form = {
+ 'next_url': '/',
+ 'action_confirm': 'Confirm',
+ }
request = urllib2.Request(self._AGE_URL, urllib.urlencode(age_form), std_headers)
try:
self.to_stdout('[youtube] Confirming age')
video_real_url = 'http://www.youtube.com/get_video?video_id=%s&t=%s' % (video_id, mobj.group(1))
if format_param is not None:
video_real_url = '%s&fmt=%s' % (video_real_url, format_param)
+ self.to_stdout('[youtube] %s: URL: %s' % (video_id, video_real_url))
# uploader
mobj = re.search(r'More From: ([^<]*)<', video_webpage)
simple_title = simple_title.strip(u'_')
# Return information
- return [{ 'id': video_id,
- 'url': video_real_url,
- 'uploader': video_uploader,
- 'title': video_title,
- 'stitle': simple_title,
- 'ext': video_extension,
- }]
+ return [{
+ 'id': video_id,
+ 'url': video_real_url,
+ 'uploader': video_uploader,
+ 'title': video_title,
+ 'stitle': simple_title,
+ 'ext': video_extension,
+ }]
if __name__ == '__main__':
try:
youtube_ie = YoutubeIE()
# File downloader
- fd = FileDownloader({ 'usenetrc': False,
- 'username': None,
- 'password': None,
- 'quiet': False,
- 'format': None,
- 'outtmpl': '%(id)s.%(ext)s'
- })
+ fd = FileDownloader({
+ 'usenetrc': False,
+ 'username': None,
+ 'password': None,
+ 'quiet': False,
+ 'simulate': True,
+ 'format': None,
+ 'outtmpl': '%(id)s.%(ext)s'
+ })
fd.add_info_extractor(youtube_ie)
- fd.download([ 'http://www.youtube.com/watch?v=t7qdwI7TVe8',
- 'http://www.youtube.com/watch?v=IJyn3pRcy_Q',
- 'http://www.youtube.com/watch?v=DZRXe1wtC-M', ])
+ fd.download([
+ 'http://www.youtube.com/watch?v=t7qdwI7TVe8',
+ 'http://www.youtube.com/watch?v=IJyn3pRcy_Q',
+ 'http://www.youtube.com/watch?v=DZRXe1wtC-M',
+ ])
except KeyboardInterrupt:
sys.exit('\nERROR: Interrupted by user')