Automating HLS-Stream-Creator (Python)

My HLS Stream Creator script takes input videos and outputs adaptive HLS streams. However, it was written as an experiment and developed over time, so has always been focused more on manual runs.

This snippet gives an example of wrapping automation around the script, and (a slightly different version) is in active use on one of my muxers, fetching muxing jobs from an upstream API (hence the slightly odd input to triggerMux).

It may need a little adaptation for use-cases that differ to mine, for example, the output directory is written into the same path as the input video and all input videos exist under the same base directory (basically, a NFS mount point), but should serve to show the basic concept

Details

Snippet

#!/usr/bin/env python
import json
import subprocess
from subprocess import call
import os
import time
import re

# Where is HLS Stream Creator
hls_enc_path="/usr/local/bin/HLS-Stream-Creator.sh"

# Where will we find the videos (and write out to)
base_dir='/mnt/videos/HLS/'

# HLS Segment length
seg_length=5

# Set some FFMPEG flags
FFMPEG_FLAGS='-preset fast -hide_banner -strict -2 -loglevel quiet'
NUMTHREADS=2

# When calculating bitrates, what factor of the source should we create (as well as 1 which is implied)
output_rates = [0.75,0.5,0.4]

# What should we use as a tempfile for the initial compatability pass?
TEMPFILE="/tmp/tmp.mp4"

def sorted_nicely( l ):
    """ Sorts the given iterable in the way that is expected.

    Required arguments:
    l -- The iterable to be sorted.

    """
    convert = lambda text: int(text) if text.isdigit() else text
    alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
    return sorted(l, key = alphanum_key)

def probe_file(filename):
    """ Probe the input file to get stream information
    """
    cmnd = ['ffprobe', '-show_streams', '-print_format', 'json', '-loglevel', 'quiet', filename]
    p = subprocess.Popen(cmnd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err =  p.communicate()
    if err:
        print "========= error ========"
        print err
        return False
    j = json.loads(out)
    return j

def calcBitrates(j, output_rates):
    """ Calculate the output bitrates based on the source input
    """
    bitrates = []
    for s in j['streams']:
        if s['codec_type'] != 'video':
            continue
        elif "bit_rate" not in s:
            continue

        br = int(s['bit_rate'])

        # Work out the bitrates to generate
        bitrates.append(str(int(br/1000)))

        for mod in output_rates:
        # Convert to kb/s
        # We caste to an int to remove any decimal places, and then to a string so we dont break join()
            bitrates.append(str(int((br * mod)/1000)))

        if int(bitrates[-1]) > 250:
            # Make sure there's a 200kb/s option
            bitrates.append(str(200))

        return sorted_nicely(bitrates)

    return False

def triggerMux(job):
    """

    job['job']['path'] = Path to source media

    Muxed version will be written into a directory using the original filename as a name
    but with .hls appended.

    i.e. /mnt/video/foo.mp4 would create /mnt/video/foo.mp4.hls/

    """
    notify_change('inprocess',job)
    # Call the muxing job
    path = "%s%s" % (base_dir,job['job']['path'])
    pathsplit = job['job']['path'].split("/")

    vidname=pathsplit[-1]
    viddir="%s.hls" % (path)

    if os.path.isdir(viddir):
        # Don't try and redo work that's already done
        print "already seem to have %s" % (viddir,)
        notify_change('failed',job,{'reason':'DirectoryExists'})
        return False

    if not os.path.isfile(path):
        # Do nothing if the source file is missing
        print "File doesn't exist %s" % (path,)
        notify_change('failed',job,{'reason':'MissingFile'})
        return False

    # Create the output directory
    os.mkdir(viddir)

    # Get video information and calculate variant rates
    json = probe_file(path)
    brs = calcBitrates(json, output_rates)

    if not brs:
        print "Couldn't ascertain source bitrate"
        notify_change('failed',job,{'reason':'CantGetBitRate'})
        return False        

    bitrates = ','.join(brs)

    # Set up the environment
    my_env = os.environ.copy()
    my_env["FFMPEG_FLAGS"] = FFMPEG_FLAGS
    my_env["NUMTHREADS"] = str(NUMTHREADS)

    # Build the CMD
    cmd = ['ffmpeg','-i',str(path)]

    for arg in FFMPEG_FLAGS.split(' '):
        cmd.append(arg)

    cmd.append(TEMPFILE)

    # Make a copy of the file to strip any unsupported tracks
    notify_change('compatrun',job)
    proc = subprocess.Popen(cmd,env=my_env)    
    proc.wait()

    # Get ready to start the actual mux 
    # Make a note of when it started
    start = int(time.time())

    # Trigger the mux
    notify_change('muxstarted',job)
    proc = subprocess.Popen([hls_enc_path,'-i',TEMPFILE,'-o',str(viddir),'-b',str(bitrates),'-p','manifest','-t','media','-s',str(seg_length)],env=my_env)    
    proc.wait()

    os.remove(TEMPFILE)

    if (int(time.time()) - start) < 5:
        # Completed too quick, ffmpeg errored
        print "FFMpeg exited %s" % (path,)
        notify_change('failed',job,{'reason':'FFmpegErr'})
        return False

    notify_change('complete',job)

def notify_change(state,job,data=False):
    ''' Placeholder function

    Do whatever you need here if you want to notify somewhere of task status
    '''

    return True

Usage Example

#!/usr/bin/env python
#
# Simple wrapper to call HLS Stream Creator against multiple video files
#
import HLSSCWrapper
HLSSCWrapper.hls_enc_path="/home/ben/repos/HLS-Stream-Creator/HLS-Stream-Creator.sh"
HLSSCWrapper.base_dir='/mnt/HLS-Out/'

jobs=[]
inputfiles=['example1.mp4','a/b/c/d/foo.mp4','b/B/big_buck_bunny.mp4']

for inp in inputfiles:

    job = {
        "job": {
        "path": inp
        }
    }

    HLSSCWrapper.triggerMux(job)