#!/usr/bin/python """This is the mapper module for running the wikiparser Hadoop scripts. See main() for usage info. The org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat class in wikiparser-0.1.jar is an InputFormat class that is used by Hadoop to split the input XML data dump into chunks of .... In simple terms, if there are N mappers and M chunks, Hadoop divides them into the N mappers. This module contains an abstract class with a public run() method. It reads the chunks of ... from stdin and calls _getClaimsAndTitle on a single chunk, which in turn extracts the claims from a page (if it's a wikibase Item) and calls _generateKeyValues on the list of claims. _generateKeyValues is implemented in the different subclasses to yield different kinds of CSV rows for the different datasets. """ from lxml import etree from itertools import izip from StringIO import StringIO import json import sys import traceback def main(): """The following are 6 types of sample commands. Note that the -input and -output parameters in the Hadoop command are paths on HDFS and not on the local filesystem. Replace /path/to with the actual paths of the files. wikiparser-0.1.jar, mapper.py and reducer.py have to available in a location where Hadoop has enough permissions to access. 1. For generating the item-property pairs using Hadoop: (Dataset used to suggest global claim properties) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/global-ip-pairs \ -file /path/to/mapper.py -mapper '/path/to/mapper.py global-ip-pairs' \ -reducer '/bin/cat' 2. For generating the claim guid-property pairs for source refs using Hadoop: (Dataset used to suggest source ref properties) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/ref-cp-pairs \ -file /path/to/mapper.py -mapper '/path/to/mapper.py ref-cp-pairs' \ -reducer '/bin/cat' 3. For counting frequencies of properties used in source refs: (Dataset used to suggest source ref properties for empty source refs) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/ref-props \ -file /path/to/mapper.py -mapper '/path/to/mapper.py ref-props' \ -file /path/to/reducer.py -reducer '/path/to/reducer.py ref-props' 4. For counting global property (used in all claims) frequencies: (Dataset used to suggest global claim properties for empty items) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/global-props \ -file /path/to/mapper.py -mapper '/path/to/mapper.py global-props' \ -file /path/to/reducer.py -reducer '/path/to/reducer.py global-props' 5. For counting frequency of properties used in qualifiers, to find the top N popular qualifier properties for each property: (Dataset used to suggest qualifiers, given a property) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/qual-props \ -file /path/to/mapper.py -mapper '/path/to/mapper.py qual-props' \ -file /path/to/reducer.py -reducer '/path/to/reducer.py qual-props' 6. For counting frequency of values used with properties, to find the top N popular values for each property: (Dataset used to suggest values, given a property) bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /path/to/wikiparser-0.1.jar \ -inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \ -input /input/dump.xml -output /output/prop-values \ -file /path/to/mapper.py -mapper '/path/to/mapper.py prop-values' \ -file /path/to/reducer.py -reducer '/path/to/reducer.py prop-values' """ mapperClass = { 'global-ip-pairs': GlobalIPPairGeneratorMapper, 'ref-cp-pairs': RefClaimPropPairGeneratorMapper, 'ref-props': CountRefPropsMapper, 'global-props': CountGlobalPropsMapper, 'qual-props': CountQualiferPropsMapper, 'prop-values': CountPropertyValuesMapper } try: mapper = mapperClass[sys.argv[1]]() except (KeyError, IndexError): print "Unrecognized/missing mapper arguments." print "Please enter any of global-ip-pairs, ref-cp-pairs, ref-props, global-props, qual-props, prop-values." return mapper.run() class AbstractWikiParserMapper: """Abstract class that should be inherited by all mapper classes""" def run(self): """Main driver method that iterates through the pages and sends them to the generateKeyValues method.""" while True: page = '' i = sys.stdin.readline() if not i: break if '' in i: i = sys.stdin.readline() while '' not in i: page += i i = sys.stdin.readline() page = '' + page + '' (claims, title) = self._getClaimsAndTitle(page) if claims != None: self._generateKeyValues(claims, title) sys.stdout.flush() def _getClaimsAndTitle(self, page): """Parses a page and its JSON text to return the list of claims. """ tree = etree.parse(StringIO(page)) page = {child.tag:child.text for child in tree.iter()} if page['ns'] == '0': title = page['title'][1:] text = json.loads(page['text']) if 'claims' in text: return text['claims'], title else: return None, None def _generateKeyValues(self, claims, title): """Iterates through a list of claims, performs some operations and writes (key, value) pairs to stdout to be fed into the reducer script.""" raise NotImplementedError("Please implement this method in a subclass.") def _pairwise(self, iterable): a = iter(iterable) return izip(a, a) class GlobalIPPairGeneratorMapper(AbstractWikiParserMapper): """The mapper class for generating the item-property pairs and item-(property-value) pairs using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims, extracts the wikibaseProperties and sends the item ID, property ID and a relative score of 1 to stdout, separated by commas.""" ipPair = ipPairOld = None for claim in claims: statement = {i:j for i, j in self._pairwise(claim['m'])} if statement == None: continue try: ipPair = "Q%s,P%s,1" % (str(title), str(statement['value'])) # ,1 is for relative score. It's same for all, ie. 1. # This code may be added in the future to generate CSV rows with values. Please make it in a different class: ## if 'wikibase-entityid' not in statement: continue ## value = str(statement['wikibase-entityid']['numeric-id']).encode("utf-8", 'ignore').strip() ## ipvPair = "Q%s,P%s----Q%s,1" % (str(title), str(statement['value']), value) # for itemID,propertyID---valueID rows ## pvPair = "P%s,Q%s,1" % (str(statement['value']), value) # for propertyID,valueID rows # After that, write ipvPair or pvPair to stdout, whichever is needed. if ipPairOld != ipPair: #This check is to prevent printing duplicate itemID,propertyID pairs. ipPairOld = ipPair sys.stdout.write(ipPair.encode("utf-8", 'ignore').strip() + "\n") except KeyError: pass class RefClaimPropPairGeneratorMapper(AbstractWikiParserMapper): """The mapper class for generating the claim guid-property pairs for source refs using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims, extracts the claim GUIDs and their source ref wikibaseProperties and sends them to stdout with claim GUID as key and prefixed property ID as value.""" ipPair = ipPairOld = None for claim in claims: statement = {i:j for i, j in self._pairwise(claim['m'])} if statement == None: continue try: guid = claim['g'] for ref in claim['refs']: reference = {i:j for i, j in self._pairwise(ref[0])} sys.stdout.write("%s,P%s,1\n" % (guid, str(reference['value']))) except KeyError: pass class CountRefPropsMapper(AbstractWikiParserMapper): """The mapper class to count frequencies of properties used in source refs from the wikidata dump using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims to extract the properties from the source references and sends them to stdout with the property id as the key and "1" as the value. This is similar to MapReduce wordcount.""" for claim in claims: if 'refs' not in claim: continue for ref in claim['refs']: source = {i:j for i, j in self._pairwise(ref[0])} if source == None: continue try: prop = str(source['value']) sys.stdout.write("P%s\t1\n" % prop) except KeyError: pass class CountGlobalPropsMapper(AbstractWikiParserMapper): """The mapper class to count global property frequencies from the wikidata dump using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims to extract the properties from the statements and sends them to stdout with the property id as the key and "1" as the value. This is similar to MapReduce wordcount.""" for claim in claims: statement = {i:j for i, j in self._pairwise(claim['m'])} if statement == None: continue try: prop = str(statement['value']) sys.stdout.write("P%s\t1\n" % prop) except KeyError: pass class CountQualiferPropsMapper(AbstractWikiParserMapper): """The mapper class to count global qualifier property frequencies from the wikidata dump using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims to extract the properties from the statements and the properties of its qualifiers if any exist, and sends them to stdout with the statement's property id as the key and the qualifier property id as the value. This is similar to MapReduce wordcount.""" for claim in claims: statement = {i:j for i, j in self._pairwise(claim['m'])} if statement == None: continue try: prop = str(statement['value']) for q in claim['q']: qualifier = {i:j for i, j in self._pairwise(q)} sys.stdout.write("P%s\tP%s\n" % (prop, str(qualifier['value']))) except KeyError: pass class CountPropertyValuesMapper(AbstractWikiParserMapper): """The mapper class to count global qualifier property frequencies from the wikidata dump using Hadoop.""" def _generateKeyValues(self, claims, title): """Iterates through an item's claims to extract the wikibaseProperties and wikibaseValues from the statements and sends them to stdout with the wikibaseProperty id as the key and the wikibaseValue as the value. This is similar to MapReduce wordcount.""" for claim in claims: statement = {i:j for i, j in self._pairwise(claim['m'])} if statement == None: continue try: prop = str(statement['value']) if 'wikibase-entityid' not in statement: continue value = str(statement['wikibase-entityid']['numeric-id']).encode("utf-8", 'ignore').strip() sys.stdout.write("P%s\tQ%s\n" % (prop, value)) except KeyError: pass if __name__ == '__main__': main()