How many times does cat appear in the cat Wikipedia page?

In this post, I’m going to hit the Wikipedia API to get data into a database because so often as data engineers our job is to hit an arbitrary API and extract, clean, and store the data. The point of this post is not to showcase the best Python or software engineering skills ever. The point of this post is to talk through some common themes you may see in data engineering projects and to distract myself from my growing desire to cut my own hair. pleasehelpitkeepsgrowing.

The theoretical business ask is to record the number of instances of the word “Cat” and the most common word in the Wikipedia page on cats every day.

Okay first lesson: the art of good engineering is good Googling. I’m not promoting plagiarism; if you use someone’s code you should always credit them, but don’t reinvent the wheel. Your boss does not care if you wrote the code from scratch, your boss cares if it works.

So our first step is go to the Wikipedia page on their API. There we find the sample code for getting the content of a page, which we can slightly alter for our use case:



    MediaWiki API Demos
    Demo of `Parse` module: Parse content of a page

    MIT License

import requests

S = requests.Session()

URL = ""

    "action": "parse",
    "page": "Cat",
    "format": "json"

R = S.get(url=URL, params=PARAMS)
DATA = R.json()


Now we get a bunch of HTML. It’s so ugly and long that we can’t tell what part we are on.


This isn’t going to work. We want to clean this text up a bit. There actually are libraries for parsing XML/HTML such as beautifulsoup. However, since we don’t need info from a specific tag (what is the header? What is the title?), but rather just to count instances of “cat”, I figured we could just brute force it.

This is what I ended up doing

## from 
## strip and encode to UTF-8

convert_to_string = str(DATA["parse"]["text"]["*"].encode('utf-8').strip()) 

## from
## remove anything within an html tag
convert_to_string = re.sub('<[^<]+?>', '', convert_to_string) convert_to_string = re.sub('&#[^<]+?;[^<]+?;', '', convert_to_string) 
## split on punctionality 
convert_to_string = re.sub('[?., %-()"^;:/]', ' ', convert_to_string) 

convert_to_string = re.sub('\\n', ' ', convert_to_string) 
convert_to_string = re.sub(' +', ' ', convert_to_string) 

## convert to lower
convert_to_string = convert_to_string.lower()

That was the extent of my brute force clean up. This gives us:

Slightly Cleaned Up

Because Wikipedia has citations and links at the bottom of the page, I decided to cut off the string as soon as we see the last “See also”:

end = convert_to_string.rfind("See also")
convert_to_string = convert_to_string[:end]

So once we’ve cleaned up the string of the whole page, you may be tempted to do something like “how many times does the string contain cat?” but that would give us false positives for words like domestiCATed. To get around this, we’ll change the string into an array, splitting on any space (notice before the in code we replaced all punctuation with a space and any number of spaces with one space).

word_list = []
word_list = convert_to_string.split(' ')

Now we have an array of words.


Your instinct may be to loop through and increment a variable named cat_counter every time you see a cat. Good instinct. But what about getting the most common word used? How would we do that?

Whenever I’m tempted to do a for loop and a counter, I ask myself if I can create a dictionary instead. I did this by:

word_dict = {}
for word in word_list:
    word_dict[word] = word_dict.get(word, 0) + 1

This allows us to, every time we see a distinct word in the list, create a new entry for it in the dictionary. If the word has already been seen, we just increment that entry by 1. This works because when accessing a value associated with a key in a dictionary, you can do either:

So the first time we see a word, we get 0 + 1 = 1. Then every time after that, we increment by 1. We then get this dictionary:


I actually sorted this dictionary to help us find the most common word.

sorted_word_dict = sorted(word_dict.items(), key=lambda x: x[1], reverse=True)

We passed a lambda function (an anonymous function) that tells the sorted function to sort by x[1], where x is the dictionary and x[1] is the value (as opposed to the key). reverse=True will make the most popular (biggest) value go first, so that way we can easily do the following:

cat_count = word_dict["cat"]
most_popular_word = sorted_word_dict[0][0]

Which gives us 264 times and the word “the”.

The only thing left to do is to write to our database, either directly via the Python connector or through a library such as SQLAlchemy. I am just going to create the INSERT statement:

EPOCH = str(time.time()).split('.')[0]
INSERT_STATEMENT = """ INSERT INTO cat_wiki_count (datetime_checked, cat_count, most_popular_word) VALUES ('{datetime_checked}', {cat_count}, '{most_popular_word}'); """.format(datetime_checked=EPOCH, cat_count=cat_count, most_popular_word=most_popular_word) 

Things to note here:

Let’s say your boss for wants you to only record the most recent data you find from the page and never the history. This is a bad idea; storage in databases is typically cheap and there is no reason why you can’t record all of history in this table and then get the most recent with SQL. But if it has to be done, just remember: never delete until you successfully insert.

So let’s say you did this:


    INSERT INTO cat_wiki_count (datetime_checked, cat_count, most_popular_word) VALUES ('{datetime_checked}', {cat_count}, '{most_popular_word}'); 

Now let’s say your insert statement failed. What are you left with? An empty table. Your boss wants the most recent, so if an insert failed, the truncate never should have happened. So how do we handle this?

We create a transaction using sqlalchemy and we rollback if our truncate failed:

engine = snowflake.engine
connection = snowflake.connect_engine() 
transaction = connection.begin() 
except Exception as e:
    raise ValueError(e) 

This leverages:

Again, I find it easier to just write from Python into Snowflake or any database with a timestamp and then deduplicate later in the SQL, but if you must delete records, be careful about it.

In this case, I fulfilled the requirement but let’s take a step back and think about scalability, validation, and flexibility.

Since Snowflake is great at storing semi-structured data and storage is cheap, is there any reason not to just store the entire dictionary of word usage as a variant? The benefits are:

import requests
import logging
import re
import time
import sys
import json
PAGES_TO_SEARCH = ["Cat", "Dog"]
URL = ""
for page in PAGES_TO_SEARCH:
   EPOCH = str(time.time()).split(".")[0]
   word_list = []
   word_dict = {}
   sorted_word_dict = {}
   ## from
   S = requests.Session()
   PARAMS = {"action": "parse", "page": page, "format": "json"}
   R = S.get(url=URL, params=PARAMS)
   DATA = R.json()
   ## ensure the title page we're on is Cat
   if DATA["parse"]["title"] != page:
       print("something very bad happened")
   ## from
   convert_to_string = str(DATA["parse"]["text"]["*"].encode("utf-8").strip())
   ## from
   convert_to_string = re.sub("<[^<]+?>", "", convert_to_string)
   convert_to_string = re.sub("&#[^<]+?;[^<]+?;", "", convert_to_string)
   ## split on punctionality
   convert_to_string = re.sub('[?., %-()"^;:/]', " ", convert_to_string)
   convert_to_string = re.sub("\\n", " ", convert_to_string)
   convert_to_string = re.sub(" +", " ", convert_to_string)
   convert_to_string = convert_to_string.lower()
   end = convert_to_string.rfind("See also")
   convert_to_string = convert_to_string[:end]
   convert_to_string = convert_to_string.replace(r"\\u", "")
   word_list = convert_to_string.split(" ")
   for word in word_list:
       if word.isalpha():
           word_dict[word] = word_dict.get(word, 0) + 1
   word_dict = json.dumps(word_dict)
   INSERT INTO wiki_word_count (datetime_checked, page_name, word_count_json)
   (SELECT '{datetime_checked}', '{page}', PARSE_JSON('{word_count}'));
       datetime_checked=EPOCH, page=page.lower(), word_count=word_dict

The table looks like this:

Raw Results

We can get the count of instances of the page name and the most common word with this query:

   word_count_json[page_name] AS page_name_count,
   LAST_VALUE(f.key) OVER (PARTITION BY page_name, datetime_checked ORDER BY f.value asc)
FROM wiki_word_count,
LATERAL FLATTEN(input => word_count_json) f

Most Common Word

And there we have it– “the” is a pretty common word. If you learned anything from this post, I hope it was not that.