Chris Griffith

pre-commit – Check yourself before you wreck yourself

pre-commit checks are like prenups. They run before you commit, get the dirty stuff out of the way, and may even save your relationship(s) in the long run. The difference is that pre-commit checks are written by programmers and not lawyers, so they are a lot easier to read and implement.

There are several types of pre-commit checks that can be run. Some common features you could chose are:

  • git sanity checks (no huge files, no bad merge lines, etc…)
  • code style enforcement
  • whitespace fixes
  • python specific checks

Why pre-commit?

Just as you should do a quick check in the mirror before heading out, pre-commits are a quick reflection of your code before making it public. They quickly check everything you’re about to add to git to make sure it’s copasetic. Many even automatically fix common issues for you.

Basically pre-commits make sure your code’s tee-shirt isn’t on inside out.

How to pre-commit

There are three basic steps to get pre-commit working with your code repo.

  1. Install the pre-commit tool onto your system
  2. Add a pre-commit configuration file to your repo
  3. Install the git hook for the repo

1. Install pre-commit

There are several different official ways to install pre-commit. I personally do not like packages polluting my global python site-packages. Instead, I install it with user level only privileges. Then I make sure its install path is added to the system path.

pip install pre-commit --user

You will receive a warning that the script installation location is not on the system path. (Unless you have done this before, and then you can skip the next part.)

If you really don’t care and are very good about using isolated virtual environment, you can just install it globally with pip install pre-commit without the --user flag and skip the next part.

Windows

Installing collected packages: pre-commit
  WARNING: The scripts pre-commit-validate-config.exe, pre-commit-validate-manifest.exe and pre-commit.exe are installed in 'C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts' which is not on PATH.

The above warning is what I received while installing this on windows. So I coped that path C:\Users\Chris\AppData\Roaming\Python\Python39\Scripts to my clipboard and then went through the following process to add it to my system path.

First, search for “edit system” on the window search bar and click the highlighted link.

Second, click on the “Environment Variables…” button at the bottom.

Third, in the bottom section under “System variables” click on “Path” and hit “Edit…”

Finally on the new window hit “New” in the top right and add the path previously copied to the clipboard. Then Hit Ok on all the windows to close them.

You will have to restart any cmd sessions you have opened. When you do, you should be able to run pre-commit just fine.

Linux

It’s a bit simpler to add the custom install location to the system path on linux.

INSTALL_PATH=/home/james/.local/bin  # Change to the path printed in your warning
echo "export PATH=\"${INSTALL_PATH}:\$PATH\"" >> ~/.bashrc 
source ~/.bashrc

2. The config file

In your repo, you will need to create a file named .pre-commit-config.yaml and chose which checks to add for your code. This is a config file that I use (with opinionated formatters and mypy removed). You can find the full list of built-in supported checks at the pre-commit github repo.

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:

    # Identify invalid files
    - id: check-ast                        
    - id: check-yaml                       
    - id: check-json                       
    - id: check-toml                       

    # git checks
    - id: check-merge-conflict             
    - id: check-added-large-files          
      exclude: tests/media/.+
    - id: detect-private-key               
    - id: check-case-conflict              

    # Python checks
    - id: check-docstring-first            
    - id: debug-statements                 
    - id: requirements-txt-fixer           
    - id: fix-encoding-pragma              
    - id: fix-byte-order-marker            

    # General quality checks
    - id: mixed-line-ending                
    - id: trailing-whitespace              
      args: [--markdown-linebreak-ext=md]  
    - id: check-executables-have-shebangs  
    - id: end-of-file-fixer                

Personally I also always use black code formatter. It is not part of the standard checks, but is still easy to add.

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.0.1
    hooks:
       # ... 
  # Add at same level as the first pre-commit-hooks repo
  - repo: https://github.com/psf/black
    rev: 21.6b0 
    hooks:
      - id: black

3. Adding the pre-commit hook to git

Go into the repo with the config file at it’s root, and simply type:

pre-commit install

The first time you add it you will also want to run it on all the files in the repo to shore them all up (usually it only runs on the changed files).

pre-commit run --all-files

It will take a few minutes to install the virtual env for running these checks, but will be much faster after the first time. It should look something like:

pre-commit run --all-files
[INFO] Initializing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Initializing environment for https://github.com/psf/black.
[INFO] Installing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
[INFO] Installing environment for https://github.com/psf/black.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
Check python ast.........................................................Passed
Check Yaml...............................................................Passed
Check JSON...........................................(no files to check)Skipped
Check Toml...............................................................Passed
Check for merge conflicts................................................Passed
Check for added large files..............................................Passed
Detect Private Key.......................................................Passed
Check for case conflicts.................................................Passed
Check docstring is first.................................................Passed
Debug Statements (Python)................................................Passed
Fix requirements.txt.....................................................Passed
Fix python encoding pragma...............................................Passed
fix UTF-8 byte order marker..............................................Passed
Mixed line ending........................................................Passed
Trim Trailing Whitespace.................................................Failed
- hook id: trailing-whitespace
- exit code: 1
- files were modified by this hook

Fixing README.md
Fixing docs/README.md
Fixing docs/build-licenses.txt

Check that executables have shebangs.....................................Passed
Fix End of Files.........................................................Failed
- hook id: end-of-file-fixer
- exit code: 1
- files were modified by this hook

Fixing docs/build-licenses.txt
Fixing .pre-commit-config.yaml

black....................................................................Passed

4. Sit back and relax

Congrats, you are now totally almost protected from the most common silly mistakes you used to make. You now have the opportunity to make completely new mistakes you wouldn’t have thought of before!

Python’s New Structural Pattern Matching!

It’s finally happened, after years of complaining and told “it will never happen”, I got my darn switch statement! And honestly, calling it only a “switch” statement is really underselling it. It’s much more powerful than most people will ever use! You might have already heard about this, but I wanted to make sure it made it to at least the 3.10 beta phase, as the alphas are no guarantee you’ll actually see the new feature (still possible to remove before the RC phase, but less likely).

Artwork by Clara Griffith

Python 3.10 will introduce “Structural Pattern Matching” as introduced in PEP622 which is a crazy advanced switch statement that can recognizing patterns. Instead of the keyword switch Python will introduce match instead (get ready to update your regex variable names!). Let’s do a little quick compare with it and JavaScript’s switch statement. First let’s start off with the Javascript example modified from w3schools.

Javascript’s Switch

switch (new Date().getDay()) {
  case 5:
    console.log("It's Friday!");
    break;
  case 0:
  case 6:
    console.log("Woo, Weekend!");
    break;
  default:
    console.log("Work. Work. Work. Work.");
}

Python’s Structural Pattern Matching

We can accomplish the same with Python in less lines. Note that in Python the weekday starts at 0 for Monday.

from datetime import datetime

match datetime.today().weekday():  
    case 4:
        print("It's Friday!")
    case 5 | 6:
        print("Woo, Weekend!")
    case _:
        print("Work. Work. Work. Work.")

Similarities and Differences

With Python’s new match style statements, the biggest change from tradition is no “drop through” cases. Aka you don’t have to manually specify break or anything to exit the match case for every instance. That does mean though that you can’t have multiple matches with the same result. Instead you can combine checks with | for “or” operations. Which I honestly think is a lot cleaner.

What I don’t like is there is no default keyword or similar, and you have to use the _ wildcard. The underscore is already overused in my opinion. However it does add a lot of power for more advanced cases. Let’s dive into some more examples.

Matching and Assigning

The very basic things to understand about the new Structural Pattern Matching is the flow (as seen in the example above) and the possibility of assignment. Because not only can you make sure things are equal to another literal, you can detect patterns (as the name suggests) and then work with the variables inside the pattern itself.

Pattern Recognition

Lets dip the toes in on this whole “pattern matching” thing, what does that mean?

Say you have a tuple with two objects, that could be in either order and you always want to print it right.

data_1 = ("Purchase Number", 574)
data_2 = (574, "Purchase Number")

You could have a simple match case to straighten it out so it always prints “Purchase Number 574”. As the pattern matching support type checking.

match data_1:
    case str(x), int(y):
        print(x, y) 
    case int(x), str(y):
        print(y, x)

Match on dict values

Just like with regular value comparisons, you can check dictionary values and still do or operations. In this case, either grade b or c will print out "Welcome to being average!"

match {'grade': 'B'}:
    case {'grade': 'a' | 'A'}:
        print("You're a star!")
    case {'grade': 'b' | 'c' | 'B' | 'C'}:
        print("Welcome to being average!")

Unpacking arbitrary data

You can use the standard * and ** unpackers to capture and unpack variable length lists and dicts.

my_dict = {'data': [{'action': 'move'}, {'action': 'stay'}],
           'location': "square_10"}

match my_dict:
    case {'data': [*options], **remainder }:
        print(options)
        print(remainder)
    case _:
        raise ValueError("Data not as expected")

Will match with the first case and print out:

[{'action': 'move'}, {'action': 'stay'}]
{'location': 'square_10'}

That can really come in handy if dealing with variable length data.

Adding the “if”

You can also add in some inline checks.

cords = (30.25100365332043, -97.86221371827051, "Workplace")
cords_2 = (44.40575746530253, 8.947747627912994)

match cords: 
    case lat, lon, *_ if lat > 0:
        print("Northern Hemisphere")
    case lat, lon, *_ if lat < 0: 
        print("Sothern Hemisphere")    

Don’t forget about “as”

Sometimes you might want one the many literals you are looking for to be usable in the case itself. Lets put a lot of our knowledge together and check out the added power of the as clause.

We are going to build a really simple command line game to look for random things.

import sys
import random

inventory = []

while True:
    match input().lower().split():
        # Remember `split` makes the input into a list.  
        # These first two cases are functionally the same.
        case ["exit"]:  
            sys.exit(0)
        case "stop", *_: 
            sys.exit(0)
        case "travel", "up" | "down" | "left" | "right" as direction:
            print(f"Going {direction}")
        case "search", "area" | "backpack" as thing, _, *extra:
            item = " ".join(extra)
            if thing == "backpack":
               if item  in inventory:
                   print(f"Yes you have {item}")
               else:
                   print(f"No you don't have {item}")
            elif thing == "area":
                 if random.randint(1, 10) > 5:
                     inventory.append(item)
                     print(f"You found {item}!")
                 else:
                    print(f"No {item} here")
            else:
                print(f"Sorry, you cannot search {thing}")
        case _:
            print("sorry, didn't understand you! type 'exit' or 'stop' to quit")

Lets do a quick playthrough:

> search area for diamonds
You found diamonds!

> search backpack for diamonds
Yes you have diamonds

> take a short rest and try to eat the diamonds
sorry, didn't understand you! type 'exit' or 'stop' to quit

> Travel Up
Going up

> search area for candy
No candy here

> search backpack for candy
No you don't have candy

> stop
# Process finished with exit code 0

Notice our special case for searching around.

case "search", "area" | "backpack" as thing, _, *extra:

we are looking for the first keyword “Search”, then either “area” or “backpack” and saving which to the variable thing. The _ will ignore the next word as we expect them to type for or something useless there. Finally we grab everything else and treat it as a single item.

Classes and More

Using dataclasses with match is super easy.

from dataclasses import dataclass

@dataclass
class Car:
    model: str
    top_speed: int

car_1 = Car("Jaguar F-Type R", 186)
car_2 = Car("Tesla Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(x, speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(x, speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

Using a regular class, you have to be a pit more explicit about the varaibles that are being used.

class Car:

    def __init__(self, model, top_speed):
        self.model = model
        self.top_speed = top_speed


car_1 = Car("F-Type R", 186)
car_2 = Car("Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(model=x, top_speed=speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(model=x, top_speed=speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

However, there is a way around that! You can add __match_args__ to the class itself to define which arguments you will want to use for the pattern recognition.

class Car:

    __match_args__ = ["model", "top_speed"]
    def __init__(self, model, top_speed):
        self.model = model
        self.top_speed = top_speed


car_1 = Car("F-Type R", 186)
car_2 = Car("Model S", 163)
car_3 = Car("BMW M4", 155)

match car_1:
    case Car(x, speed) if speed > 180:
        print(f"{x} is a super fast car!")
    case Car(x, speed) if speed > 160:
        print(f"{x} is a pretty fast car")
    case _:
        print("Regular Car")

Summary

Structural Pattern Matching is Python’s sexy new “switch” statement that will bring a lot of power, and removal of a lot of old if elif blocks. The only real downside is that it’s brand new, and will take a few years for most people to be able to use it with their default interpreter.

Stop Re-Encoding Videos!

I know this may sound like a weird statement coming from the author of FastFlix, but from the bottom of my heart, please stop the needless pixel loss! Every time you re-encode (aka transcode) a video it losses information, which lowers the quality and makes it a lot worse if you ever need to do it again. Re-encoding makes you a pixel killer!

Why am I saying re-encoding and not just “encoding” when you may have the original video? Sadly, to even fit onto your computer or device the video you are working with has already been encoded in a highly compressed manor. Even phones and professional cameras like the RED series use real-time compression. Raw video takes too much bandwidth for standard storage media. For example, imaging recording raw video using a video sensor at 16-bit 60FPS. A single minute of UHD footage would be near 60GB! That translates into a bitrate of over 7,500,000 kbps. Compare that to a re-encoded YouTube video of HDR 60FPS 4K footage is around 30,000 kbps, 250x times less quality!

Why does it matter?

Everyone has seen potato quality images and videos being re-shared again and again. The quality loss is due to needless re-encoding.

Websites generally always re-encode videos for a variety of reasons, such as adding watermarks or forcing certain bitrates. It may be required in some instances, but for most cases it’s overkill. If I had the power I would challenge websites to instead publish what encoding targets are required and allow for direct playback of the original file.

But while we don’t have control over that, that makes it even more important to not make it worse when you have control over it.

In the above example I took a short video and encoded it again and again and again using the same settings each time. The video still looks good while being watched, but if you zoom in you can see the entire thing has essentially become blurred. Don’t do this to your own videos, save the pixels!

Future Proofing

Another thing to consider is not just the result you have now, but how it will look in ten or twenty years. Sure, the single 1080p or 4K re-encode you did now may look perfect. But what about when 16K TVs are standard? What about when your phone or monitors pixel density is four to eight times higher than it is now? Behind the scenes of good TVs and devices is an “Ai” chip that upscales videos to look better on newer screens. With less detail to start with, it won’t look as good as if you didn’t re-encode.

Remember that amazing video you took with your flip phone all those years ago that you can’t even tell what is happening in it anymore? You don’t want that to happen to your videos now.

There are some cases that it cannot be avoided that we will cover later, but even common operations like trimming videos and rotating them can be accomplished without re-encoding.

Trim and Rotate without re-encoding

Two very common reasons people want to re-encode videos is to shorten them to a particular section, or rotate it the proper direction. Thankfully you can do both of those without modifying the original video stream. I am using the command line tool ffmpeg to accomplish this, which is available for free.

Rotate

To rotate the video, you just have to add some metadata to the container the video is in. This is how phones set videos to portrait or landscape mode without having to change their encoder settings.

ffmpeg -i your_video.mp4 -metadata 0:s:v rotate=90 -map 0 -c copy rotated_video.mp4

In the above example, replace your_video.mp4 with the video file you need rotated. It will be copied with the new metadata to rotated_video.mp4 and now should be rotated properly.

Trim

To cut out a section of the video, you simply need to “copy” everything between the two desired points. For example if you want to cut out a 48 second section between 1:02 and 1:50 (one minute two seconds and one minute fifty seconds), use the following command.

ffmpeg --ss 1:02 -to 1:50  -i your_video.mp4 -map 0 -c copy rotated_video.mp4

Are there any exceptions?

There are a few cases where you simply cannot avoid re-encoding. If you need to add effects, crop, make any actual modifications to the video, you will need to re-encode.

However if you are keeping the video as is, there are only three instances you should consider re-encoding for:

  • Limited bandwidth scenarios
  • Device compatibility
  • Cannot provide required storage space

Limited bandwidth scenario

ISPs still like to pretend upload speeds don’t matter. Even if an ISP provides 1Gbps down most still have less than 45Mbps upload peak. Things get even worse for mobile, where the average upload rate is around 10Mbps. Then on top of that, a general rule of thumb is your video bitrate should be around half or less of available bandwidth to ensure there isn’t stuttering.

That means if you’re planning to share videos in real time with the world, you simply have to re-encode (transcode) it.

Device compatibility

There are some large companies that have terrible compatibility with commonly accepted formats to be anti-competition. That means if you or a loved one is enslaved to some large fruit company, you may need to tinker with your videos to please the orchard overlords.

Storage space

“Storage is cheap” is a phrase I hear a little too much in the coding world. In the real world without a large quarterly tech budget, you have to count your pennies. If you can re-encode a video so that it’s near visually lossless while saving on storage space for your needs, go for it!

16TB Showdown – WD HC550 vs Seagate EXOS 16 vs Toshiba MG08

Hard Drive Interals

The three cheapest SATA 16TB drives available at the moment (March 2021) are all around $350: the Seagate EXOS 16, the WD Ultrastar DC HC550 and the Toshiba MG08 Series. I planned on using two of each in my home NAS, until a shocking discovery while doing a quick benchmark on them! To spoil the suspense, the Toshiba was either defective or was designed with different workloads in mind.

A quick note: these were all purchased out of pocket for my own setup. I do not have any sponsors, nor any advertising on my website. These results are purely my own findings. I simply wanted to share to those who may be interested.

The Contenders

As you may have guessed, these 16TB drives have a lot in similar. Each drive is helium filled, 7200rpm and SATA III (6.0Gb/s). The three of them also offer 2.5M hours of Mean Time Between Failures and max 550TB/yr workload. There are a few subtitle differences though.

Seagate
EXOS 16
WD Ultrastar
DC HC550
Toshiba
MG08 Series
Cache256mb512mb512mb
Block512e / 4Kn512e / 4Kn512e OR 4Kn
TechCMRCMR / EAMRCMR

They also sport a few physical design differences. The WD seems to be “upside down” compared to the others, and the standard middle side screw hole did not line up in my case.

For those who are design conscious, the EXOS is the only one offering a splash of color, and none of them offer RGB lights. Though if you are looking for RGB lights on a HDD, you might need a mental recalibration via wrench to the side of the head. HDDs are like construction workers, they work all day and are only noticed when they are slowing things down.

The Test Machine

This system was not designed to pump out the best benchmark performance. This is a real world NAS I will be using for my house. That said, it may not eek out the most performance on these drives, but it does give us a standard base for comparison.

  • Operating System: TrueNAS SCALE 21.02 Alpha
  • CPU: Ryzen 5 5900x 12-core 3.7GHz
  • Motherboard: ASRock X570 PHANTOM GAMING 4
  • Memory: NEMIX RAM 64GB DDR4-3200 P 2Rx8 ECC Unbuffered Memory
  • PSU: CORSAIR RM Series RM750
  • HBA: Dell H310 LSI IT mode 9211-8i

The hard drives were all connected to the HBA card and tested one at a time. They were setup into single ZFS pools using the TrueNAS UI with record sizes set to 512K.

All three drives are the exact same size (perfect for a NAS setup) and using the same 512e block size.

14.55 TiB, 16000900661248 bytes, 31251759104 sectors
Units: sectors of 1 * 512 = 512 bytes
Sector size (logical/physical): 512 bytes / 4096 bytes
I/O size (minimum/optimal): 4096 bytes / 4096 bytes

Benchmark Results

Read Speeds

These were gathered by running hdparm -tT twice on each drive and taking the better of the two (they were extremely similar and didn’t seem to merit more tests).

They were all so close we really need to “zoom in” on the differences.

That’s a bit better. However it exaggerates the real world difference because the lack of scale, so keep that in mind (never trust a graph that doesn’t start at zero!) Out of the three, Seagate EXOS 16 looks to be the sweet spot for cached and buffered read speeds. The other two both put in great numbers for spinning platter disks, and may want to stick to the WD if you will be working with frequently cached data.

Write Speeds

Each Benchmark for the write speeds were done using the dd command three times each to pump output from /dev/urandom to each drive. The tests will not encounter compression or caching done during the runs with this design. This means these numbers are for comparison to each other only and not max speed. Each drive was in a single ZFS vdev by itself with a record size of 512k.

Large blocks

Here is where the Toshiba MG08 started to really worry me. Its write speed of large 1M block sizes is half that of the others.

dd if=/dev/urandom  of=/mnt/<dive>/tmp.file  bs=1M  count=1000  oflag=dsync

Medium blocks

The Seagate and WD are still leading by a considerable amount, but this seems to be the most favorable benchmark for the Toshiba.

dd if=/dev/urandom  of=/mnt/<dive>/tmp.file  bs=4096k  count=1000  oflag=dsync

Small blocks

Full on nosedive for the Toshiba, over four times slower than the other drives.

dd if=/dev/urandom  of=/mnt/<drive>/tmp.file  bs=4096  count=1000  oflag=dsync

After reviewing the data, I am convinced that the drive is either defective or is designed for a different type of workload than what is in my setup. I did reach out to other enthusiasts on the /r/homelab discord and nothing seemed to cause that dip due to methodology at least.

Conclusions

Both the WD Ultrastar HC550 and Seagate EXOS 16 seem to be good choices for a home NAS. Others seem to also have great performance with the Toshiba MG08, that I was not able to duplicate in this instance. I wish I had the capability to grab another Toshiba to see if this was just a defective drive or my setup, but I can’t just throw cash at it.

For now I am putting together a combo of WDs and Seagates into my NAS and calling it good. Hope you found this information useful!

Upload large files fast with Dropzone.js

I have previously covered how to upload large files with dropzone.js, but it didn’t allow for parallel chunk uploads. In this article we will go over that new addition, as well as several other improvements.

Download the final executable or view github to see all the code now if you don’t want to read the article.

This is the final result for now, and can be customized if you so chose. Obviously I’m no graphic design, but I’d pick function over style anyway.

Design

Before we dive into code, lets think about the design. We will need to somehow handle multiple parts of a single file being uploaded at the same time in a random order. How do we keep track of that?

Thankfully, Dropzone will provide the server with a few different pieces of information which each chunk, they include:

  • dzuuid – unique ID per upload file
  • dzchunkindex – the chunk number of the current upload
  • dztotalfilesize – Total size of the upload
  • dzchunksize – Max size per chunk
  • dztotalchunkcount – The number of chunks in this file
  • dzchunkbyteoffset – The place in the file this chunk starts

In my mind there are two clear ways to approach the problem. First option is to create a sparse file of the full size to start with, using dztotalchunkcount and then with every incoming chunk, set the position of the file using dzchunkbyteoffset and write the data starting there.

The advantage of this method is that it only requires a single file on disk. The disadvantage is you have to worry about multiple threads accessing the same file at the same time.

The second choice is to write each chunk to a separate file, then when they are all uploaded concatenate them all to a single file and remove the individual chunks. The disadvantage are that that you require twice the space for a short time, and have to deal with cleanup of temporary files.

I personally preferred the second option, as it seemed a bit safer.

Upload Function

As a quick warning, I am now using Bottle instead of Flask for this upload, so a bit of the form syntax has changed since the last post.

from pathlib import Path
from threading import Lock
from collections import defaultdict
import shutil
import uuid

from bottle import route, run, request, error, response, HTTPError, static_file
from werkzeug.utils import secure_filename

lock = Lock()
chucks = defaultdict(list)

chunk_path = Path(__file__).parent / "chunks"
storage_path = Path(__file__).parent / "storage"
chunk_path.mkdir(exist_ok=True, parents=True)
storage_path.mkdir(exist_ok=True, parents=True)

@route("/upload", method="POST")
def upload():
    file = request.files.get("file")
    if not file:
        raise HTTPError(status=400, body="No file provided")

    dz_uuid = request.forms.get("dzuuid")
    if not dz_uuid:
        # Assume this file has not been chunked
        with open(storage_path / f"{uuid.uuid4()}_{secure_filename(file.filename)}", "wb") as f:
            file.save(f)
        return "File Saved"

    # Chunked download
    try:
        current_chunk = int(request.forms["dzchunkindex"])
        total_chunks = int(request.forms["dztotalchunkcount"])
    except KeyError as err:
        raise HTTPError(status=400, body=f"Not all required fields supplied, missing {err}")
    except ValueError:
        raise HTTPError(status=400, body=f"Values provided were not in expected format")
    
    # Create a new directory for this file in the chunks dir, using the UUID as the folder name
    save_dir = chunk_path / dz_uuid
    if not save_dir.exists():
        save_dir.mkdir(exist_ok=True, parents=True)

    # Save the individual chunk
    with open(save_dir / str(request.forms["dzchunkindex"]), "wb") as f:
        file.save(f)

    # See if we have all the chunks downloaded
    with lock:
        chucks[dz_uuid].append(current_chunk)
        completed = len(chucks[dz_uuid]) == total_chunks

    # Concat all the files into the final file when all are downloaded
    if completed:
        with open(storage_path / f"{dz_uuid}_{secure_filename(file.filename)}", "wb") as f:
            for file_number in range(total_chunks):
                f.write((save_dir / str(file_number)).read_bytes())
        print(f"{file.filename} has been uploaded")
        shutil.rmtree(save_dir)

    return "Chunk upload successful"

if __name__ == "__main__":
    run(server="paste")

Hopefully the code is decently self documented. We do a few checks at the start as we pull in the required parameters. Then we prepare the directory for where the temporary chunks will be stored, and write the incoming chunk there. We gather information on all the chunks and when then have been completed in a global dictionary, and when they are all uploaded they are assembled into the final file.

File Downloading

Now that we can put files on the server, what about getting them back? I personally don’t want people to host random files on my server, but others may. To accomplish that, we shouldn’t just list all the files to everyone that visits the site, but only to whoever uploaded it. Thankfully we can just store the uuid in a cookie on the frontend, and then have a very basic download function.

@route("/download/<dz_uuid>")
def download(dz_uuid):
    for file in storage_path.iterdir():
        if file.is_file() and file.name.startswith(dz_uuid):
            return static_file(file.name, root=file.parent.absolute(), download=True)
    return HTTPError(status=404)

This does complicate our frontend a bit, as we want to save both UUID and filename as text fields in a cookie. There are a lot of great libraries out there to make life easier with JavaScript and cookies, but I wanted to keep it simple and pure JS other than Dropzone, making the code a bit more complicated than last time.

Dropzone frontend

Instead of being a standalone file, I have also put this directly into the python file to make using it as a f-string a lot easier, but makes it a little harder to read.

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.css"/>
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/basic.min.css"/>
    <script type="application/javascript"
        src="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.js">
    </script>
    <title>pyfiledrop</title>
</head>
<body>

    <div id="content" style="width: 800px; margin: 0 auto;">
        <h2>Upload new files</h2>
        <form method="POST" action='/upload' class="dropzone dz-clickable" id="dropper" enctype="multipart/form-data">
        </form>

        <h2>
            Uploaded
            <input type="button" value="Clear" onclick="clearCookies()" />
        </h2>
        <div id="uploaded">

        </div>

        <script type="application/javascript">
            function clearCookies() {{
                document.cookie = "files=; Max-Age=0";
                document.getElementById("uploaded").innerHTML = "";
            }}

            function getFilesFromCookie() {{
                try {{ return document.cookie.split("=", 2)[1].split("||");}} catch (error) {{ return []; }}
            }}

            function saveCookie(new_file) {{
                    let all_files = getFilesFromCookie();
                    all_files.push(new_file);
                    document.cookie = `files=${{all_files.join("||")}}`;
            }}

            function generateLink(combo){{
                const uuid = combo.split('|^^|')[0];
                const name = combo.split('|^^|')[1];
                if ({'true' if allow_downloads else 'false'}) {{
                    return `<a href="/download/${{uuid}}" download="${{name}}">${{name}}</a>`;
                }}
                return name;
            }}


            function init() {{

                Dropzone.options.dropper = {{
                    paramName: 'file',
                    chunking: true,
                    forceChunking: {dropzone_force_chunking},
                    url: '/upload',
                    retryChunks: true,
                    parallelChunkUploads: {dropzone_parallel_chunks},
                    timeout: {dropzone_timeout}, 
                    maxFilesize: {dropzone_max_file_size}, 
                    chunkSize: {dropzone_chunk_size}, 
                    init: function () {{
                        this.on("complete", function (file) {{
                            let combo = `${{file.upload.uuid}}|^^|${{file.upload.filename}}`;
                            saveCookie(combo);
                            document.getElementById("uploaded").innerHTML += generateLink(combo)  + "<br />";
                        }});
                    }}
                }}

                if (typeof document.cookie !== 'undefined' ) {{
                    let content = "";
                     getFilesFromCookie().forEach(function (combo) {{
                        content += generateLink(combo) + "<br />";
                    }});

                    document.getElementById("uploaded").innerHTML = content;
                }}
            }}

            init();

        </script>
    </div>
</body>
</html>

Notice we are using a slew of python variables that we are going to allow to be configurable upon launch.

Command line options

import argparse
...

allow_downloads = False
dropzone_cdn = "https://cdnjs.cloudflare.com/ajax/libs/dropzone"
dropzone_version = "5.7.6"
dropzone_timeout = "120000"
dropzone_max_file_size = "100000"
dropzone_chunk_size = "1000000"
dropzone_parallel_chunks = "true"
dropzone_force_chunking = "true"

...


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--port", type=int, default=16273, required=False)
    parser.add_argument("--host", type=str, default="0.0.0.0", required=False)
    parser.add_argument("-s", "--storage", type=str, default=str(storage_path), required=False)
    parser.add_argument("-c", "--chunks", type=str, default=str(chunk_path), required=False)
    parser.add_argument(
        "--max-size",
        type=str,
        default=dropzone_max_file_size,
        help="Max file size (Mb)",
    )
    parser.add_argument(
        "--timeout",
        type=str,
        default=dropzone_timeout,
        help="Timeout (ms) for each chuck upload",
    )
    parser.add_argument("--chunk-size", type=str, default=dropzone_chunk_size, help="Chunk size (bytes)")
    parser.add_argument("--disable-parallel-chunks", required=False, default=False, action="store_true")
    parser.add_argument("--disable-force-chunking", required=False, default=False, action="store_true")
    parser.add_argument("-a", "--allow-downloads", required=False, default=False, action="store_true")
    parser.add_argument("--dz-cdn", type=str, default=None, required=False)
    parser.add_argument("--dz-version", type=str, default=None, required=False)
    return parser.parse_args()


if __name__ == "__main__":

    args = parse_args()
    storage_path = Path(args.storage)
    chunk_path = Path(args.chunks)
    dropzone_chunk_size = args.chunk_size
    dropzone_timeout = args.timeout
    dropzone_max_file_size = args.max_size
    try:
        if int(dropzone_timeout) < 1 or int(dropzone_chunk_size) < 1 or int(dropzone_max_file_size) < 1:
            raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all positive")
    except ValueError:
        raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all integers")

    if args.dz_cdn:
        dropzone_cdn = args.dz_cdn
    if args.dz_version:
        dropzone_version = args.dz_version
    if args.disable_parallel_chunks:
        dropzone_parallel_chunks = "false"
    if args.disable_force_chunking:
        dropzone_force_chunking = "false"
    if args.allow_downloads:
        allow_downloads = True

    if not storage_path.exists():
        storage_path.mkdir(exist_ok=True)
    if not chunk_path.exists():
        chunk_path.mkdir(exist_ok=True)

    print(f"""Timeout: {int(dropzone_timeout) 
Chunk Size: {int(dropzone_chunk_size) 
Max File Size: {int(dropzone_max_file_size)} Mb
Force Chunking: {dropzone_force_chunking}
Parallel Chunks: {dropzone_parallel_chunks}
Storage Path: {storage_path.absolute()}
Chunk Path: {chunk_path.absolute()}
""")
    run(server="paste", port=args.port, host=args.host)

As this will become an executable, to be configurable we want to pass parameters upon launch.

Favicon

Now this is getting into the realm of silly. But to be an all in one script, we need to provide a binary file (the favicon) in the script itself. Thankfully ico files can be compressed rather easily, so we are going to compress it in the script itself, and decompress it when requested.

@route("/favicon.ico")
def favicon():
    return zlib.decompress(
        b"x\x9c\xedVYN\xc40\x0c5J%[\xe2\xa3|q\x06\x8e1G\xe1(=ZoV"
        b"\xb2\xa7\x89\x97R\x8d\x84\x04\xe4\xa5\xcb(\xc9\xb3\x1do"
        b"\x1d\x80\x17?\x1e\x0f\xf0O\x82\xcfw\x00\x7f\xc1\x87\xbf"
        b"\xfd\x14l\x90\xe6#\xde@\xc1\x966n[z\x85\x11\xa6\xfcc"
        b"\xdfw?s\xc4\x0b\x8e#\xbd\xc2\x08S\xe1111\xf1k\xb1NL"
        b"\xfcU<\x99\xe4T\xf8\xf43|\xaa\x18\xf8\xc3\xbaHFw\xaaj\x94"
        b"\xf4c[F\xc6\xee\xbb\xc2\xc0\x17\xf6\xf4\x12\x160\xf9"
        b"\xa3\xfeQB5\xab@\xf4\x1f\xa55r\xf9\xa4KGG\xee\x16\xdd\xff"
        b"\x8e\x9d\x8by\xc4\xe4\x17\tU\xbdDg\xf1\xeb\xf0Zh\x8e"
        b"\xd3s\x9c\xab\xc3P\n<e\xcb$\x05 b\xd8\x84Q1\x8a\xd6Kt\xe6"
        b"\x85(\x13\xe5\xf3]j\xcf\x06\x88\xe6K\x02\x84\x18\x90"
        b"\xc5\xa7Kz\xd4\x11\xeeEZK\x012\xe9\xab\xa5\xbf\xb3@i\x00"
        b"\xce\xe47\x0b\xb4\xfe\xb1d\xffk\xebh\xd3\xa3\xfd\xa4:`5J"
        b"\xa3\xf1\xf5\xf4\xcf\x02tz\x8c_\xd2\xa1\xee\xe1\xad"
        b"\xaa\xb7n-\xe5\xafoSQ\x14'\x01\xb7\x9b<\x15~\x0e\xf4b"
        b"\x8a\x90k\x8c\xdaO\xfb\x18<H\x9d\xdfj\xab\xd0\xb43\xe1"
        b'\xe3nt\x16\xdf\r\xe6\xa1d\xad\xd0\xc9z\x03"\xc7c\x94v'
        b"\xb6I\xe1\x8f\xf5,\xaa2\x93}\x90\xe0\x94\x1d\xd2\xfcY~f"
        b"\xab\r\xc1\xc8\xc4\xe4\x1f\xed\x03\x1e`\xd6\x02\xda\xc7k"
        b"\x16\x1a\xf4\xcb2Q\x05\xa0\xe6\xb4\x1e\xa4\x84\xc6"
        b"\xcc..`8'\x9a\xc9-\n\xa8\x05]?\xa3\xdfn\x11-\xcc\x0b"
        b"\xb4\x7f67:\x0c\xcf\xd5\xbb\xfd\x89\x9ebG\xf8:\x8bG"
        b"\xc0\xfb\x9dm\xe2\xdf\x80g\xea\xc4\xc45\xbe\x00\x03\xe9\xd6\xbb"
    )

Putting it all together

Here is the culmination of everything we talked about put into a script.

This may not always be the newest version, if you want to use it yourself please download the final executable or view github to see the latest code.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
from threading import Lock
from collections import defaultdict
import shutil
import argparse
import uuid
import zlib

from bottle import route, run, request, error, response, HTTPError, static_file
from werkzeug.utils import secure_filename

storage_path: Path = Path(__file__).parent / "storage"
chunk_path: Path = Path(__file__).parent / "chunk"

allow_downloads = False
dropzone_cdn = "https://cdnjs.cloudflare.com/ajax/libs/dropzone"
dropzone_version = "5.7.6"
dropzone_timeout = "120000"
dropzone_max_file_size = "100000"
dropzone_chunk_size = "1000000"
dropzone_parallel_chunks = "true"
dropzone_force_chunking = "true"

lock = Lock()
chucks = defaultdict(list)


@error(500)
def handle_500(error_message):
    response.status = 500
    response.body = f"Error: {error_message}"
    return response


@route("/")
def index():
    index_file = Path(__file__) / "index.html"
    if index_file.exists():
        return index_file.read_text()
    return f"""
<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.css"/>
    <link rel="stylesheet" href="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/basic.min.css"/>
    <script type="application/javascript"
        src="{dropzone_cdn.rstrip('/')}/{dropzone_version}/min/dropzone.min.js">
    </script>
    <title>pyfiledrop</title>
</head>
<body>

    <div id="content" style="width: 800px; margin: 0 auto;">
        <h2>Upload new files</h2>
        <form method="POST" action='/upload' class="dropzone dz-clickable" id="dropper" enctype="multipart/form-data">
        </form>

        <h2>
            Uploaded
            <input type="button" value="Clear" onclick="clearCookies()" />
        </h2>
        <div id="uploaded">

        </div>

        <script type="application/javascript">
            function clearCookies() {{
                document.cookie = "files=; Max-Age=0";
                document.getElementById("uploaded").innerHTML = "";
            }}

            function getFilesFromCookie() {{
                try {{ return document.cookie.split("=", 2)[1].split("||");}} catch (error) {{ return []; }}
            }}

            function saveCookie(new_file) {{
                    let all_files = getFilesFromCookie();
                    all_files.push(new_file);
                    document.cookie = `files=${{all_files.join("||")}}`;
            }}

            function generateLink(combo){{
                const uuid = combo.split('|^^|')[0];
                const name = combo.split('|^^|')[1];
                if ({'true' if allow_downloads else 'false'}) {{
                    return `<a href="/download/${{uuid}}" download="${{name}}">${{name}}</a>`;
                }}
                return name;
            }}


            function init() {{

                Dropzone.options.dropper = {{
                    paramName: 'file',
                    chunking: true,
                    forceChunking: {dropzone_force_chunking},
                    url: '/upload',
                    retryChunks: true,
                    parallelChunkUploads: {dropzone_parallel_chunks},
                    timeout: {dropzone_timeout}, 
                    maxFilesize: {dropzone_max_file_size}, 
                    chunkSize: {dropzone_chunk_size}, 
                    init: function () {{
                        this.on("complete", function (file) {{
                            let combo = `${{file.upload.uuid}}|^^|${{file.upload.filename}}`;
                            saveCookie(combo);
                            document.getElementById("uploaded").innerHTML += generateLink(combo)  + "<br />";
                        }});
                    }}
                }}

                if (typeof document.cookie !== 'undefined' ) {{
                    let content = "";
                     getFilesFromCookie().forEach(function (combo) {{
                        content += generateLink(combo) + "<br />";
                    }});

                    document.getElementById("uploaded").innerHTML = content;
                }}
            }}

            init();

        </script>
    </div>
</body>
</html>
    """


@route("/favicon.ico")
def favicon():
    return zlib.decompress(
        b"x\x9c\xedVYN\xc40\x0c5J%[\xe2\xa3|q\x06\x8e1G\xe1(=ZoV"
        b"\xb2\xa7\x89\x97R\x8d\x84\x04\xe4\xa5\xcb(\xc9\xb3\x1do"
        b"\x1d\x80\x17?\x1e\x0f\xf0O\x82\xcfw\x00\x7f\xc1\x87\xbf"
        b"\xfd\x14l\x90\xe6#\xde@\xc1\x966n[z\x85\x11\xa6\xfcc"
        b"\xdfw?s\xc4\x0b\x8e#\xbd\xc2\x08S\xe1111\xf1k\xb1NL"
        b"\xfcU<\x99\xe4T\xf8\xf43|\xaa\x18\xf8\xc3\xbaHFw\xaaj\x94"
        b"\xf4c[F\xc6\xee\xbb\xc2\xc0\x17\xf6\xf4\x12\x160\xf9"
        b"\xa3\xfeQB5\xab@\xf4\x1f\xa55r\xf9\xa4KGG\xee\x16\xdd\xff"
        b"\x8e\x9d\x8by\xc4\xe4\x17\tU\xbdDg\xf1\xeb\xf0Zh\x8e"
        b"\xd3s\x9c\xab\xc3P\n<e\xcb$\x05 b\xd8\x84Q1\x8a\xd6Kt\xe6"
        b"\x85(\x13\xe5\xf3]j\xcf\x06\x88\xe6K\x02\x84\x18\x90"
        b"\xc5\xa7Kz\xd4\x11\xeeEZK\x012\xe9\xab\xa5\xbf\xb3@i\x00"
        b"\xce\xe47\x0b\xb4\xfe\xb1d\xffk\xebh\xd3\xa3\xfd\xa4:`5J"
        b"\xa3\xf1\xf5\xf4\xcf\x02tz\x8c_\xd2\xa1\xee\xe1\xad"
        b"\xaa\xb7n-\xe5\xafoSQ\x14'\x01\xb7\x9b<\x15~\x0e\xf4b"
        b"\x8a\x90k\x8c\xdaO\xfb\x18<H\x9d\xdfj\xab\xd0\xb43\xe1"
        b'\xe3nt\x16\xdf\r\xe6\xa1d\xad\xd0\xc9z\x03"\xc7c\x94v'
        b"\xb6I\xe1\x8f\xf5,\xaa2\x93}\x90\xe0\x94\x1d\xd2\xfcY~f"
        b"\xab\r\xc1\xc8\xc4\xe4\x1f\xed\x03\x1e`\xd6\x02\xda\xc7k"
        b"\x16\x1a\xf4\xcb2Q\x05\xa0\xe6\xb4\x1e\xa4\x84\xc6"
        b"\xcc..`8'\x9a\xc9-\n\xa8\x05]?\xa3\xdfn\x11-\xcc\x0b"
        b"\xb4\x7f67:\x0c\xcf\xd5\xbb\xfd\x89\x9ebG\xf8:\x8bG"
        b"\xc0\xfb\x9dm\xe2\xdf\x80g\xea\xc4\xc45\xbe\x00\x03\xe9\xd6\xbb"
    )


@route("/upload", method="POST")
def upload():
    file = request.files.get("file")
    if not file:
        raise HTTPError(status=400, body="No file provided")

    dz_uuid = request.forms.get("dzuuid")
    if not dz_uuid:
        # Assume this file has not been chunked
        with open(storage_path / f"{uuid.uuid4()}_{secure_filename(file.filename)}", "wb") as f:
            file.save(f)
        return "File Saved"

    # Chunked download
    try:
        current_chunk = int(request.forms["dzchunkindex"])
        total_chunks = int(request.forms["dztotalchunkcount"])
    except KeyError as err:
        raise HTTPError(status=400, body=f"Not all required fields supplied, missing {err}")
    except ValueError:
        raise HTTPError(status=400, body=f"Values provided were not in expected format")

    save_dir = chunk_path / dz_uuid

    if not save_dir.exists():
        save_dir.mkdir(exist_ok=True, parents=True)

    # Save the individual chunk
    with open(save_dir / str(request.forms["dzchunkindex"]), "wb") as f:
        file.save(f)

    # See if we have all the chunks downloaded
    with lock:
        chucks[dz_uuid].append(current_chunk)
        completed = len(chucks[dz_uuid]) == total_chunks

    # Concat all the files into the final file when all are downloaded
    if completed:
        with open(storage_path / f"{dz_uuid}_{secure_filename(file.filename)}", "wb") as f:
            for file_number in range(total_chunks):
                f.write((save_dir / str(file_number)).read_bytes())
        print(f"{file.filename} has been uploaded")
        shutil.rmtree(save_dir)

    return "Chunk upload successful"


@route("/download/<dz_uuid>")
def download(dz_uuid):
    if not allow_downloads:
        raise HTTPError(status=403)
    for file in storage_path.iterdir():
        if file.is_file() and file.name.startswith(dz_uuid):
            return static_file(file.name, root=file.parent.absolute(), download=True)
    return HTTPError(status=404)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--port", type=int, default=16273, required=False)
    parser.add_argument("--host", type=str, default="0.0.0.0", required=False)
    parser.add_argument("-s", "--storage", type=str, default=str(storage_path), required=False)
    parser.add_argument("-c", "--chunks", type=str, default=str(chunk_path), required=False)
    parser.add_argument(
        "--max-size",
        type=str,
        default=dropzone_max_file_size,
        help="Max file size (Mb)",
    )
    parser.add_argument(
        "--timeout",
        type=str,
        default=dropzone_timeout,
        help="Timeout (ms) for each chuck upload",
    )
    parser.add_argument("--chunk-size", type=str, default=dropzone_chunk_size, help="Chunk size (bytes)")
    parser.add_argument("--disable-parallel-chunks", required=False, default=False, action="store_true")
    parser.add_argument("--disable-force-chunking", required=False, default=False, action="store_true")
    parser.add_argument("-a", "--allow-downloads", required=False, default=False, action="store_true")
    parser.add_argument("--dz-cdn", type=str, default=None, required=False)
    parser.add_argument("--dz-version", type=str, default=None, required=False)
    return parser.parse_args()


if __name__ == "__main__":

    args = parse_args()
    storage_path = Path(args.storage)
    chunk_path = Path(args.chunks)
    dropzone_chunk_size = args.chunk_size
    dropzone_timeout = args.timeout
    dropzone_max_file_size = args.max_size
    try:
        if int(dropzone_timeout) < 1 or int(dropzone_chunk_size) < 1 or int(dropzone_max_file_size) < 1:
            raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all positive")
    except ValueError:
        raise Exception("Invalid dropzone option, make sure max-size, timeout, and chunk-size are all integers")

    if args.dz_cdn:
        dropzone_cdn = args.dz_cdn
    if args.dz_version:
        dropzone_version = args.dz_version
    if args.disable_parallel_chunks:
        dropzone_parallel_chunks = "false"
    if args.disable_force_chunking:
        dropzone_force_chunking = "false"
    if args.allow_downloads:
        allow_downloads = True

    if not storage_path.exists():
        storage_path.mkdir(exist_ok=True)
    if not chunk_path.exists():
        chunk_path.mkdir(exist_ok=True)

    print(
        f"""Timeout: {int(dropzone_timeout) 
Chunk Size: {int(dropzone_chunk_size) 
Max File Size: {int(dropzone_max_file_size)} Mb
Force Chunking: {dropzone_force_chunking}
Parallel Chunks: {dropzone_parallel_chunks}
Storage Path: {storage_path.absolute()}
Chunk Path: {chunk_path.absolute()}
"""
    )
    run(server="paste", port=args.port, host=args.host)

Make it yours, and give back if you can!

What will you add to this script? Set a max time for how long you can see the uploaded files? A way to ensure the file exists on the server before trying to download it? Checksum comparison to avoid using space for duplicate files?

However you make it better, please consider to add a pull request for your features so anyone can benefit from it!