# wikiget - CLI tool for downloading files from Wikimedia sites
# Copyright (C) 2018, 2019 Cody Logan
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Wikiget is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wikiget is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Wikiget. If not, see .
"""Main wikiget functions."""
from __future__ import absolute_import, division, print_function, unicode_literals
from builtins import open
from future import standard_library
standard_library.install_aliases()
import argparse
import hashlib
import logging
import os
import re
import sys
from urllib.parse import unquote, urlparse
from mwclient import InvalidResponse, Site, __version__ as mwclient_version
from requests import ConnectionError
from tqdm import tqdm
from wikiget.version import __version__
BLOCKSIZE = 65536
DEFAULT_SITE = "commons.wikimedia.org"
USER_AGENT = "wikiget/{} (https://github.com/clpo13/wikiget) " \
"mwclient/{}".format(__version__, mwclient_version)
def main():
"""
Main entry point for console script. Automatically compiled by setuptools
when installed with `pip install` or `python setup.py install`.
"""
parser = argparse.ArgumentParser(description="""
A tool for downloading files from MediaWiki sites
using the file name or description page URL
""",
epilog="""
Copyright (C) 2018, 2019 Cody Logan. License GPLv3+: GNU GPL
version 3 or later .
This is free software; you are free to change and redistribute
it under certain conditions. There is NO WARRANTY, to the
extent permitted by law.
""")
parser.add_argument("FILE", help="""
name of the file to download with the File: or Image: prefix,
or the URL of its file description page
""")
parser.add_argument("-V", "--version", action="version",
version="%(prog)s {}".format(__version__))
output_options = parser.add_mutually_exclusive_group()
output_options.add_argument("-q", "--quiet", help="suppress warning messages",
action="store_true")
output_options.add_argument("-v", "--verbose",
help="print detailed information; use -vv for even more detail",
action="count", default=0)
parser.add_argument("-f", "--force", help="force overwriting existing files",
action="store_true")
parser.add_argument("-s", "--site", default=DEFAULT_SITE,
help="MediaWiki site to download from (default: %(default)s)")
parser.add_argument("-o", "--output", help="write download to OUTPUT")
parser.add_argument("-a", "--batch", help="treat FILE as a textfile containing multiple files to download, one URL or filename per line",
action="store_true")
args = parser.parse_args()
# print API and debug messages in verbose mode
if args.verbose >= 2:
logging.basicConfig(level=logging.DEBUG)
elif args.verbose >= 1:
logging.basicConfig(level=logging.WARNING)
if args.batch:
# batch download mode
input_file = args.FILE
if args.verbose >= 1:
print("Info: using batch file '{}'".format(input_file))
try:
fd = open(input_file, "r")
except IOError as e:
print("File could not be read. The following error was encountered:")
print(e)
sys.exit(1)
else:
with fd:
for _, line in enumerate(fd):
line = line.strip()
download(line, args)
else:
# single download mode
dl = args.FILE
download(dl, args)
def download(dl, args):
url = urlparse(dl)
if url.netloc:
filename = url.path
site_name = url.netloc
if args.site is not DEFAULT_SITE and not args.quiet:
# this will work even if the user specifies 'commons.wikimedia.org'
print("Warning: target is a URL, ignoring site specified with --site")
else:
filename = dl
site_name = args.site
file_match = valid_file(filename)
site_match = valid_site(site_name)
# check for valid site parameter
if not site_match:
print("Only Wikimedia sites (wikipedia.org and wikimedia.org) are currently supported.")
sys.exit(1)
# check if this is a valid file
if file_match and file_match.group(1):
# has File:/Image: prefix and extension
filename = file_match.group(2)
else:
# no file extension and/or prefix, probably an article
print("Downloading Wikipedia articles is not currently supported.", end="")
if file_match and not file_match.group(1):
# file extension detected, but no prefix
# TODO: no longer possible to get to this point since file_match is None with no prefix
print(" If this is a file, please add the 'File:' prefix.")
else:
print("\n", end="")
sys.exit(1)
filename = unquote(filename) # remove URL encoding for special characters
dest = args.output or filename
if args.verbose >= 2:
print("User agent: {}".format(USER_AGENT))
# connect to site and identify ourselves
try:
site = Site(site_name, clients_useragent=USER_AGENT)
except ConnectionError:
# usually this means there is no such site, or there's no network connection
print("Error: couldn't connect to specified site.")
sys.exit(1)
except InvalidResponse as e:
# site exists, but we couldn't communicate with the API endpoint
print(e)
sys.exit(1)
# get info about the target file
file = site.images[filename]
if file.imageinfo != {}:
# file exists either locally or at Wikimedia Commons
file_url = file.imageinfo["url"]
file_size = file.imageinfo["size"]
file_sha1 = file.imageinfo["sha1"]
if args.verbose >= 1:
print("Info: downloading '{}' "
"({} bytes) from {}".format(filename, file_size, site.host), end="")
if args.output:
print(" to '{}'".format(dest))
else:
print("\n", end="")
print("Info: {}".format(file_url))
if os.path.isfile(dest) and not args.force:
print("File '{}' already exists, skipping download (use -f to ignore)".format(dest))
else:
try:
fd = open(dest, "wb")
except IOError as e:
print("File could not be written. The following error was encountered:")
print(e)
sys.exit(1)
else:
# download the file
with tqdm(total=file_size, unit="B",
unit_scale=True, unit_divisor=1024) as progress_bar:
with fd:
res = site.connection.get(file_url, stream=True)
progress_bar.set_postfix(file=dest, refresh=False)
for chunk in res.iter_content(1024):
fd.write(chunk)
progress_bar.update(len(chunk))
# verify file integrity and optionally print details
dl_sha1 = verify_hash(dest)
if args.verbose >= 1:
print("Info: downloaded file SHA1 is {}".format(dl_sha1))
print("Info: server file SHA1 is {}".format(file_sha1))
if dl_sha1 == file_sha1:
if args.verbose >= 1:
print("Info: hashes match!")
# at this point, we've successfully downloaded the file
else:
print("Error: hash mismatch! Downloaded file may be corrupt.")
sys.exit(1)
else:
# no file information returned
print("Target '{}' does not appear to be a valid file.".format(filename))
sys.exit(1)
def valid_file(search_string):
"""
Determines if the given string contains a valid file name, defined as a string
ending with a '.' and at least one character, beginning with 'File:' or
'Image:', the standard file prefixes in MediaWiki.
:param search_string: string to validate
:returns: a regex Match object if there's a match or None otherwise
"""
# second group could also restrict to file extensions with three or more
# letters with ([^/\r\n\t\f\v]+\.\w{3,})
file_regex = re.compile(r"(File:|Image:)([^/\r\n\t\f\v]+\.\w+)$", re.I)
return file_regex.search(search_string)
def valid_site(search_string):
"""
Determines if the given string contains a valid site name, defined as a string
ending with 'wikipedia.org' or 'wikimedia.org'. This covers all subdomains of
those domains. Eventually, it should be possible to support any MediaWiki site,
regardless of domain name.
:param search_string: string to validate
:returns: a regex Match object if there's a match or None otherwise
"""
site_regex = re.compile(r"wiki[mp]edia\.org$", re.I)
return site_regex.search(search_string)
def verify_hash(filename):
"""
Calculates the SHA1 hash of the given file for comparison with a known value.
:param filename: name of the file to calculate a hash for
:return: hash digest
"""
hasher = hashlib.sha1()
with open(filename, "rb") as dl:
buf = dl.read(BLOCKSIZE)
while len(buf) > 0:
hasher.update(buf)
buf = dl.read(BLOCKSIZE)
return hasher.hexdigest()