coding

Interprocess Communications

Inter-Office-Process Communications, Artwork by Clara Griffith

Inter-Office-Process Communications, Artwork by Clara Griffith

Dealing with communications across programs, processes or even threads can be a real pain in the patella. Each situation usually calls for something slightly different and has to work with a limited set of options. On top of that, a lot of tutorials I see are people simply starting one program from inside the other (for example with subprocess) but that just won’t do for my taste.

Just show me the table of which IPC method I need 

I’m going to go through options that can handle have two independent programs, started on their own, talk with each other. And when I say talk, I mean able to at least execute tasks in the other’s process and, in most cases, transfer data as well.

Pipes

Probably one of the most old school ways of transferring data is to just pipe it back and forth as needed. This is how terminals and shells interact with programs, by using the standard pipes of stdin, stdout, and stderr (standard in, standard out, standard error).

Every time you print you are writing to stdout and it is very common to use this in Python when running another program from within it (i.e. if we did subprocess.run('bob.py') we could easily communicate with it via the pipes). These are anonymous pipes, that exist only while the program is running. To communicate between different programs you should use a Named Pipe which creates a file descriptor that you connect to as an entry point.

Here are some short examples showing their use on Linux. It is also possible on Windows with the pywin32 module or do it yourself with ctypes. But I find it easier to just use other methods on Windows.

Our example will be two programs, the first of which, Alice.py is simply a text converter. The message sent will be in three parts, a starting identifier, X, four integers to denote the message length, and the message itself. (This is by no means a common standard or practical, just something I made up for a quick example.)

So to send ‘Howdy’, it would look like X0005HOWDY, X being the identifier of a new message, 0005 denoting the length of the message to come, and HOWDY being the message body.

Alice.py

import time 
import os 

# This is a full path on the file system, 
# in this scenario both are run from the same directory
pipename = 'fifo'
os.mkfifo(pipename)

# For non-blocking pipes, you have to be reading from it 
# before you can have something else writing to it
pipein = os.open(pipename, os.O_NONBLOCK|os.O_RDONLY) 
p = os.fdopen(pipein, 'rb', buffering=0)

# This program will simply make the output of the other program more readable
def converter(message):
    return message.decode('utf-8').replace("_", " ").replace("-", " ").lower()

while True:
    # Wait until we have a message identifier
    new_data = p.read(1)
    if new_data == b'X':
        # Figure out the length of the message 
        raw_length = p.read(4)
        message = p.read(int(raw_length))
        # Read and convert the message 
        print(converter(message))
    elif new_data:
        # If we read a single byte that isn't an identifier, something went wrong
        raise Exception('Out of sync!') 
    else:
        time.sleep(1)

That’s all our conversion server is. It creates a pipe that something else can connect to, and will convert the incoming messages to lower case and replace dashes and underscores with spaces.

So let’s have Bob.py talk to Alice.py, but as Bob is a computer program, he sometimes spits out gobilty gook messages that need some help.

Bob.py

import os 

# Connect to the pipe created by Alice.py
pipename = 'fifo'
pipeout = os.open(pipename, os.O_NONBLOCK|os.O_WRONLY) 
p = os.fdopen(pipeout, 'wb', buffering=0, closefd=False)

def write_message(message):
    """Covert a string into our message format and send it"""
    length = '{0:04d}'.format(len(message))
    p.write(b'X')
    p.write(length.encode('utf-8'))
    p.write(message.encode('utf-8'))
     
write_message('TERriBLe_looking-machine-oUTput')

Start up Alice.py first, then Bob.py.

Alice will print out a pretty message of:  terrible looking machine output

While pipes are super handy for terminal usage and running programs inside each other, it has become uncommon to use named pipes as actual comms between two independent programs. Mainly because of the required setup procedure and non-uniformity across operating systems. This has lead to more modern and cross-compatible being preferred.

Pros: 

  • Fast and Efficient
  • No external services

Cons:

  • Not cross-platform compatible code
  • Difficult to code well

Files

Another ye olde (yet perfectly valid) way to communicate between programs is to simply create files that each program can interpret. Sometimes it’s as simple as having a lockfile. If the lockfile exists, it can serve as a message to other programs to let it finish before they do something, or even to stop new instances of itself from running. For example, running system updates in most Linux environments will create a lock file to make sure that two different update processes aren’t started at the same time.

It’s possible to take that idea further and share a file or two to actually transfer information. In these examples, the two programs will both work out of the same file, with a lock file for safety. (You can write your own code for file lock control, but I will be using the py-filelock package for brevity.)

There are a lot of possible ways to format the shared file, this example will keep it very basic, giving each command a unique id (used to know if command has been run before or not), then the command, and it’s argument. The same dictionary will also leave room for a result or an error message.

The shared JSON file will have the following format:

{
  "commands": { 
    "<random uuid>": {
      "command": "add",
      "args": [2,5],
      "result": 7  # "result" field only exists after server has responded
      # if "error" key exists instead of "result", something bad happened
    }
  }
}

The server, Alice.py will then keep looping, waiting for the shared file to change. Once it does, Alice will obtain the lock for safety (so there isn’t any corrupt JSON from writing being interrupted), read the file, run the desired command, and save the result.  In a real world scenario the lock would only be obtained during the individual reading and writing phases, so to keep the lock held as short as possible by a single program. But that complicates the code (as then you would have to do a second read before writing and only update the sections you ran, in case there were other ones added) and makes it a bit much for an off the shelf example.

Alice.py

import json
import time
import os

from filelock import FileLock

# Keep track of commands that have been run 
completed = []
# Track file size to know when new commands have been added
last_size = 0

lock_file = "shared.json.lock"
lock = FileLock(lock_file)
shared_file = "shared.json"


# Not totally necessary, but if you ever need to raise exceptions
# it's better to create your own than use the built-ins
class AliceBroke(Exception):
    """Custom exception for catching our own errors"""


# Functions that can be executed by Bob.py
def adding(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to add numbers')
    return a + b


def multiply(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to multiply numbers')
    return a * b


# Right now just have a way to map the incoming strings from Bob to functions
# This could be expanded to include what arguments it expects for pre-validation
translate = {
    "add": adding,
    "multiply": multiply
}


def main():
    global last_size, completed
    while True:
        # Poor man's file watcher
        file_size = os.path.getsize(shared_file)
        if file_size == last_size:
            # File hasn't changed
            time.sleep(.1)
            continue
        else:
            print(f'File has changed from {last_size} to {file_size} bytes,'
                  f' lets see what they want to do!')
            last_size = file_size

        run_commands()
        last_size = os.path.getsize(shared_file)


def run_commands():
    # Grab the lock, once it is acquired open the file
    with lock, open(shared_file, 'r+') as f:
        data = json.load(f)
        # Iterate over the command keys, if we haven't run it yet, do so now
        for name in data['commands']:
            if name not in completed:

                command = data['commands'][name]['command']
                args = data['commands'][name]['args']
                print(f'running command {command} with args {args}')

                try:
                    data['commands'][name]['result'] = translate[command](*args)
                except AliceBroke as err:
                    # Arguments weren't the type we were expecting
                    data['commands'][name]['error'] = str(err)
                except TypeError:
                    data['commands'][name]['error'] = "Incorrect number of arguments"
                finally:
                    completed.append(name)
        # As we are writing the data back to the same file that is still
        # open, we need to go back to the begging of it before writing
        f.seek(0)
        json.dump(data, f, indent=2)


if __name__ == '__main__':
    # Create / blank out the shared file
    with open(shared_file, 'w') as f:
        json.dump({"commands": {}}, f)
    last_size = os.path.getsize(shared_file)

    try:
        main()
    finally:
        # Be nice and clean up after ourselves
        os.unlink(shared_file)
        os.unlink(lock_file)

Our client, Bob.py will ask some simple commands that Alice supports and wait until it gets the answer back.

Bob.py

import json
import time
import uuid

from filelock import FileLock

completed = []
last_size = 0

lock = FileLock("shared.json.lock")
shared_file = "shared.json"


def ask_alice(command, *args, wait_for_answer=True):
    # Create a new random ID for the command
    # could be as simple as incremented numbers 
    cmd_uuid = str(uuid.uuid4())
    with lock, open(shared_file, "r+") as f:
        data = json.load(f)
        data['commands'][cmd_uuid] = {'command': command, 'args': args}
        f.seek(0)
        json.dump(data, f)
    if wait_for_answer:
        return get_answer(cmd_uuid)
    return cmd_uuid


def get_answer(cmd_uuid):
    # Wait until we get an answer back for a command
    # Ideally this would be more of an asynchronous callback, but there 
    # are plenty of cases where serialized processes like this must happen
    while True:
        with lock, open(shared_file) as f:
            data = json.load(f)
            command = data['commands'][cmd_uuid]
            if 'result' in command:
                return command['result']
            elif 'error' in command:
                raise Exception(command['error'])
        time.sleep(.2)



print(f"Lets add 2 and 5 {ask_alice('add', 2, 5, wait_for_answer=True)}")

print(f"Lets multiply 8 and 5 {ask_alice('multiply', 2, 5, wait_for_answer=True)}")

print("Lets break it and cause an exception!")
ask_alice('add', 'bad', 'data', wait_for_answer=True)

Start up Alice.py first, then Bob.py.

Alice will return:

File has changed from 16 to 90 bytes, lets see what they want to do!
running command add with args [2, 5]
File has changed from 103 to 184 bytes, lets see what they want to do!
running command multiply with args [2, 5]
File has changed from 198 to 283 bytes, lets see what they want to do!
running command add with args ['bad', 'data']

Bob

Lets add 2 and 5: 7
Lets multiply 8 and 5: 40
Lets break it and cause an exception!
Traceback (most recent call last):
     ...
Exception: Yeah, we are only going to add numbers

 

Pros:

  • Cross-platform compatible
  • Simple to implement and understand

Cons: 

  • Have to worry about File System security if anything sensitive is being shared
  • Programs now responsible to clean up after themselves

 

Message Queue

Welcome to the 21st Century, where message queues serve as quick and efficient ways to transfer commands and information. I like to think of them as an always running middlemen that can survive outside your process.

Here you are spoiled for choice with options:  ActiveMQ, ZeroMQ, Redis, RabbitMQSparrowStarlingKestrelAmazon SQSBeanstalkKafkaIronMQ, and POSIX IPC message queue are the ones I know. You can even use NoSQL databases like mongoDB or couchDB in a similar manner, though for simple IPC I suggest against using those.

I suggest looking into RabbitMQ’s tutorials to get a good in-depth look at how you can write code for it (and across multiple different languages). RabbitMQ is cross-platform and even provides Windows binaries directly, unlike some others.

For my own examples we will use Redis, as they have the best summary I can steal quote from their website https://redis.io/ yet have a surprising lack of Python tutorials.

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, [etc…]

That’s it, it stores data somewhere that is easily access from multiple programs, just what you need for IPC. You can use it as a keystore (for data transfer and storage) or, like I am about to show, a publisher / subscriber model, better for command and control.

Alice.py is our service and simply subscribes to a channel and waits for any updates and will print them to the screen.

import time

import redis

r = redis.StrictRedis()
p = r.pubsub()


def handler(message):
    print(message['data'].decode('utf-8'))


p.subscribe(my_channel=handler)
thread = p.run_in_thread(sleep_time=0.001)

try:
    # Runs in the background while your program otherwise operates
    time.sleep(1000)
finally:
    # Shut down cleanly when all done
    thread.stop()

Bob.py is another program just simply pushes a message to that channel

import redis

r = redis.StrictRedis()
p = r.pubsub()

r.publish('my_channel', 'example data'.encode('utf-8'))

Start up Alice.py first, then Bob.py.

Alice.py will print example data, and can be stopped by pressing CTRL+C

Pros:

  • Built-in publisher / subscriber methodology, don’t have to do manual checks
  • Built-in background thread running
  • (Can be) Cross-platform compatible
  • State can be saved if a program exits unexpectedly (depending on setup)

Cons: 

  • Requirement for external service
  • More overhead
  • Have to worry about the external service’s security and how you connect to it

Shared Memory

If written correctly, this can one of the fastest way to transfer data or execute tasks between programs, but it is also the most low-level, meaning a lot more coding and error handling yourself.

Think of using memory in the same way as the single file with using a lock. While one program writes to memory, the other has to wait until it finishes, then can read it and write its own thing back. (It’s possible to also use multiple sections of memory, just it gets to be a lot of example code really fast so I am holding off.)

So now you have to create the Semaphore (basically a memory lockfile) and mapped memory that each program can access.

On Linux (and Windows with Cywin) you can use posix_ipc, and check out their example here.

A lot simpler is to just use the built in mmap directly and share a file in memory. This example won’t even go as far as creating the lockfile,  just showing off the basics only.

Alice.py is going to create and write stuff to the memory mapped file

Alice.py

import time
import mmap
import os

# Create the file and fill it with line ends
fd = os.open('mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.write(fd, b'\n' * mmap.PAGESIZE)

# Map it to memory and write some data to it, then change it 10 seconds later
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_WRITE)
buf.write(b'now we are in memory\n')
time.sleep(10)
buf.seek(0)
buf.write(b'again\n')

Bob.py will simply read the content of the memory mapped file. Showing that it does know when the contents change.

Bob.py

import mmap
import os
import time


# Open the file for reading only
fd = os.open('mmaptest', os.O_RDONLY)
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_READ)

# Print when the content changes
last = b''
while True:
    buf.seek(0)
    msg = buf.readline()
    if msg != last:
        print(msg)
        last = msg
    else:
        time.sleep(1)

This example isn’t the most friendly to run, as you have to start-up Alice.py and then Bob.py within ten seconds after that, but it shows the basics of how memory mapping is very similar to just using a file.

Pros:

  • Fast
  • Cross-platform compatible code

Cons: 

  • Can be difficult to write
  • Limited size to accessible memory
  • Using mmap without posix_ipc will also create a physical file with the same content

 

Signals

On Linux? Don’t want to send information, just need to toggle state on something? Send a signal!

Imagine you have a service, Alice, that anything local can connect to, but you want to be able to tell them if the service goes down or comes back up.

In your clients code, they should register their process identification number with Alice when they first connect to her, and have a method to capture a custom signal to know Alice‘s current state.

Bob.py

import signal, os

service_running = True

my_pid = os.getpid()

# 'Touch' a file as the name of the program's PID 
# in Alice service's special directory
# Make sure to delete this file when your program exists!
open(f"/etc/my_service/pids/{my_pid}", "w").close()


    
def service_off(signum, frame):
    global service_running 
    service_running = False

def service_on(signum, frame):
    global service_running 
    service_running = True

signal.signal(signal.SIGUSR1, service_off)
signal.signal(signal.SIGUSR2, service_on)

SIGUSR1 is a custom signal reserved for custom use, as well as SIGUSR2, so they are safe to use in this manner without fear of secondary actions happening. (For example, if you send something like SIGINT , aka interrupt, it will just kill your program if not caught properly.)

Alice.py will then simply go through the directory of PID files when it starts up or shuts down, and sends each one that signal to let them know she’s back online.

import os
import signal 

def let_them_know(startup=True): 
   signal_to_send = signal.SIGUSR1 if startup else signal.SIGUSR2
    for pid_file in os.listdir(f"/etc/my_service/pids/"):
        # Put in try catch block for production
        os.kill(int(pid_file), signal_to_send) 

Pros:

  • Super simple

Cons: 

  • Not cross-platform compatible
  • Cannot transfer data

Sockets

The traditional IPC. Every time I look for different IPC methods, sockets always come up. It’s easy to understand why, they are cross platform and natively supported by most languages.

However, dealing directly with raw sockets is very low-level and require a lot more coding and configuration. The Python standard library has a great example of an echo server to get you started.

But this is batteries included, everyone-else-has-already-done-the-work-for-you Python. Sure you could set up everything yourself, or you can use the higher level Listeners and Clients from the multiprocessing library.

Alice.py is hosting a server party, and executing incoming requests.

from multiprocessing.connection import Listener

def function_to_execute(*args):
    """" Our handler function that will run 
         when an incoming request is a list of arguments
    """
    return args[0] * args[1]

with Listener(('localhost', 6000), authkey=b'Do not let eve find us!') as listener:
    # Looping here so that the clients / party goers can 
    # always come back for more than a single request
    while True:
        print('waiting for someone to ask for something')
        with listener.accept() as conn:
            args = conn.recv()
            
            if args == b'stop server':
                print('Goodnight')
                break
            elif isinstance(args, list):  
                # Very basic check, must be more secure in production
                print('Someone wants me to do something')
                result = function_to_execute(*args)
                conn.send(result)
            else:
                conn.send(b'I have no idea what you want me to do')

Bob.py is going to go for just a quick function and then call it a night.

from multiprocessing.connection import Client

with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    conn.send([8, 8])
    print(f"What is 8 * 8? Why Alice told me it's {conn.recv()}")

# We have to connect again because Alice can only handle one request at a time
with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    # Bob's a party pooper and going to end it for everyone
    conn.send(b'stop server')

Start up Alice.py first, then run Bob.py. Bob will quickly exit with the message What is 8 * 8, Why Alice told me it's 64

Alice will have four total messages:

waiting for someone to ask for something
Someone wants me to do something
waiting for someone to ask for something
Goodnight

Pros:

  • Cross-platform compatible
  • Can be extended to RPC

Cons: 

  • Slower
  • More overhead

 

RPC

Believe it or not, you can use a lot of the methods from remote procedure calls locally. Even sockets and message queues can already be set up to work for either IPC or RPC.

Now, barring using IR receivers and transmitters or laser communications, you are probably going to connect remote programs via the internet. That means most RPC standards are going to be networking based, aka on sockets. So it’s less about deciding which protocol to use, and more on choosing which data transmission standard to use. Two that I have used are JSONRPC, for normal humans, and XMLRPC, for if you are a XML fan that like sniffing glue and running with scissors 1. There is also SOAP, for XML fans who need their acronyms to also spell something, and Apache Thrift that I found while doing research for this article that I have not touched. Those standards transfer data as text, which makes it easier to read, but inefficient. Newer options, like gRPC use protocol buffers to serialize the data and reduce overhead.

It’s also very common and easy to just write up a simple HTTP REST interface for you programs, using a lightweight framework like bottle or flask, and communicate that way.

In the future (if they don’t already exist) I expect to see even more choices with WebSocket or WebRTC based communications.

 

Summary

Lets wrap this all up with a comparison table:

MethodCross Platform CompatibleRequires File or File DescriptorRequires External ServiceEasy to write / read code 5
PipesNoYesNoNo
FilesYesYesNoYes
Message QueuesYes4NoYesYes
Shared MemoryYes3Yes2NoNo
SocketsYesNoNoYes
SignalsNoNoNoYes

Hopefully that at least clears up why Sockets are the go-to IPC method, as they have the most favorable traits. Whereas for me, I usually want something either a little more robust, like message queues, or REST APIs so that it can be used locally or remotely. Which, to be fair, are built on top of sockets.

 

 

ThreadPools explained – In the deep end

Thread and Multiprocessing Pools are an underused feature of Python. In my opinion, they are the easiest way to dip your feet into concurrency, and yet still the method I use most often.

Threads in a Pool

Threads in a Pool, Artwork by Clara Griffith

They allow you to easily offload CPU or I/O bound tasks to a pre-instantiated group (pool) of threads or processes. One of the great things about them, is that both the ThreadPool and Pool (Multiprocessing) classes have the same methods, so all the following examples are interchangeable between them. (This article will not get into the differences between Threading and Multiprocessing in Python, as that is worth a post on its own.)

Map

Let’s jump in with a super simple example. We will use a pool of 5 workers to square numbers. We will use the map method which takes a function as it’s first argument (make sure it’s not called, no parentheses!)  then a list (or iterable) of arguments, which will be passed singularly to that function per Thread or Process. That way, multiple instances of that function are working at the same time!

from multiprocessing.pool import ThreadPool, Pool

def square_it(x):
    return x*x

# On Windows, make sure that multiprocessing doesn't start
# until after "if __name__ == '__main__'" 

with Pool(processes=5) as pool:
   results = pool.map(square_it, [5, 4, 3, 2 ,1])

print(results) 
# [25, 16, 9, 4, 1]

Think of map as running a for loop over the list and sending each item in it to a worker process as soon as it is free (little more complicated internally, but we’ll come back to that later). Each process has been told to run the function square_it against that item.

So in this case, all the processes will be running the same function against a set of data, and waiting until all the data has been processed to return. The list of returned data will be in order based on the iterable that was put in. This is super handy if you want to do something like make a lot of requests to different websites and wait for the results, or need to run a lot of calculations. I actually did just that in the Birthday Paradox blog post using the Multiprocessing pool to speed up probability calculations.

Thankfully, Pools are versatile, they have several other handy methods, and you can let most of them run in the background, instead of waiting on it to finish immediately, by going asynchronous.

Async

So lets use the map style functionality again, but this time we want to not bother waiting around for the results, which means we need to use map_async. Lets say you want to capture a lot of images off of a website. You populate the list of links to the images ,then you just need to add those to the pool and download them.

from multiprocessing.pool import ThreadPool
import time
import reusables

# When downloading from a website, be kind with how often 
# and how many requests you are making
tp = ThreadPool(processes=2)

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


# Same as the previous map, taking a function and iterable,
# just with an additional callback function 
tp.map_async(reusables.download, urls, callback=print) 
# Also, this is why having print as a function in python 3 is so dang handy


# Do something else 
time.sleep(10) 

# Results are printed when done 
# ['/home/me/birthday.png', '/home/me/Capture.png']

# Not using a context manager means we have to clean house ourselves. 
tp.terminate()
tp.close() 

This is really advantageous in scenarios where you need an instant reply, such as an API call or working with a GUI. Then later a callback can either update the GUI or database. There is also an error_callback argument it can take if the function raises an exception. The error_callback function will receive the Exception caught when the worker erred, that way you can decide if you want to ignore it, or raise it in the main Thread.

You can also ignore using the callbacks, and deal with the AsyncResult directly. It has the methods:

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.
from multiprocessing.pool import ThreadPool
import time

timeout = 25

with ThreadPool(processes=4) as tp:
    async_result = tp.map_async(time.sleep, [5, 4])

    for i in range(timeout):
        time.sleep(1)

        if async_result.ready():
            if async_result.successful():
                print(async_result.get())
                break
    else:
        print("Task did not complete on time, or with errors")

Three of the method’s available have corresponding async methods:

  • map – map_async
  • starmap – starmap_async
  • apply – apply_async

Passing additional arguments with “partial” or “starmap”

Now, notice that map will only provide a single argument to a function. So if you have a function that takes more than one argument, you will either need to use partial to redefine the function with default parameters, or use starmap which takes an iterable of tuples.

So lets use partial from functools first.

from multiprocessing.pool import ThreadPool
from functools import partial
import time
import reusables

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)

# Replace the required `wait_time` with a default of 5
down_the_file = partial(download_file, wait_time=5)


with ThreadPool(2) as tp:
    # Notice we are now using the new function, down_the_file we created with partial
    print(tp.map(down_the_file, urls))

Not too difficult, just you’re stuck with using the same setting for everything. If you want to customize it, you can send multiple arguments using starmap.

Starmap

from multiprocessing.pool import ThreadPool
import reusables
import time

# Notice the list now is a list of tuples, that have a second argument, 
# that will be passed in as the second parameter. In this case, as wait_time
urls = [("https://codecalamity.com/wp-content/uploads/2017/10/birthday.png", 4),
        ("https://codecalamity.com/wp-content/uploads/2017/06/Capture.png", 10)]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)


with ThreadPool(2) as tp:
    # Using `starmap` instead of just `map`
    print(tp.starmap(download_file, urls))

Apply

Both map and starmap take a single function to run a lot of things against. But there are many times where you just want background workers to take on a variety of different tasks. That’s where apply comes in.

from multiprocessing.pool import Pool
import reusables

pool = Pool(processes=5)

# apply takes `args`, aka arguments, in a tuple format 
# and `kwds`, aka keyword arguments, as a dictionary

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
print(pool.apply(abs, (-5.67, )))
# 5.67
print(pool.apply(reusables.download, 
                 args=("http://example.com", ), 
                 kwds=dict(save_to_file=False)))
# b'<!doctype html>\n<html>\n<head>\n   ...


pool.terminate()
pool.close()

Additional Content

You can of course also mix and match any methods while the pool is still not terminated.

from multiprocessing.pool import Pool
import time

pool = Pool(processes=5)

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
pool.apply_async(abs, (-5.67, ), callback=print)
# 5.67
pool.map_async(any, [(True, False), (False, False)], callback=print)
# [True, False]

time.sleep(1)
pool.terminate()
pool.close()

imap

Now remember how I said map basically iterates over the list and send each one to a worker? Well that’s not entirely true. imap does that, map can be a lot faster because it breaks the list into chunks first and sends each to a worker’s queue to make sure it always has something in the pipeline.

Ok, cool, so map is faster, what’s the point of imap then? With speed comes the price of a larger memory footprint. When map takes in an interable, it converts it to a list to a list so it can be chucked out, whereas imap will only pull items out of the iterable as needed (it defaults to 1 for chunksize, aka how many it will pull out, but it can be increased). It also has the advantage of giving you the results as soon as possible (as an iterable, hence the name imap), while still preserving order. There is also imap_unordered which will simply give you the results as fast as they come, in the order they finish.

from multiprocessing.pool import ThreadPool
import time

def wait(x):
    time.sleep(x)
    return x

iterable = [0, 4, 5, 2]

with ThreadPool(processes=4) as tp:

    print("map")
    map_start = time.time()
    for map_result in tp.map(wait, iterable):
        print(f"{map_result} took {time.time() - map_start:.0f} seconds")

    print("\nimap")
    imap_start = time.time()
    for imap_result in tp.imap(wait, iterable):
        print(f"{imap_result} took {time.time() - imap_start:.0f} seconds")

    print("\nimap_unordered")
    imap_unordered_start = time.time()
    for imap_un_result in tp.imap_unordered(wait, iterable):
        print(f"{imap_un_result} took" 
              f"{time.time() - imap_unordered_start:.0f} seconds")

Using map it will wait all 5 seconds (the largest wait time in the argument list) to return all the results at once.

map
0 took 5 seconds
4 took 5 seconds
5 took 5 seconds
2 took 5 seconds

imap will immediately return the 0 result, then four seconds later will return the 4, one second later it will return 5 and 2 at the same time.

imap
0 took 0 seconds
4 took 4 seconds
5 took 5 seconds
2 took 5 seconds

imap_unordered will return them as soon as each one finishes. (Notice this won’t always be the shortest one first, as the argument list may be longer than the number of worker processes).

imap_unordered
0 took 0 seconds
2 took 2 seconds
4 took 4 seconds
5 took 5 seconds

The Methods

Here are the methods, their parameters and docstring, and an overview of what they are.

map

map(func, iterable, chunksize=None):
    ''' Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned. '''

map takes an interable and turns it into a list, then breaks it up to send to worker processes. Each worker process will run the function given as the first argument with a single argument (given to it from the iterable). The results will be collected into a list and returned, in order, when all results finish. Returns a list.

starmap

starmap(func, iterable, chunksize=None):
    ''' Like `map()` method but the elements of the `iterable` are expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b). '''

starmap allows multiple arguments to be given to the function by passing in an iterable of iterables (i.e. list of tuples, list of lists, generator of generators, etc..).  The inner iterables do NOT have to be the same length either, so multiple defaults could be overridden for one item of the list, but not for all of them. Returns a list.

apply

apply(func, args=(), kwds={}):
    ''' Equivalent of `func(*args, **kwds)`. '''

apply runs a single function with one of the pool’s workers. Returns the result of the function.

imap

imap(func, iterable, chunksize=1):
    ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. '''

imap takes the same arguments as map, however it’s result is iterable and will start returning results, in order, as soon as they have finished. Returns an interable.

imap_unordered

imap_unordered(self, func, iterable, chunksize=1):
    ''' Like `imap()` method but ordering of results is arbitrary. '''

imap_unordered is the same as imap except it will return each result as soon as it finishes, not in order. Returns an interable.

The Asynchronous Methods

The asynchronous versions of map, starmap, and apply all take the same parameters as their original functions, as well as a callback and error_callback parameters.

  • callback – function that takes a single argument, that will be the result(s) of the function(s) run.
  • error_callback – function that takes a single argument, which will be the Exception raised (if one occurs).

The async methods immediately return an AsyncResult (also called ApplyResult or sub-classed to MapResult)  that can be directly used to view the results and check on its status. (View the Async section above for an example).

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.

map_async

map_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `map()` method. '''

starmap_async

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `starmap()` method. '''

apply_async

apply_async(func, args=(), kwds={}, callback=None, error_callback=None):
    ''' Asynchronous version of `apply()` method. '''

Is it easier to ask for forgiveness in Python?

Yes, we are looking for graphic artists

Yes, we are looking for graphic artists

Is Python more suited for EAFP1 or LBYL2  coding styles? It has been debated across message boards, email chains, stackoverflow, twitter and workplaces. It’s not as heated as some other battles; like how spaces are better than tabs, or that nano is indisputably the best terminal editor 3; but people seem to have their own preference on the subject. What many will gloss over is the fact that Python as a language doesn’t have a preference. Rather, different design patterns lend themselves better to one or the other.

I disagree with the position that EAFP is better than LBYL, or “generally recommended” by Python – Guido van Rossum 4

If you haven’t already, I suggest taking a look at Brett Cannon’s blog post about how EAFP is a valid method with Python that programmers from other languages may not be familiar with. The benefits are EAFP are simple: Explicit code, fail fast, succeed faster, and DRY (don’t repeat yourself). In general I find myself using it more than LBYL, but that doesn’t mean it’s always the right way.

Use Case Dependent

First, a little code comparison to see the differences between them. Lets try to work with a list of list of strings. Our goal is that if the inner list is at least three items long, copy the third item into another list.

messages = [["hi there", "how are you?", "I'm doing fine"]]
out = []
 
# LBYL
if messages and messages[0] and len(messages[0]) >= 3:
    out.append(messages[0][2])

# EAFP
try:
    out.append(messages[0][2])
except IndexError as err:
    print(err)  # In the real world, this should be a logging `.exception()`

Both pieces of code are short and easy to follow what they are accomplishing. However, when there are at least three items in the inner list, LBYL will take almost twice the amount of time! When there aren’t any errors happening, the checks are just additional weight slowing the process down.

But if we prune the messages down to [["hi there", "how are you?"]]   then EAFP will be much slower, because it will always try a bad lookup, fail and then have to catch that failure. Whereas LBYL will only perform the check, see it can’t do it, and move on. Check out my speed comparison gist yourself.

Some people might think putting both the append and lookup in the same try block is wrong. However, it is both faster; as we only have to do the lookup once; and the proper way; as we are only catching the possible IndexError and not anything that would arise from the append.

The real takeaway from this is to know which side of the coin you area dealing with beforehand. Is your incoming data almost always going to be correct? Then I’d lean towards EAFP, whereas if it’s a crap shoot, LBYL may make more sense.

Sometimes one is always faster

There are some instances when one is almost always faster than the other (though speed isn’t everything). For example, file operations are faster with EAFP because the less  IO wait, the better.

Lets try making a directory when we are not sure if it was created yet or not, while pretending it’s still ye olde times and os.makedirs(..., exist_ok=True) doesn’t exist yet.

import os 

# LBYL 
if not os.path.exists("folder"):
    os.mkdir("folder")

# EAFP 
try:
    os.mkdir("folder")
except FileExistsError:
    pass

In this case, it’s always preferential to use EAFP, as it’s faster (even when executed on a m.2 SSD) , there are no side effects, and the error is highly specific so there is no need for special handling.

Be careful though, many times when dealing with files you don’t want to leave something in a bad state, in those cases, you would use LBYL.

What bad things can happen when you don’t ask permission?

Side effects

In many cases, if something fails to execute, the state of the program or associated files might have changed. If it’s not easy to revert to a known good state in the exception clause, EAFP should be avoided.

# DO NOT DO

with open("file.txt", "w") as f:
    try: 
        f.write("Hi there!\n") 
        f.write(message[3])   
    except IndexError: 
        pass  # Don't do, need to be able to get file back to original state

Catching too much

If you are wrapping something with except Exception or worse, the dreaded blank except:, then you shouldn’t be using EAFP (if you’re using black except: still, you need to read up more on that and stop it!)

# DO NOT DO

try:
    do_something()
except:  # If others see this you will be reported to the Python Secret Police
    pass 

Also, sometimes errors seem specific, like an OSError, but could raise multiple different child errors that must be parsed through.

# DO 
import errno

try:
    os.scandir("secret_files")
except FileNotFoundError: # A child of OSError
    # could be put as 'elif err.errno == errno.ENOENT' below
    log.error("Yo dawg, that directory doesn't exist, "
              "we'll try the backup folder")
except OSError as err:
    if err.errno in (errno.EACCES, errno.EPERM):
        log.error("We don't have permission to access that capt'n!"
                  "We'll try the backup folder")
    else:
        log.exception(f"Unexpected OSError: {err}") 
        raise err # If you don't expect an error, don't keep going. 
                  # This isn't Javascript

When to use what?

EAFP (Easier to Ask for Forgiveness than Permission)

  • IO operations (Hard drive and Networking)
  • Actions that will almost always be successful
  • Database operations (when dealing with transactions and can rollback)
  • Fast prototyping in a throw away environment

LBYL (Look Before You Leap):

  • Irrevocable actions, or anything that may have a side effect
  • Operation that may fail more times than succeed
  • When an exception that needs special attention could be easily caught beforehand

Sometime you might even find yourself using both for complex or mission critical applications. Like programming the space shuttle, or getting your code above that 500 lines the teacher wants.

 

First steps into GUI design with Kivy

Boring Background

I have always had in interest in making a local photo organization tool, that supports albums and tagging the way I want to do it. Maybe down the line allowing others to connect and use it too. So I started on one a few years ago, using what I knew best, Python REST back-end with Angular JS web based front end.

And it worked decently well.

However I was never happy with it’s performance past 10K images (and I am dealing with hundreds of thousands) and I tried halfway through development to switch to Angular 2. And everything blew up.

Now that I am older and wiser less stupid, I can face the fact that local data is best served with a desktop native application. It also removes the need for trying to keep up to date with ever changing web standards and libraries, in exchange for ones that might last through an entire development process (crazy thought, I know.)

Choosing Kivy

To be honest, I didn’t even think about using Kivy for a desktop application at first. I went straight to PySide, as it’s less bad licensing compared to PyQT. However, at the time, it didn’t want to play ball with a halfway modern python version, 3.5, the oldest I will create new content on, 3.7-dev is preferred or 3.6 for stable. I then looked into alternatives, such as wx and Kivy. Lets just say a quick view at both of their home pages tells you plenty about who you trust more with front end design off the bat.

Even thought Kivy seems to be aimed at more mobile development, including touch capabilities, I have found it to be extremely capable as a desktop application. It is also very appealing to me as it is no BS, MIT licensed.

First Steps

Just like any good python GUI, Kivy was a pain in the arse to install. On Windows, I just ended up grabbing the pre-compiled binaries from a well-loved unofficial binaries page. On my Linux VirtualBox, ended up having to disable 3D acceleration for it to start.

However, once I was able to start coding, it became stupid simple to be able to get content on the screen fast, and manipulate them. After about four hours of coding in one night, I had this:

An image viewer app that could display any directory of images. It included a preview bar that you could select past or upcoming images from, and control with either mouse clicks or keyboard presses. The actual code for it can be found here (just be aware it isn’t ready for the lime light yet, plenty of fixes to go.)

Application Layout

The application is composed of six widget classes overall. Here is a simple visual diagram of what they look like, and the Kivy widget classes they are using. (The padding is for visual ease only, not part of the program or to scale).

First Lessons learned

The two biggest issues I ran into were proper widget alignment, and the fact there is no built-in ‘on_hover’ like I am used to with CSS. I wrote up a quick class to do the latter, and including it here as it should be a drop-in for other’s kivy projects.

Kivy Mouse Over Code

from kivy.factory import Factory
from kivy.properties import BooleanProperty, ObjectProperty
from kivy.core.window import Window
from kivy.uix.widget import Widget


class MouseOver(Widget):

    def __init__(self, **kwargs):
        Window.bind(mouse_pos=self._mouse_move)
        self.hovering = BooleanProperty(False)
        self.poi = ObjectProperty(None)
        self.register_event_type('on_hover')
        self.register_event_type('on_exit')
        super(MouseOver, self).__init__(**kwargs)

    def _mouse_move(self, *args):
        if not self.get_root_window():
            return
        is_collide = self.collide_point(*self.to_widget(*args[1]))
        if self.hovering == is_collide:
            return
        self.poi = args[1]
        self.hovering = is_collide
        self.dispatch('on_hover' if is_collide else 'on_exit')

    def on_hover(self):
        """Mouse over"""

    def on_exit(self):
        """Mouse leaves"""


Factory.register('MouseOver', MouseOver)

Then you simply have to subclass it as well as the original widget class you want your object to be, and override the on_hover and on_exit methods.

class Selector(Button, MouseOver):
    """ Base class for Prev and Next Image Buttons"""

    def on_hover(self):
        self.opacity = .8

    def on_exit(self):
        self.opacity = 0

Widget alignment

Proper widget alignment was fun to figure out, because Kivy has both hard set properties width and height for objects, as well as a more lose size_hint feature. By default, everything has a size_hint of (1, 1)  , think of this as a percentage of the screen,  so (1,1) == (100% height, 100% width).

Simple right? Well, now add two of those objects in base layout. Some layouts, like BoxLayout and GridLayout will then make each of them 50% of the screen. Others, like FloatLayout will let each of them take up the entire layer, so one will be hidden. Even more fun, is to disable it, you manually have to set size_hint to (None, None).

Then on top of that, you can start mix and matching, hinting and absolutes with different elements on the screen. I chose to do that in this case, because I always wanted the preview bar of images at the bottom to be only 100px tall. But I wanted the large image to expand to the full height.  But if you set the bar to 100px tall, and leave the image (1,1), with just raising it’s starting position 100px from the bottom, it now is 100px off the top of the page. So now on top of the size_hint you need to add an on_size method, that will reduce the height of the image object every time the window is resized, like so:

    def on_size(self, obj, size):
        """Make sure all children sizes adjust properly"""
        self.image.height = self.height - 100
        self.next_button.pos = (self.width - 99, self.next_button.hpos)

You can notice we also have make sure that the right hand button always looks attached to the right hand of the screen. (Which I later learned could probably be accomplished with the RelitiveLayout, but am still unsure how that would work compared to everything else I already have with my FloatLayout.)

So the takeaway for me was, even without CSS, you’re going to have fun with alignment issues.

Next Steps

So I have am image viewer, great! But you want to organize images into more manageable groups. I want to create a folder / album view to be able to select at a higher level what you want to view.

After that I am sure I will want to create a database system to store tags, album info and thumbnails (for faster preview loading).

Python Decorators

Often times in python there comes a need to have multiple functions act in a similar manner. It could be anything from making sure that similar functions output the right type of result, they all log when they are called or all report exceptions in the same way.

decorators An easy and repeatable way to accomplish this is . They look like:

@decorator
def my_function(print_string="Hello World"):
    print(print_string)

my_function()
# Hello World

Decorators are simply wrapping a function with another function. In other words, it is exactly the same as doing:

def my_function(print_string="Hello World"):
    print(print_string)

decorator(my_function)("My message")
# My message

So what does a decorator look like?

Decorator Template

from functools import wraps

def decorator(func):
    """This is an example decorators"""
    @wraps(func)
    def container(*args, **kwargs):
        # perform actions before running contained function
        output = func(*args, **kwargs)
        # actions to run after contained function
        return output
    return container

Running the code above does nothing extra currently. It is showing how a decorator runs another function within itself.

@decorator
def my_function():
    print "Hello World"

my_function()
# "Hello World"

Line by line breakdown

def decorator(func):

The name of the decorator function, which takes the wrapped function as its single argument.

@wraps(func)

Wraps is a built-in method that replaces the decorators name and docstring with that of the wrapped function, the section after the line by line breakdown explains why this is necessary.

def container(*args, **kwargs):

This inner function collects the parameters and keyword parameters that are going to be passed to the original function. This allows the decorator access to incoming arguments to verify or modify before the function is ever run.

output = func(*args, **kwargs)

This runs the function with the original arguments and captures the output. It is also possible to return a completely different result. It is more common to check or modify the output, like if you wanted to make sure everything returned was an integer.

return output

Don’t forget to actually return the original function’s output (or a custom one). Otherwise the function will simply return None.

return container

The container function is the actual function being called,  hence why *args, **kwargs are passed to it. It is necessary to return it from the outside decorator so it can be called.

The importance of Wraps

We need to incorporate wraps so that the function name and docstring appear to be from the wrapped function, and not those of the wrapper itself.

@decorator
def my_func():
    """Example function"""
    return "Hello"

@decorator_no_wrap
def second_func():
    """Some awesome docstring"""
    return "World"

help(my_func)
# Help on function my_func:

# my_func()
#    Example function

help(second_func)
# Help on function container:

# container(*args, **kwargs)

It is possible, though more work to accomplish the same thing yourself.

def decorator(func):
    """This is an example decorators"""
    def container(*args, **kwargs):
        return func(*args, **kwargs)
    container.__name__ = func.__name__
    container.__doc__ = func.__doc__
    return container

Useful Example

Now lets turn it into something useful. In this example we will make sure that the function returns the expected type of result. Otherwise it will raise an exception for us so there are not hidden complications down the road.

from functools import wraps

def isint(func):
    """ 
    This decorator will make sure the resulting value 
    is a integer or else it will raise an exception.
    """
    @wraps(func)
    def container(*args, **kwargs):
        output = func(*args, **kwargs)
        if not isinstance(output, int):
            raise TypeError("function did not return integer")
        return output
    return container

@isint
def add(num1, num2):
    """Add two numbers together and return the results"""
    return num1 + num2

print(add(1, 2))
# 3

print(add("this", "that"))
# Type Error: function did not return integer

Regular decorators are already called on execution, so you do not need to add ()s after their name, such as @isint. However, if the decorator accepts arguments, aka a meta-decorator, it will require () even if nothing additional is passed to it.

Meta-decorators

Passing arguments to a decorator turns it into a Meta-decorator. To pass these arguments in, it requires either another function wrapped around the decorator or turn the entire thing into a class.

from functools import wraps

def istype(instance_type=int):
    def decorator(func):
        """ 
        This decorator will make sure the resulting value is the 
        type specified or else it will raise an exception. 
        """
        @wraps(func)
        def container(*args, **kwargs):
            output = func(*args, **kwargs)
            if not isinstance(output, instance_type):
                raise TypeError("function did not return proper type")
            return output
        return container
    return decorator

@istype()
def add(num1, num2):
    """Add two numbers together and return the results"""
    return num1 + num2

@istype(str)
def reverse(forward_string):
    """Reverse and return incoming string"""
    return forward_string[::-1]


print(add(1, 2))
# 3

print(reverse("Hello"))
# "olleH"

print(add("this", "that"))
# Type Error: function did not return proper type

Remember running a decorator is equivalent to:

decorator(my_function)("My message")

Running a meta-decorator adds an additional layer.

reversed_string = istype(str)(reverse)("Reverse Me")

Hence why @decorator doesn’t require to be called when put above a function, but @istype() does.

You can also create this meta-decorator as a class instead of another function.

class IsType: # Instead of 'def istype'

    def __init__(self, inc_type=int):
        self.inc_type = inc_type

    def __call__(self, func): # Replaces 'def decorator(func)'
        @wraps(func)
        def container(*args, **kwargs):
            output = func(*args, **kwargs)
            if not isinstance(output, self.inc_type):
                raise TypeError("function did not return proper type")
            return output
        return container

In functionality they are the same, but be aware they are technically different types. This will really only impact code inspectors and  those trying to manually navigate code, so it is not a huge issue, but is something to be aware of.

type(IsType)
<class 'type'>

type(istype)
<class 'function'>

 Things to keep in mind

  1. Order of operation does matter. If you have multiple decorators around a single function keep in mind they are run from top to bottom
  2. Once a function is wrapped, it cannot be run without the wrapper.
  3.  Meta-decorators require the additional parentheses, regular decorators do not.