ale/main.py
2023-03-17 22:36:21 -04:00

235 lines
7.1 KiB
Python

import argparse
import hashlib
import multiprocessing
import sys
import time
from functools import partial
from pathlib import Path
import structlog
from src.utils.prereq import check_ffmpeg
check_ffmpeg()
from src.editors.amplitude.editor import AmplitudeEditor
from src.editors.passthrough.editor import PassthroughEditor
from src.editors.sentiment.editor import SentimentEditor
from src.math.cost import quadratic_loss
from src.mediautils.audio import extract_audio_from_video
from src.mediautils.video import render_moments
log = structlog.get_logger()
EDITORS = {
"amplitude": AmplitudeEditor,
"sentiment": SentimentEditor,
"passthrough": PassthroughEditor,
}
ERROR_FUNCS = {"quadratic": quadratic_loss}
def main(args):
# Check video existance
input_file = args.file
in_vid_path = Path(input_file)
if not in_vid_path.is_file():
log.error("the specified input path does not exist", path=input_file)
sys.exit(-1)
log.info("preparing video", input_video=input_file)
intro_file = args.introvidpath
if intro_file is not None:
intro_vid_path = Path(intro_file)
if not in_vid_path.is_file():
log.error(
"the specified input path does not exist for the intro", path=intro_file
)
sys.exit(-1)
log.info("found intro", input_video=intro_file)
outro_file = args.outrovidpath
if outro_file is not None:
outro_vid_path = Path(outro_file)
if not outro_vid_path.is_file():
log.error(
"the specified input path does not exist for the outro", path=outro_file
)
sys.exit(-1)
log.info("found outro", input_video=outro_file)
# Hash the video, we use this to see if we have processed this video before
# and as a simple way to generate temp file names
sha1 = hashlib.sha1()
BUF_SIZE = 1655360
with open(in_vid_path, "rb") as f:
while True:
data = f.read(BUF_SIZE)
if not data:
break
sha1.update(data)
temp_file_name = sha1.hexdigest()
log.info("hash computed", hash=temp_file_name)
temp_file_name = f"ale-{temp_file_name}"
# Prepare the input video
audio_path, audio_cached = extract_audio_from_video(
str(in_vid_path.resolve()), temp_file_name
)
if audio_cached:
log.info("using cached audio file", cache_path=audio_path)
else:
log.info("extracted audio", cache_path=audio_path)
params = vars(args)
params["temp_file_name"] = temp_file_name
# Initalize Editor
log.info("initializing editor", editor=args.editor)
editor = EDITORS[args.editor](str(in_vid_path.resolve()), audio_path, params)
log.info("initialized editor", editor=args.editor)
costfunc = ERROR_FUNCS[args.cost]
desired = args.duration
result = []
try:
result = editor.full_edit(costfunc, desired, vars(args))
except Exception as e:
log.fatal("there was an error during editing the video", error=e)
sys.exit(-1)
if len(result) == 0:
log.fatal(
"no viable edit was found for the provided parameters, please try again with different values"
)
sys.exit(-2)
log.info(
"found edit within target duration",
target=desired,
)
out_path = Path(args.destination)
log.info("rendering...")
start = time.time()
render_moments(
result,
str(in_vid_path.resolve()),
str(out_path.resolve()),
intro_path=intro_file,
parallelism=args.parallelism,
)
log.info(
"render complete",
duration=time.time() - start,
output=str(out_path.resolve()),
)
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="ALE",
description="ALE: Automatic Linear Editor.",
formatter_class=partial(argparse.HelpFormatter, width=100),
)
parser.add_argument("file", help="Path to the video file to edit")
parser.add_argument(
"duration", help="Target length of the edit, in seconds", type=int
)
parser.add_argument("destination", help="Edited video save location")
subparsers = parser.add_subparsers(
dest="editor", help="The editing algorithm to use"
)
parser_audio_amp = subparsers.add_parser(
"amplitude",
help="The amplitude editor uses audio amplitude moving averages to find swings from relatively quiet moments to loud moments. This is useful in videos where long moments of quiet are interspersed with loud action filled moments.",
)
parser_audio_amp.add_argument(
"--factor",
default=16000,
help="Subsampling factor",
dest="factor",
type=int,
)
parser_sentiment = subparsers.add_parser(
"sentiment",
help="The sentiment editor transcribes the speech in a video and runs sentiment analysis on the resulting text. Using moving averages, large swings in sentiment can be correlated to controversial or exciting moments. A GPU with CUDA is recommended for fast results.",
)
parser_sentiment.add_argument(
"--model",
default="base",
help="The size of the sentiment analysis model being used. Larger models increase computation time.",
dest="model_size",
choices=["base", "tiny", "small", "medium", "large"],
)
parser_passthrough = subparsers.add_parser(
"passthrough",
help="The passthrough editor simply cuts the video to the target length provided",
)
parser.add_argument(
"-p",
"--parallelism",
dest="parallelism",
type=int,
default=multiprocessing.cpu_count() - 2,
help="The number of cores to use, defaults to N - 2 cores.",
)
parser.add_argument(
"--cost-function", dest="cost", choices=ERROR_FUNCS.keys(), default="quadratic"
)
parser.add_argument(
"--min-duration",
default=8,
help="Minimum clip duration",
dest="mindur",
type=int,
)
parser.add_argument(
"--max-duration",
default=15,
help="Maximum clip duration",
dest="maxdur",
type=int,
)
parser.add_argument(
"--intro-video",
default=None,
help="Path to a video file to use as an intro",
dest="introvidpath",
type=str,
)
parser.add_argument(
"--outro-video",
default=None,
help="Path to a video file to use as an outro",
dest="outrovidpath",
type=str,
)
parser.add_argument(
"--title-slide-text",
default=None,
help="Text to show during a title slide. The title slide is played after any provided intro. If there is no intro, the title slide is the intro",
dest="outrovidpath",
type=str,
)
parser.add_argument(
"--title-slide-duration",
default=5,
help="Time, in seconds, to show a title slide for",
dest="outrovidpath",
type=str,
)
args = parser.parse_args()
try:
main(args)
except Exception:
log.exception("uncaught error!")
sys.exit(-2)
sys.exit(0)