ThreadPools explained – In the deep end

Thread and Multiprocessing Pools are an underused feature of Python. In my opinion, they are the easiest way to dip your feet into concurrency, and yet still the method I use most often.

Threads in a Pool

Threads in a Pool, Artwork by Clara Griffith

They allow you to easily offload CPU or I/O bound tasks to a pre-instantiated group (pool) of threads or processes. One of the great things about them, is that both the ThreadPool and Pool (Multiprocessing) classes have the same methods, so all the following examples are interchangeable between them. (This article will not get into the differences between Threading and Multiprocessing in Python, as that is worth a post on its own.)

Map

Let’s jump in with a super simple example. We will use a pool of 5 workers to square numbers. We will use the map method which takes a function as it’s first argument (make sure it’s not called, no parentheses!)  then a list (or iterable) of arguments, which will be passed singularly to that function per Thread or Process. That way, multiple instances of that function are working at the same time!

from multiprocessing.pool import ThreadPool, Pool

def square_it(x):
    return x*x

# On Windows, make sure that multiprocessing doesn't start
# until after "if __name__ == '__main__'" 

with Pool(processes=5) as pool:
   results = pool.map(square_it, [5, 4, 3, 2 ,1])

print(results) 
# [25, 16, 9, 4, 1]

Think of map as running a for loop over the list and sending each item in it to a worker process as soon as it is free (little more complicated internally, but we’ll come back to that later). Each process has been told to run the function square_it against that item.

So in this case, all the processes will be running the same function against a set of data, and waiting until all the data has been processed to return. The list of returned data will be in order based on the iterable that was put in. This is super handy if you want to do something like make a lot of requests to different websites and wait for the results, or need to run a lot of calculations. I actually did just that in the Birthday Paradox blog post using the Multiprocessing pool to speed up probability calculations.

Thankfully, Pools are versatile, they have several other handy methods, and you can let most of them run in the background, instead of waiting on it to finish immediately, by going asynchronous.

Async

So lets use the map style functionality again, but this time we want to not bother waiting around for the results, which means we need to use map_async. Lets say you want to capture a lot of images off of a website. You populate the list of links to the images ,then you just need to add those to the pool and download them.

from multiprocessing.pool import ThreadPool
import time
import reusables

# When downloading from a website, be kind with how often 
# and how many requests you are making
tp = ThreadPool(processes=2)

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


# Same as the previous map, taking a function and iterable,
# just with an additional callback function 
tp.map_async(reusables.download, urls, callback=print) 
# Also, this is why having print as a function in python 3 is so dang handy


# Do something else 
time.sleep(10) 

# Results are printed when done 
# ['/home/me/birthday.png', '/home/me/Capture.png']

# Not using a context manager means we have to clean house ourselves. 
tp.terminate()
tp.close() 

This is really advantageous in scenarios where you need an instant reply, such as an API call or working with a GUI. Then later a callback can either update the GUI or database. There is also an error_callback argument it can take if the function raises an exception. The error_callback function will receive the Exception caught when the worker erred, that way you can decide if you want to ignore it, or raise it in the main Thread.

You can also ignore using the callbacks, and deal with the AsyncResult directly. It has the methods:

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.
from multiprocessing.pool import ThreadPool
import time

timeout = 25

with ThreadPool(processes=4) as tp:
    async_result = tp.map_async(time.sleep, [5, 4])

    for i in range(timeout):
        time.sleep(1)

        if async_result.ready():
            if async_result.successful():
                print(async_result.get())
                break
    else:
        print("Task did not complete on time, or with errors")

Three of the method’s available have corresponding async methods:

  • map – map_async
  • starmap – starmap_async
  • apply – apply_async

Passing additional arguments with “partial” or “starmap”

Now, notice that map will only provide a single argument to a function. So if you have a function that takes more than one argument, you will either need to use partial to redefine the function with default parameters, or use starmap which takes an iterable of tuples.

So lets use partial from functools first.

from multiprocessing.pool import ThreadPool
from functools import partial
import time
import reusables

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)

# Replace the required `wait_time` with a default of 5
down_the_file = partial(download_file, wait_time=5)


with ThreadPool(2) as tp:
    # Notice we are now using the new function, down_the_file we created with partial
    print(tp.map(down_the_file, urls))

Not too difficult, just you’re stuck with using the same setting for everything. If you want to customize it, you can send multiple arguments using starmap.

Starmap

from multiprocessing.pool import ThreadPool
import reusables
import time

# Notice the list now is a list of tuples, that have a second argument, 
# that will be passed in as the second parameter. In this case, as wait_time
urls = [("https://codecalamity.com/wp-content/uploads/2017/10/birthday.png", 4),
        ("https://codecalamity.com/wp-content/uploads/2017/06/Capture.png", 10)]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)


with ThreadPool(2) as tp:
    # Using `starmap` instead of just `map`
    print(tp.starmap(download_file, urls))

Apply

Both map and starmap take a single function to run a lot of things against. But there are many times where you just want background workers to take on a variety of different tasks. That’s where apply comes in.

from multiprocessing.pool import Pool
import reusables

pool = Pool(processes=5)

# apply takes `args`, aka arguments, in a tuple format 
# and `kwds`, aka keyword arguments, as a dictionary

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
print(pool.apply(abs, (-5.67, )))
# 5.67
print(pool.apply(reusables.download, 
                 args=("http://example.com", ), 
                 kwds=dict(save_to_file=False)))
# b'<!doctype html>\n<html>\n<head>\n   ...


pool.terminate()
pool.close()

Additional Content

You can of course also mix and match any methods while the pool is still not terminated.

from multiprocessing.pool import Pool
import time

pool = Pool(processes=5)

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
pool.apply_async(abs, (-5.67, ), callback=print)
# 5.67
pool.map_async(any, [(True, False), (False, False)], callback=print)
# [True, False]

time.sleep(1)
pool.terminate()
pool.close()

imap

Now remember how I said map basically iterates over the list and send each one to a worker? Well that’s not entirely true. imap does that, map can be a lot faster because it breaks the list into chunks first and sends each to a worker’s queue to make sure it always has something in the pipeline.

Ok, cool, so map is faster, what’s the point of imap then? With speed comes the price of a larger memory footprint. When map takes in an interable, it converts it to a list to a list so it can be chucked out, whereas imap will only pull items out of the iterable as needed (it defaults to 1 for chunksize, aka how many it will pull out, but it can be increased). It also has the advantage of giving you the results as soon as possible (as an iterable, hence the name imap), while still preserving order. There is also imap_unordered which will simply give you the results as fast as they come, in the order they finish.

from multiprocessing.pool import ThreadPool
import time

def wait(x):
    time.sleep(x)
    return x

iterable = [0, 4, 5, 2]

with ThreadPool(processes=4) as tp:

    print("map")
    map_start = time.time()
    for map_result in tp.map(wait, iterable):
        print(f"{map_result} took {time.time() - map_start:.0f} seconds")

    print("\nimap")
    imap_start = time.time()
    for imap_result in tp.imap(wait, iterable):
        print(f"{imap_result} took {time.time() - imap_start:.0f} seconds")

    print("\nimap_unordered")
    imap_unordered_start = time.time()
    for imap_un_result in tp.imap_unordered(wait, iterable):
        print(f"{imap_un_result} took" 
              f"{time.time() - imap_unordered_start:.0f} seconds")

Using map it will wait all 5 seconds (the largest wait time in the argument list) to return all the results at once.

map
0 took 5 seconds
4 took 5 seconds
5 took 5 seconds
2 took 5 seconds

imap will immediately return the 0 result, then four seconds later will return the 4, one second later it will return 5 and 2 at the same time.

imap
0 took 0 seconds
4 took 4 seconds
5 took 5 seconds
2 took 5 seconds

imap_unordered will return them as soon as each one finishes. (Notice this won’t always be the shortest one first, as the argument list may be longer than the number of worker processes).

imap_unordered
0 took 0 seconds
2 took 2 seconds
4 took 4 seconds
5 took 5 seconds

The Methods

Here are the methods, their parameters and docstring, and an overview of what they are.

map

map(func, iterable, chunksize=None):
    ''' Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned. '''

map takes an interable and turns it into a list, then breaks it up to send to worker processes. Each worker process will run the function given as the first argument with a single argument (given to it from the iterable). The results will be collected into a list and returned, in order, when all results finish. Returns a list.

starmap

starmap(func, iterable, chunksize=None):
    ''' Like `map()` method but the elements of the `iterable` are expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b). '''

starmap allows multiple arguments to be given to the function by passing in an iterable of iterables (i.e. list of tuples, list of lists, generator of generators, etc..).  The inner iterables do NOT have to be the same length either, so multiple defaults could be overridden for one item of the list, but not for all of them. Returns a list.

apply

apply(func, args=(), kwds={}):
    ''' Equivalent of `func(*args, **kwds)`. '''

apply runs a single function with one of the pool’s workers. Returns the result of the function.

imap

imap(func, iterable, chunksize=1):
    ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. '''

imap takes the same arguments as map, however it’s result is iterable and will start returning results, in order, as soon as they have finished. Returns an interable.

imap_unordered

imap_unordered(self, func, iterable, chunksize=1):
    ''' Like `imap()` method but ordering of results is arbitrary. '''

imap_unordered is the same as imap except it will return each result as soon as it finishes, not in order. Returns an interable.

The Asynchronous Methods

The asynchronous versions of map, starmap, and apply all take the same parameters as their original functions, as well as a callback and error_callback parameters.

  • callback – function that takes a single argument, that will be the result(s) of the function(s) run.
  • error_callback – function that takes a single argument, which will be the Exception raised (if one occurs).

The async methods immediately return an AsyncResult (also called ApplyResult or sub-classed to MapResult)  that can be directly used to view the results and check on its status. (View the Async section above for an example).

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.

map_async

map_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `map()` method. '''

starmap_async

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `starmap()` method. '''

apply_async

apply_async(func, args=(), kwds={}, callback=None, error_callback=None):
    ''' Asynchronous version of `apply()` method. '''

The Birthday Paradox – The Proof is in the Python

This month is my wife’s, mother’s and my own birthday, all within a span of nine days. What are the odds of that? No idea, I’m no statistician. However, as a developer, I thought it would be fun to prove (or disprove?) the Birthday Paradox with some Python coding.

 

If you haven’t heard of the Birthday Paradox, it states that as soon as you have 23 random people in a room, there is a 50 percent chance two of them have the same birthday. Once the number of people in the room is at least 70, there is a 99.9 percent chance. It sound counter intuitive as it takes a full 366 (a full year + 1) people to have a 100% chance.

So instead of doing this from the statistical side, let’s do it from the experimentation angle. We are going to generate 23 random days in a year and see if there is a duplicate day.

First, we create a function that will return a list of N number of datetime objects within a given year. We will then use that list to see if there are duplicates within it. In the first case, with 23 people, exactly half the time we run this function there should be at least one duplicate date.

from random import randint
from datetime import datetime, timedelta

def random_birthdays(number_of_people):
    first_day_of_year = datetime(2017, 1, 1)
    return [first_day_of_year + timedelta(days=randint(0, 365))
            for _ in range(number_of_people)]

We need to run this function a lot of times to get an average of how often there is a duplicate.  We’ll create a function that runs it a thousand times and returns percentage average of duplicate found. (As a percentage is between 0 and 1, such as .50, we are multiplying it by 100 so that .5 looks like “50% chance” later.)

def determine_probability(number_of_people, run_amount=1_000):
    dups_found = 0
    for _ in range(run_amount):
        birthdays = random_birthdays(number_of_people)
        duplicates = set(x for x in birthdays if birthdays.count(x) > 1)
        if len(duplicates) >= 1:
            dups_found += 1

    return dups_found/run_amount * 100

Feel free to run it even more times than a 1000 for higher accuracy, max I tried was 100,000 which did have the most consistent results.

Finally, we add that last bit of code that actually calls the function and prints our findings.

if __name__ == '__main__':
    msg = ("{people} Random people have a {chance:.1f}%"
           " chance of having a birthday on the same day")
    for people in (23, 70):
        print(msg.format(people=people, chance=determine_probability(people)))

And wouldn’t ya know it, those fancy statisticians seem to be right.

23 Random people have a 50.4% chance of having a birthday on the same day
70 Random people have a 99.9% chance of having a birthday on the same day

At the top of the post, you saw a plot generated by calculating the first 100 people’s worth of probabilities, with red vertical markers at 23 and 70.  It is rather smooth due to increasing the run_amount to 10,000. Below is a plot of an entire 366 people, but only a 1000 runs per number of people.

 

To accomplish this, I used matplotlib and mixed in some multiprocessing to speed up the probability generation. Here is the final entire file.

from random import randint
from datetime import datetime, timedelta
from multiprocessing import Pool, cpu_count

import matplotlib.pyplot as plt


def random_birthdays(number_of_people):
    first_day_of_year = datetime(2017, 1, 1)
    return [first_day_of_year + timedelta(days=randint(0, 365))
            for _ in range(number_of_people)]


def determine_probability(number_of_people, run_amount=1000):
    dups_found = 0
    print(f"Generating day {number_of_people}")
    for _ in range(run_amount):
        birthdays = random_birthdays(number_of_people)
        duplicates = set(x for x in birthdays if birthdays.count(x) > 1)
        if len(duplicates) >= 1:
            dups_found += 1

    return number_of_people, dups_found/run_amount * 100


def plot_yearly_probabilities(max_people, vertical_markers=(23, 70)):
    with Pool(processes=cpu_count()) as p:
        percent_chances = p.map(determine_probability, range(max_people))

    plt.plot([z[1] for z in sorted(percent_chances, key=lambda x: x[0])])

    plt.xlabel("Number of people")
    plt.ylabel('Chance of sharing a birthday (Percentage)')

    for marker in vertical_markers:
        if max_people >= marker:
            plt.axvline(x=marker, color='red')

    plt.savefig("birthday_paradox.png")


if __name__ == '__main__':
    plot_yearly_probabilities(100)

 

Notice that our generate image lines near perfectly alongside the statically generated image on Wikipedia*.

Hope you enjoyed, and maybe even learned something alone the way!

* By Rajkiran g (Own work) [CC BY-SA 3.0 (https://creativecommons.org/licenses/by-sa/3.0) or GFDL (http://www.gnu.org/copyleft/fdl.html)], via Wikimedia Commons

 

Reusables – Part 2: Wrappers

Spice up your code with wrappers! In Python, a wrapper, also known as a decorator, is simply encapsulating a function within other functions.

@wrapper
def my_func(a, b=2):
    print(b)

@meta_decorator(foo="bar")
def my_other_func(**kwargs):
    print(kwargs)

In Reusables, all the wrappers take arguments, aka meta decorators, so you will at minimum have to end them with parens ()s. Let’s take a look at the one I use the most first.

time_it

import reusables

@reusables.time_it()
def test_func():
    import time, random
    time.sleep(random.randint(2, 5))

test_func()
# Function 'test_func' took a total of 5.000145769345911 seconds

time_it documentation
The message by default is printed. However it can also be sent to a log with a customized message. If log=True it will log to the Reusables logger, however you can also specify either a logger by name (string) or pass in a logger object directly.

reusables.add_stream_handler('reusables')

@reusables.time_it(log=True, message="{seconds:.2f} seconds")
def test_time(length):
    time.sleep(length)
    return "slept {0}".format(length)

result = test_time(5)
# 2016-11-09 16:59:39,935 - reusables.wrappers  INFO      5.01 seconds

print(result)
# slept 5

It’s also possible to capture time taken in a list.

@reusables.time_it(log='no_log', append=my_times)
def test_func():
    import time, random
    length = random.random()
    time.sleep(length)
    
for _ in range(10):
    test_func()
    
my_times
# [0.4791555858872698, 0.8963652232890809, 0.05607090172793505, 
# 0.03099917658380491,0.6567622821214627, 0.4333975642063024, 
# 0.21456404424395714, 0.5723555061358638, 0.0734819056269771, 
# 0.13208268856499217]

unique

The oldest wrapper in the Reusables library forces the output of a function to be unique, or else it will raise an exception or if specified, return an alternative output.

import reusables
import random


@reusables.unique(max_retries=100)
def poor_uuid():
    return random.randint(0, 10)


print([poor_uuid() for _ in range(10)])
# [8, 9, 6, 3, 0, 7, 2, 5, 4, 10]

print([poor_uuid() for _ in range(1000)])
# Exception: No result was unique

unique documentation

lock_it

A very simple wrapper to use while threading. Makes sure a function is only being run once at a time.

import reusables
import time


def func_one(_):
    time.sleep(5)


@reusables.lock_it()
def func_two(_):
    time.sleep(5)


@reusables.time_it(message="test_1 took {0:.2f} seconds")
def test_1():
    reusables.run_in_pool(func_one, (1, 2, 3), threaded=True)


@reusables.time_it(message="test_2 took {0:.2f} seconds")
def test_2():
    reusables.run_in_pool(func_two, (1, 2, 3), threaded=True)


test_1()
test_2()

# test_1 took 5.04 seconds
# test_2 took 15.07 seconds

log_exception

It’s good practice to catch and explicitly log exceptions, but sometimes it’s just easier to let it fail naturally at any point and log it for later refinement or debugging.

@reusables.log_exception()
def test():
    raise Exception("Bad")

# 2016-12-26 12:38:01,381 - reusables   ERROR  Exception in test - Bad
# Traceback (most recent call last):
#     File "<input>", line 1, in <module>
#     File "reusables\wrappers.py", line 200, in wrapper
#     raise err
# Exception: Bad

queue_it

Add the result of the function to the specified queue instead of returning it normally.

import reusables
import queue

my_queue = queue.Queue()


@reusables.queue_it(my_queue)
def func(a):
    return a


func(10)

print(my_queue.get())
# 10

New in 0.9

catch_it

Catch specified exceptions and return an alternative output or send it to an exception handler.

def handle_error(exception, func, *args, **kwargs):
    print(f"{func.__name__} raised {exception} when called with {args}")

@reusables.catch_it(handler=handle_error)
def will_raise(message="Hello"):
    raise Exception(message)


will_raise("Oh no!")
# will_raise raised Oh no! when called with ('Oh no!',)

retry_it

Once is never enough, keep trying until it works!

@reusables.retry_it(exceptions=(Exception, ), tries=10, wait=1)
def may_fail(dont_do=[]):
    dont_do.append("x")
    if len(dont_do) < 6:
        raise OSError("So silly")
    print("Much success!")

may_fail()
# Much success!

 

This post is a follow-up of Reusables – Part 1: Overview and File Management.

Exploit the work of others for profit! (Vega 64 Edition)

As I sit here, anxious for the new AMD Vega 64 to be released, I decide to keep myself busy writing some Python code….designed to text me as soon as a new “rx vega 64” search term showed up on Amazon (I have the patience of a child on Christmas Eve, so sue me.)

When writing code, I try to be as lazy efficient as possible. That means I look for other’s to do the hard part for me. Other people might phrase it more kindly like “don’t reinvent the wheel,” but let’s be real, you are receiving benefit for no cost. So next time a project saves your bacon, consider sending a little cash or cryptocoin to the dev(s). Or throw your hat into the open source community and provide dev work yourself, it’s a great way to learn a lot more about the coding community and gain a lot of experience along the way while still giving back.

Back to the Vega 64 stock tracking tool. It would totally be possible to do that all with the Python internals; using urllib and re to download and find stuff on the page, then using email to send a message to my phone’s SMS; but that would take forever, and is honestly stupid to do. There are much better tools for that at this point, like requests and BeautifulSoup , then using some gmail or other common email provider library.

But as Amazon is a rather big website with an API available, there are Python libraries for that API. There are also different ways to easily send a text message to yourself via online services, like twilo. In the end, I created the script using Python 3.6 on Windows (should be cross-platform compatible), and the libraries I used for this were:

python-amazon-simple-product-api
twilio
reusables
python-box

If you are interested in using this as well (comes as-is, no promises it works or won’t bite you) before using the script you will need to get AWS access keys and sign up for twilio, then fill in the appropriate variables at the top of the script.

from datetime import datetime
from time import sleep

from amazon.api import AmazonAPI
from twilio.rest import Client
from box import Box, BoxList
from reusables import setup_logger

amazon_access_key = "XXXXXXXXXXXXXXXXXXXX"
amazon_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
amazon_associate_tag = "codecalamity-20" 

twilio_from = "+"  # Twilio phone number
twilio_to = "+"  # Phone number to text
twilio_key = "ACXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
twilio_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"

search_product_keywords = "rx vega 64"
search_product_name_includes = "vega"
search_product_brand = "amd"
search_index = 'Electronics'
search_region = 'US'

log = setup_logger("vega")


def update(search, saved_data):
    new_prods = BoxList()
    for x in (product for product in search if
              search_product_name_includes in product.title.lower()
              and (search_product_brand in product.brand.lower() or
                   search_product_brand in product.manufacturer.lower())):
        if x.title not in saved_data:
            data = Box({"price": float(x.price_and_currency[0]),
                        "url": x.detail_page_url,
                        "updated": datetime.now().isoformat()})
            log.info(f"New item: {x.title} - {data}")
            new_prods.append((x.title, data))
            saved_data[x.title] = data

    saved_data.to_json(filename="products.json")
    return new_prods


def format_message(new_prods):
    product_list = [f"{prod[0]}-{prod[1].price}-{prod[1].url}"
                    for prod in new_prods]
    return f"New Prods: {', '.join(product_list)}"


def send_message(client, message):
    log.info(f"About to send message: '{message}'")
    client.messages.create(to=twilio_to,
                           from_=twilio_from,
                           body=message)


def main():
    amazon = AmazonAPI(amazon_access_key, amazon_secret, amazon_associate_tag)
    # Only search the first two pages to not spam the server
    products = amazon.search_n(2, Keywords=search_product_keywords,
                               SearchIndex=search_index, region=search_region)
    twilio_client = Client(twilio_key, twilio_secret)

    try:
        prods = Box.from_json(filename="products.json")
    except FileNotFoundError:
        prods = Box()

    while True:
        new_prods = update(products, prods)
        if new_prods:
            message = format_message(new_prods)
            send_message(twilio_client, message)
        sleep(60)


if __name__ == '__main__':
    main()

So now that possibly huge pure python standard library multifaceted application has turned into fifty lines of code (not counting imports / globals) designed to do nothing but feed my anxiety as efficiently as possible. Luckily it’s self testing to make sure it works, as it will find the result for the Vega Frontier Edition first, so if you chose to use it, make sure you get that text.

2017-08-07 22:09:23,938 - vega             INFO      About to send message: 
'New Prods: Radeon Vega Frontier Edition Liquid Retail-1579.48-
https://www.amazon.com/Radeon-Vega-Frontier-Liquid-Retail/dp/B072XLR2K7?SubscriptionId=AKIAIF3WXFESZ53UZKDQ&tag=codecalamity-20&linkCode=xm2&camp=2025&creative=165953&creativeASIN=B072XLR2K7'

Be warned that this may spam your phone a lot when the product does drop, and that the code doesn’t have a lot of safety checks as-is so may fail and stop reporting. It also doesn’t check if it is available to buy yet, just that the product page for the Vega 64 exists.

If you don’t want to have your own script constantly running, try out sites like nowinstock.net, as they have great options to text or email you when products are available.

Is it easier to ask for forgiveness in Python?

Yes, we are looking for graphic artists

Yes, we are looking for graphic artists

Is Python more suited for EAFP1 or LBYL2  coding styles? It has been debated across message boards, email chains, stackoverflow, twitter and workplaces. It’s not as heated as some other battles; like how spaces are better than tabs, or that nano is indisputably the best terminal editor 3; but people seem to have their own preference on the subject. What many will gloss over is the fact that Python as a language doesn’t have a preference. Rather, different design patterns lend themselves better to one or the other.

I disagree with the position that EAFP is better than LBYL, or “generally recommended” by Python – Guido van Rossum 4

If you haven’t already, I suggest taking a look at Brett Cannon’s blog post about how EAFP is a valid method with Python that programmers from other languages may not be familiar with. The benefits are EAFP are simple: Explicit code, fail fast, succeed faster, and DRY (don’t repeat yourself). In general I find myself using it more than LBYL, but that doesn’t mean it’s always the right way.

Use Case Dependent

First, a little code comparison to see the differences between them. Lets try to work with a list of list of strings. Our goal is that if the inner list is at least three items long, copy the third item into another list.

messages = [["hi there", "how are you?", "I'm doing fine"]]
out = []
 
# LBYL
if messages and messages[0] and len(messages[0]) >= 3:
    out.append(messages[0][2])

# EAFP
try:
    out.append(messages[0][2])
except IndexError as err:
    print(err)  # In the real world, this should be a logging `.exception()`

Both pieces of code are short and easy to follow what they are accomplishing. However, when there are at least three items in the inner list, LBYL will take almost twice the amount of time! When there aren’t any errors happening, the checks are just additional weight slowing the process down.

But if we prune the messages down to [["hi there", "how are you?"]]   then EAFP will be much slower, because it will always try a bad lookup, fail and then have to catch that failure. Whereas LBYL will only perform the check, see it can’t do it, and move on. Check out my speed comparison gist yourself.

Some people might think putting both the append and lookup in the same try block is wrong. However, it is both faster; as we only have to do the lookup once; and the proper way; as we are only catching the possible IndexError and not anything that would arise from the append.

The real takeaway from this is to know which side of the coin you area dealing with beforehand. Is your incoming data almost always going to be correct? Then I’d lean towards EAFP, whereas if it’s a crap shoot, LBYL may make more sense.

Sometimes one is always faster

There are some instances when one is almost always faster than the other (though speed isn’t everything). For example, file operations are faster with EAFP because the less  IO wait, the better.

Lets try making a directory when we are not sure if it was created yet or not, while pretending it’s still ye olde times and os.makedirs(..., exist_ok=True) doesn’t exist yet.

import os 

# LBYL 
if not os.path.exists("folder"):
    os.mkdir("folder")

# EAFP 
try:
    os.mkdir("folder")
except FileExistsError:
    pass

In this case, it’s always preferential to use EAFP, as it’s faster (even when executed on a m.2 SSD) , there are no side effects, and the error is highly specific so there is no need for special handling.

Be careful though, many times when dealing with files you don’t want to leave something in a bad state, in those cases, you would use LBYL.

What bad things can happen when you don’t ask permission?

Side effects

In many cases, if something fails to execute, the state of the program or associated files might have changed. If it’s not easy to revert to a known good state in the exception clause, EAFP should be avoided.

# DO NOT DO

with open("file.txt", "w") as f:
    try: 
        f.write("Hi there!\n") 
        f.write(message[3])   
    except IndexError: 
        pass  # Don't do, need to be able to get file back to original state

Catching too much

If you are wrapping something with except Exception or worse, the dreaded blank except:, then you shouldn’t be using EAFP (if you’re using black except: still, you need to read up more on that and stop it!)

# DO NOT DO

try:
    do_something()
except:  # If others see this you will be reported to the Python Secret Police
    pass 

Also, sometimes errors seem specific, like an OSError, but could raise multiple different child errors that must be parsed through.

# DO 
import errno

try:
    os.scandir("secret_files")
except FileNotFoundError: # A child of OSError
    # could be put as 'elif err.errno == errno.ENOENT' below
    log.error("Yo dawg, that directory doesn't exist, "
              "we'll try the backup folder")
except OSError as err:
    if err.errno in (errno.EACCES, errno.EPERM):
        log.error("We don't have permission to access that capt'n!"
                  "We'll try the backup folder")
    else:
        log.exception(f"Unexpected OSError: {err}") 
        raise err # If you don't expect an error, don't keep going. 
                  # This isn't Javascript

When to use what?

EAFP (Easier to Ask for Forgiveness than Permission)

  • IO operations (Hard drive and Networking)
  • Actions that will almost always be successful
  • Database operations (when dealing with transactions and can rollback)
  • Fast prototyping in a throw away environment

LBYL (Look Before You Leap):

  • Irrevocable actions, or anything that may have a side effect
  • Operation that may fail more times than succeed
  • When an exception that needs special attention could be easily caught beforehand

Sometime you might even find yourself using both for complex or mission critical applications. Like programming the space shuttle, or getting your code above that 500 lines the teacher wants.