# wikiget - CLI tool for downloading files from Wikimedia sites # Copyright (C) 2018, 2019 Cody Logan # SPDX-License-Identifier: GPL-3.0-or-later # # Wikiget is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Wikiget is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Wikiget. If not, see . """Main wikiget functions.""" from __future__ import absolute_import, division, print_function, unicode_literals from builtins import open from future import standard_library standard_library.install_aliases() import argparse import hashlib import logging import os import re import sys from urllib.parse import unquote, urlparse from mwclient import InvalidResponse, Site, __version__ as mwclient_version from requests import ConnectionError from tqdm import tqdm from wikiget.version import __version__ BLOCKSIZE = 65536 DEFAULT_SITE = "en.wikipedia.org" USER_AGENT = "wikiget/{} (https://github.com/clpo13/wikiget) " \ "mwclient/{}".format(__version__, mwclient_version) def main(): """ Main entry point for console script. Automatically compiled by setuptools when installed with `pip install` or `python setup.py install`. """ parser = argparse.ArgumentParser(description=""" A tool for downloading files from MediaWiki sites using the file name or description page URL """, epilog=""" Copyright (C) 2018, 2019 Cody Logan. License GPLv3+: GNU GPL version 3 or later . This is free software; you are free to change and redistribute it under certain conditions. There is NO WARRANTY, to the extent permitted by law. """) parser.add_argument("FILE", help=""" name of the file to download with the File: or Image: prefix, or the URL of its file description page """) parser.add_argument("-V", "--version", action="version", version="%(prog)s {}".format(__version__)) output_options = parser.add_mutually_exclusive_group() output_options.add_argument("-q", "--quiet", help="suppress warning messages", action="store_true") output_options.add_argument("-v", "--verbose", help="print detailed information; use -vv for even more detail", action="count", default=0) parser.add_argument("-f", "--force", help="force overwriting existing files", action="store_true") parser.add_argument("-s", "--site", default=DEFAULT_SITE, help="MediaWiki site to download from (default: %(default)s)") parser.add_argument("-o", "--output", help="write download to OUTPUT") parser.add_argument("-a", "--batch", help="treat FILE as a textfile containing multiple files to download, one URL or filename per line", action="store_true") args = parser.parse_args() # print API and debug messages in verbose mode if args.verbose >= 2: logging.basicConfig(level=logging.DEBUG) elif args.verbose >= 1: logging.basicConfig(level=logging.WARNING) if args.batch: # batch download mode input_file = args.FILE if args.verbose >= 1: print("Info: using batch file '{}'".format(input_file)) try: fd = open(input_file, "r") except IOError as e: print("File could not be read. The following error was encountered:") print(e) sys.exit(1) else: with fd: for _, line in enumerate(fd): line = line.strip() download(line, args) else: # single download mode dl = args.FILE download(dl, args) def download(dl, args): url = urlparse(dl) if url.netloc: filename = url.path site_name = url.netloc if args.site is not DEFAULT_SITE and not args.quiet: # this will work even if the user specifies 'en.wikipedia.org' print("Warning: target is a URL, ignoring site specified with --site") else: filename = dl site_name = args.site file_match = valid_file(filename) site_match = valid_site(site_name) # check for valid site parameter if not site_match: print("Only Wikimedia sites (wikipedia.org and wikimedia.org) are currently supported.") sys.exit(1) # check if this is a valid file if file_match and file_match.group(1): # has File:/Image: prefix and extension filename = file_match.group(2) else: # no file extension and/or prefix, probably an article print("Downloading Wikipedia articles is not currently supported.", end="") if file_match and not file_match.group(1): # file extension detected, but no prefix # TODO: no longer possible to get to this point since file_match is None with no prefix print(" If this is a file, please add the 'File:' prefix.") else: print("\n", end="") sys.exit(1) filename = unquote(filename) # remove URL encoding for special characters dest = args.output or filename if args.verbose >= 2: print("User agent: {}".format(USER_AGENT)) # connect to site and identify ourselves try: site = Site(site_name, clients_useragent=USER_AGENT) except ConnectionError: # usually this means there is no such site, or there's no network connection print("Error: couldn't connect to specified site.") sys.exit(1) except InvalidResponse as e: # site exists, but we couldn't communicate with the API endpoint print(e) sys.exit(1) # get info about the target file file = site.images[filename] if file.imageinfo != {}: # file exists either locally or at Wikimedia Commons file_url = file.imageinfo["url"] file_size = file.imageinfo["size"] file_sha1 = file.imageinfo["sha1"] if args.verbose >= 1: print("Info: downloading '{}' " "({} bytes) from {}".format(filename, file_size, site.host), end="") if args.output: print(" to '{}'".format(dest)) else: print("\n", end="") print("Info: {}".format(file_url)) if os.path.isfile(dest) and not args.force: print("File '{}' already exists, skipping download (use -f to ignore)".format(dest)) else: try: fd = open(dest, "wb") except IOError as e: print("File could not be written. The following error was encountered:") print(e) sys.exit(1) else: # download the file with tqdm(total=file_size, unit="B", unit_scale=True, unit_divisor=1024) as progress_bar: with fd: res = site.connection.get(file_url, stream=True) progress_bar.set_postfix(file=dest, refresh=False) for chunk in res.iter_content(1024): fd.write(chunk) progress_bar.update(len(chunk)) # verify file integrity and optionally print details dl_sha1 = verify_hash(dest) if args.verbose >= 1: print("Info: downloaded file SHA1 is {}".format(dl_sha1)) print("Info: server file SHA1 is {}".format(file_sha1)) if dl_sha1 == file_sha1: if args.verbose >= 1: print("Info: hashes match!") # at this point, we've successfully downloaded the file else: print("Error: hash mismatch! Downloaded file may be corrupt.") sys.exit(1) else: # no file information returned print("Target '{}' does not appear to be a valid file.".format(filename)) sys.exit(1) def valid_file(search_string): """ Determines if the given string contains a valid file name, defined as a string ending with a '.' and at least one character, beginning with 'File:' or 'Image:', the standard file prefixes in MediaWiki. :param search_string: string to validate :returns: a regex Match object if there's a match or None otherwise """ # second group could also restrict to file extensions with three or more # letters with ([^/\r\n\t\f\v]+\.\w{3,}) file_regex = re.compile(r"(File:|Image:)([^/\r\n\t\f\v]+\.\w+)$", re.I) return file_regex.search(search_string) def valid_site(search_string): """ Determines if the given string contains a valid site name, defined as a string ending with 'wikipedia.org' or 'wikimedia.org'. This covers all subdomains of those domains. Eventually, it should be possible to support any MediaWiki site, regardless of domain name. :param search_string: string to validate :returns: a regex Match object if there's a match or None otherwise """ site_regex = re.compile(r"wiki[mp]edia\.org$", re.I) return site_regex.search(search_string) def verify_hash(filename): """ Calculates the SHA1 hash of the given file for comparison with a known value. :param filename: name of the file to calculate a hash for :return: hash digest """ hasher = hashlib.sha1() with open(filename, "rb") as dl: buf = dl.read(BLOCKSIZE) while len(buf) > 0: hasher.update(buf) buf = dl.read(BLOCKSIZE) return hasher.hexdigest()