Chris Griffith

Uploading large files by chunking – featuring Python Flask and Dropzone.js

It can be a real pain to upload huge files. Many services limit their upload sizes to a few megabytes, and you don’t want a single connection open forever either. The super simple way to get around that is simply send the file in lots of small parts, aka chunking.

Chunking Food - Artwork by Clara Griffith

Chunking Food – Artwork by Clara Griffith

Finished code example can be downloaded here.

So there are going to be two parts to making this work, the front-end (website) and backend (server). Lets start on what the user will see.

Webpage with Dropzone.js

Beautiful, ain’t it? The best part is, the code powering it is just as succinct.

<!doctype html>
<html lang="en">
<head>

    <meta charset="UTF-8">

    <link rel="stylesheet" 
     href="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/dropzone.min.css"/>

    <link rel="stylesheet" 
     href="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/basic.min.css"/>

    <script type="application/javascript" 
     src="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/dropzone.min.js">
    </script>

    <title>File Dropper</title>
</head>
<body>

<form method="POST" action='/upload' class="dropzone dz-clickable" 
      id="dropper" enctype="multipart/form-data">
</form>


</body>
</html>

This is using the dropzone.js library, which has no additional dependencies and decent CSS included. All you have to do is add the class “dropzone” to a form and it automatically turns it into one of their special drag and drop fields (you can also click and select).

However, by default, dropzone does not chunk files. Luckily, it is really easy to enable. We are going to add some custom JavaScript and insert it between the form and the end of the body

</form>

<script type="application/javascript">
    Dropzone.options.dropper = {
        paramName: 'file',
        chunking: true,
        forceChunking: true,
        url: '/upload',
        maxFilesize: 1025, // megabytes
        chunkSize: 1000000 // bytes
    }
</script>

</body>

When enabling chunking, it will break up any files larger than the chunkSize and send them to the server over multiple requests. It accomplishes this by adding form data that has information about the chunk (uuid, current chunk, total chunks, chunk size, total size). By default, anything under that size will not have that information send as part of the form data and the server would have to have an additional logic path. Thankfully, there is the forceChunking option which will always send that information, even if it’s a smaller file. Everything else is pretty self-explanatory, but if you want more details about the possible options, just check out their list of configuration options.

Python Flask Server

Onto the backend. I am going to be using Flask, which is currently the most popular Python web framework (by github stars), other good options include Bottle and CherryPy. If you hate yourself or your colleagues, you could also use Django or Pyramid. There are a ton of good example Flask projects, and boiler plates to start from, I am going to use one that I have created for my own use that fits my needs, but don’t feel obligated to use it.

This type of upload will work across any real website back-end. You will simply need two routes, one that displays the frontend, and the other that accepts the file as an upload. At first, lets just view what dropzone is sending us. In this example my project’s name is called ‘pydrop’, and if you’re using my FlaskBootstrap code, this is the views/templated.py file.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import logging
import os

from flask import render_template, Blueprint, request, make_response
from werkzeug.utils import secure_filename

from pydrop.config import config

blueprint = Blueprint('templated', __name__, template_folder='templates')

log = logging.getLogger('pydrop')


@blueprint.route('/')
@blueprint.route('/index')
def index():
    # Route to serve the upload form
    return render_template('index.html',
                           page_name='Main',
                           project_name="pydrop")


@blueprint.route('/upload', methods=['POST'])
def upload():
    # Route to deal with the uploaded chunks
    log.info(request.form)
    log.info(request.files)
    return make_response(('ok', 200))

Run the flask server and upload a small file (under the size of the chunk limit). It should log a single instance of a POST to /upload:

[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
     ('dzuuid', '807f99b7-7f58-4d9b-ac05-2a20f5e53782'), 
     ('dzchunkindex', '0'), 
     ('dztotalfilesize', '1742'), 
     ('dzchunksize', '1000000'), 
     ('dztotalchunkcount', '1'), 
     ('dzchunkbyteoffset', '0')])

[INFO] pydrop: ImmutableMultiDict([
     ('file', <FileStorage: 'README.md' ('application/octet-stream')>)])

Lets break down what information we are getting:

dzuuid – Unique identifier of the file being uploaded

dzchunkindex – Which block number we are currently on

dztotalfilesize – The entire file’s size

dzchunksize – The max chunk size set on the frontend (note this may be larger than the actual chuck’s size)

dztotalchunkcount – The number of chunks to expect

dzchunkbyteoffset – The file offset we need to keep appending to the file being  uploaded

Next, let’s upload something just a bit larger that will require it to be chunked into multiple parts:

[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
    ('dzuuid', 'b4b2409a-99f0-4300-8602-8becbef24c91'), 
    ('dzchunkindex', '0'), 
    ('dztotalfilesize', '1191708'), 
    ('dzchunksize', '1000000'), 
    ('dztotalchunkcount', '2'), 
    ('dzchunkbyteoffset', '0')])

[INFO] pydrop: ImmutableMultiDict([
    ('file', <FileStorage: '04vfpknzx8z01.png' ('application/octet-stream')>)])



[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
    ('dzuuid', 'b4b2409a-99f0-4300-8602-8becbef24c91'), 
    ('dzchunkindex', '1'),
    ('dztotalfilesize', '1191708'),  
    ('dzchunksize', '1000000'), 
    ('dztotalchunkcount', '2'), 
    ('dzchunkbyteoffset', '1000000')])

[INFO] pydrop: ImmutableMultiDict([
    ('file', <FileStorage: '04vfpknzx8z01.png' ('application/octet-stream')>)])

Notice how /upload has been called twice. And that the dzchunkindex and dzchunkbyteoffset have been updated accordingly.  That means our upload function has to be smart enough to handle both new requests and existing multipart uploads.  That means for new requests we should open existing files and only write data after the data already in them, whereas we will create a file and start at the beginning for new uploads. Luckily, both can be accomplished by opening with the same code. First open file in append mode,  then ‘seek’ to the end of the current data (in this case we are relying on the seek offset to be provided by dropzone.)

@blueprint.route('/upload', methods=['POST'])
def upload():
    # Remember the paramName was set to 'file', we can use that here to grab it
    file = request.files['file']

    # secure_filename makes sure the filename isn't unsafe to save
    save_path = os.path.join(config.data_dir, secure_filename(file.filename))

    # We need to append to the file, and write as bytes
    with open(save_path, 'ab') as f:
        # Goto the offset, aka after the chunks we already wrote 
        f.seek(int(request.form['dzchunkbyteoffset']))
        f.write(file.stream.read())
       
    # Giving it a 200 means it knows everything is ok
    return make_response(('Uploaded Chunk', 200))

At this point you should have a working upload script, tada!

But lets beef this up a little bit. The following code improvements make it so we don’t overwrite existing files that have already been uploaded, checks the file size matches what we expect when we’re done, and gives a little more output along the way.

@blueprint.route('/upload', methods=['POST'])
def upload():
    file = request.files['file']

    save_path = os.path.join(config.data_dir, secure_filename(file.filename))
    current_chunk = int(request.form['dzchunkindex'])

    # If the file already exists it's ok if we are appending to it,
    # but not if it's new file that would overwrite the existing one
    if os.path.exists(save_path) and current_chunk == 0:
        # 400 and 500s will tell dropzone that an error occurred and show an error
        return make_response(('File already exists', 400))

    try:
        with open(save_path, 'ab') as f:
            f.seek(int(request.form['dzchunkbyteoffset']))
            f.write(file.stream.read())
    except OSError:
        # log.exception will include the traceback so we can see what's wrong 
        log.exception('Could not write to file')
        return make_response(("Not sure why,"
                              " but we couldn't write the file to disk", 500))

    total_chunks = int(request.form['dztotalchunkcount'])

    if current_chunk + 1 == total_chunks:
        # This was the last chunk, the file should be complete and the size we expect
        if os.path.getsize(save_path) != int(request.form['dztotalfilesize']):
            log.error(f"File {file.filename} was completed, "
                      f"but has a size mismatch."
                      f"Was {os.path.getsize(save_path)} but we"
                      f" expected {request.form['dztotalfilesize']} ")
            return make_response(('Size mismatch', 500))
        else:
            log.info(f'File {file.filename} has been uploaded successfully')
    else:
        log.debug(f'Chunk {current_chunk + 1} of {total_chunks} '
                  f'for file {file.filename} complete')

    return make_response(("Chunk upload successful", 200))

Now lets give this a try:

[DEBUG] pydrop: Chunk 1 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 2 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 3 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 4 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 5 of 6 for file DSC_0051-1.jpg complete
[INFO] pydrop: File DSC_0051-1.jpg has been uploaded successfully

Sweet! But wait, what if we remove the directories where the files are stored? Or try to upload the same file again?

(Dropzone’s text out of the box is a little hard to read, but it says “File already exists” on the left and “Not sure why, but we couldn’t write file the disk” on the right. Exactly what we’d expect.)

2018-05-28 14:29:19,311 [ERROR] pydrop: Could not write to file
Traceback (most recent call last):
    ....
FileNotFoundError: [Errno 2] No such file or directory:

We get error message on the webpage and in the logs, perfect.

I hope you found this information useful and if you have any suggestions on how to improve it, please let me know!

Thinking further down the road

In the long-term I would have a database or some permanent storage option to keep track of file uploads. That way you could see if one fails or stops halfway and be able to remove incomplete ones. I would also base saving files first into a temp directory based off their UUID then, when complete, moving them to a place based off their file hash. Would also be nice to have a page to see everything uploaded and manage directories or other options, or even password protected uploads.

 

Interprocess Communications

Inter-Office-Process Communications, Artwork by Clara Griffith

Inter-Office-Process Communications, Artwork by Clara Griffith

Dealing with communications across programs, processes or even threads can be a real pain in the patella. Each situation usually calls for something slightly different and has to work with a limited set of options. On top of that, a lot of tutorials I see are people simply starting one program from inside the other (for example with subprocess) but that just won’t do for my taste.

Just show me the table of which IPC method I need 

I’m going to go through options that can handle have two independent programs, started on their own, talk with each other. And when I say talk, I mean able to at least execute tasks in the other’s process and, in most cases, transfer data as well.

Pipes

Probably one of the most old school ways of transferring data is to just pipe it back and forth as needed. This is how terminals and shells interact with programs, by using the standard pipes of stdin, stdout, and stderr (standard in, standard out, standard error).

Every time you print you are writing to stdout and it is very common to use this in Python when running another program from within it (i.e. if we did subprocess.run('bob.py') we could easily communicate with it via the pipes). These are anonymous pipes, that exist only while the program is running. To communicate between different programs you should use a Named Pipe which creates a file descriptor that you connect to as an entry point.

Here are some short examples showing their use on Linux. It is also possible on Windows with the pywin32 module or do it yourself with ctypes. But I find it easier to just use other methods on Windows.

Our example will be two programs, the first of which, Alice.py is simply a text converter. The message sent will be in three parts, a starting identifier, X, four integers to denote the message length, and the message itself. (This is by no means a common standard or practical, just something I made up for a quick example.)

So to send ‘Howdy’, it would look like X0005HOWDY, X being the identifier of a new message, 0005 denoting the length of the message to come, and HOWDY being the message body.

Alice.py

import time 
import os 

# This is a full path on the file system, 
# in this scenario both are run from the same directory
pipename = 'fifo'
os.mkfifo(pipename)

# For non-blocking pipes, you have to be reading from it 
# before you can have something else writing to it
pipein = os.open(pipename, os.O_NONBLOCK|os.O_RDONLY) 
p = os.fdopen(pipein, 'rb', buffering=0)

# This program will simply make the output of the other program more readable
def converter(message):
    return message.decode('utf-8').replace("_", " ").replace("-", " ").lower()

while True:
    # Wait until we have a message identifier
    new_data = p.read(1)
    if new_data == b'X':
        # Figure out the length of the message 
        raw_length = p.read(4)
        message = p.read(int(raw_length))
        # Read and convert the message 
        print(converter(message))
    elif new_data:
        # If we read a single byte that isn't an identifier, something went wrong
        raise Exception('Out of sync!') 
    else:
        time.sleep(1)

That’s all our conversion server is. It creates a pipe that something else can connect to, and will convert the incoming messages to lower case and replace dashes and underscores with spaces.

So let’s have Bob.py talk to Alice.py, but as Bob is a computer program, he sometimes spits out gobilty gook messages that need some help.

Bob.py

import os 

# Connect to the pipe created by Alice.py
pipename = 'fifo'
pipeout = os.open(pipename, os.O_NONBLOCK|os.O_WRONLY) 
p = os.fdopen(pipeout, 'wb', buffering=0, closefd=False)

def write_message(message):
    """Covert a string into our message format and send it"""
    length = '{0:04d}'.format(len(message))
    p.write(b'X')
    p.write(length.encode('utf-8'))
    p.write(message.encode('utf-8'))
     
write_message('TERriBLe_looking-machine-oUTput')

Start up Alice.py first, then Bob.py.

Alice will print out a pretty message of:  terrible looking machine output

While pipes are super handy for terminal usage and running programs inside each other, it has become uncommon to use named pipes as actual comms between two independent programs. Mainly because of the required setup procedure and non-uniformity across operating systems. This has lead to more modern and cross-compatible being preferred.

Pros: 

  • Fast and Efficient
  • No external services

Cons:

  • Not cross-platform compatible code
  • Difficult to code well

Files

Another ye olde (yet perfectly valid) way to communicate between programs is to simply create files that each program can interpret. Sometimes it’s as simple as having a lockfile. If the lockfile exists, it can serve as a message to other programs to let it finish before they do something, or even to stop new instances of itself from running. For example, running system updates in most Linux environments will create a lock file to make sure that two different update processes aren’t started at the same time.

It’s possible to take that idea further and share a file or two to actually transfer information. In these examples, the two programs will both work out of the same file, with a lock file for safety. (You can write your own code for file lock control, but I will be using the py-filelock package for brevity.)

There are a lot of possible ways to format the shared file, this example will keep it very basic, giving each command a unique id (used to know if command has been run before or not), then the command, and it’s argument. The same dictionary will also leave room for a result or an error message.

The shared JSON file will have the following format:

{
  "commands": { 
    "<random uuid>": {
      "command": "add",
      "args": [2,5],
      "result": 7  # "result" field only exists after server has responded
      # if "error" key exists instead of "result", something bad happened
    }
  }
}

The server, Alice.py will then keep looping, waiting for the shared file to change. Once it does, Alice will obtain the lock for safety (so there isn’t any corrupt JSON from writing being interrupted), read the file, run the desired command, and save the result.  In a real world scenario the lock would only be obtained during the individual reading and writing phases, so to keep the lock held as short as possible by a single program. But that complicates the code (as then you would have to do a second read before writing and only update the sections you ran, in case there were other ones added) and makes it a bit much for an off the shelf example.

Alice.py

import json
import time
import os

from filelock import FileLock

# Keep track of commands that have been run 
completed = []
# Track file size to know when new commands have been added
last_size = 0

lock_file = "shared.json.lock"
lock = FileLock(lock_file)
shared_file = "shared.json"


# Not totally necessary, but if you ever need to raise exceptions
# it's better to create your own than use the built-ins
class AliceBroke(Exception):
    """Custom exception for catching our own errors"""


# Functions that can be executed by Bob.py
def adding(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to add numbers')
    return a + b


def multiply(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to multiply numbers')
    return a * b


# Right now just have a way to map the incoming strings from Bob to functions
# This could be expanded to include what arguments it expects for pre-validation
translate = {
    "add": adding,
    "multiply": multiply
}


def main():
    global last_size, completed
    while True:
        # Poor man's file watcher
        file_size = os.path.getsize(shared_file)
        if file_size == last_size:
            # File hasn't changed
            time.sleep(.1)
            continue
        else:
            print(f'File has changed from {last_size} to {file_size} bytes,'
                  f' lets see what they want to do!')
            last_size = file_size

        run_commands()
        last_size = os.path.getsize(shared_file)


def run_commands():
    # Grab the lock, once it is acquired open the file
    with lock, open(shared_file, 'r+') as f:
        data = json.load(f)
        # Iterate over the command keys, if we haven't run it yet, do so now
        for name in data['commands']:
            if name not in completed:

                command = data['commands'][name]['command']
                args = data['commands'][name]['args']
                print(f'running command {command} with args {args}')

                try:
                    data['commands'][name]['result'] = translate[command](*args)
                except AliceBroke as err:
                    # Arguments weren't the type we were expecting
                    data['commands'][name]['error'] = str(err)
                except TypeError:
                    data['commands'][name]['error'] = "Incorrect number of arguments"
                finally:
                    completed.append(name)
        # As we are writing the data back to the same file that is still
        # open, we need to go back to the begging of it before writing
        f.seek(0)
        json.dump(data, f, indent=2)


if __name__ == '__main__':
    # Create / blank out the shared file
    with open(shared_file, 'w') as f:
        json.dump({"commands": {}}, f)
    last_size = os.path.getsize(shared_file)

    try:
        main()
    finally:
        # Be nice and clean up after ourselves
        os.unlink(shared_file)
        os.unlink(lock_file)

Our client, Bob.py will ask some simple commands that Alice supports and wait until it gets the answer back.

Bob.py

import json
import time
import uuid

from filelock import FileLock

completed = []
last_size = 0

lock = FileLock("shared.json.lock")
shared_file = "shared.json"


def ask_alice(command, *args, wait_for_answer=True):
    # Create a new random ID for the command
    # could be as simple as incremented numbers 
    cmd_uuid = str(uuid.uuid4())
    with lock, open(shared_file, "r+") as f:
        data = json.load(f)
        data['commands'][cmd_uuid] = {'command': command, 'args': args}
        f.seek(0)
        json.dump(data, f)
    if wait_for_answer:
        return get_answer(cmd_uuid)
    return cmd_uuid


def get_answer(cmd_uuid):
    # Wait until we get an answer back for a command
    # Ideally this would be more of an asynchronous callback, but there 
    # are plenty of cases where serialized processes like this must happen
    while True:
        with lock, open(shared_file) as f:
            data = json.load(f)
            command = data['commands'][cmd_uuid]
            if 'result' in command:
                return command['result']
            elif 'error' in command:
                raise Exception(command['error'])
        time.sleep(.2)



print(f"Lets add 2 and 5 {ask_alice('add', 2, 5, wait_for_answer=True)}")

print(f"Lets multiply 8 and 5 {ask_alice('multiply', 2, 5, wait_for_answer=True)}")

print("Lets break it and cause an exception!")
ask_alice('add', 'bad', 'data', wait_for_answer=True)

Start up Alice.py first, then Bob.py.

Alice will return:

File has changed from 16 to 90 bytes, lets see what they want to do!
running command add with args [2, 5]
File has changed from 103 to 184 bytes, lets see what they want to do!
running command multiply with args [2, 5]
File has changed from 198 to 283 bytes, lets see what they want to do!
running command add with args ['bad', 'data']

Bob

Lets add 2 and 5: 7
Lets multiply 8 and 5: 40
Lets break it and cause an exception!
Traceback (most recent call last):
     ...
Exception: Yeah, we are only going to add numbers

 

Pros:

  • Cross-platform compatible
  • Simple to implement and understand

Cons: 

  • Have to worry about File System security if anything sensitive is being shared
  • Programs now responsible to clean up after themselves

 

Message Queue

Welcome to the 21st Century, where message queues serve as quick and efficient ways to transfer commands and information. I like to think of them as an always running middlemen that can survive outside your process.

Here you are spoiled for choice with options:  ActiveMQ, ZeroMQ, Redis, RabbitMQSparrowStarlingKestrelAmazon SQSBeanstalkKafkaIronMQ, and POSIX IPC message queue are the ones I know. You can even use NoSQL databases like mongoDB or couchDB in a similar manner, though for simple IPC I suggest against using those.

I suggest looking into RabbitMQ’s tutorials to get a good in-depth look at how you can write code for it (and across multiple different languages). RabbitMQ is cross-platform and even provides Windows binaries directly, unlike some others.

For my own examples we will use Redis, as they have the best summary I can steal quote from their website https://redis.io/ yet have a surprising lack of Python tutorials.

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, [etc…]

That’s it, it stores data somewhere that is easily access from multiple programs, just what you need for IPC. You can use it as a keystore (for data transfer and storage) or, like I am about to show, a publisher / subscriber model, better for command and control.

Alice.py is our service and simply subscribes to a channel and waits for any updates and will print them to the screen.

import time

import redis

r = redis.StrictRedis()
p = r.pubsub()


def handler(message):
    print(message['data'].decode('utf-8'))


p.subscribe(my_channel=handler)
thread = p.run_in_thread(sleep_time=0.001)

try:
    # Runs in the background while your program otherwise operates
    time.sleep(1000)
finally:
    # Shut down cleanly when all done
    thread.stop()

Bob.py is another program just simply pushes a message to that channel

import redis

r = redis.StrictRedis()
p = r.pubsub()

r.publish('my_channel', 'example data'.encode('utf-8'))

Start up Alice.py first, then Bob.py.

Alice.py will print example data, and can be stopped by pressing CTRL+C

Pros:

  • Built-in publisher / subscriber methodology, don’t have to do manual checks
  • Built-in background thread running
  • (Can be) Cross-platform compatible
  • State can be saved if a program exits unexpectedly (depending on setup)

Cons: 

  • Requirement for external service
  • More overhead
  • Have to worry about the external service’s security and how you connect to it

Shared Memory

If written correctly, this can one of the fastest way to transfer data or execute tasks between programs, but it is also the most low-level, meaning a lot more coding and error handling yourself.

Think of using memory in the same way as the single file with using a lock. While one program writes to memory, the other has to wait until it finishes, then can read it and write its own thing back. (It’s possible to also use multiple sections of memory, just it gets to be a lot of example code really fast so I am holding off.)

So now you have to create the Semaphore (basically a memory lockfile) and mapped memory that each program can access.

On Linux (and Windows with Cywin) you can use posix_ipc, and check out their example here.

A lot simpler is to just use the built in mmap directly and share a file in memory. This example won’t even go as far as creating the lockfile,  just showing off the basics only.

Alice.py is going to create and write stuff to the memory mapped file

Alice.py

import time
import mmap
import os

# Create the file and fill it with line ends
fd = os.open('mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.write(fd, b'\n' * mmap.PAGESIZE)

# Map it to memory and write some data to it, then change it 10 seconds later
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_WRITE)
buf.write(b'now we are in memory\n')
time.sleep(10)
buf.seek(0)
buf.write(b'again\n')

Bob.py will simply read the content of the memory mapped file. Showing that it does know when the contents change.

Bob.py

import mmap
import os
import time


# Open the file for reading only
fd = os.open('mmaptest', os.O_RDONLY)
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_READ)

# Print when the content changes
last = b''
while True:
    buf.seek(0)
    msg = buf.readline()
    if msg != last:
        print(msg)
        last = msg
    else:
        time.sleep(1)

This example isn’t the most friendly to run, as you have to start-up Alice.py and then Bob.py within ten seconds after that, but it shows the basics of how memory mapping is very similar to just using a file.

Pros:

  • Fast
  • Cross-platform compatible code

Cons: 

  • Can be difficult to write
  • Limited size to accessible memory
  • Using mmap without posix_ipc will also create a physical file with the same content

 

Signals

On Linux? Don’t want to send information, just need to toggle state on something? Send a signal!

Imagine you have a service, Alice, that anything local can connect to, but you want to be able to tell them if the service goes down or comes back up.

In your clients code, they should register their process identification number with Alice when they first connect to her, and have a method to capture a custom signal to know Alice‘s current state.

Bob.py

import signal, os

service_running = True

my_pid = os.getpid()

# 'Touch' a file as the name of the program's PID 
# in Alice service's special directory
# Make sure to delete this file when your program exists!
open(f"/etc/my_service/pids/{my_pid}", "w").close()


    
def service_off(signum, frame):
    global service_running 
    service_running = False

def service_on(signum, frame):
    global service_running 
    service_running = True

signal.signal(signal.SIGUSR1, service_off)
signal.signal(signal.SIGUSR2, service_on)

SIGUSR1 is a custom signal reserved for custom use, as well as SIGUSR2, so they are safe to use in this manner without fear of secondary actions happening. (For example, if you send something like SIGINT , aka interrupt, it will just kill your program if not caught properly.)

Alice.py will then simply go through the directory of PID files when it starts up or shuts down, and sends each one that signal to let them know she’s back online.

import os
import signal 

def let_them_know(startup=True): 
   signal_to_send = signal.SIGUSR1 if startup else signal.SIGUSR2
    for pid_file in os.listdir(f"/etc/my_service/pids/"):
        # Put in try catch block for production
        os.kill(int(pid_file), signal_to_send) 

Pros:

  • Super simple

Cons: 

  • Not cross-platform compatible
  • Cannot transfer data

Sockets

The traditional IPC. Every time I look for different IPC methods, sockets always come up. It’s easy to understand why, they are cross platform and natively supported by most languages.

However, dealing directly with raw sockets is very low-level and require a lot more coding and configuration. The Python standard library has a great example of an echo server to get you started.

But this is batteries included, everyone-else-has-already-done-the-work-for-you Python. Sure you could set up everything yourself, or you can use the higher level Listeners and Clients from the multiprocessing library.

Alice.py is hosting a server party, and executing incoming requests.

from multiprocessing.connection import Listener

def function_to_execute(*args):
    """" Our handler function that will run 
         when an incoming request is a list of arguments
    """
    return args[0] * args[1]

with Listener(('localhost', 6000), authkey=b'Do not let eve find us!') as listener:
    # Looping here so that the clients / party goers can 
    # always come back for more than a single request
    while True:
        print('waiting for someone to ask for something')
        with listener.accept() as conn:
            args = conn.recv()
            
            if args == b'stop server':
                print('Goodnight')
                break
            elif isinstance(args, list):  
                # Very basic check, must be more secure in production
                print('Someone wants me to do something')
                result = function_to_execute(*args)
                conn.send(result)
            else:
                conn.send(b'I have no idea what you want me to do')

Bob.py is going to go for just a quick function and then call it a night.

from multiprocessing.connection import Client

with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    conn.send([8, 8])
    print(f"What is 8 * 8? Why Alice told me it's {conn.recv()}")

# We have to connect again because Alice can only handle one request at a time
with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    # Bob's a party pooper and going to end it for everyone
    conn.send(b'stop server')

Start up Alice.py first, then run Bob.py. Bob will quickly exit with the message What is 8 * 8, Why Alice told me it's 64

Alice will have four total messages:

waiting for someone to ask for something
Someone wants me to do something
waiting for someone to ask for something
Goodnight

Pros:

  • Cross-platform compatible
  • Can be extended to RPC

Cons: 

  • Slower
  • More overhead

 

RPC

Believe it or not, you can use a lot of the methods from remote procedure calls locally. Even sockets and message queues can already be set up to work for either IPC or RPC.

Now, barring using IR receivers and transmitters or laser communications, you are probably going to connect remote programs via the internet. That means most RPC standards are going to be networking based, aka on sockets. So it’s less about deciding which protocol to use, and more on choosing which data transmission standard to use. Two that I have used are JSONRPC, for normal humans, and XMLRPC, for if you are a XML fan that like sniffing glue and running with scissors 1. There is also SOAP, for XML fans who need their acronyms to also spell something, and Apache Thrift that I found while doing research for this article that I have not touched. Those standards transfer data as text, which makes it easier to read, but inefficient. Newer options, like gRPC use protocol buffers to serialize the data and reduce overhead.

It’s also very common and easy to just write up a simple HTTP REST interface for you programs, using a lightweight framework like bottle or flask, and communicate that way.

In the future (if they don’t already exist) I expect to see even more choices with WebSocket or WebRTC based communications.

 

Summary

Lets wrap this all up with a comparison table:

MethodCross Platform CompatibleRequires File or File DescriptorRequires External ServiceEasy to write / read code 5
PipesNoYesNoNo
FilesYesYesNoYes
Message QueuesYes4NoYesYes
Shared MemoryYes3Yes2NoNo
SocketsYesNoNoYes
SignalsNoNoNoYes

Hopefully that at least clears up why Sockets are the go-to IPC method, as they have the most favorable traits. Whereas for me, I usually want something either a little more robust, like message queues, or REST APIs so that it can be used locally or remotely. Which, to be fair, are built on top of sockets.

 

 

ThreadPools explained – In the deep end

Thread and Multiprocessing Pools are an underused feature of Python. In my opinion, they are the easiest way to dip your feet into concurrency, and yet still the method I use most often.

Threads in a Pool

Threads in a Pool, Artwork by Clara Griffith

They allow you to easily offload CPU or I/O bound tasks to a pre-instantiated group (pool) of threads or processes. One of the great things about them, is that both the ThreadPool and Pool (Multiprocessing) classes have the same methods, so all the following examples are interchangeable between them. (This article will not get into the differences between Threading and Multiprocessing in Python, as that is worth a post on its own.)

Map

Let’s jump in with a super simple example. We will use a pool of 5 workers to square numbers. We will use the map method which takes a function as it’s first argument (make sure it’s not called, no parentheses!)  then a list (or iterable) of arguments, which will be passed singularly to that function per Thread or Process. That way, multiple instances of that function are working at the same time!

from multiprocessing.pool import ThreadPool, Pool

def square_it(x):
    return x*x

# On Windows, make sure that multiprocessing doesn't start
# until after "if __name__ == '__main__'" 

with Pool(processes=5) as pool:
   results = pool.map(square_it, [5, 4, 3, 2 ,1])

print(results) 
# [25, 16, 9, 4, 1]

Think of map as running a for loop over the list and sending each item in it to a worker process as soon as it is free (little more complicated internally, but we’ll come back to that later). Each process has been told to run the function square_it against that item.

So in this case, all the processes will be running the same function against a set of data, and waiting until all the data has been processed to return. The list of returned data will be in order based on the iterable that was put in. This is super handy if you want to do something like make a lot of requests to different websites and wait for the results, or need to run a lot of calculations. I actually did just that in the Birthday Paradox blog post using the Multiprocessing pool to speed up probability calculations.

Thankfully, Pools are versatile, they have several other handy methods, and you can let most of them run in the background, instead of waiting on it to finish immediately, by going asynchronous.

Async

So lets use the map style functionality again, but this time we want to not bother waiting around for the results, which means we need to use map_async. Lets say you want to capture a lot of images off of a website. You populate the list of links to the images ,then you just need to add those to the pool and download them.

from multiprocessing.pool import ThreadPool
import time
import reusables

# When downloading from a website, be kind with how often 
# and how many requests you are making
tp = ThreadPool(processes=2)

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


# Same as the previous map, taking a function and iterable,
# just with an additional callback function 
tp.map_async(reusables.download, urls, callback=print) 
# Also, this is why having print as a function in python 3 is so dang handy


# Do something else 
time.sleep(10) 

# Results are printed when done 
# ['/home/me/birthday.png', '/home/me/Capture.png']

# Not using a context manager means we have to clean house ourselves. 
tp.terminate()
tp.close() 

This is really advantageous in scenarios where you need an instant reply, such as an API call or working with a GUI. Then later a callback can either update the GUI or database. There is also an error_callback argument it can take if the function raises an exception. The error_callback function will receive the Exception caught when the worker erred, that way you can decide if you want to ignore it, or raise it in the main Thread.

You can also ignore using the callbacks, and deal with the AsyncResult directly. It has the methods:

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.
from multiprocessing.pool import ThreadPool
import time

timeout = 25

with ThreadPool(processes=4) as tp:
    async_result = tp.map_async(time.sleep, [5, 4])

    for i in range(timeout):
        time.sleep(1)

        if async_result.ready():
            if async_result.successful():
                print(async_result.get())
                break
    else:
        print("Task did not complete on time, or with errors")

Three of the method’s available have corresponding async methods:

  • map – map_async
  • starmap – starmap_async
  • apply – apply_async

Passing additional arguments with “partial” or “starmap”

Now, notice that map will only provide a single argument to a function. So if you have a function that takes more than one argument, you will either need to use partial to redefine the function with default parameters, or use starmap which takes an iterable of tuples.

So lets use partial from functools first.

from multiprocessing.pool import ThreadPool
from functools import partial
import time
import reusables

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)

# Replace the required `wait_time` with a default of 5
down_the_file = partial(download_file, wait_time=5)


with ThreadPool(2) as tp:
    # Notice we are now using the new function, down_the_file we created with partial
    print(tp.map(down_the_file, urls))

Not too difficult, just you’re stuck with using the same setting for everything. If you want to customize it, you can send multiple arguments using starmap.

Starmap

from multiprocessing.pool import ThreadPool
import reusables
import time

# Notice the list now is a list of tuples, that have a second argument, 
# that will be passed in as the second parameter. In this case, as wait_time
urls = [("https://codecalamity.com/wp-content/uploads/2017/10/birthday.png", 4),
        ("https://codecalamity.com/wp-content/uploads/2017/06/Capture.png", 10)]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)


with ThreadPool(2) as tp:
    # Using `starmap` instead of just `map`
    print(tp.starmap(download_file, urls))

Apply

Both map and starmap take a single function to run a lot of things against. But there are many times where you just want background workers to take on a variety of different tasks. That’s where apply comes in.

from multiprocessing.pool import Pool
import reusables

pool = Pool(processes=5)

# apply takes `args`, aka arguments, in a tuple format 
# and `kwds`, aka keyword arguments, as a dictionary

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
print(pool.apply(abs, (-5.67, )))
# 5.67
print(pool.apply(reusables.download, 
                 args=("http://example.com", ), 
                 kwds=dict(save_to_file=False)))
# b'<!doctype html>\n<html>\n<head>\n   ...


pool.terminate()
pool.close()

Additional Content

You can of course also mix and match any methods while the pool is still not terminated.

from multiprocessing.pool import Pool
import time

pool = Pool(processes=5)

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
pool.apply_async(abs, (-5.67, ), callback=print)
# 5.67
pool.map_async(any, [(True, False), (False, False)], callback=print)
# [True, False]

time.sleep(1)
pool.terminate()
pool.close()

imap

Now remember how I said map basically iterates over the list and send each one to a worker? Well that’s not entirely true. imap does that, map can be a lot faster because it breaks the list into chunks first and sends each to a worker’s queue to make sure it always has something in the pipeline.

Ok, cool, so map is faster, what’s the point of imap then? With speed comes the price of a larger memory footprint. When map takes in an interable, it converts it to a list to a list so it can be chucked out, whereas imap will only pull items out of the iterable as needed (it defaults to 1 for chunksize, aka how many it will pull out, but it can be increased). It also has the advantage of giving you the results as soon as possible (as an iterable, hence the name imap), while still preserving order. There is also imap_unordered which will simply give you the results as fast as they come, in the order they finish.

from multiprocessing.pool import ThreadPool
import time

def wait(x):
    time.sleep(x)
    return x

iterable = [0, 4, 5, 2]

with ThreadPool(processes=4) as tp:

    print("map")
    map_start = time.time()
    for map_result in tp.map(wait, iterable):
        print(f"{map_result} took {time.time() - map_start:.0f} seconds")

    print("\nimap")
    imap_start = time.time()
    for imap_result in tp.imap(wait, iterable):
        print(f"{imap_result} took {time.time() - imap_start:.0f} seconds")

    print("\nimap_unordered")
    imap_unordered_start = time.time()
    for imap_un_result in tp.imap_unordered(wait, iterable):
        print(f"{imap_un_result} took" 
              f"{time.time() - imap_unordered_start:.0f} seconds")

Using map it will wait all 5 seconds (the largest wait time in the argument list) to return all the results at once.

map
0 took 5 seconds
4 took 5 seconds
5 took 5 seconds
2 took 5 seconds

imap will immediately return the 0 result, then four seconds later will return the 4, one second later it will return 5 and 2 at the same time.

imap
0 took 0 seconds
4 took 4 seconds
5 took 5 seconds
2 took 5 seconds

imap_unordered will return them as soon as each one finishes. (Notice this won’t always be the shortest one first, as the argument list may be longer than the number of worker processes).

imap_unordered
0 took 0 seconds
2 took 2 seconds
4 took 4 seconds
5 took 5 seconds

The Methods

Here are the methods, their parameters and docstring, and an overview of what they are.

map

map(func, iterable, chunksize=None):
    ''' Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned. '''

map takes an interable and turns it into a list, then breaks it up to send to worker processes. Each worker process will run the function given as the first argument with a single argument (given to it from the iterable). The results will be collected into a list and returned, in order, when all results finish. Returns a list.

starmap

starmap(func, iterable, chunksize=None):
    ''' Like `map()` method but the elements of the `iterable` are expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b). '''

starmap allows multiple arguments to be given to the function by passing in an iterable of iterables (i.e. list of tuples, list of lists, generator of generators, etc..).  The inner iterables do NOT have to be the same length either, so multiple defaults could be overridden for one item of the list, but not for all of them. Returns a list.

apply

apply(func, args=(), kwds={}):
    ''' Equivalent of `func(*args, **kwds)`. '''

apply runs a single function with one of the pool’s workers. Returns the result of the function.

imap

imap(func, iterable, chunksize=1):
    ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. '''

imap takes the same arguments as map, however it’s result is iterable and will start returning results, in order, as soon as they have finished. Returns an interable.

imap_unordered

imap_unordered(self, func, iterable, chunksize=1):
    ''' Like `imap()` method but ordering of results is arbitrary. '''

imap_unordered is the same as imap except it will return each result as soon as it finishes, not in order. Returns an interable.

The Asynchronous Methods

The asynchronous versions of map, starmap, and apply all take the same parameters as their original functions, as well as a callback and error_callback parameters.

  • callback – function that takes a single argument, that will be the result(s) of the function(s) run.
  • error_callback – function that takes a single argument, which will be the Exception raised (if one occurs).

The async methods immediately return an AsyncResult (also called ApplyResult or sub-classed to MapResult)  that can be directly used to view the results and check on its status. (View the Async section above for an example).

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.

map_async

map_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `map()` method. '''

starmap_async

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `starmap()` method. '''

apply_async

apply_async(func, args=(), kwds={}, callback=None, error_callback=None):
    ''' Asynchronous version of `apply()` method. '''

The Birthday Paradox – The Proof is in the Python

This month is my wife’s, mother’s and my own birthday, all within a span of nine days. What are the odds of that? No idea, I’m no statistician. However, as a developer, I thought it would be fun to prove (or disprove?) the Birthday Paradox with some Python coding.

 

If you haven’t heard of the Birthday Paradox, it states that as soon as you have 23 random people in a room, there is a 50 percent chance two of them have the same birthday. Once the number of people in the room is at least 70, there is a 99.9 percent chance. It sound counter intuitive as it takes a full 366 (a full year + 1) people to have a 100% chance.

So instead of doing this from the statistical side, let’s do it from the experimentation angle. We are going to generate 23 random days in a year and see if there is a duplicate day.

First, we create a function that will return a list of N number of datetime objects within a given year. We will then use that list to see if there are duplicates within it. In the first case, with 23 people, exactly half the time we run this function there should be at least one duplicate date.

from random import randint
from datetime import datetime, timedelta

def random_birthdays(number_of_people):
    first_day_of_year = datetime(2017, 1, 1)
    return [first_day_of_year + timedelta(days=randint(0, 365))
            for _ in range(number_of_people)]

We need to run this function a lot of times to get an average of how often there is a duplicate.  We’ll create a function that runs it a thousand times and returns percentage average of duplicate found. (As a percentage is between 0 and 1, such as .50, we are multiplying it by 100 so that .5 looks like “50% chance” later.)

def determine_probability(number_of_people, run_amount=1_000):
    dups_found = 0
    for _ in range(run_amount):
        birthdays = random_birthdays(number_of_people)
        duplicates = set(x for x in birthdays if birthdays.count(x) > 1)
        if len(duplicates) >= 1:
            dups_found += 1

    return dups_found/run_amount * 100

Feel free to run it even more times than a 1000 for higher accuracy, max I tried was 100,000 which did have the most consistent results.

Finally, we add that last bit of code that actually calls the function and prints our findings.

if __name__ == '__main__':
    msg = ("{people} Random people have a {chance:.1f}%"
           " chance of having a birthday on the same day")
    for people in (23, 70):
        print(msg.format(people=people, chance=determine_probability(people)))

And wouldn’t ya know it, those fancy statisticians seem to be right.

23 Random people have a 50.4% chance of having a birthday on the same day
70 Random people have a 99.9% chance of having a birthday on the same day

At the top of the post, you saw a plot generated by calculating the first 100 people’s worth of probabilities, with red vertical markers at 23 and 70.  It is rather smooth due to increasing the run_amount to 10,000. Below is a plot of an entire 366 people, but only a 1000 runs per number of people.

 

To accomplish this, I used matplotlib and mixed in some multiprocessing to speed up the probability generation. Here is the final entire file.

from random import randint
from datetime import datetime, timedelta
from multiprocessing import Pool, cpu_count

import matplotlib.pyplot as plt


def random_birthdays(number_of_people):
    first_day_of_year = datetime(2017, 1, 1)
    return [first_day_of_year + timedelta(days=randint(0, 365))
            for _ in range(number_of_people)]


def determine_probability(number_of_people, run_amount=1000):
    dups_found = 0
    print(f"Generating day {number_of_people}")
    for _ in range(run_amount):
        birthdays = random_birthdays(number_of_people)
        duplicates = set(x for x in birthdays if birthdays.count(x) > 1)
        if len(duplicates) >= 1:
            dups_found += 1

    return number_of_people, dups_found/run_amount * 100


def plot_yearly_probabilities(max_people, vertical_markers=(23, 70)):
    with Pool(processes=cpu_count()) as p:
        percent_chances = p.map(determine_probability, range(max_people))

    plt.plot([z[1] for z in sorted(percent_chances, key=lambda x: x[0])])

    plt.xlabel("Number of people")
    plt.ylabel('Chance of sharing a birthday (Percentage)')

    for marker in vertical_markers:
        if max_people >= marker:
            plt.axvline(x=marker, color='red')

    plt.savefig("birthday_paradox.png")


if __name__ == '__main__':
    plot_yearly_probabilities(100)

 

Notice that our generate image lines near perfectly alongside the statically generated image on Wikipedia*.

Hope you enjoyed, and maybe even learned something alone the way!

* By Rajkiran g (Own work) [CC BY-SA 3.0 (https://creativecommons.org/licenses/by-sa/3.0) or GFDL (http://www.gnu.org/copyleft/fdl.html)], via Wikimedia Commons

 

Reusables – Part 2: Wrappers

Spice up your code with wrappers! In Python, a wrapper, also known as a decorator, is simply encapsulating a function within other functions.

@wrapper
def my_func(a, b=2):
    print(b)

@meta_decorator(foo="bar")
def my_other_func(**kwargs):
    print(kwargs)

In Reusables, all the wrappers take arguments, aka meta decorators, so you will at minimum have to end them with parens ()s. Let’s take a look at the one I use the most first.

time_it

import reusables

@reusables.time_it()
def test_func():
    import time, random
    time.sleep(random.randint(2, 5))

test_func()
# Function 'test_func' took a total of 5.000145769345911 seconds

time_it documentation
The message by default is printed. However it can also be sent to a log with a customized message. If log=True it will log to the Reusables logger, however you can also specify either a logger by name (string) or pass in a logger object directly.

reusables.add_stream_handler('reusables')

@reusables.time_it(log=True, message="{seconds:.2f} seconds")
def test_time(length):
    time.sleep(length)
    return "slept {0}".format(length)

result = test_time(5)
# 2016-11-09 16:59:39,935 - reusables.wrappers  INFO      5.01 seconds

print(result)
# slept 5

It’s also possible to capture time taken in a list.

@reusables.time_it(log='no_log', append=my_times)
def test_func():
    import time, random
    length = random.random()
    time.sleep(length)
    
for _ in range(10):
    test_func()
    
my_times
# [0.4791555858872698, 0.8963652232890809, 0.05607090172793505, 
# 0.03099917658380491,0.6567622821214627, 0.4333975642063024, 
# 0.21456404424395714, 0.5723555061358638, 0.0734819056269771, 
# 0.13208268856499217]

unique

The oldest wrapper in the Reusables library forces the output of a function to be unique, or else it will raise an exception or if specified, return an alternative output.

import reusables
import random


@reusables.unique(max_retries=100)
def poor_uuid():
    return random.randint(0, 10)


print([poor_uuid() for _ in range(10)])
# [8, 9, 6, 3, 0, 7, 2, 5, 4, 10]

print([poor_uuid() for _ in range(1000)])
# Exception: No result was unique

unique documentation

lock_it

A very simple wrapper to use while threading. Makes sure a function is only being run once at a time.

import reusables
import time


def func_one(_):
    time.sleep(5)


@reusables.lock_it()
def func_two(_):
    time.sleep(5)


@reusables.time_it(message="test_1 took {0:.2f} seconds")
def test_1():
    reusables.run_in_pool(func_one, (1, 2, 3), threaded=True)


@reusables.time_it(message="test_2 took {0:.2f} seconds")
def test_2():
    reusables.run_in_pool(func_two, (1, 2, 3), threaded=True)


test_1()
test_2()

# test_1 took 5.04 seconds
# test_2 took 15.07 seconds

log_exception

It’s good practice to catch and explicitly log exceptions, but sometimes it’s just easier to let it fail naturally at any point and log it for later refinement or debugging.

@reusables.log_exception()
def test():
    raise Exception("Bad")

# 2016-12-26 12:38:01,381 - reusables   ERROR  Exception in test - Bad
# Traceback (most recent call last):
#     File "<input>", line 1, in <module>
#     File "reusables\wrappers.py", line 200, in wrapper
#     raise err
# Exception: Bad

queue_it

Add the result of the function to the specified queue instead of returning it normally.

import reusables
import queue

my_queue = queue.Queue()


@reusables.queue_it(my_queue)
def func(a):
    return a


func(10)

print(my_queue.get())
# 10

New in 0.9

catch_it

Catch specified exceptions and return an alternative output or send it to an exception handler.

def handle_error(exception, func, *args, **kwargs):
    print(f"{func.__name__} raised {exception} when called with {args}")

@reusables.catch_it(handler=handle_error)
def will_raise(message="Hello"):
    raise Exception(message)


will_raise("Oh no!")
# will_raise raised Oh no! when called with ('Oh no!',)

retry_it

Once is never enough, keep trying until it works!

@reusables.retry_it(exceptions=(Exception, ), tries=10, wait=1)
def may_fail(dont_do=[]):
    dont_do.append("x")
    if len(dont_do) < 6:
        raise OSError("So silly")
    print("Much success!")

may_fail()
# Much success!

 

This post is a follow-up of Reusables – Part 1: Overview and File Management.