Tutorial

Posts designed as a walk through tutorial of how to achieve desired behavior.

Cartoon or Photo? – Image detection with Python

This all started with me just wanting a fast way to sort through image folders and remove cartoon images. That lead me down a spiraling rabbit hole of possibility. From using OpenCV to do different types of detection, or even training a Machine Learning model from scratch with Keras. This article will go through the multiple options available and learn how accurate they end up being.

What makes an image a cartoon?

Model is Jessica Marie Frye

If we are going to detect the differences between photos and cartoons, first we need to figure out how they are different. Importantly, how they are different in a quantifiable way. These are the measurable differences I could think up:

  • Cartoons have smoother gradients
  • Real images use a larger color palette
  • Cartoons usually have drawn edge outlines

Below we will try each of these options and see how well they fare.

Results First

You’re probably more interested in what will work the best for you and less about how I spent days toiling away at this, so to cut to the chase, here is how everything performed.

The best OpenCV contender turned out to be counting colors, with a combined 75% accuracy.

Overall, Machine Learning Image Classification using the Xception model wiped the floor with a combined 96% accuracy. This probably isn’t even as good as it could get with further fine tuning, but was more than good enough for my own needs.

These results were also with a hard threshold set to force it into either bucket of real or cartoon. I personally modify them for my own use to use a smaller range for absolutes, and then put the “unsure” ones in another folder. Further reducing any errant picks.

Machine Learning with Keras is the obvious pick if you have a good set of data to train with and a computer beefy enough to process it. However it’s not as portable and much longer than simply trying out one of the OpenCV methods.

OpenCV Gradient Differences

The first method we will use is pretty straight forward. We will blur the image a little, and compare it to it’s unaltered form and quantify the difference. First things first is that we will need opencv installed for python. Go into your venv for this and run:

pip install opencv-python

Now open up your IDE and create a new .py file to get started. First thing we are going to do is read the image into opencv which is the cv2 module. We will use a JPEG image as it will be the expected BRG color format.

import cv2

img = cv2.imread("/path/to/my/image.jpg")

Next we will blur the image a little using a bilateral filter, to even out the colors. We will also resize the image to a standard size so the blur across every image is the same.

img = cv2.resize(img, (1024, 1024))
color_blurred = cv2.bilateralFilter(img, 6, 250, 250)

You can check out the result to see how strong the effect is by previewing the image. Press any key to close the window.

# Optional Preview
cv2.imshow("blurred", color_blurred)
cv2.waitKey(0)
cv2.destroyAllWindows()

Then we need to compare this new color_blurred image to the original image. I accomplished this by comparing the histograms. We will have do that for each color individually.

diffs = []
for k, color in enumerate(('b', 'r', 'g')):
    print(f"Comparing histogram for color {color}")
    real_histogram = cv2.calcHist(img, [k], None, [256], [0, 256])
    color_histogram = cv2.calcHist(color_blurred, [k], None, [256], [0, 256])
    diffs.append(cv2.compareHist(real_histogram, color_histogram, cv2.HISTCMP_CORREL))

result = sum(diffs) / 3

compareHist will give us a result between 0 and 1 (one being the most similar.) We will need to set a threshold for how similar we cartoons will be. I have mine set at 0.98 (aka 98% similar.)

if result > 0.98:
    print("It's a cartoon!")
else: 
    print("It's a photo!")

And that’s it! Now you can test it out and see how it works for your images. I found this iteration to work very fast and have about a ~70% proper detection rate.

Let’s put it all together into a usable script!

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 0.98,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    # blur the image to "even out" the colors
    color_blurred = cv2.bilateralFilter(img, 6, 250, 250)

    if preview:
        cv2.imshow("blurred", color_blurred)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    # compare the colors from the original image to blurred one.
    diffs = []
    for k, color in enumerate(("b", "r", "g")):
        # print(f"Comparing histogram for color {color}")
        real_histogram = cv2.calcHist(img, [k], None, [256], [0, 256])
        color_histogram = cv2.calcHist(color_blurred, [k], None, [256], [0, 256])
        diffs.append(
            cv2.compareHist(real_histogram, color_histogram, cv2.HISTCMP_CORREL)
        )

    return sum(diffs) / 3 > threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=0.98,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

OpenCV Color Counting

Using a subset of 512 colors in a 1024×1024 image, determine how much of the image can be reproduced

Next possible way to approach the problem was just figuring out how many colors were used for the majority of the image. We will use all the same code to load and resize the image from above, just this time we will add a loop to count all the colors (slow way, faster option below):

    # Find count of each color
    a = {}
    for item in img.flatten():
        value = tuple(item)
        if value not in a:
            a[value] = 1
        else:
            a[value] += 1

Next we will sort the dictionary by the most used color, and add the count of the top 512 images together.

The actual calculation is a lot of functionality in a small block of code. First let’s get a visual preview of what is happening by re-creating the image with only the selected colors with:

mask = numpy.zeros(img.shape[:2], dtype=bool)

for color, _ in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]:
    mask |= (img == color).all(-1)

img[~mask] = (255, 255, 255)

cv2.imshow("img", img)
cv2.waitKey(0)
cv2.destroyAllWindows()

Here’s the code that basically calculates what you are seeing. We divide the sum of all the top colors by the size of the image to get what percent could be recreated with just those colors.

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum([x[1] for x in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]])
    return (most_common_colors / (1024 * 1024)) > 0.3 # new threshold

This script is pretty similar to the last one. And has a slightly higher success rate on my data set at 76%! It is 20% better at detecting what is a cartoon, but 10% worse at photos. It is also much much slower, some 20~50 times slower. (Will take a second per image instead of 0.05 of a second). However, we can speed that up by instead using some numpy trickery.

    # Replace everything after "Find count of each color" with this faster version, but it doesn't work with the preview.
    flattened = numpy.reshape(img, ((1024 * 1024), 3))
    multiplied = numpy.multiply(flattened, [100_000, 100, 1])
    sums = multiplied.sum(axis=1)
    unique, counts = numpy.unique(sums, return_counts=True)

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum(sorted(counts, reverse=True)[:512])
    return (most_common_colors / (1024 * 1024)) > threshold

Here is the entire script with the slower version that works with preview image. However after getting a good threshold I would suggest replacing the code with the faster version above.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2
import numpy


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 0.3,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    # Find count of each color
    a = {}
    for row in img:
        for item in row:
            value = tuple(item)
            if value not in a:
                a[value] = 1
            else:
                a[value] += 1

    if preview:
        mask = numpy.zeros(img.shape[:2], dtype=bool)

        for color, _ in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]:
            mask |= (img == color).all(-1)

        img[~mask] = (255, 255, 255)

        cv2.imshow("img", img)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    # Identify the percent of the image that uses the top 512 colors
    most_common_colors = sum(
        [x[1] for x in sorted(a.items(), key=lambda pair: pair[1], reverse=True)[:512]]
    )
    return (most_common_colors / (1024 * 1024)) > threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=0.3,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

OpenCV Edge Detection

This I haven’t had much luck with, only about 55% successful at determine what type of image it is. A coin toss is about as accurate. I don’t recommend using it as is, but if you have any ideas or improvements, let me hear them!

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
from typing import Union
import argparse

import cv2
import numpy


def is_cartoon(
    image: Union[str, Path],
    threshold: float = 4500,
    preview: bool = False,
) -> bool:
    # read and resize image
    img = cv2.imread(str(image))
    img = cv2.resize(img, (1024, 1024))

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    blurred_gray = cv2.medianBlur(gray, 3)

    edges = cv2.adaptiveThreshold(
        gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 5, 10
    )
    blurred_edges = cv2.adaptiveThreshold(
        blurred_gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 5, 10
    )

    if preview:
        cv2.imshow("edges", edges)
        cv2.imshow("blurred edges", blurred_edges)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    count_1 = numpy.count_nonzero(edges)
    count_2 = numpy.count_nonzero(blurred_edges)

    return abs(count_2 - count_1) < threshold


def command_line_options():
    args = argparse.ArgumentParser(
        "blur_compare",
        description="Determine if a image is likely a cartoon or photo.",
    )
    args.add_argument(
        "-p",
        "--preview",
        action="store_true",
        help="Show the blurred image",
    )
    args.add_argument(
        "-t",
        "--threshold",
        type=float,
        help="Cutoff threshold",
        default=4500,
    )
    args.add_argument(
        "image",
        type=Path,
        help="Path to image file",
    )
    return vars(args.parse_args())


if __name__ == "__main__":
    options = command_line_options()
    if not options["image"].exists():
        raise FileNotFoundError(f"No image exists at {options['image'].absolute()}")
    if is_cartoon(**options):
        print(f"{options['image'].name} is a cartoon!")
    else:
        print(f"{options['image'].name} is a photo!")

Keras Machine Learning Model

Time to take the kid gloves off, let’s use some ML image detection.

First we have to train a minified Xception model with a lot of hand picked good data. I used 556 cartoon images and 2295 real images in the training datasets. Then used that model for detection between another 2000+ unsorted images.

To get set by step details, please check out the same tutorial I used for this.

import shutil

import numpy as np
import os
import time
from pathlib import Path

import tensorflow as tf

root_dir = Path("/training/")
image_size = (180, 180)
batch_size = 32
epochs = 20
model_name = "my_model"


def make_model(input_shape, num_classes):
    inputs = tf.keras.Input(shape=input_shape)
    # Image augmentation block
    data_augmentation = tf.keras.Sequential(
        [
            tf.keras.layers.RandomFlip("horizontal"),
            tf.keras.layers.RandomRotation(0.1),
        ]
    )
    x = data_augmentation(inputs)

    # Entry block
    x = tf.keras.layers.Rescaling(1.0 / 255)(x)
    x = tf.keras.layers.Conv2D(32, 3, strides=2, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.Conv2D(64, 3, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [128, 256, 512, 728]:
        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(size, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = tf.keras.layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = tf.keras.layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = tf.keras.layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        activation = "sigmoid"
        units = 1
    else:
        activation = "softmax"
        units = num_classes

    x = tf.keras.layers.Dropout(0.5)(x)
    outputs = tf.keras.layers.Dense(units, activation=activation)(x)
    return tf.keras.Model(inputs, outputs)


def train_data():
    train_ds = tf.keras.preprocessing.image_dataset_from_directory(
        str(root_dir),
        validation_split=0.2,
        subset="training",
        seed=1337,
        image_size=image_size,
        batch_size=batch_size,
    )
    val_ds = tf.keras.preprocessing.image_dataset_from_directory(
        str(root_dir),
        validation_split=0.2,
        subset="validation",
        seed=1337,
        image_size=image_size,
        batch_size=batch_size,
    )

    train_ds = train_ds.prefetch(buffer_size=32)
    val_ds = val_ds.prefetch(buffer_size=32)

    model = make_model(input_shape=image_size + (3,), num_classes=2)
    tf.keras.utils.plot_model(model, show_shapes=True)

    callbacks = [
        tf.keras.callbacks.ModelCheckpoint(f"{model_name}_{{epoch}}.h5"),
    ]
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss="binary_crossentropy",
        metrics=["accuracy"],
    )
    model.fit(
        train_ds, epochs=epochs, callbacks=callbacks, validation_data=val_ds,
    )


def clean_images():
    move_to = root_dir.parent / "bad_data"  # needs to be not in the training directory
    move_to.mkdir(exist_ok=True)
    moved = 0
    for directory in root_dir.glob("*"):
        if directory.name == move_to.name:
            continue
        if directory.is_dir():
            for i, file in enumerate(directory.glob("*")):
                if not file.name.lower().endswith(("jpg", "jpeg")) or not tf.compat.as_bytes("JFIF") in file.open("rb").read(10):
                    shutil.move(file, move_to / file.name)
                    moved += 1
        print("moved unclean data", moved, "from", directory)


def move_images():
    model = tf.keras.models.load_model(f"{model_name}_{epochs}.h5")
    cartoon_dir = Path("/cartoon/")
    real_dir = Path("/real/")

    real, cartoon, unknown = 0, 0, 0

    for file in Path("/unsorted/").glob("*.[jpg][jpeg][png]"):

        img = tf.keras.preprocessing.image.load_img(
            str(file), target_size=image_size
        )
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = tf.expand_dims(img_array, 0)  # Create batch axis

        predictions = model.predict(img_array)
        score = predictions[0][0]
        if score > 0.98:
            real += 1
            shutil.move(file, real_dir / file.name)
        elif score < 0.02:
            cartoon += 1
            shutil.move(file, cartoon_dir / file.name)
        else:
            unknown += 1
            print(f"Could not figure out {file} as it was {score * 100}")
    print(f"Moved {real} to real and {cartoon} to cartoon, {unknown} were unmoved")


if __name__ == '__main__':
    clean_images()
    train_data()
    move_images()

I personally think 95% overall accuracy is amazing for no additional tuning. As well as not using this model for not exactly it’s intended use case. Generally we think about classifying items in the image (cat vs dog) not type of image (anime cat vs real cat).

pre-commit – Check yourself before you wreck yourself

pre-commit checks are like prenups. They run before you commit, get the dirty stuff out of the way, and may even save your relationship(s) in the long run. The difference is that pre-commit checks are written by programmers and not lawyers, so they are a lot easier to read and implement.

There are several types of pre-commit checks that can be run. Some common features you could chose are:

  • git sanity checks (no huge files, no bad merge lines, etc…)
  • code style enforcement
  • whitespace fixes
  • python specific checks

Why pre-commit?

Just as you should do a quick check in the mirror before heading out, pre-commits are a quick reflection of your code before making it public. They quickly check everything you’re about to add to git to make sure it’s copasetic. Many even automatically fix common issues for you.

Basically pre-commits make sure your code’s tee-shirt isn’t on inside out.

How to pre-commit

There are three basic steps to get pre-commit working with your code repo.

  1. Install the pre-commit tool onto your system
  2. Add a pre-commit configuration file to your repo
  3. Install the git hook for the repo

1. Install pre-commit

There are several different official ways to install pre-commit. I personally do not like packages polluting my global python site-packages. Instead, I install it with user level only privileges. Then I make sure its install path is added to the system path.

pip install pre-commit --user

You will receive a warning that the script installation location is not on the system path. (Unless you have done this before, and then you can skip the next part.)

If you really don’t care and are very good about using isolated virtual environment, you can just install it globally with pip install pre-commit without the --user flag and skip the next part.

Windows

Installing collected packages: pre-commit
  WARNING: The scripts pre-commit-validate-config.exe, pre-commit-validate-manifest.exe and pre-commit.exe are installed in 'C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts' which is not on PATH.

The above warning is what I received while installing this on windows. So I coped that path C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts to my clipboard and then went through the following process to add it to my system path.

First, search for “edit system” on the window search bar and click the highlighted link.

Second, click on the “Environment Variables…” button at the bottom.

Third, in the bottom section under “System variables” click on “Path” and hit “Edit…”

Finally on the new window hit “New” in the top right and add the path previously copied to the clipboard. Then Hit Ok on all the windows to close them.

You will have to restart any cmd sessions you have opened. When you do, you should be able to run pre-commit just fine.

Linux

It’s a bit simpler to add the custom install location to the system path on linux.

INSTALL_PATH=/home/james/.local/bin  # Change to the path printed in your warning
echo "export PATH=\"${INSTALL_PATH}:\$PATH\"" >> ~/.bashrc 
source ~/.bashrc

2. The config file

In your repo, you will need to create a file named .pre-commit-config.yaml and chose which checks to add for your code. This is a config file that I use (with opinionated formatters and mypy removed). You can find the full list of built-in supported checks at the pre-commit github repo.

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:

    # Identify invalid files
    - id: check-ast                        
    - id: check-yaml                       
    - id: check-json                       
    - id: check-toml                       

    # git checks
    - id: check-merge-conflict             
    - id: check-added-large-files          
      exclude: tests/media/.+
    - id: detect-private-key               
    - id: check-case-conflict              

    # Python checks
    - id: check-docstring-first            
    - id: debug-statements                 
    - id: requirements-txt-fixer           
    - id: fix-encoding-pragma              
    - id: fix-byte-order-marker            

    # General quality checks
    - id: mixed-line-ending                
    - id: trailing-whitespace              
      args: [--markdown-linebreak-ext=md]  
    - id: check-executables-have-shebangs  
    - id: end-of-file-fixer                

Personally I also always use black code formatter. It is not part of the standard checks, but is still easy to add.

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:
       # ... 
  # Add at same level as the first pre-commit-hooks repo
  - repo: https://github.com/psf/black
    rev: 21.6b0 
    hooks:
      - id: black

3. Adding the pre-commit hook to git

Go into the repo with the config file at it’s root, and simply type:

pre-commit install

The first time you add it you will also want to run it on all the files in the repo to shore them all up (usually it only runs on the changed files).

pre-commit run --all-files

It will take a few minutes to install the virtual env for running these checks, but will be much faster after the first time. It should look something like:

pre-commit run --all-files
[INFO] Initializing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Initializing environment for https://github.com/psf/black.
[INFO] Installing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
[INFO] Installing environment for https://github.com/psf/black.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
Check python ast.........................................................Passed
Check Yaml...............................................................Passed
Check JSON...........................................(no files to check)Skipped
Check Toml...............................................................Passed
Check for merge conflicts................................................Passed
Check for added large files..............................................Passed
Detect Private Key.......................................................Passed
Check for case conflicts.................................................Passed
Check docstring is first.................................................Passed
Debug Statements (Python)................................................Passed
Fix requirements.txt.....................................................Passed
Fix python encoding pragma...............................................Passed
fix UTF-8 byte order marker..............................................Passed
Mixed line ending........................................................Passed
Trim Trailing Whitespace.................................................Failed
- hook id: trailing-whitespace
- exit code: 1
- files were modified by this hook

Fixing README.md
Fixing docs/README.md
Fixing docs/build-licenses.txt

Check that executables have shebangs.....................................Passed
Fix End of Files.........................................................Failed
- hook id: end-of-file-fixer
- exit code: 1
- files were modified by this hook

Fixing docs/build-licenses.txt
Fixing .pre-commit-config.yaml

black....................................................................Passed

4. Sit back and relax

Congrats, you are now totally almost protected from the most common silly mistakes you used to make. You now have the opportunity to make completely new mistakes you wouldn’t have thought of before!

Upload large files fast with Dropzone.js

I have previously covered how to upload large files with dropzone.js, but it didn’t allow for parallel chunk uploads. In this article we will go over that new addition, as well as several other improvements.

Download the final executable or view github to see all the code now if you don’t want to read the article.

This is the final result for now, and can be customized if you so chose. Obviously I’m no graphic design, but I’d pick function over style anyway.

Design

Before we dive into code, lets think about the design. We will need to somehow handle multiple parts of a single file being uploaded at the same time in a random order. How do we keep track of that?

Thankfully, Dropzone will provide the server with a few different pieces of information which each chunk, they include:

  • dzuuid – unique ID per upload file
  • dzchunkindex – the chunk number of the current upload
  • dztotalfilesize – Total size of the upload
  • dzchunksize – Max size per chunk
  • dztotalchunkcount – The number of chunks in this file
  • dzchunkbyteoffset – The place in the file this chunk starts

In my mind there are two clear ways to approach the problem. First option is to create a sparse file of the full size to start with, using dztotalchunkcount and then with every incoming chunk, set the position of the file using dzchunkbyteoffset and write the data starting there.

The advantage of this method is that it only requires a single file on disk. The disadvantage is you have to worry about multiple threads accessing the same file at the same time.

The second choice is to write each chunk to a separate file, then when they are all uploaded concatenate them all to a single file and remove the individual chunks. The disadvantage are that that you require twice the space for a short time, and have to deal with cleanup of temporary files.

I personally preferred the second option, as it seemed a bit safer.

Upload Function

As a quick warning, I am now using Bottle instead of Flask for this upload, so a bit of the form syntax has changed since the last post.

from pathlib import Path
from threading import Lock
from collections import defaultdict
import shutil
import uuid

from bottle import route, run, request, error, response, HTTPError, static_file
from werkzeug.utils import secure_filename

lock = Lock()
chucks = defaultdict(list)

chunk_path = Path(__file__).parent / "chunks"
storage_path = Path(__file__).parent / "storage"
chunk_path.mkdir(exist_ok=True, parents=True)
storage_path.mkdir(exist_ok=True, parents=True)

@route("/upload", method="POST")
def upload():
    file = request.files.get("file")
    if not file:
        raise HTTPError(status=400, body="No file provided")

    dz_uuid = request.forms.get("dzuuid")
    if not dz_uuid:
        # Assume this file has not been chunked
        with open(storage_path / f"{uuid.uuid4()}_{secure_filename(file.filename)}", "wb") as f:
            file.save(f)
        return "File Saved"

    # Chunked download
    try:
        current_chunk = int(request.forms["dzchunkindex"])
        total_chunks = int(request.forms["dztotalchunkcount"])
    except KeyError as err:
        raise HTTPError(status=400, body=f"Not all required fields supplied, missing {err}")
    except ValueError:
        raise HTTPError(status=400, body=f"Values provided were not in expected format")
    
    # Create a new directory for this file in the chunks dir, using the UUID as the folder name
    save_dir = chunk_path / dz_uuid
    if not save_dir.exists():
        save_dir.mkdir(exist_ok=True, parents=True)

    # Save the individual chunk
    with open(save_dir / str(request.forms["dzchunkindex"]), "wb") as f:
        file.save(f)

    # See if we have all the chunks downloaded
    with lock:
        chucks[dz_uuid].append(current_chunk)
        completed = len(chucks[dz_uuid]) == total_chunks

    # Concat all the files into the final file when all are downloaded
    if completed:
        with open(storage_path / f"{dz_uuid}_{secure_filename(file.filename)}", "wb") as f:
            for file_number in range(total_chunks):
                f.write((save_dir / str(file_number)).read_bytes())
        print(f"{file.filename} has been uploaded")
        shutil.rmtree(save_dir)

    return "Chunk upload successful"

if __name__ == "__main__":
    run(server="paste")

Hopefully the code is decently self documented. We do a few checks at the start as we pull in the required parameters. Then we prepare the directory for where the temporary chunks will be stored, and write the incoming chunk there. We gather information on all the chunks and when then have been completed in a global dictionary, and when they are all uploaded they are assembled into the final file.

File Downloading

Now that we can put files on the server, what about getting them back? I personally don’t want people to host random files on my server, but others may. To accomplish that, we shouldn’t just list all the files to everyone that visits the site, but only to whoever uploaded it. Thankfully we can just store the uuid in a cookie on the frontend, and then have a very basic download function.

@route("/download/<dz_uuid>")
def download(dz_uuid):
    for file in storage_path.iterdir():
        if file.is_file() and file.name.startswith(dz_uuid):
            return static_file(file.name, root=file.parent.absolute(), download=True)
    return HTTPError(status=404)

This does complicate our frontend a bit, as we want to save both UUID and filename as text fields in a cookie. There are a lot of great libraries out there to make life easier with JavaScript and cookies, but I wanted to keep it simple and pure JS other than Dropzone, making the code a bit more complicated than last time.

Dropzone frontend

Instead of being a standalone file, I have also put this directly into the python file to make using it as a f-string a lot easier, but makes it a little harder to read.

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.css"/>
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/basic.min.css"/>
    <script type="application/javascript"
        src="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.js">
    </script>
    <title>pyfiledrop</title>
</head>
<body>

    <div id="content" style="width: 800px; margin: 0 auto;">
        <h2>Upload new files</h2>
        <form method="POST" action='/upload' class="dropzone dz-clickable" id="dropper" enctype="multipart/form-data">
        </form>

        <h2>
            Uploaded
            <input type="button" value="Clear" onclick="clearCookies()" />
        </h2>
        <div id="uploaded">

        </div>

        <script type="application/javascript">
            function clearCookies() {{
                document.cookie = "files=; Max-Age=0";
                document.getElementById("uploaded").innerHTML = "";
            }}

            function getFilesFromCookie() {{
                try {{ return document.cookie.split("=", 2)[1].split("||");}} catch (error) {{ return []; }}
            }}

            function saveCookie(new_file) {{
                    let all_files = getFilesFromCookie();
                    all_files.push(new_file);
                    document.cookie = `files=${{all_files.join("||")}}`;
            }}

            function generateLink(combo){{
                const uuid = combo.split('|^^|')[0];
                const name = combo.split('|^^|')[1];
                if ({'true' if allow_downloads else 'false'}) {{
                    return `<a href="/download/${{uuid}}" download="${{name}}">${{name}}</a>`;
                }}
                return name;
            }}


            function init() {{

                Dropzone.options.dropper = {{
                    paramName: 'file',
                    chunking: true,
                    forceChunking: {dropzone_force_chunking},
                    url: '/upload',
                    retryChunks: true,
                    parallelChunkUploads: {dropzone_parallel_chunks},
                    timeout: {dropzone_timeout}, 
                    maxFilesize: {dropzone_max_file_size}, 
                    chunkSize: {dropzone_chunk_size}, 
                    init: function () {{
                        this.on("complete", function (file) {{
                            let combo = `${{file.upload.uuid}}|^^|${{file.upload.filename}}`;
                            saveCookie(combo);
                            document.getElementById("uploaded").innerHTML += generateLink(combo)  + "<br />";
                        }});
                    }}
                }}

                if (typeof document.cookie !== 'undefined' ) {{
                    let content = "";
                     getFilesFromCookie().forEach(function (combo) {{
                        content += generateLink(combo) + "<br />";
                    }});

                    document.getElementById("uploaded").innerHTML = content;
                }}
            }}

            init();

        </script>
    </div>
</body>
</html>

Notice we are using a slew of python variables that we are going to allow to be configurable upon launch.

Command line options

import argparse
...

allow_downloads = False
dropzone_cdn = "https://cdnjs.cloudflare.com/ajax/libs/dropzone"
dropzone_version = "5.7.6"
dropzone_timeout = "120000"
dropzone_max_file_size = "100000"
dropzone_chunk_size = "1000000"
dropzone_parallel_chunks = "true"
dropzone_force_chunking = "true"

...


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--port", type=int, default=16273, required=False)
    parser.add_argument("--host", type=str, default="0.0.0.0", required=False)
    parser.add_argument("-s", "--storage", type=str, default=str(storage_path), required=False)
    parser.add_argument("-c", "--chunks", type=str, default=str(chunk_path), required=False)
    parser.add_argument(
        "--max-size",
        type=str,
        default=dropzone_max_file_size,
        help="Max file size (Mb)",
    )
    parser.add_argument(
        "--timeout",
        type=str,
        default=dropzone_timeout,
        help="Timeout (ms) for each chuck upload",
    )
    parser.add_argument("--chunk-size", type=str, default=dropzone_chunk_size, help="Chunk size (bytes)")
    parser.add_argument("--disable-parallel-chunks", required=False, default=False, action="store_true")
    parser.add_argument("--disable-force-chunking", required=False, default=False, action="store_true")
    parser.add_argument("-a", "--allow-downloads", required=False, default=False, action="store_true")
    parser.add_argument("--dz-cdn", type=str, default=None, required=False)
    parser.add_argument("--dz-version", type=str, default=None, required=False)
    return parser.parse_args()


if __name__ == "__main__":

    args = parse_args()
    storage_path = Path(args.storage)
    chunk_path = Path(args.chunks)
    dropzone_chunk_size = args.chunk_size
    dropzone_timeout = args.timeout
    dropzone_max_file_size = args.max_size
    try:
        if int(dropzone_timeout) < 1 or int(dropzone_chunk_size) < 1 or int(dropzone_max_file_size) < 1:
            raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all positive")
    except ValueError:
        raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all integers")

    if args.dz_cdn:
        dropzone_cdn = args.dz_cdn
    if args.dz_version:
        dropzone_version = args.dz_version
    if args.disable_parallel_chunks:
        dropzone_parallel_chunks = "false"
    if args.disable_force_chunking:
        dropzone_force_chunking = "false"
    if args.allow_downloads:
        allow_downloads = True

    if not storage_path.exists():
        storage_path.mkdir(exist_ok=True)
    if not chunk_path.exists():
        chunk_path.mkdir(exist_ok=True)

    print(f"""Timeout: {int(dropzone_timeout) 
Chunk Size: {int(dropzone_chunk_size) 
Max File Size: {int(dropzone_max_file_size)} Mb
Force Chunking: {dropzone_force_chunking}
Parallel Chunks: {dropzone_parallel_chunks}
Storage Path: {storage_path.absolute()}
Chunk Path: {chunk_path.absolute()}
""")
    run(server="paste", port=args.port, host=args.host)

As this will become an executable, to be configurable we want to pass parameters upon launch.

Favicon

Now this is getting into the realm of silly. But to be an all in one script, we need to provide a binary file (the favicon) in the script itself. Thankfully ico files can be compressed rather easily, so we are going to compress it in the script itself, and decompress it when requested.

@route("/favicon.ico")
def favicon():
    return zlib.decompress(
        b"x\x9c\xedVYN\xc40\x0c5J%[\xe2\xa3|q\x06\x8e1G\xe1(=ZoV"
        b"\xb2\xa7\x89\x97R\x8d\x84\x04\xe4\xa5\xcb(\xc9\xb3\x1do"
        b"\x1d\x80\x17?\x1e\x0f\xf0O\x82\xcfw\x00\x7f\xc1\x87\xbf"
        b"\xfd\x14l\x90\xe6#\xde@\xc1\x966n[z\x85\x11\xa6\xfcc"
        b"\xdfw?s\xc4\x0b\x8e#\xbd\xc2\x08S\xe1111\xf1k\xb1NL"
        b"\xfcU<\x99\xe4T\xf8\xf43|\xaa\x18\xf8\xc3\xbaHFw\xaaj\x94"
        b"\xf4c[F\xc6\xee\xbb\xc2\xc0\x17\xf6\xf4\x12\x160\xf9"
        b"\xa3\xfeQB5\xab@\xf4\x1f\xa55r\xf9\xa4KGG\xee\x16\xdd\xff"
        b"\x8e\x9d\x8by\xc4\xe4\x17\tU\xbdDg\xf1\xeb\xf0Zh\x8e"
        b"\xd3s\x9c\xab\xc3P\n<e\xcb$\x05 b\xd8\x84Q1\x8a\xd6Kt\xe6"
        b"\x85(\x13\xe5\xf3]j\xcf\x06\x88\xe6K\x02\x84\x18\x90"
        b"\xc5\xa7Kz\xd4\x11\xeeEZK\x012\xe9\xab\xa5\xbf\xb3@i\x00"
        b"\xce\xe47\x0b\xb4\xfe\xb1d\xffk\xebh\xd3\xa3\xfd\xa4:`5J"
        b"\xa3\xf1\xf5\xf4\xcf\x02tz\x8c_\xd2\xa1\xee\xe1\xad"
        b"\xaa\xb7n-\xe5\xafoSQ\x14'\x01\xb7\x9b<\x15~\x0e\xf4b"
        b"\x8a\x90k\x8c\xdaO\xfb\x18<H\x9d\xdfj\xab\xd0\xb43\xe1"
        b'\xe3nt\x16\xdf\r\xe6\xa1d\xad\xd0\xc9z\x03"\xc7c\x94v'
        b"\xb6I\xe1\x8f\xf5,\xaa2\x93}\x90\xe0\x94\x1d\xd2\xfcY~f"
        b"\xab\r\xc1\xc8\xc4\xe4\x1f\xed\x03\x1e`\xd6\x02\xda\xc7k"
        b"\x16\x1a\xf4\xcb2Q\x05\xa0\xe6\xb4\x1e\xa4\x84\xc6"
        b"\xcc..`8'\x9a\xc9-\n\xa8\x05]?\xa3\xdfn\x11-\xcc\x0b"
        b"\xb4\x7f67:\x0c\xcf\xd5\xbb\xfd\x89\x9ebG\xf8:\x8bG"
        b"\xc0\xfb\x9dm\xe2\xdf\x80g\xea\xc4\xc45\xbe\x00\x03\xe9\xd6\xbb"
    )

Putting it all together

Here is the culmination of everything we talked about put into a script.

This may not always be the newest version, if you want to use it yourself please download the final executable or view github to see the latest code.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
from threading import Lock
from collections import defaultdict
import shutil
import argparse
import uuid
import zlib

from bottle import route, run, request, error, response, HTTPError, static_file
from werkzeug.utils import secure_filename

storage_path: Path = Path(__file__).parent / "storage"
chunk_path: Path = Path(__file__).parent / "chunk"

allow_downloads = False
dropzone_cdn = "https://cdnjs.cloudflare.com/ajax/libs/dropzone"
dropzone_version = "5.7.6"
dropzone_timeout = "120000"
dropzone_max_file_size = "100000"
dropzone_chunk_size = "1000000"
dropzone_parallel_chunks = "true"
dropzone_force_chunking = "true"

lock = Lock()
chucks = defaultdict(list)


@error(500)
def handle_500(error_message):
    response.status = 500
    response.body = f"Error: {error_message}"
    return response


@route("/")
def index():
    index_file = Path(__file__) / "index.html"
    if index_file.exists():
        return index_file.read_text()
    return f"""
<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.css"/>
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/basic.min.css"/>
    <script type="application/javascript"
        src="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.js">
    </script>
    <title>pyfiledrop</title>
</head>
<body>

    <div id="content" style="width: 800px; margin: 0 auto;">
        <h2>Upload new files</h2>
        <form method="POST" action='/upload' class="dropzone dz-clickable" id="dropper" enctype="multipart/form-data">
        </form>

        <h2>
            Uploaded
            <input type="button" value="Clear" onclick="clearCookies()" />
        </h2>
        <div id="uploaded">

        </div>

        <script type="application/javascript">
            function clearCookies() {{
                document.cookie = "files=; Max-Age=0";
                document.getElementById("uploaded").innerHTML = "";
            }}

            function getFilesFromCookie() {{
                try {{ return document.cookie.split("=", 2)[1].split("||");}} catch (error) {{ return []; }}
            }}

            function saveCookie(new_file) {{
                    let all_files = getFilesFromCookie();
                    all_files.push(new_file);
                    document.cookie = `files=${{all_files.join("||")}}`;
            }}

            function generateLink(combo){{
                const uuid = combo.split('|^^|')[0];
                const name = combo.split('|^^|')[1];
                if ({'true' if allow_downloads else 'false'}) {{
                    return `<a href="/download/${{uuid}}" download="${{name}}">${{name}}</a>`;
                }}
                return name;
            }}


            function init() {{

                Dropzone.options.dropper = {{
                    paramName: 'file',
                    chunking: true,
                    forceChunking: {dropzone_force_chunking},
                    url: '/upload',
                    retryChunks: true,
                    parallelChunkUploads: {dropzone_parallel_chunks},
                    timeout: {dropzone_timeout}, 
                    maxFilesize: {dropzone_max_file_size}, 
                    chunkSize: {dropzone_chunk_size}, 
                    init: function () {{
                        this.on("complete", function (file) {{
                            let combo = `${{file.upload.uuid}}|^^|${{file.upload.filename}}`;
                            saveCookie(combo);
                            document.getElementById("uploaded").innerHTML += generateLink(combo)  + "<br />";
                        }});
                    }}
                }}

                if (typeof document.cookie !== 'undefined' ) {{
                    let content = "";
                     getFilesFromCookie().forEach(function (combo) {{
                        content += generateLink(combo) + "<br />";
                    }});

                    document.getElementById("uploaded").innerHTML = content;
                }}
            }}

            init();

        </script>
    </div>
</body>
</html>
    """


@route("/favicon.ico")
def favicon():
    return zlib.decompress(
        b"x\x9c\xedVYN\xc40\x0c5J%[\xe2\xa3|q\x06\x8e1G\xe1(=ZoV"
        b"\xb2\xa7\x89\x97R\x8d\x84\x04\xe4\xa5\xcb(\xc9\xb3\x1do"
        b"\x1d\x80\x17?\x1e\x0f\xf0O\x82\xcfw\x00\x7f\xc1\x87\xbf"
        b"\xfd\x14l\x90\xe6#\xde@\xc1\x966n[z\x85\x11\xa6\xfcc"
        b"\xdfw?s\xc4\x0b\x8e#\xbd\xc2\x08S\xe1111\xf1k\xb1NL"
        b"\xfcU<\x99\xe4T\xf8\xf43|\xaa\x18\xf8\xc3\xbaHFw\xaaj\x94"
        b"\xf4c[F\xc6\xee\xbb\xc2\xc0\x17\xf6\xf4\x12\x160\xf9"
        b"\xa3\xfeQB5\xab@\xf4\x1f\xa55r\xf9\xa4KGG\xee\x16\xdd\xff"
        b"\x8e\x9d\x8by\xc4\xe4\x17\tU\xbdDg\xf1\xeb\xf0Zh\x8e"
        b"\xd3s\x9c\xab\xc3P\n<e\xcb$\x05 b\xd8\x84Q1\x8a\xd6Kt\xe6"
        b"\x85(\x13\xe5\xf3]j\xcf\x06\x88\xe6K\x02\x84\x18\x90"
        b"\xc5\xa7Kz\xd4\x11\xeeEZK\x012\xe9\xab\xa5\xbf\xb3@i\x00"
        b"\xce\xe47\x0b\xb4\xfe\xb1d\xffk\xebh\xd3\xa3\xfd\xa4:`5J"
        b"\xa3\xf1\xf5\xf4\xcf\x02tz\x8c_\xd2\xa1\xee\xe1\xad"
        b"\xaa\xb7n-\xe5\xafoSQ\x14'\x01\xb7\x9b<\x15~\x0e\xf4b"
        b"\x8a\x90k\x8c\xdaO\xfb\x18<H\x9d\xdfj\xab\xd0\xb43\xe1"
        b'\xe3nt\x16\xdf\r\xe6\xa1d\xad\xd0\xc9z\x03"\xc7c\x94v'
        b"\xb6I\xe1\x8f\xf5,\xaa2\x93}\x90\xe0\x94\x1d\xd2\xfcY~f"
        b"\xab\r\xc1\xc8\xc4\xe4\x1f\xed\x03\x1e`\xd6\x02\xda\xc7k"
        b"\x16\x1a\xf4\xcb2Q\x05\xa0\xe6\xb4\x1e\xa4\x84\xc6"
        b"\xcc..`8'\x9a\xc9-\n\xa8\x05]?\xa3\xdfn\x11-\xcc\x0b"
        b"\xb4\x7f67:\x0c\xcf\xd5\xbb\xfd\x89\x9ebG\xf8:\x8bG"
        b"\xc0\xfb\x9dm\xe2\xdf\x80g\xea\xc4\xc45\xbe\x00\x03\xe9\xd6\xbb"
    )


@route("/upload", method="POST")
def upload():
    file = request.files.get("file")
    if not file:
        raise HTTPError(status=400, body="No file provided")

    dz_uuid = request.forms.get("dzuuid")
    if not dz_uuid:
        # Assume this file has not been chunked
        with open(storage_path / f"{uuid.uuid4()}_{secure_filename(file.filename)}", "wb") as f:
            file.save(f)
        return "File Saved"

    # Chunked download
    try:
        current_chunk = int(request.forms["dzchunkindex"])
        total_chunks = int(request.forms["dztotalchunkcount"])
    except KeyError as err:
        raise HTTPError(status=400, body=f"Not all required fields supplied, missing {err}")
    except ValueError:
        raise HTTPError(status=400, body=f"Values provided were not in expected format")

    save_dir = chunk_path / dz_uuid

    if not save_dir.exists():
        save_dir.mkdir(exist_ok=True, parents=True)

    # Save the individual chunk
    with open(save_dir / str(request.forms["dzchunkindex"]), "wb") as f:
        file.save(f)

    # See if we have all the chunks downloaded
    with lock:
        chucks[dz_uuid].append(current_chunk)
        completed = len(chucks[dz_uuid]) == total_chunks

    # Concat all the files into the final file when all are downloaded
    if completed:
        with open(storage_path / f"{dz_uuid}_{secure_filename(file.filename)}", "wb") as f:
            for file_number in range(total_chunks):
                f.write((save_dir / str(file_number)).read_bytes())
        print(f"{file.filename} has been uploaded")
        shutil.rmtree(save_dir)

    return "Chunk upload successful"


@route("/download/<dz_uuid>")
def download(dz_uuid):
    if not allow_downloads:
        raise HTTPError(status=403)
    for file in storage_path.iterdir():
        if file.is_file() and file.name.startswith(dz_uuid):
            return static_file(file.name, root=file.parent.absolute(), download=True)
    return HTTPError(status=404)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--port", type=int, default=16273, required=False)
    parser.add_argument("--host", type=str, default="0.0.0.0", required=False)
    parser.add_argument("-s", "--storage", type=str, default=str(storage_path), required=False)
    parser.add_argument("-c", "--chunks", type=str, default=str(chunk_path), required=False)
    parser.add_argument(
        "--max-size",
        type=str,
        default=dropzone_max_file_size,
        help="Max file size (Mb)",
    )
    parser.add_argument(
        "--timeout",
        type=str,
        default=dropzone_timeout,
        help="Timeout (ms) for each chuck upload",
    )
    parser.add_argument("--chunk-size", type=str, default=dropzone_chunk_size, help="Chunk size (bytes)")
    parser.add_argument("--disable-parallel-chunks", required=False, default=False, action="store_true")
    parser.add_argument("--disable-force-chunking", required=False, default=False, action="store_true")
    parser.add_argument("-a", "--allow-downloads", required=False, default=False, action="store_true")
    parser.add_argument("--dz-cdn", type=str, default=None, required=False)
    parser.add_argument("--dz-version", type=str, default=None, required=False)
    return parser.parse_args()


if __name__ == "__main__":

    args = parse_args()
    storage_path = Path(args.storage)
    chunk_path = Path(args.chunks)
    dropzone_chunk_size = args.chunk_size
    dropzone_timeout = args.timeout
    dropzone_max_file_size = args.max_size
    try:
        if int(dropzone_timeout) < 1 or int(dropzone_chunk_size) < 1 or int(dropzone_max_file_size) < 1:
            raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all positive")
    except ValueError:
        raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all integers")

    if args.dz_cdn:
        dropzone_cdn = args.dz_cdn
    if args.dz_version:
        dropzone_version = args.dz_version
    if args.disable_parallel_chunks:
        dropzone_parallel_chunks = "false"
    if args.disable_force_chunking:
        dropzone_force_chunking = "false"
    if args.allow_downloads:
        allow_downloads = True

    if not storage_path.exists():
        storage_path.mkdir(exist_ok=True)
    if not chunk_path.exists():
        chunk_path.mkdir(exist_ok=True)

    print(
        f"""Timeout: {int(dropzone_timeout) 
Chunk Size: {int(dropzone_chunk_size) 
Max File Size: {int(dropzone_max_file_size)} Mb
Force Chunking: {dropzone_force_chunking}
Parallel Chunks: {dropzone_parallel_chunks}
Storage Path: {storage_path.absolute()}
Chunk Path: {chunk_path.absolute()}
"""
    )
    run(server="paste", port=args.port, host=args.host)

Make it yours, and give back if you can!

What will you add to this script? Set a max time for how long you can see the uploaded files? A way to ensure the file exists on the server before trying to download it? Checksum comparison to avoid using space for duplicate files?

However you make it better, please consider to add a pull request for your features so anyone can benefit from it!

Keep Windows from going to sleep, no power settings needed!

Say you have a long running process that you let go overnight, only to come back the next morning and realize “Oh no, only an hour into it, my computer went to sleep!”. Thanks to some Windows internals, it’s possible to make it realize there is a background task running and should not go to sleep.

Artwork by Clara Griffith

I personally use this exact method in my FastFlix program to make sure the computer doesn’t interrupt the coding process. It should work with any version of Python3, and it boils down too:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import ctypes
import time

CONTINUOUS = 0x80000000
SYSTEM_REQUIRED = 0x00000001
DISPLAY_REQUIRED = 0x00000002 

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED)
try:
    # This is where you would do stuff
    while True:
        time.sleep(600) 
finally:
    ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS)

As this example shows, you let this run in the background to always keep your computer from sleeping. Just be careful you don’t abuse your IT policies by allowing your computer to stay on for a lot longer than they want.

It’s pretty straightforward code, the main function being SetThreadExecutionState.

ES_CONTINUOUS 0x80000000Informs the system that the state being set should remain in effect until the next call that uses ES_CONTINUOUS and one of the other state flags is cleared.
ES_DISPLAY_REQUIRED 0x00000002Forces the display to be on by resetting the display idle timer.
ES_SYSTEM_REQUIRED 0x00000001Forces the system to be in the working state by resetting the system idle timer.
Table from https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setthreadexecutionstate

Notice that it doesn’t take multiple arguments, like what you expect in Python. Instead it accepts a single hex value that represents the culmination of all the options you want to set. Hence why you see us passing them in via a “bitwise or” expression. For example, if you also wanted to force the screen to stay on, just need to add:

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED | DISPLAY_REQUIRED)

After this is set, it is extremely important to remember to always reset it back to ES_CONTINUOUS after your work is done.

Use atexit for safety

In our example above, we simply add this in a finally clause. For a more robust program you may want to ensure it’s always called via atexit.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import ctypes
import atexit

CONTINUOUS = 0x80000000
SYSTEM_REQUIRED = 0x00000001
DISPLAY_REQUIRED = 0x00000002 

ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS | SYSTEM_REQUIRED)

@atexit.register
def cleanup():
    ctypes.windll.kernel32.SetThreadExecutionState(CONTINUOUS)

# Do Stuff    

Create an image in Task Manager from CPU Usage

A lot of people have pass around a internet video of someone using a crazy high core count computer to display a video via Task Manager’s CPU usage. The one problem with it: it’s probably fake. Task Manager displays the last 60 seconds of activity, not a instant view like shown in the video. But what if we kept the usage the same for sixty seconds, could we at least make an image?

So the first problem, is how do we generate enough load for a CPU usage to noticeably increase? Luckily there is an old goto into the benchmark world that is really simple to implement. Square Roots. Throw a few million square roots at a CPU and watch it light up.

from itertools import count
import math

def run_benchmark():
    # Warning, no way to stop other than Ctrl+C or killing the process
    for num in count():
        math.sqrt(num)

run_benchmark()

The real problem is making sure we have a way to stop it. The easiest way is with a timeout, but just the slight work of gathering system time may throw off how much work we want to do in the future. So lets only do that, say, every 100,000 operations.

from itertools import count
import math
import time 

def run_benchmark(timeout=60):
    start_time = time.perf_counter()
    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)

run_benchmark()

Excellent, now we can pump up a core to max workload. However, we currently have no control over which CPU it will run on! We have to set the “affinity” of the process. There is no easy way to do that with Python directly, and in this case we have to go straight to the Win32 API. In this case we will use the pywin32 library to access the win32process.

pip install pywin32

We will then use the SetProcessAffinityMask function to lock our program to a specific CPU, which will also require grabbing some details from GetCurrentProcess Win32 API function. One thing not really documented anywhere I could find, is the fact you set the CPU core affinity not by it’s actual number, but by the mask itself. Which we can create by taking 2 ** cpu_core.

from itertools import count
import math
import time 

import win32process  # pywin32

def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)

run_benchmark()

So now every time we run the program, it should only max out the first core. If we want to do this across multiple cores, we will have to create multiple processes at the same time. My favorite way to do that is with multiprocessing maps. So instead of lighting up a single processor, let’s pump them all to the roof.

from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count

import win32process  # pywin32


def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)


if __name__ == '__main__':
    arguments = [(core, 60) for core in range(cpu_count())]
    # [(0, 60), (1, 60), (2, 60), (3, 60)] 
    # each tuple will be as arguments to run_benchmark as its arguments

    with Pool(processes=cpu_count()) as p:
        p.starmap(run_benchmark, arguments)
    run_benchmark()

We have to throw the multiprocessing after the `if __name__ == ‘__main__’: block due to how CPython starts up on Windows. (It’s also just a good idea for any scripts.)

At this point you could also change up which cores it is running on to see how they correspond on the Task Manager. For example, you could change the range increments to only launch on each other core. range(0, cpu_count(), 2)

On my 8 core machine (so 16 logical cores) I can make a quick X shape by selecting certain cores

arguments = [(core, 100) for core in [0, 3, 5, 6, 9, 10, 12, 15]]

Now remember there are two types of CPU cores according to the operating system, physical and logical. Physical is the number of actual cores on the CPU, but if the CPU has SMT (Simultaneous multi-threading) it will double that. So that means every odd number core is actually a fake one (remember cores start at 0). Which means it has to use some of previous core’s resources. Hence why cores 2, 4, 8 and 14 are showing higher usage.

But what if we want even cooler graphics and don’t want to be limited to just 100% cpu usage? Well then we need to tell the computer to not work for very small amounts of time. Aka sleep. So lets try adding a sleep every 100K ops for oh say, 60 milliseconds.

def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
            time.sleep(0.06)
        math.sqrt(num)

This time I am also going to run it just on physical cores.

arguments = [(core, 100, 80) for core in range(0, cpu_count(), 2)]

How about that, now it’s using just about 50% usage on each core on my computer. If you’re trying this on your own, this is where the fun begins. I suggest trying out different time offsets to see if you can get a list of times for 10~90% usage. For example, mine is close to:

usage_to_sleep_time = {
    100: 0,
    90: 0.014,
    80: 0.02,
    70: 0.03,
    60: 0.04,
    50: 0.06,
    40: 0.08,
    30: 0.1,
    20: 0.2,
    10: 0.5
}

Then lets throw that into the run_benchmark function to be able to set a precise amount of usage per core.

def run_benchmark(cpu_core=0, timeout=60, usage=100):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
            if usage != 100:
                time.sleep(usage_to_sleep_time[usage])
        math.sqrt(num)

Then if you have enough cores, you can see them all in action at the same time.

arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]

(I was impatient and didn’t do this one while the computer was idle, hence the messy lower ones.)

From here I am sure some of you could go crazy making individual cores ramp up and done, and create something truly spectacular, but my whistle was wetted. It is totally possible to have control over exactly how much CPU core usage is being shown in Task Manager with Python.

Here is the full final code, hope you enjoyed!

from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count

import win32process  # pywin32

usage_to_sleep_time = {
    100: 0,
    90: 0.014,
    80: 0.02,
    70: 0.03,
    60: 0.04,
    50: 0.06,
    40: 0.08,
    30: 0.1,
    20: 0.2,
    10: 0.5
}

def run_benchmark(cpu_core=0, timeout=60, usage=100):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return num
            if usage != 100:
                time.sleep(usage_to_sleep_time[usage])
        math.sqrt(num)



if __name__ == '__main__':
    arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]

    with Pool(processes=len(arguments)) as p:
        print(p.starmap(run_benchmark, arguments))