format all code

This commit is contained in:
2023-02-19 10:47:13 -05:00
parent 8b97fa5ebd
commit 937c938837
11 changed files with 146 additions and 62 deletions

View File

@ -4,6 +4,7 @@ from ..common import find_moving_average_highlights
import numpy as np
import structlog
class AmplitudeEditor:
def __init__(self, video_path, audio_path, params):
self.logger = structlog.get_logger("amplitude")
@ -21,8 +22,13 @@ class AmplitudeEditor:
def edit(self, large_window, small_window, params):
window_factor = self.bitrate / self.factor
long_ma = np_moving_average(self.squared_subsample, large_window * window_factor)
short_ma = np_moving_average(self.squared_subsample, small_window * window_factor)
highlights = find_moving_average_highlights(short_ma, long_ma, self.factor / self.bitrate)
long_ma = np_moving_average(
self.squared_subsample, large_window * window_factor
)
short_ma = np_moving_average(
self.squared_subsample, small_window * window_factor
)
highlights = find_moving_average_highlights(
short_ma, long_ma, self.factor / self.bitrate
)
return highlights, large_window, small_window

View File

@ -1,5 +1,6 @@
from ..models.moment import Moment
def find_moving_average_highlights(short_ma, long_ma, scaling_factor=1):
in_a_clip = False
m = None

View File

@ -12,12 +12,14 @@ from flair.data import Sentence
from ...math.average import np_moving_average
from ..common import find_moving_average_highlights
@dataclass
class TextGlob:
start:float
stop:float
text:str
sentiment:float
start: float
stop: float
text: str
sentiment: float
class SentimentEditor:
def __init__(self, video_path, audio_path, params):
@ -30,20 +32,24 @@ class SentimentEditor:
self.logger.info("transcribing audio", path=audio_path)
self.result = self.model.transcribe(audio_path)
with open(dest_location, 'w') as fp:
with open(dest_location, "w") as fp:
json.dump(self.result, fp)
else:
self.logger.info("cached transcription found", path=dest_location)
with open(dest_location, 'r') as f:
self.result = json.load(f)
with open(dest_location, "r") as f:
self.result = json.load(f)
self.segments = []
for segment in self.result['segments']:
self.segments.append(TextGlob(segment['start'], segment['end'], segment['text'], 0))
classifier = TextClassifier.load('en-sentiment')
for segment in self.result["segments"]:
self.segments.append(
TextGlob(segment["start"], segment["end"], segment["text"], 0)
)
classifier = TextClassifier.load("en-sentiment")
self.sentiments = []
self.logger.info("calculating sentiment on segments", segments=len(self.segments))
self.logger.info(
"calculating sentiment on segments", segments=len(self.segments)
)
for segment in self.segments:
sentence = Sentence(segment.text)
classifier.predict(sentence)
@ -53,11 +59,13 @@ class SentimentEditor:
segment.sentiment = sentsum
self.sentiments.append(sentsum)
self.sentiments = np.array(self.sentiments)
def edit(self, large_window, small_window, params):
end_time = self.segments[-1].stop
window_factor = len(self.sentiments) / end_time
long_ma = np_moving_average(self.sentiments, large_window)
short_ma = np_moving_average(self.sentiments, small_window)
highlights = find_moving_average_highlights(short_ma, long_ma, 1.0 / window_factor)
highlights = find_moving_average_highlights(
short_ma, long_ma, 1.0 / window_factor
)
return highlights, large_window, small_window

View File

@ -1,4 +1,5 @@
import numpy as np
def np_moving_average(data: int, window: int) -> np.ndarray:
return np.convolve(data, np.ones(int(window)), "valid") / window

View File

@ -3,5 +3,6 @@ Functions in this file should always target for 0 to be the
lowest possible error -> smaller values win
"""
def quadratic_loss(target, result):
return (target - result)**2.0
return (target - result) ** 2.0

View File

@ -1,5 +1,6 @@
import numpy as np
def create_distribution(center, spread, count):
high = center * (1.0 + spread)
low = center - (center * spread)

View File

@ -4,10 +4,8 @@ from pathlib import Path
import numpy as np
import scipy.io.wavfile as wav
def extract_audio_from_video(
video_path: str,
filename: str
):
def extract_audio_from_video(video_path: str, filename: str):
tempdir = tempfile.gettempdir()
dest_location = f"{tempdir}/{filename}.wav"
@ -19,6 +17,7 @@ def extract_audio_from_video(
vid.close()
return dest_location, False
def process_audio(source_audio_path):
rate, data_raw = wav.read(source_audio_path)
data_raw = data_raw.astype(np.int32)
@ -26,5 +25,6 @@ def process_audio(source_audio_path):
duration = len(mono) / rate
return mono, duration, rate
def resample(data: np.ndarray, factor: int) -> np.ndarray:
return data[::factor].copy()

View File

@ -9,16 +9,27 @@ def get_subclips(source_video_path, moments):
return clips, vid
def render_moments(moments, input_video_path, output_path, intro_path=None, outro_path=None, parallelism=1):
def render_moments(
moments,
input_video_path,
output_path,
intro_path=None,
outro_path=None,
parallelism=1,
):
clips, _ = get_subclips(input_video_path, moments)
if intro_path is not None:
size = clips[0].size
iclip = mp.VideoFileClip(intro_path)
iclip.resize(height=size[1])
clips.insert(0, iclip)
composite = mp.concatenate_videoclips(clips, method='compose')
composite = mp.concatenate_videoclips(clips, method="compose")
composite.write_videofile(output_path, logger=None, threads=parallelism)
def filter_moments(moments, min_length, max_length):
return [m for m in moments if m.get_duration() > min_length and m.get_duration() < max_length]
return [
m
for m in moments
if m.get_duration() > min_length and m.get_duration() < max_length
]

View File

@ -1,6 +1,4 @@
class Moment:
def __init__(self, start, stop):
self.start = start
self.stop = stop

View File

@ -9,12 +9,18 @@ log = structlog.get_logger()
def install_ffmpeg():
if not click.confirm('Do you want to install ffmpeg? It is required for ALE.', default=False):
log.warn("ffmpeg not installed. Please install it manually or restart ALE. Exiting...")
if not click.confirm(
"Do you want to install ffmpeg? It is required for ALE.", default=False
):
log.warn(
"ffmpeg not installed. Please install it manually or restart ALE. Exiting..."
)
sys.exit(0)
system = platform.system().lower()
if system == "linux":
package_manager = "apt-get" if os.path.exists("/etc/apt/sources.list") else "yum"
package_manager = (
"apt-get" if os.path.exists("/etc/apt/sources.list") else "yum"
)
command = f"sudo {package_manager} install -y ffmpeg"
elif system == "darwin":
command = "brew install ffmpeg"