coding

Cartoon or Photo? – Image detection with Python

This all started with me just wanting a fast way to sort through image folders and remove cartoon images. That lead me down a spiraling rabbit hole of possibility. From using OpenCV to do different types of detection, or even training a Machine Learning model from scratch with Keras. This article will go through the multiple options available and learn how accurate they end up being.

What makes an image a cartoon?

Model is Jessica Marie Frye

If we are going to detect the differences between photos and cartoons, first we need to figure out how they are different. Importantly, how they are different in a quantifiable way. These are the measurable differences I could think up:

  • Cartoons have smoother gradients
  • Real images use a larger color palette
  • Cartoons usually have drawn edge outlines

Below we will try each of these options and see how well they fare.

Results First

You’re probably more interested in what will work the best for you and less about how I spent days toiling away at this, so to cut to the chase, here is how everything performed.

The best OpenCV contender turned out to be counting colors, with a combined 75% accuracy.

Overall, Machine Learning Image Classification using the Xception model wiped the floor with a combined 96% accuracy. This probably isn’t even as good as it could get with further fine tuning, but was more than good enough for my own needs.

These results were also with a hard threshold set to force it into either bucket of real or cartoon. I personally modify them for my own use to use a smaller range for absolutes, and then put the “unsure” ones in another folder. Further reducing any errant picks.

Machine Learning with Keras is the obvious pick if you have a good set of data to train with and a computer beefy enough to process it. However it’s not as portable and much longer than simply trying out one of the OpenCV methods.

OpenCV Gradient Differences

The first method we will use is pretty straight forward. We will blur the image a little, and compare it to it’s unaltered form and quantify the difference. First things first is that we will need opencv installed for python. Go into your venv for this and run:

pip install opencv-python

Now open up your IDE and create a new .py file to get started. First thing we are going to do is read the image into opencv which is the cv2 module. We will use a JPEG image as it will be the expected BRG color format.

import cv2

img = cv2.imread("/path/to/my/image.jpg")

Next we will blur the image a little using a bilateral filter, to even out the colors. We will also resize the image to a standard size so the blur across every image is the same.

img = cv2.resize(img, (1024, 1024))
color_blurred = cv2.bilateralFilter(img, 6, 250, 250)

You can check out the result to see how strong the effect is by previewing the image. Press any key to close the window.

# Optional Preview
cv2.imshow("blurred", color_blurred)
cv2.waitKey(0)
cv2.destroyAllWindows()

Then we need to compare this new color_blurred image to the original image. I accomplished this by comparing the histograms. We will have do that for each color individually.

diffs = []
for k, color in enumerate(('b', 'r', 'g')):
    print(f"Comparing histogram for color {color}")
    real_histogram = cv2.calcHist(img, [k], None, [256], [0, 256])
    color_histogram = cv2.calcHist(color_blurred, [k], None, [256], [0, 256])
    diffs.append(cv2.compareHist(real_histogram, color_histogram, cv2.HISTCMP_CORREL))

result = sum(diffs) / 3

compareHist will give us a result between 0 and 1 (one being the most similar.) We will need to set a threshold for how similar we cartoons will be. I have mine set at 0.98 (aka 98% similar.)

if result > 0.98:
    print("It's a cartoon!")
else: 
    print("It's a photo!")

And that’s it! Now you can test it out and see how it works for your images. I found this iteration to work very fast and have about a ~70% proper detection rate.

Let’s put it all together into a usable script!

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 0.98,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    # blur the image to "even out" the colors
    color_blurred = cv2.bilateralFilter(img, 6, 250, 250)

    if preview:
        cv2.imshow("blurred", color_blurred)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    # compare the colors from the original image to blurred one.
    diffs = []
    for k, color in enumerate(("b", "r", "g")):
        # print(f"Comparing histogram for color {color}")
        real_histogram = cv2.calcHist(img, [k], None, [256], [0, 256])
        color_histogram = cv2.calcHist(color_blurred, [k], None, [256], [0, 256])
        diffs.append(
            cv2.compareHist(real_histogram, color_histogram, cv2.HISTCMP_CORREL)
        )

    return sum(diffs) / 3 > threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=0.98,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

OpenCV Color Counting

Using a subset of 512 colors in a 1024×1024 image, determine how much of the image can be reproduced

Next possible way to approach the problem was just figuring out how many colors were used for the majority of the image. We will use all the same code to load and resize the image from above, just this time we will add a loop to count all the colors (slow way, faster option below):

    # Find count of each color
    a = {}
    for item in img.flatten():
        value = tuple(item)
        if value not in a:
            a[value] = 1
        else:
            a[value] += 1

Next we will sort the dictionary by the most used color, and add the count of the top 512 images together.

The actual calculation is a lot of functionality in a small block of code. First let’s get a visual preview of what is happening by re-creating the image with only the selected colors with:

mask = numpy.zeros(img.shape[:2], dtype=bool)

for color, _ in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]:
    mask |= (img == color).all(-1)

img[~mask] = (255, 255, 255)

cv2.imshow("img", img)
cv2.waitKey(0)
cv2.destroyAllWindows()

Here’s the code that basically calculates what you are seeing. We divide the sum of all the top colors by the size of the image to get what percent could be recreated with just those colors.

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum([x[1] for x in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]])
    return (most_common_colors / (1024 * 1024)) > 0.3 # new threshold

This script is pretty similar to the last one. And has a slightly higher success rate on my data set at 76%! It is 20% better at detecting what is a cartoon, but 10% worse at photos. It is also much much slower, some 20~50 times slower. (Will take a second per image instead of 0.05 of a second). However, we can speed that up by instead using some numpy trickery.

    # Replace everything after "Find count of each color" with this faster version, but it doesn't work with the preview.
    flattened = numpy.reshape(img, ((1024 * 1024), 3))
    multiplied = numpy.multiply(flattened, [100_000, 100, 1])
    sums = multiplied.sum(axis=1)
    unique, counts = numpy.unique(sums, return_counts=True)

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum(sorted(counts, reverse=True)[:512])
    return (most_common_colors / (1024 * 1024)) > threshold

Here is the entire script with the slower version that works with preview image. However after getting a good threshold I would suggest replacing the code with the faster version above.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2
import numpy


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 0.3,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    # Find count of each color
    a = {}
    for row in img:
        for item in row:
            value = tuple(item)
            if value not in a:
                a[value] = 1
            else:
                a[value] += 1

    if preview:
        mask = numpy.zeros(img.shape[:2], dtype=bool)

        for color, _ in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]:
            mask |= (img == color).all(-1)

        img[~mask] = (255, 255, 255)

        cv2.imshow("img", img)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum(
        [x[1] for x in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]]
    )
    return (most_common_colors / (1024 * 1024)) > threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=0.3,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

OpenCV Edge Detection

This I haven’t had much luck with, only about 55% successful at determine what type of image it is. A coin toss is about as accurate. I don’t recommend using it as is, but if you have any ideas or improvements, let me hear them!

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2
import numpy


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 4500,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    blurred_gray = cv2.medianBlur(gray, 3)

    edges = cv2.adaptiveThreshold(
        gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 5, 10
    )
    blurred_edges = cv2.adaptiveThreshold(
        blurred_gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 5, 10
    )

    if preview:
        cv2.imshow("edges", edges)
        cv2.imshow("blurred edges", blurred_edges)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    count_1 = numpy.count_nonzero(edges)
    count_2 = numpy.count_nonzero(blurred_edges)

    return abs(count_2 - count_1) < threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=4500,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

Keras Machine Learning Model

Time to take the kid gloves off, let’s use some ML image detection.

First we have to train a minified Xception model with a lot of hand picked good data. I used 556 cartoon images and 2295 real images in the training datasets. Then used that model for detection between another 2000+ unsorted images.

To get set by step details, please check out the same tutorial I used for this.

import shutil

import numpy as np
import os
import time
from pathlib import Path

import tensorflow as tf

root_dir = Path("/training/")
image_size = (180, 180)
batch_size = 32
epochs = 20
model_name = "my_model"


def make_model(input_shape, num_classes):
    inputs = tf.keras.Input(shape=input_shape)
    # Image augmentation block
    data_augmentation = tf.keras.Sequential(
        [
            tf.keras.layers.RandomFlip("horizontal"),
            tf.keras.layers.RandomRotation(0.1),
        ]
    )
    x = data_augmentation(inputs)

    # Entry block
    x = tf.keras.layers.Rescaling(1.0 / 255)(x)
    x = tf.keras.layers.Conv2D(32, 3, strides=2, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.Conv2D(64, 3, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [128, 256, 512, 728]:
        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = tf.keras.layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = tf.keras.layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = tf.keras.layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        activation = "sigmoid"
        units = 1
    else:
        activation = "softmax"
        units = num_classes

    x = tf.keras.layers.Dropout(0.5)(x)
    outputs = tf.keras.layers.Dense(units, activation=activation)(x)
    return tf.keras.Model(inputs, outputs)


def train_data():
    train_ds = tf.keras.preprocessing.image_dataset_from_directory(
        str(root_dir),
        validation_split=0.2,
        subset="training",
        seed=1337,
        image_size=image_size,
        batch_size=batch_size,
    )
    val_ds = tf.keras.preprocessing.image_dataset_from_directory(
        str(root_dir),
        validation_split=0.2,
        subset="validation",
        seed=1337,
        image_size=image_size,
        batch_size=batch_size,
    )

    train_ds = train_ds.prefetch(buffer_size=32)
    val_ds = val_ds.prefetch(buffer_size=32)

    model = make_model(input_shape=image_size + (3,), num_classes=2)
    tf.keras.utils.plot_model(model, show_shapes=True)

    callbacks = [
        tf.keras.callbacks.ModelCheckpoint(f"{model_name}_{{epoch}}.h5"),
    ]
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss="binary_crossentropy",
        metrics=["accuracy"],
    )
    model.fit(
        train_ds, epochs=epochs, callbacks=callbacks, validation_data=val_ds,
    )


def clean_images():
    move_to = root_dir.parent / "bad_data"  # needs to be not in the training directory
    move_to.mkdir(exist_ok=True)
    moved = 0
    for directory in root_dir.glob("*"):
        if directory.name == move_to.name:
            continue
        if directory.is_dir():
            for i, file in enumerate(directory.glob("*")):
                if not file.name.lower().endswith(("jpg", "jpeg")) or not tf.compat.as_bytes("JFIF") in file.open("rb").read(10):
                    shutil.move(file, move_to / file.name)
                    moved += 1
        print("moved unclean data", moved, "from", directory)


def move_images():
    model = tf.keras.models.load_model(f"{model_name}_{epochs}.h5")
    cartoon_dir = Path("/cartoon/")
    real_dir = Path("/real/")

    real, cartoon, unknown = 0, 0, 0

    for file in Path("/unsorted/").glob("*.[jpg][jpeg][png]"):

        img = tf.keras.preprocessing.image.load_img(
            str(file), target_size=image_size
        )
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = tf.expand_dims(img_array, 0)  # Create batch axis

        predictions = model.predict(img_array)
        score = predictions[0][0]
        if score > 0.98:
            real += 1
            shutil.move(file, real_dir / file.name)
        elif score < 0.02:
            cartoon += 1
            shutil.move(file, cartoon_dir / file.name)
        else:
            unknown += 1
            print(f"Could not figure out {file} as it was {score * 100}")
    print(f"Moved {real} to real and {cartoon} to cartoon, {unknown} were unmoved")


if __name__ == '__main__':
    clean_images()
    train_data()
    move_images()

I personally think 95% overall accuracy is amazing for no additional tuning. As well as not using this model for not exactly it’s intended use case. Generally we think about classifying items in the image (cat vs dog) not type of image (anime cat vs real cat).

pre-commit – Check yourself before you wreck yourself

pre-commit checks are like prenups. They run before you commit, get the dirty stuff out of the way, and may even save your relationship(s) in the long run. The difference is that pre-commit checks are written by programmers and not lawyers, so they are a lot easier to read and implement.

There are several types of pre-commit checks that can be run. Some common features you could chose are:

  • git sanity checks (no huge files, no bad merge lines, etc…)
  • code style enforcement
  • whitespace fixes
  • python specific checks

Why pre-commit?

Just as you should do a quick check in the mirror before heading out, pre-commits are a quick reflection of your code before making it public. They quickly check everything you’re about to add to git to make sure it’s copasetic. Many even automatically fix common issues for you.

Basically pre-commits make sure your code’s tee-shirt isn’t on inside out.

How to pre-commit

There are three basic steps to get pre-commit working with your code repo.

  1. Install the pre-commit tool onto your system
  2. Add a pre-commit configuration file to your repo
  3. Install the git hook for the repo

1. Install pre-commit

There are several different official ways to install pre-commit. I personally do not like packages polluting my global python site-packages. Instead, I install it with user level only privileges. Then I make sure its install path is added to the system path.

pip install pre-commit --user

You will receive a warning that the script installation location is not on the system path. (Unless you have done this before, and then you can skip the next part.)

If you really don’t care and are very good about using isolated virtual environment, you can just install it globally with pip install pre-commit without the --user flag and skip the next part.

Windows

Installing collected packages: pre-commit
  WARNING: The scripts pre-commit-validate-config.exe, pre-commit-validate-manifest.exe and pre-commit.exe are installed in 'C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts' which is not on PATH.

The above warning is what I received while installing this on windows. So I coped that path C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts to my clipboard and then went through the following process to add it to my system path.

First, search for “edit system” on the window search bar and click the highlighted link.

Second, click on the “Environment Variables…” button at the bottom.

Third, in the bottom section under “System variables” click on “Path” and hit “Edit…”

Finally on the new window hit “New” in the top right and add the path previously copied to the clipboard. Then Hit Ok on all the windows to close them.

You will have to restart any cmd sessions you have opened. When you do, you should be able to run pre-commit just fine.

Linux

It’s a bit simpler to add the custom install location to the system path on linux.

INSTALL_PATH=/home/james/.local/bin  # Change to the path printed in your warning
echo "export PATH=\"${INSTALL_PATH}:\$PATH\"" >> ~/.bashrc 
source ~/.bashrc

2. The config file

In your repo, you will need to create a file named .pre-commit-config.yaml and chose which checks to add for your code. This is a config file that I use (with opinionated formatters and mypy removed). You can find the full list of built-in supported checks at the pre-commit github repo.

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:

    # Identify invalid files
    - id: check-ast                        
    - id: check-yaml                       
    - id: check-json                       
    - id: check-toml                       

    # git checks
    - id: check-merge-conflict             
    - id: check-added-large-files          
      exclude: tests/media/.+
    - id: detect-private-key               
    - id: check-case-conflict              

    # Python checks
    - id: check-docstring-first            
    - id: debug-statements                 
    - id: requirements-txt-fixer           
    - id: fix-encoding-pragma              
    - id: fix-byte-order-marker            

    # General quality checks
    - id: mixed-line-ending                
    - id: trailing-whitespace              
      args: [--markdown-linebreak-ext=md]  
    - id: check-executables-have-shebangs  
    - id: end-of-file-fixer                

Personally I also always use black code formatter. It is not part of the standard checks, but is still easy to add.

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:
       # ... 
  # Add at same level as the first pre-commit-hooks repo
  - repo: https://github.com/psf/black
    rev: 21.6b0 
    hooks:
      - id: black

3. Adding the pre-commit hook to git

Go into the repo with the config file at it’s root, and simply type:

pre-commit install

The first time you add it you will also want to run it on all the files in the repo to shore them all up (usually it only runs on the changed files).

pre-commit run --all-files

It will take a few minutes to install the virtual env for running these checks, but will be much faster after the first time. It should look something like:

pre-commit run --all-files
[INFO] Initializing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Initializing environment for https://github.com/psf/black.
[INFO] Installing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
[INFO] Installing environment for https://github.com/psf/black.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
Check python ast.........................................................Passed
Check Yaml...............................................................Passed
Check JSON...........................................(no files to check)Skipped
Check Toml...............................................................Passed
Check for merge conflicts................................................Passed
Check for added large files..............................................Passed
Detect Private Key.......................................................Passed
Check for case conflicts.................................................Passed
Check docstring is first.................................................Passed
Debug Statements (Python)................................................Passed
Fix requirements.txt.....................................................Passed
Fix python encoding pragma...............................................Passed
fix UTF-8 byte order marker..............................................Passed
Mixed line ending........................................................Passed
Trim Trailing Whitespace.................................................Failed
- hook id: trailing-whitespace
- exit code: 1
- files were modified by this hook

Fixing README.md
Fixing docs/README.md
Fixing docs/build-licenses.txt

Check that executables have shebangs.....................................Passed
Fix End of Files.........................................................Failed
- hook id: end-of-file-fixer
- exit code: 1
- files were modified by this hook

Fixing docs/build-licenses.txt
Fixing .pre-commit-config.yaml

black....................................................................Passed

4. Sit back and relax

Congrats, you are now totally almost protected from the most common silly mistakes you used to make. You now have the opportunity to make completely new mistakes you wouldn’t have thought of before!

Python’s New Structural Pattern Matching!

It’s finally happened, after years of complaining and told “it will never happen”, I got my darn switch statement! And honestly, calling it only a “switch” statement is really underselling it. It’s much more powerful than most people will ever use! You might have already heard about this, but I wanted to make sure it made it to at least the 3.10 beta phase, as the alphas are no guarantee you’ll actually see the new feature (still possible to remove before the RC phase, but less likely).

Artwork by Clara Griffith

Python 3.10 will introduce “Structural Pattern Matching” as introduced in PEP622 which is a crazy advanced switch statement that can recognizing patterns. Instead of the keyword switch Python will introduce match instead (get ready to update your regex variable names!). Let’s do a little quick compare with it and JavaScript’s switch statement. First let’s start off with the Javascript example modified from w3schools.

Javascript’s Switch

switch (new Date().getDay()) {
  case 5:
    console.log("It's Friday!");
    break;
  case 0:
  case 6:
    console.log("Woo, Weekend!");
    break;
  default:
    console.log("Work. Work. Work. Work.");
}

Python’s Structural Pattern Matching

We can accomplish the same with Python in less lines. Note that in Python the weekday starts at 0 for Monday.

from datetime import datetime

match datetime.today().weekday():  
    case 4:
        print("It's Friday!")
    case 5 | 6:
        print("Woo, Weekend!")
    case _:
        print("Work. Work. Work. Work.")

Similarities and Differences

With Python’s new match style statements, the biggest change from tradition is no “drop through” cases. Aka you don’t have to manually specify break or anything to exit the match case for every instance. That does mean though that you can’t have multiple matches with the same result. Instead you can combine checks with | for “or” operations. Which I honestly think is a lot cleaner.

What I don’t like is there is no default keyword or similar, and you have to use the _ wildcard. The underscore is already overused in my opinion. However it does add a lot of power for more advanced cases. Let’s dive into some more examples.

Matching and Assigning

The very basic things to understand about the new Structural Pattern Matching is the flow (as seen in the example above) and the possibility of assignment. Because not only can you make sure things are equal to another literal, you can detect patterns (as the name suggests) and then work with the variables inside the pattern itself.

Pattern Recognition

Lets dip the toes in on this whole “pattern matching” thing, what does that mean?

Say you have a tuple with two objects, that could be in either order and you always want to print it right.

data_1 = ("Purchase Number", 574)
data_2 = (574, "Purchase Number")

You could have a simple match case to straighten it out so it always prints “Purchase Number 574”. As the pattern matching support type checking.

match data_1:
    case str(x), int(y):
        print(x, y) 
    case int(x), str(y):
        print(y, x)

Match on dict values

Just like with regular value comparisons, you can check dictionary values and still do or operations. In this case, either grade b or c will print out "Welcome to being average!"

match {'grade': 'B'}:
    case {'grade': 'a' | 'A'}:
        print("You're a star!")
    case {'grade': 'b' | 'c' | 'B' | 'C'}:
        print("Welcome to being average!")

Unpacking arbitrary data

You can use the standard * and ** unpackers to capture and unpack variable length lists and dicts.

my_dict = {'data': [{'action': 'move'}, {'action': 'stay'}],
           'location': "square_10"}

match my_dict:
    case {'data': [*options], **remainder }:
        print(options)
        print(remainder)
    case _:
        raise ValueError("Data not as expected")

Will match with the first case and print out:

[{'action': 'move'}, {'action': 'stay'}]
{'location': 'square_10'}

That can really come in handy if dealing with variable length data.

Adding the “if”

You can also add in some inline checks.

cords = (30.25100365332043, -97.86221371827051, "Workplace")
cords_2 = (44.40575746530253, 8.947747627912994)

match cords: 
    case lat, lon, *_ if lat > 0:
        print("Northern Hemisphere")
    case lat, lon, *_ if lat < 0: 
        print("Sothern Hemisphere")    

Don’t forget about “as”

Sometimes you might want one the many literals you are looking for to be usable in the case itself. Lets put a lot of our knowledge together and check out the added power of the as clause.

We are going to build a really simple command line game to look for random things.

import sys
import random

inventory = []

while True:
    match input().lower().split():
        # Remember `split` makes the input into a list.  
        # These first two cases are functionally the same.
        case ["exit"]:  
            sys.exit(0)
        case "stop", *_: 
            sys.exit(0)
        case "travel", "up" | "down" | "left" | "right" as direction:
            print(f"Going {direction}")
        case "search", "area" | "backpack" as thing, _, *extra:
            item = " ".join(extra)
            if thing == "backpack":
               if item  in inventory:
                   print(f"Yes you have {item}")
               else:
                   print(f"No you don't have {item}")
            elif thing == "area":
                 if random.randint(1, 10) > 5:
                     inventory.append(item)
                     print(f"You found {item}!")
                 else:
                    print(f"No {item} here")
            else:
                print(f"Sorry, you cannot search {thing}")
        case _:
            print("sorry, didn't understand you! type 'exit' or 'stop' to quit")

Lets do a quick playthrough:

> search area for diamonds
You found diamonds!

> search backpack for diamonds
Yes you have diamonds

> take a short rest and try to eat the diamonds
sorry, didn't understand you! type 'exit' or 'stop' to quit

> Travel Up
Going up

> search area for candy
No candy here

> search backpack for candy
No you don't have candy

> stop
# Process finished with exit code 0

Notice our special case for searching around.

case "search", "area" | "backpack" as thing, _, *extra:

we are looking for the first keyword “Search”, then either “area” or “backpack” and saving which to the variable thing. The _ will ignore the next word as we expect them to type for or something useless there. Finally we grab everything else and treat it as a single item.

Classes and More

Using dataclasses with match is super easy.

from dataclasses import dataclass

@dataclass
class Car:
    model: str
    top_speed: int

car_1 = Car("Jaguar F-Type R", 186)
car_2 = Car("Tesla Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(x, speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(x, speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

Using a regular class, you have to be a pit more explicit about the varaibles that are being used.

class Car:

    def __init__(self, model, top_speed):
        self.model = model
        self.top_speed = top_speed


car_1 = Car("F-Type R", 186)
car_2 = Car("Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(model=x, top_speed=speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(model=x, top_speed=speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

However, there is a way around that! You can add __match_args__ to the class itself to define which arguments you will want to use for the pattern recognition.

class Car:

    __match_args__ = ["model", "top_speed"]
    def __init__(self, model, top_speed):
        self.model = model
        self.top_speed = top_speed


car_1 = Car("F-Type R", 186)
car_2 = Car("Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(x, speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(x, speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

Summary

Structural Pattern Matching is Python’s sexy new “switch” statement that will bring a lot of power, and removal of a lot of old if elif blocks. The only real downside is that it’s brand new, and will take a few years for most people to be able to use it with their default interpreter.

Keep Windows from going to sleep, no power settings needed!

Say you have a long running process that you let go overnight, only to come back the next morning and realize “Oh no, only an hour into it, my computer went to sleep!”. Thanks to some Windows internals, it’s possible to make it realize there is a background task running and should not go to sleep.

Artwork by Clara Griffith

I personally use this exact method in my FastFlix program to make sure the computer doesn’t interrupt the coding process. It should work with any version of Python3, and it boils down too:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import ctypes
import time

CONTINUOUS = 0x80000000
SYSTEM_REQUIRED = 0x00000001
DISPLAY_REQUIRED = 0x00000002 

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED)
try:
    # This is where you would do stuff
    while True:
        time.sleep(600) 
finally:
    ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS)

As this example shows, you let this run in the background to always keep your computer from sleeping. Just be careful you don’t abuse your IT policies by allowing your computer to stay on for a lot longer than they want.

It’s pretty straightforward code, the main function being SetThreadExecutionState.

ES_CONTINUOUS 0x80000000Informs the system that the state being set should remain in effect until the next call that uses ES_CONTINUOUS and one of the other state flags is cleared.
ES_DISPLAY_REQUIRED 0x00000002Forces the display to be on by resetting the display idle timer.
ES_SYSTEM_REQUIRED 0x00000001Forces the system to be in the working state by resetting the system idle timer.
Table from https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setthreadexecutionstate

Notice that it doesn’t take multiple arguments, like what you expect in Python. Instead it accepts a single hex value that represents the culmination of all the options you want to set. Hence why you see us passing them in via a “bitwise or” expression. For example, if you also wanted to force the screen to stay on, just need to add:

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED | DISPLAY_REQUIRED)

After this is set, it is extremely important to remember to always reset it back to ES_CONTINUOUS after your work is done.

Use atexit for safety

In our example above, we simply add this in a finally clause. For a more robust program you may want to ensure it’s always called via atexit.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import ctypes
import atexit

CONTINUOUS = 0x80000000
SYSTEM_REQUIRED = 0x00000001
DISPLAY_REQUIRED = 0x00000002 

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED)

@atexit.register
def cleanup():
    ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS)

# Do Stuff    

Interprocess Communications

Inter-Office-Process Communications, Artwork by Clara Griffith

Inter-Office-Process Communications, Artwork by Clara Griffith

Dealing with communications across programs, processes or even threads can be a real pain in the patella. Each situation usually calls for something slightly different and has to work with a limited set of options. On top of that, a lot of tutorials I see are people simply starting one program from inside the other (for example with subprocess) but that just won’t do for my taste.

Just show me the table of which IPC method I need 

I’m going to go through options that can handle have two independent programs, started on their own, talk with each other. And when I say talk, I mean able to at least execute tasks in the other’s process and, in most cases, transfer data as well.

Pipes

Probably one of the most old school ways of transferring data is to just pipe it back and forth as needed. This is how terminals and shells interact with programs, by using the standard pipes of stdin, stdout, and stderr (standard in, standard out, standard error).

Every time you print you are writing to stdout and it is very common to use this in Python when running another program from within it (i.e. if we did subprocess.run('bob.py') we could easily communicate with it via the pipes). These are anonymous pipes, that exist only while the program is running. To communicate between different programs you should use a Named Pipe which creates a file descriptor that you connect to as an entry point.

Here are some short examples showing their use on Linux. It is also possible on Windows with the pywin32 module or do it yourself with ctypes. But I find it easier to just use other methods on Windows.

Our example will be two programs, the first of which, Alice.py is simply a text converter. The message sent will be in three parts, a starting identifier, X, four integers to denote the message length, and the message itself. (This is by no means a common standard or practical, just something I made up for a quick example.)

So to send ‘Howdy’, it would look like X0005HOWDY, X being the identifier of a new message, 0005 denoting the length of the message to come, and HOWDY being the message body.

Alice.py

import time 
import os 

# This is a full path on the file system, 
# in this scenario both are run from the same directory
pipename = 'fifo'
os.mkfifo(pipename)

# For non-blocking pipes, you have to be reading from it 
# before you can have something else writing to it
pipein = os.open(pipename, os.O_NONBLOCK|os.O_RDONLY) 
p = os.fdopen(pipein, 'rb', buffering=0)

# This program will simply make the output of the other program more readable
def converter(message):
    return message.decode('utf-8').replace("_", " ").replace("-", " ").lower()

while True:
    # Wait until we have a message identifier
    new_data = p.read(1)
    if new_data == b'X':
        # Figure out the length of the message 
        raw_length = p.read(4)
        message = p.read(int(raw_length))
        # Read and convert the message 
        print(converter(message))
    elif new_data:
        # If we read a single byte that isn't an identifier, something went wrong
        raise Exception('Out of sync!') 
    else:
        time.sleep(1)

That’s all our conversion server is. It creates a pipe that something else can connect to, and will convert the incoming messages to lower case and replace dashes and underscores with spaces.

So let’s have Bob.py talk to Alice.py, but as Bob is a computer program, he sometimes spits out gobilty gook messages that need some help.

Bob.py

import os 

# Connect to the pipe created by Alice.py
pipename = 'fifo'
pipeout = os.open(pipename, os.O_NONBLOCK|os.O_WRONLY) 
p = os.fdopen(pipeout, 'wb', buffering=0, closefd=False)

def write_message(message):
    """Covert a string into our message format and send it"""
    length = '{0:04d}'.format(len(message))
    p.write(b'X')
    p.write(length.encode('utf-8'))
    p.write(message.encode('utf-8'))
     
write_message('TERriBLe_looking-machine-oUTput')

Start up Alice.py first, then Bob.py.

Alice will print out a pretty message of:  terrible looking machine output

While pipes are super handy for terminal usage and running programs inside each other, it has become uncommon to use named pipes as actual comms between two independent programs. Mainly because of the required setup procedure and non-uniformity across operating systems. This has lead to more modern and cross-compatible being preferred.

Pros: 

  • Fast and Efficient
  • No external services

Cons:

  • Not cross-platform compatible code
  • Difficult to code well

Files

Another ye olde (yet perfectly valid) way to communicate between programs is to simply create files that each program can interpret. Sometimes it’s as simple as having a lockfile. If the lockfile exists, it can serve as a message to other programs to let it finish before they do something, or even to stop new instances of itself from running. For example, running system updates in most Linux environments will create a lock file to make sure that two different update processes aren’t started at the same time.

It’s possible to take that idea further and share a file or two to actually transfer information. In these examples, the two programs will both work out of the same file, with a lock file for safety. (You can write your own code for file lock control, but I will be using the py-filelock package for brevity.)

There are a lot of possible ways to format the shared file, this example will keep it very basic, giving each command a unique id (used to know if command has been run before or not), then the command, and it’s argument. The same dictionary will also leave room for a result or an error message.

The shared JSON file will have the following format:

{
  "commands": { 
    "<random uuid>": {
      "command": "add",
      "args": [2,5],
      "result": 7  # "result" field only exists after server has responded
      # if "error" key exists instead of "result", something bad happened
    }
  }
}

The server, Alice.py will then keep looping, waiting for the shared file to change. Once it does, Alice will obtain the lock for safety (so there isn’t any corrupt JSON from writing being interrupted), read the file, run the desired command, and save the result.  In a real world scenario the lock would only be obtained during the individual reading and writing phases, so to keep the lock held as short as possible by a single program. But that complicates the code (as then you would have to do a second read before writing and only update the sections you ran, in case there were other ones added) and makes it a bit much for an off the shelf example.

Alice.py

import json
import time
import os

from filelock import FileLock

# Keep track of commands that have been run 
completed = []
# Track file size to know when new commands have been added
last_size = 0

lock_file = "shared.json.lock"
lock = FileLock(lock_file)
shared_file = "shared.json"


# Not totally necessary, but if you ever need to raise exceptions
# it's better to create your own than use the built-ins
class AliceBroke(Exception):
    """Custom exception for catching our own errors"""


# Functions that can be executed by Bob.py
def adding(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to add numbers')
    return a + b


def multiply(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to multiply numbers')
    return a * b


# Right now just have a way to map the incoming strings from Bob to functions
# This could be expanded to include what arguments it expects for pre-validation
translate = {
    "add": adding,
    "multiply": multiply
}


def main():
    global last_size, completed
    while True:
        # Poor man's file watcher
        file_size = os.path.getsize(shared_file)
        if file_size == last_size:
            # File hasn't changed
            time.sleep(.1)
            continue
        else:
            print(f'File has changed from {last_size} to {file_size} bytes,'
                  f' lets see what they want to do!')
            last_size = file_size

        run_commands()
        last_size = os.path.getsize(shared_file)


def run_commands():
    # Grab the lock, once it is acquired open the file
    with lock, open(shared_file, 'r+') as f:
        data = json.load(f)
        # Iterate over the command keys, if we haven't run it yet, do so now
        for name in data['commands']:
            if name not in completed:

                command = data['commands'][name]['command']
                args = data['commands'][name]['args']
                print(f'running command {command} with args {args}')

                try:
                    data['commands'][name]['result'] = translate[command](*args)
                except AliceBroke as err:
                    # Arguments weren't the type we were expecting
                    data['commands'][name]['error'] = str(err)
                except TypeError:
                    data['commands'][name]['error'] = "Incorrect number of arguments"
                finally:
                    completed.append(name)
        # As we are writing the data back to the same file that is still
        # open, we need to go back to the begging of it before writing
        f.seek(0)
        json.dump(data, f, indent=2)


if __name__ == '__main__':
    # Create / blank out the shared file
    with open(shared_file, 'w') as f:
        json.dump({"commands": {}}, f)
    last_size = os.path.getsize(shared_file)

    try:
        main()
    finally:
        # Be nice and clean up after ourselves
        os.unlink(shared_file)
        os.unlink(lock_file)

Our client, Bob.py will ask some simple commands that Alice supports and wait until it gets the answer back.

Bob.py

import json
import time
import uuid

from filelock import FileLock

completed = []
last_size = 0

lock = FileLock("shared.json.lock")
shared_file = "shared.json"


def ask_alice(command, *args, wait_for_answer=True):
    # Create a new random ID for the command
    # could be as simple as incremented numbers 
    cmd_uuid = str(uuid.uuid4())
    with lock, open(shared_file, "r+") as f:
        data = json.load(f)
        data['commands'][cmd_uuid] = {'command': command, 'args': args}
        f.seek(0)
        json.dump(data, f)
    if wait_for_answer:
        return get_answer(cmd_uuid)
    return cmd_uuid


def get_answer(cmd_uuid):
    # Wait until we get an answer back for a command
    # Ideally this would be more of an asynchronous callback, but there 
    # are plenty of cases where serialized processes like this must happen
    while True:
        with lock, open(shared_file) as f:
            data = json.load(f)
            command = data['commands'][cmd_uuid]
            if 'result' in command:
                return command['result']
            elif 'error' in command:
                raise Exception(command['error'])
        time.sleep(.2)



print(f"Lets add 2 and 5 {ask_alice('add', 2, 5, wait_for_answer=True)}")

print(f"Lets multiply 8 and 5 {ask_alice('multiply', 2, 5, wait_for_answer=True)}")

print("Lets break it and cause an exception!")
ask_alice('add', 'bad', 'data', wait_for_answer=True)

Start up Alice.py first, then Bob.py.

Alice will return:

File has changed from 16 to 90 bytes, lets see what they want to do!
running command add with args [2, 5]
File has changed from 103 to 184 bytes, lets see what they want to do!
running command multiply with args [2, 5]
File has changed from 198 to 283 bytes, lets see what they want to do!
running command add with args ['bad', 'data']

Bob

Lets add 2 and 5: 7
Lets multiply 8 and 5: 40
Lets break it and cause an exception!
Traceback (most recent call last):
     ...
Exception: Yeah, we are only going to add numbers

 

Pros:

  • Cross-platform compatible
  • Simple to implement and understand

Cons: 

  • Have to worry about File System security if anything sensitive is being shared
  • Programs now responsible to clean up after themselves

 

Message Queue

Welcome to the 21st Century, where message queues serve as quick and efficient ways to transfer commands and information. I like to think of them as an always running middlemen that can survive outside your process.

Here you are spoiled for choice with options:  ActiveMQ, ZeroMQ, Redis, RabbitMQSparrowStarlingKestrelAmazon SQSBeanstalkKafkaIronMQ, and POSIX IPC message queue are the ones I know. You can even use NoSQL databases like mongoDB or couchDB in a similar manner, though for simple IPC I suggest against using those.

I suggest looking into RabbitMQ’s tutorials to get a good in-depth look at how you can write code for it (and across multiple different languages). RabbitMQ is cross-platform and even provides Windows binaries directly, unlike some others.

For my own examples we will use Redis, as they have the best summary I can steal quote from their website https://redis.io/ yet have a surprising lack of Python tutorials.

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, [etc…]

That’s it, it stores data somewhere that is easily access from multiple programs, just what you need for IPC. You can use it as a keystore (for data transfer and storage) or, like I am about to show, a publisher / subscriber model, better for command and control.

Alice.py is our service and simply subscribes to a channel and waits for any updates and will print them to the screen.

import time

import redis

r = redis.StrictRedis()
p = r.pubsub()


def handler(message):
    print(message['data'].decode('utf-8'))


p.subscribe(my_channel=handler)
thread = p.run_in_thread(sleep_time=0.001)

try:
    # Runs in the background while your program otherwise operates
    time.sleep(1000)
finally:
    # Shut down cleanly when all done
    thread.stop()

Bob.py is another program just simply pushes a message to that channel

import redis

r = redis.StrictRedis()
p = r.pubsub()

r.publish('my_channel', 'example data'.encode('utf-8'))

Start up Alice.py first, then Bob.py.

Alice.py will print example data, and can be stopped by pressing CTRL+C

Pros:

  • Built-in publisher / subscriber methodology, don’t have to do manual checks
  • Built-in background thread running
  • (Can be) Cross-platform compatible
  • State can be saved if a program exits unexpectedly (depending on setup)

Cons: 

  • Requirement for external service
  • More overhead
  • Have to worry about the external service’s security and how you connect to it

Shared Memory

If written correctly, this can one of the fastest way to transfer data or execute tasks between programs, but it is also the most low-level, meaning a lot more coding and error handling yourself.

Think of using memory in the same way as the single file with using a lock. While one program writes to memory, the other has to wait until it finishes, then can read it and write its own thing back. (It’s possible to also use multiple sections of memory, just it gets to be a lot of example code really fast so I am holding off.)

So now you have to create the Semaphore (basically a memory lockfile) and mapped memory that each program can access.

On Linux (and Windows with Cywin) you can use posix_ipc, and check out their example here.

A lot simpler is to just use the built in mmap directly and share a file in memory. This example won’t even go as far as creating the lockfile,  just showing off the basics only.

Alice.py is going to create and write stuff to the memory mapped file

Alice.py

import time
import mmap
import os

# Create the file and fill it with line ends
fd = os.open('mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.write(fd, b'\n' * mmap.PAGESIZE)

# Map it to memory and write some data to it, then change it 10 seconds later
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_WRITE)
buf.write(b'now we are in memory\n')
time.sleep(10)
buf.seek(0)
buf.write(b'again\n')

Bob.py will simply read the content of the memory mapped file. Showing that it does know when the contents change.

Bob.py

import mmap
import os
import time


# Open the file for reading only
fd = os.open('mmaptest', os.O_RDONLY)
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_READ)

# Print when the content changes
last = b''
while True:
    buf.seek(0)
    msg = buf.readline()
    if msg != last:
        print(msg)
        last = msg
    else:
        time.sleep(1)

This example isn’t the most friendly to run, as you have to start-up Alice.py and then Bob.py within ten seconds after that, but it shows the basics of how memory mapping is very similar to just using a file.

Pros:

  • Fast
  • Cross-platform compatible code

Cons: 

  • Can be difficult to write
  • Limited size to accessible memory
  • Using mmap without posix_ipc will also create a physical file with the same content

 

Signals

On Linux? Don’t want to send information, just need to toggle state on something? Send a signal!

Imagine you have a service, Alice, that anything local can connect to, but you want to be able to tell them if the service goes down or comes back up.

In your clients code, they should register their process identification number with Alice when they first connect to her, and have a method to capture a custom signal to know Alice‘s current state.

Bob.py

import signal, os

service_running = True

my_pid = os.getpid()

# 'Touch' a file as the name of the program's PID 
# in Alice service's special directory
# Make sure to delete this file when your program exists!
open(f"/etc/my_service/pids/{my_pid}", "w").close()


    
def service_off(signum, frame):
    global service_running 
    service_running = False

def service_on(signum, frame):
    global service_running 
    service_running = True

signal.signal(signal.SIGUSR1, service_off)
signal.signal(signal.SIGUSR2, service_on)

SIGUSR1 is a custom signal reserved for custom use, as well as SIGUSR2, so they are safe to use in this manner without fear of secondary actions happening. (For example, if you send something like SIGINT , aka interrupt, it will just kill your program if not caught properly.)

Alice.py will then simply go through the directory of PID files when it starts up or shuts down, and sends each one that signal to let them know she’s back online.

import os
import signal 

def let_them_know(startup=True): 
   signal_to_send = signal.SIGUSR1 if startup else signal.SIGUSR2
    for pid_file in os.listdir(f"/etc/my_service/pids/"):
        # Put in try catch block for production
        os.kill(int(pid_file), signal_to_send) 

Pros:

  • Super simple

Cons: 

  • Not cross-platform compatible
  • Cannot transfer data

Sockets

The traditional IPC. Every time I look for different IPC methods, sockets always come up. It’s easy to understand why, they are cross platform and natively supported by most languages.

However, dealing directly with raw sockets is very low-level and require a lot more coding and configuration. The Python standard library has a great example of an echo server to get you started.

But this is batteries included, everyone-else-has-already-done-the-work-for-you Python. Sure you could set up everything yourself, or you can use the higher level Listeners and Clients from the multiprocessing library.

Alice.py is hosting a server party, and executing incoming requests.

from multiprocessing.connection import Listener

def function_to_execute(*args):
    """" Our handler function that will run 
         when an incoming request is a list of arguments
    """
    return args[0] * args[1]

with Listener(('localhost', 6000), authkey=b'Do not let eve find us!') as listener:
    # Looping here so that the clients / party goers can 
    # always come back for more than a single request
    while True:
        print('waiting for someone to ask for something')
        with listener.accept() as conn:
            args = conn.recv()
            
            if args == b'stop server':
                print('Goodnight')
                break
            elif isinstance(args, list):  
                # Very basic check, must be more secure in production
                print('Someone wants me to do something')
                result = function_to_execute(*args)
                conn.send(result)
            else:
                conn.send(b'I have no idea what you want me to do')

Bob.py is going to go for just a quick function and then call it a night.

from multiprocessing.connection import Client

with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    conn.send([8, 8])
    print(f"What is 8 * 8? Why Alice told me it's {conn.recv()}")

# We have to connect again because Alice can only handle one request at a time
with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    # Bob's a party pooper and going to end it for everyone
    conn.send(b'stop server')

Start up Alice.py first, then run Bob.py. Bob will quickly exit with the message What is 8 * 8, Why Alice told me it's 64

Alice will have four total messages:

waiting for someone to ask for something
Someone wants me to do something
waiting for someone to ask for something
Goodnight

Pros:

  • Cross-platform compatible
  • Can be extended to RPC

Cons: 

  • Slower
  • More overhead

 

RPC

Believe it or not, you can use a lot of the methods from remote procedure calls locally. Even sockets and message queues can already be set up to work for either IPC or RPC.

Now, barring using IR receivers and transmitters or laser communications, you are probably going to connect remote programs via the internet. That means most RPC standards are going to be networking based, aka on sockets. So it’s less about deciding which protocol to use, and more on choosing which data transmission standard to use. Two that I have used are JSONRPC, for normal humans, and XMLRPC, for if you are a XML fan that like sniffing glue and running with scissors 1. There is also SOAP, for XML fans who need their acronyms to also spell something, and Apache Thrift that I found while doing research for this article that I have not touched. Those standards transfer data as text, which makes it easier to read, but inefficient. Newer options, like gRPC use protocol buffers to serialize the data and reduce overhead.

It’s also very common and easy to just write up a simple HTTP REST interface for you programs, using a lightweight framework like bottle or flask, and communicate that way.

In the future (if they don’t already exist) I expect to see even more choices with WebSocket or WebRTC based communications.

 

Summary

Lets wrap this all up with a comparison table:

MethodCross Platform CompatibleRequires File or File DescriptorRequires External ServiceEasy to write / read code 5
PipesNoYesNoNo
FilesYesYesNoYes
Message QueuesYes4NoYesYes
Shared MemoryYes3Yes2NoNo
SocketsYesNoNoYes
SignalsNoNoNoYes

Hopefully that at least clears up why Sockets are the go-to IPC method, as they have the most favorable traits. Whereas for me, I usually want something either a little more robust, like message queues, or REST APIs so that it can be used locally or remotely. Which, to be fair, are built on top of sockets.