#!/usr/bin/python
""" The mapper script to count global property frequencies from the wikidata dump using Hadoop.
Sample command to run this on Hadoop:
bin/hadoop jar contrib/streaming/hadoop*streaming*jar -libjars /tmp/entity-suggester-client.jar \
-inputformat org.wikimedia.wikibase.entitysuggester.wikiparser.WikiPageInputFormat \
-input /input/dump.xml -output /output \
-file /tmp/topproperties.py -mapper /tmp/topproperties.py \
-file /tmp/topproperties_r.py -reducer '/tmp/topproperties_r.py'
bin/hadoop dfs -cat /output/part-* | sort -t, -k1,1nr > sortedoutput
Please see topproperties_r.py too
"""
from lxml import etree
from itertools import izip
from StringIO import StringIO
import json
import sys
page = ''
def main():
"""Iterates through the input and sends each page to parsePage"""
while True:
page = ''
i = sys.stdin.readline()
if not i: break
if '' in i:
i = sys.stdin.readline()
while '' not in i:
page += i
i = sys.stdin.readline()
page = '' + page + ''
parsePage(page)
def pairwise(iterable):
a = iter(iterable)
return izip(a, a)
def parsePage(page):
"""Parses a page and its JSON text to extract the properties from the statements and sends them to stdout
with the property id as the key and "1" as the value. This is similar to MapReduce wordcount.
"""
tree = etree.parse(StringIO(page))
page = {child.tag:child.text for child in tree.iter()}
try:
if page['ns'] != '0': return
title = page['title'][1:]
text = json.loads(page['text'])
statement = None
if 'claims' not in text: return
for a in text['claims']:
statement = {i:j for i, j in pairwise(a['m'])}
if statement != None:
try:
prop = str(statement['value']).encode("utf-8", 'ignore').strip()
sys.stdout.write(prop + "\t" + "1" + "\n")
except KeyError:
pass
except (KeyError, ValueError, TypeError) as e:
sys.stderr.write("Error occurred for page : " + str(title) + ", ns = " + str(page['ns']) + "\n")
sys.stderr.write(traceback.format_exc() + "\n")
if __name__ == '__main__':
main()