Automating HLS-Stream-Creator (Python)
My HLS Stream Creator script takes input videos and outputs adaptive HLS streams. However, it was written as an experiment and developed over time, so has always been focused more on manual runs.
This snippet gives an example of wrapping automation around the script, and (a slightly different version) is in active use on one of my muxers, fetching muxing jobs from an upstream API (hence the slightly odd input to triggerMux
).
It may need a little adaptation for use-cases that differ to mine, for example, the output directory is written into the same path as the input video and all input videos exist under the same base directory (basically, a NFS mount point), but should serve to show the basic concept
Details
- Language: Python
- License: BSD-3-Clause
Snippet
#!/usr/bin/env python
import json
import subprocess
from subprocess import call
import os
import time
import re
# Where is HLS Stream Creator
hls_enc_path="/usr/local/bin/HLS-Stream-Creator.sh"
# Where will we find the videos (and write out to)
base_dir='/mnt/videos/HLS/'
# HLS Segment length
seg_length=5
# Set some FFMPEG flags
FFMPEG_FLAGS='-preset fast -hide_banner -strict -2 -loglevel quiet'
NUMTHREADS=2
# When calculating bitrates, what factor of the source should we create (as well as 1 which is implied)
output_rates = [0.75,0.5,0.4]
# What should we use as a tempfile for the initial compatability pass?
TEMPFILE="/tmp/tmp.mp4"
def sorted_nicely( l ):
""" Sorts the given iterable in the way that is expected.
Required arguments:
l -- The iterable to be sorted.
"""
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key = alphanum_key)
def probe_file(filename):
""" Probe the input file to get stream information
"""
cmnd = ['ffprobe', '-show_streams', '-print_format', 'json', '-loglevel', 'quiet', filename]
p = subprocess.Popen(cmnd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if err:
print "========= error ========"
print err
return False
j = json.loads(out)
return j
def calcBitrates(j, output_rates):
""" Calculate the output bitrates based on the source input
"""
bitrates = []
for s in j['streams']:
if s['codec_type'] != 'video':
continue
elif "bit_rate" not in s:
continue
br = int(s['bit_rate'])
# Work out the bitrates to generate
bitrates.append(str(int(br/1000)))
for mod in output_rates:
# Convert to kb/s
# We caste to an int to remove any decimal places, and then to a string so we dont break join()
bitrates.append(str(int((br * mod)/1000)))
if int(bitrates[-1]) > 250:
# Make sure there's a 200kb/s option
bitrates.append(str(200))
return sorted_nicely(bitrates)
return False
def triggerMux(job):
"""
job['job']['path'] = Path to source media
Muxed version will be written into a directory using the original filename as a name
but with .hls appended.
i.e. /mnt/video/foo.mp4 would create /mnt/video/foo.mp4.hls/
"""
notify_change('inprocess',job)
# Call the muxing job
path = "%s%s" % (base_dir,job['job']['path'])
pathsplit = job['job']['path'].split("/")
vidname=pathsplit[-1]
viddir="%s.hls" % (path)
if os.path.isdir(viddir):
# Don't try and redo work that's already done
print "already seem to have %s" % (viddir,)
notify_change('failed',job,{'reason':'DirectoryExists'})
return False
if not os.path.isfile(path):
# Do nothing if the source file is missing
print "File doesn't exist %s" % (path,)
notify_change('failed',job,{'reason':'MissingFile'})
return False
# Create the output directory
os.mkdir(viddir)
# Get video information and calculate variant rates
json = probe_file(path)
brs = calcBitrates(json, output_rates)
if not brs:
print "Couldn't ascertain source bitrate"
notify_change('failed',job,{'reason':'CantGetBitRate'})
return False
bitrates = ','.join(brs)
# Set up the environment
my_env = os.environ.copy()
my_env["FFMPEG_FLAGS"] = FFMPEG_FLAGS
my_env["NUMTHREADS"] = str(NUMTHREADS)
# Build the CMD
cmd = ['ffmpeg','-i',str(path)]
for arg in FFMPEG_FLAGS.split(' '):
cmd.append(arg)
cmd.append(TEMPFILE)
# Make a copy of the file to strip any unsupported tracks
notify_change('compatrun',job)
proc = subprocess.Popen(cmd,env=my_env)
proc.wait()
# Get ready to start the actual mux
# Make a note of when it started
start = int(time.time())
# Trigger the mux
notify_change('muxstarted',job)
proc = subprocess.Popen([hls_enc_path,'-i',TEMPFILE,'-o',str(viddir),'-b',str(bitrates),'-p','manifest','-t','media','-s',str(seg_length)],env=my_env)
proc.wait()
os.remove(TEMPFILE)
if (int(time.time()) - start) < 5:
# Completed too quick, ffmpeg errored
print "FFMpeg exited %s" % (path,)
notify_change('failed',job,{'reason':'FFmpegErr'})
return False
notify_change('complete',job)
def notify_change(state,job,data=False):
''' Placeholder function
Do whatever you need here if you want to notify somewhere of task status
'''
return True
Usage Example
#!/usr/bin/env python
#
# Simple wrapper to call HLS Stream Creator against multiple video files
#
import HLSSCWrapper
HLSSCWrapper.hls_enc_path="/home/ben/repos/HLS-Stream-Creator/HLS-Stream-Creator.sh"
HLSSCWrapper.base_dir='/mnt/HLS-Out/'
jobs=[]
inputfiles=['example1.mp4','a/b/c/d/foo.mp4','b/B/big_buck_bunny.mp4']
for inp in inputfiles:
job = {
"job": {
"path": inp
}
}
HLSSCWrapper.triggerMux(job)