From 25fd16822a857d48788f01631cc1bb594908d804 Mon Sep 17 00:00:00 2001 From: Jan-Niclas Walther Date: Sat, 4 May 2019 14:42:48 +0200 Subject: [PATCH] First commit --- README.md | 4 +- setup.py | 48 +++++ urlloader/__init__.py | 0 urlloader/test_data/bad file.dat | 1 + urlloader/test_urlloader.py | 316 +++++++++++++++++++++++++++++++ urlloader/urlloader.py | 85 +++++++++ 6 files changed, 452 insertions(+), 2 deletions(-) create mode 100644 setup.py create mode 100644 urlloader/__init__.py create mode 100644 urlloader/test_data/bad file.dat create mode 100644 urlloader/test_urlloader.py create mode 100644 urlloader/urlloader.py diff --git a/README.md b/README.md index 0917c3b..f58b554 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -# urlloader +#urlloader: URL downloader script -URL \ No newline at end of file +Script written in Python 2 for downloading files from a list of URLs given in a plaintext input file. \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..76a2a36 --- /dev/null +++ b/setup.py @@ -0,0 +1,48 @@ +from __future__ import print_function +from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand +import io +import codecs +import os +import sys + +here = os.path.abspath(os.path.dirname(__file__)) + +def read(filename): + with io.open(filename) as f: + return f.read() + +long_description = read('README.md') + +class PyTest(TestCommand): + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + import pytest + errcode = pytest.main(self.test_args) + sys.exit(errcode) + +setup( + name = 'urlloader', + version = '0.1', + url = 'https://github.com/kngbuzzo/testing123', + author = 'Jan-Niclas Walther', + tests_require=['pytest'], + install_requires=[], + cmdclass={'test': PyTest}, + author_email='jan-niclasw@web.de', + description='Script for downloading URLs', + long_description=long_description, + packages=['urlloader'], + include_package_data=True, + platforms='any', + classifiers = [ + 'Programming Language :: Python 2', + ], + extras_require={ + 'testing': ['pytest'], + } +) \ No newline at end of file diff --git a/urlloader/__init__.py b/urlloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/urlloader/test_data/bad file.dat b/urlloader/test_data/bad file.dat new file mode 100644 index 0000000..3f4cb2b --- /dev/null +++ b/urlloader/test_data/bad file.dat @@ -0,0 +1 @@ +I'm a bad file. \ No newline at end of file diff --git a/urlloader/test_urlloader.py b/urlloader/test_urlloader.py new file mode 100644 index 0000000..6017a18 --- /dev/null +++ b/urlloader/test_urlloader.py @@ -0,0 +1,316 @@ +# -*- coding: utf-8 -*- +''' +Test suite for urlloader module +''' +from __future__ import print_function + +import os +import pytest +import urllib2 +import subprocess +import sys + +import urlloader + +WORKING_PATH = {'url': (r'https://upload.wikimedia.org/wikipedia/commons/' + r'thumb/7/75/Yuan_Zi_-_panda.jpg/170px-Yuan_Zi_-_panda.jpg'), + 'file': '170px-Yuan_Zi_-_panda.jpg'} +BROKEN_PATH = {'url': r'http://www.bogus.adr/Not_there.zip', + 'file': 'Not_there.zip'} +LOCAL_PATH = {'url': r'file:///' + os.path.join(os.path.dirname(__file__), r'./test_data/bad file.dat'), + 'file': 'bad file.dat'} +MUTLITPLE_PATHS = { + 'url': [ + (r'https://upload.wikimedia.org/wikipedia/commons/thumb/7/75/' + r'Yuan_Zi_-_panda.jpg/170px-Yuan_Zi_-_panda.jpg'), + (r'https://upload.wikimedia.org/wikipedia/commons/thumb/6/66/' + r'Polar_Bear_-_Alaska_%28cropped%29.jpg/220px-Polar_Bear_-_Alaska_%28cropped%29.jpg'), + (r'https://upload.wikimedia.org/wikipedia/commons/thumb/7/7f/' + r'European_Brown_Bear.jpg/250px-European_Brown_Bear.jpg'), + ], + 'file': [ + '170px-Yuan_Zi_-_panda.jpg', + '220px-Polar_Bear_-_Alaska_(cropped).jpg', + '250px-European_Brown_Bear.jpg', + ] + } + + +##============================================ +## Test case base classes (no actual tests!) +##============================================ + +class __TestBase__(object): + ''' + Abstract baseclass for all test cases providing test methods. + + Set attributes in method initialize. + ''' + @classmethod + def setup_class(cls): + ''' + Create temporary txt input file + ''' + cls.expect_error = None + cls.initialize() + cls.input_name = cls.url + + def _check_file(self, file_path, dirname=None): + ''' + Check if file exists and remove it afterwards + ''' + if dirname: + file_path = os.path.join(dirname, file_path) + if os.path.basename(file_path): + return_value = os.path.exists(file_path) != bool(self.expect_error) + if os.path.exists(file_path): + os.remove(file_path) + return return_value + return True + + @staticmethod + def _run_function(*args, **kwargs): + ''' + Defines the function to be tested + ''' + return urlloader.save_url_content(*args, **kwargs) + + def test_file_exists(self): + ''' + Test for input file download using default keyword parameters + ''' + self._run_function(self.input_name) + assert self._check_file(self.file_out) + + def test_nondefault_dir(self): + ''' + Test for input file download using nondefault output directory + ''' + dirname = '.temp_dir' + self._run_function(self.input_name, dirname) + assert self._check_file(self.file_out, dirname) + os.rmdir(dirname) + + @pytest.mark.xfail(raises=(urllib2.URLError, ValueError)) + def test_error_raised(self): + ''' + Test for input file download with errors being raised for broken URLs + ''' + try: + self._run_function(self.input_name, skip_broken=False) + except (urllib2.URLError, ValueError) as e: + assert isinstance(e, self.expect_error) + raise e + assert self._check_file(self.file_out) + +class __TestFromFile__(object): + ''' + Modifier class to generate and test example input test files + ''' + @classmethod + def setup_class(cls): + ''' + Create temporary txt input file + ''' + cls.input_name = r'.urlloader_urls_tmp.txt' + cls.expect_error = None + cls.initialize() + with open(cls.input_name, 'w') as f: + f.write(cls.url) + + @staticmethod + def _run_function(*args, **kwargs): + ''' + Defines the function to be tested + ''' + return urlloader.save_from_url_input_file(*args, **kwargs) + + @classmethod + def teardown_class(cls): + ''' + Removes temporary txt input file + ''' + os.remove(cls.input_name) + +class __TestBatchCall__(object): + ''' + Modifier class to overload _run_function method for calling urlloader from shell + ''' + def _run_function(self, *args, **kwargs): + script_path = os.path.join(os.path.dirname(__file__), 'urlloader.py') + + cmd = [sys.executable, script_path, args[0]] + + output_dir = args[1] if len(args)>=2 else kwargs.get('output_dir', False) + skip_broken = args[2] if len(args)>=3 else kwargs.get('skip_broken', False) + + if output_dir: + cmd.extend(['-o', output_dir]) + if not skip_broken: + cmd.append('-e') + + proc = subprocess.call(cmd, stdout=sys.stdout) + + assert proc == (1 if self.expect_error else 0) + +class __TestMultipleURLs__(object): + ''' + Modifier class to overload _check_file to check a list of files + ''' + def _check_file(self, file_path_lst, dirname=None): + files_exist = [super(__TestMultipleURLs__, self)._check_file(file_path, dirname) + for file_path in file_path_lst] + return all(files_exist) != bool(self.expect_error) + + +##================================ +## URL processing test cases +##================================ + +class TestWorking(__TestBase__): + ''' + Test class for properly working URL + ''' + @classmethod + def initialize(cls): + cls.url = WORKING_PATH['url'] + cls.file_out = WORKING_PATH['file'] + +class TestBroken(__TestBase__): + ''' + Test class for nonexisting URL + ''' + @classmethod + def initialize(cls): + cls.url = BROKEN_PATH['url'] + cls.file_out = BROKEN_PATH['file'] + cls.expect_error = urllib2.URLError + +class TestSchemeMissing(__TestBase__): + ''' + Test class for URL missing scheme + ''' + @classmethod + def initialize(cls): + cls.url = WORKING_PATH['url'][8:] + cls.file_out = WORKING_PATH['file'] + cls.expect_error = ValueError + +class TestFileMissing(__TestBase__): + ''' + Test class for nonexisting URL + ''' + @classmethod + def initialize(cls): + cls.url = WORKING_PATH['url'].rsplit('/',1)[0] + cls.file_out = WORKING_PATH['file'] + cls.expect_error = urllib2.URLError + +class TestQuery(__TestBase__): + ''' + Test class for URL with an additional query + ''' + @classmethod + def initialize(cls): + cls.url = WORKING_PATH['url'] + '?foo=bar' + cls.file_out = WORKING_PATH['file'] + +class TestFragment(__TestBase__): + ''' + Test class for URL with an additional fFragment + ''' + @classmethod + def initialize(cls): + cls.url = WORKING_PATH['url'] + '#foo=bar' + cls.file_out = WORKING_PATH['file'] + +class TestEmpty(__TestBase__): + ''' + Test class for empty input file + ''' + @classmethod + def initialize(cls): + cls.url = '' + cls.file_out = '' + +class TestNonsense(__TestBase__): + ''' + Test class for nonsense input + ''' + @classmethod + def initialize(cls): + cls.url = 'kjldfhaiuhvi\nauvapiarjbv ipue83dbslvkbbsw' + cls.file_out = '' + cls.expect_error = ValueError + +class TestSpaces(__TestBase__): + ''' + Test class for input containing spaces + ''' + @classmethod + def initialize(cls): + cls.url = LOCAL_PATH['url'] + cls.file_out = LOCAL_PATH['file'] + + +##================================ +## Input file test cases +##================================ + +class TestWorkingFile(__TestFromFile__, TestWorking): + ''' + Test class for processing a file containing a working URL + ''' + pass + +class TestBrokenFile(__TestFromFile__, TestWorking): + ''' + Test class for processing a file containing a broken URL + ''' + pass + +class TestMultipleFiles(__TestMultipleURLs__, __TestFromFile__, __TestBase__): + ''' + Test class for list of properly working URLs + ''' + @classmethod + def initialize(cls): + cls.url = '\n'.join(MUTLITPLE_PATHS['url']) + cls.file_out = MUTLITPLE_PATHS['file'] + +class TestMultipleFilesBroken(__TestMultipleURLs__, __TestFromFile__, __TestBase__): + ''' + Test class for list of properly working URLs + ''' + @classmethod + def initialize(cls): + cls.url = '\n'.join(MUTLITPLE_PATHS['url']+[BROKEN_PATH['url']]) + cls.file_out = MUTLITPLE_PATHS['file']+[BROKEN_PATH['file']] + cls.expect_error = urllib2.URLError + + +##================================ +## Batch control test cases +##================================ + +class TestBatchCallWorking(__TestBatchCall__, TestWorkingFile): + ''' + Test class for properly working URL called from shell + ''' + pass + +class TestBatchCallBroken(__TestBatchCall__, TestBrokenFile): + ''' + Test class for broken URL called from shell + ''' + pass + +class TestBatchCallMultipleFiles(__TestBatchCall__, TestMultipleFiles): + ''' + Test class for properly working URL called from shell + ''' + pass + + +if __name__ == '__main__': + pytest.main() diff --git a/urlloader/urlloader.py b/urlloader/urlloader.py new file mode 100644 index 0000000..07dd7ea --- /dev/null +++ b/urlloader/urlloader.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +''' +URL downloader script + +Downloads files from a list of URLs given in a plaintext input file +''' +from __future__ import print_function + +import argparse +import contextlib +import os +import shutil +import urllib2 +import urlparse +import warnings + +def save_from_url_input_file(url_input_file, output_dir='.', skip_broken=True): + ''' + Reads URLs from text file and stores contents in output directory + + Parameters: + ----------- + url_input_file : str + Path to plain text file containing URLs (one per line) + output_dir : str + Output directory path + skip_broken : bool + Continue with warning if URL can not be opened. If False, an exception is raised. + ''' + with open(url_input_file, 'r') as urls: + for url in urls: + save_url_content(url.strip(), output_dir, skip_broken) + +def save_url_content(url, output_dir='.', skip_broken=True): + ''' + Saves contents from URL in output directory + + Parameters: + ----------- + url : str + URL input string + output_dir : str + Output directory path + skip_broken : bool + Continue with warning if URL can not be opened. If False, an exception is raised. + ''' + if not os.path.exists(output_dir): + os.makedirs(output_dir) + url = urllib2.quote(urllib2.unquote(url),':/#?=\\') + if not url: + return # skip empty lines + file_name = os.path.basename(urllib2.unquote(urlparse.urlparse(url).path)) + try: + with contextlib.closing(urllib2.urlopen(url)) as content: + with open(os.path.join(output_dir, file_name),'wb') as f: + shutil.copyfileobj(content, f) + except (urllib2.URLError, ValueError) as e: + if skip_broken: + warnings.warn('Could not open URL "%s".\n%s: %s'%(url, type(e).__name__, e)) + return + raise e + + +def parse_shell_cmd(): + ''' + Parses command line arguments + + usage: urlloader.py [-h] [-o OUTPUT] [-e] [input] + ''' + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('input', action='store', nargs=r'?', default='addrs.txt', + help='Specify input file path') + parser.add_argument('-o', '--output', action='store', nargs=1, default=[r'.'], + help='Specify output file path') + parser.add_argument('-e', '--errors', action='store_false', + help='Raise errors instead of warnings') + + args = parser.parse_args() + return args + + +if __name__ == '__main__': + args = parse_shell_cmd() + save_from_url_input_file(args.input, args.output[0], args.errors)