Back to Repositories

Testing Download Functionality in youtube-dl

This test suite implements comprehensive unit testing for the youtube-dl download functionality, focusing on video extraction and download verification across various scenarios. The tests validate core download capabilities, file integrity, and metadata extraction while handling both single videos and playlists.

Test Coverage Overview

The test suite provides extensive coverage of youtube-dl’s download functionality, encompassing both single video and playlist downloads.

Key areas tested include:
  • Video extraction and download verification
  • Playlist handling and count validation
  • File integrity checks including MD5 verification
  • Metadata extraction and JSON output validation
  • Error handling and retry mechanisms

Implementation Analysis

The testing approach employs Python’s unittest framework with dynamic test generation for different test cases. The implementation utilizes a custom YoutubeDL class that extends the main application class, allowing for controlled testing environment and detailed monitoring of the download process.

Notable patterns include:
  • Dynamic test case generation using a generator function
  • Parallel test execution support
  • Comprehensive assertion checks for file size, content, and metadata

Technical Details

Testing tools and configuration:
  • Python unittest framework
  • Custom test helpers for warnings and parameter management
  • File system operations for cleanup and verification
  • JSON parsing for metadata validation
  • MD5 hash verification for downloaded files
  • Retry mechanism with 3 attempts for network-related failures

Best Practices Demonstrated

The test suite exemplifies several testing best practices for complex download operations.

Notable practices include:
  • Thorough cleanup of test artifacts
  • Proper separation of test setup and execution
  • Comprehensive error handling and reporting
  • Flexible test parameterization
  • Modular test case definition
  • Proper resource management and cleanup

ytdl-org/youtube-dl

test/test_download.py

            
#!/usr/bin/env python

from __future__ import unicode_literals

# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from test.helper import (
    expect_warnings,
    get_params,
    gettestcases,
    expect_info_dict,
    try_rm,
    report_warning,
)


import hashlib
import json
import socket

import youtube_dl.YoutubeDL
from youtube_dl.compat import (
    compat_http_client,
    compat_HTTPError,
    compat_open as open,
    compat_urllib_error,
)
from youtube_dl.utils import (
    DownloadError,
    ExtractorError,
    error_to_compat_str,
    format_bytes,
    IDENTITY,
    preferredencoding,
    UnavailableVideoError,
)
from youtube_dl.extractor import get_info_extractor

RETRIES = 3

# Some unittest APIs require actual str
if not isinstance('TEST', str):
    _encode_str = lambda s: s.encode(preferredencoding())
else:
    _encode_str = IDENTITY


class YoutubeDL(youtube_dl.YoutubeDL):
    def __init__(self, *args, **kwargs):
        self.to_stderr = self.to_screen
        self.processed_info_dicts = []
        super(YoutubeDL, self).__init__(*args, **kwargs)

    def report_warning(self, message):
        # Don't accept warnings during tests
        raise ExtractorError(message)

    def process_info(self, info_dict):
        self.processed_info_dicts.append(info_dict)
        return super(YoutubeDL, self).process_info(info_dict)


def _file_md5(fn):
    with open(fn, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()


defs = gettestcases()


class TestDownload(unittest.TestCase):
    # Parallel testing in nosetests. See
    # http://nose.readthedocs.org/en/latest/doc_tests/test_multiprocess/multiprocess.html
    _multiprocess_shared_ = True

    maxDiff = None

    def __str__(self):
        """Identify each test with the `add_ie` attribute, if available."""

        def strclass(cls):
            """From 2.7's unittest; 2.6 had _strclass so we can't import it."""
            return '%s.%s' % (cls.__module__, cls.__name__)

        add_ie = getattr(self, self._testMethodName).add_ie
        return '%s (%s)%s:' % (self._testMethodName,
                               strclass(self.__class__),
                               ' [%s]' % add_ie if add_ie else '')

    def setUp(self):
        self.defs = defs

# Dynamically generate tests


def generator(test_case, tname):

    def test_template(self):
        ie = youtube_dl.extractor.get_info_extractor(test_case['name'])()
        other_ies = [get_info_extractor(ie_key)() for ie_key in test_case.get('add_ie', [])]
        is_playlist = any(k.startswith('playlist') for k in test_case)
        test_cases = test_case.get(
            'playlist', [] if is_playlist else [test_case])

        def print_skipping(reason):
            print('Skipping %s: %s' % (test_case['name'], reason))
            self.skipTest(_encode_str(reason))

        if not ie.working():
            print_skipping('IE marked as not _WORKING')

        for tc in test_cases:
            info_dict = tc.get('info_dict', {})
            if not (info_dict.get('id') and info_dict.get('ext')):
                raise Exception('Test definition (%s) requires both \'id\' and \'ext\' keys present to define the output file' % (tname, ))

        if 'skip' in test_case:
            print_skipping(test_case['skip'])

        for other_ie in other_ies:
            if not other_ie.working():
                print_skipping('test depends on %sIE, marked as not WORKING' % other_ie.ie_key())

        params = get_params(test_case.get('params', {}))
        params['outtmpl'] = tname + '_' + params['outtmpl']
        if is_playlist and 'playlist' not in test_case:
            params.setdefault('extract_flat', 'in_playlist')
            params.setdefault('playlistend',
                              test_case['playlist_maxcount'] + 1
                              if test_case.get('playlist_maxcount')
                              else test_case.get('playlist_mincount'))
            params.setdefault('skip_download', True)

        ydl = YoutubeDL(params, auto_init=False)
        ydl.add_default_info_extractors()
        finished_hook_called = set()

        def _hook(status):
            if status['status'] == 'finished':
                finished_hook_called.add(status['filename'])
        ydl.add_progress_hook(_hook)
        expect_warnings(ydl, test_case.get('expected_warnings', []))

        def get_tc_filename(tc):
            return ydl.prepare_filename(tc.get('info_dict', {}))

        res_dict = None

        def try_rm_tcs_files(tcs=None):
            if tcs is None:
                tcs = test_cases
            for tc in tcs:
                tc_filename = get_tc_filename(tc)
                try_rm(tc_filename)
                try_rm(tc_filename + '.part')
                try_rm(os.path.splitext(tc_filename)[0] + '.info.json')

        try_rm_tcs_files()
        try:
            try_num = 1
            while True:
                try:
                    # We're not using .download here since that is just a shim
                    # for outside error handling, and returns the exit code
                    # instead of the result dict.
                    res_dict = ydl.extract_info(
                        test_case['url'],
                        force_generic_extractor=params.get('force_generic_extractor', False))
                except (DownloadError, ExtractorError) as err:
                    # Check if the exception is not a network related one
                    if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
                        msg = getattr(err, 'msg', error_to_compat_str(err))
                        err.msg = '%s (%s)' % (msg, tname, )
                        raise err

                    if try_num == RETRIES:
                        report_warning('%s failed due to network errors, skipping...' % tname)
                        return

                    print('Retrying: {0} failed tries

##########

'.format(try_num))

                    try_num += 1
                else:
                    break

            if is_playlist:
                self.assertTrue(res_dict['_type'] in ['playlist', 'multi_video'])
                self.assertTrue('entries' in res_dict)
                expect_info_dict(self, res_dict, test_case.get('info_dict', {}))

            if 'playlist_mincount' in test_case:
                self.assertGreaterEqual(
                    len(res_dict['entries']),
                    test_case['playlist_mincount'],
                    'Expected at least %d in playlist %s, but got only %d' % (
                        test_case['playlist_mincount'], test_case['url'],
                        len(res_dict['entries'])))
            if 'playlist_maxcount' in test_case:
                self.assertLessEqual(
                    len(res_dict['entries']),
                    test_case['playlist_maxcount'],
                    'Expected at most %d in playlist %s, but got %d' % (
                        test_case['playlist_maxcount'], test_case['url'],
                        len(res_dict['entries'])))
            if 'playlist_count' in test_case:
                self.assertEqual(
                    len(res_dict['entries']),
                    test_case['playlist_count'],
                    'Expected %d entries in playlist %s, but got %d.' % (
                        test_case['playlist_count'],
                        test_case['url'],
                        len(res_dict['entries']),
                    ))
            if 'playlist_duration_sum' in test_case:
                got_duration = sum(e['duration'] for e in res_dict['entries'])
                self.assertEqual(
                    test_case['playlist_duration_sum'], got_duration)

            # Generalize both playlists and single videos to unified format for
            # simplicity
            if 'entries' not in res_dict:
                res_dict['entries'] = [res_dict]

            for tc_num, tc in enumerate(test_cases):
                tc_res_dict = res_dict['entries'][tc_num]
                # First, check test cases' data against extracted data alone
                expect_info_dict(self, tc_res_dict, tc.get('info_dict', {}))
                # Now, check downloaded file consistency
                # support test-case with volatile ID, signalled by regexp value
                if tc.get('info_dict', {}).get('id', '').startswith('re:'):
                    test_id = tc['info_dict']['id']
                    tc['info_dict']['id'] = tc_res_dict['id']
                else:
                    test_id = None
                tc_filename = get_tc_filename(tc)
                if test_id:
                    tc['info_dict']['id'] = test_id
                if not test_case.get('params', {}).get('skip_download', False):
                    self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename)
                    self.assertTrue(tc_filename in finished_hook_called)
                    expected_minsize = tc.get('file_minsize', 10000)
                    if expected_minsize is not None:
                        if params.get('test'):
                            expected_minsize = max(expected_minsize, 10000)
                        got_fsize = os.path.getsize(tc_filename)
                        self.assertGreaterEqual(
                            got_fsize, expected_minsize,
                            'Expected %s to be at least %s, but it\'s only %s ' %
                            (tc_filename, format_bytes(expected_minsize),
                                format_bytes(got_fsize)))
                    if 'md5' in tc:
                        md5_for_file = _file_md5(tc_filename)
                        self.assertEqual(tc['md5'], md5_for_file)
                # Finally, check test cases' data again but this time against
                # extracted data from info JSON file written during processing
                info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json'
                self.assertTrue(
                    os.path.exists(info_json_fn),
                    'Missing info file %s' % info_json_fn)
                with open(info_json_fn, encoding='utf-8') as infof:
                    info_dict = json.load(infof)
                expect_info_dict(self, info_dict, tc.get('info_dict', {}))
        finally:
            try_rm_tcs_files()
            if is_playlist and res_dict is not None and res_dict.get('entries'):
                # Remove all other files that may have been extracted if the
                # extractor returns full results even with extract_flat
                res_tcs = [{'info_dict': e} for e in res_dict['entries']]
                try_rm_tcs_files(res_tcs)

    return test_template


# And add them to TestDownload
for n, test_case in enumerate(defs):
    tname = 'test_' + str(test_case['name'])
    i = 1
    while hasattr(TestDownload, tname):
        tname = 'test_%s_%d' % (test_case['name'], i)
        i += 1
    test_method = generator(test_case, tname)
    test_method.__name__ = str(tname)
    ie_list = test_case.get('add_ie')
    test_method.add_ie = ie_list and ','.join(ie_list)
    setattr(TestDownload, test_method.__name__, test_method)
    del test_method


if __name__ == '__main__':
    unittest.main()