-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathencoders.py
More file actions
144 lines (130 loc) · 5.54 KB
/
encoders.py
File metadata and controls
144 lines (130 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os
import platform
import fs
import re
from subprocess import run
from mux import apply_video_settings, apply_audio_settings, apply_video_info_x265
from scenes import generate_scenes
from vstools import core
# helper file with wrappers to encode with the usual encoders and apply the params to the output stream
encoder_name = "SwareJonge" # replace with your own name
def aom_get_binary_version():
res =run([
"aomenc", "--help"
],
capture_output=True,
text=True,
check=True,
shell=platform.system() == "Windows"
)
lines = res.stdout.splitlines()
# Regex pattern to extract the desired part
for line in lines:
match = re.search(r"-\s+(AOMedia Project AV1 Encoder(?: Psy101)? [\d\.]+-[\w\d-]+)", line)
if (match):
return match.group(1)
print(r"couldn't find AOM encoder version")
exit(0)
def svt_get_binary_version():
res = run([
"SvtAv1EncApp", "--version"
],
capture_output=True,
text=True,
shell=platform.system() == "Windows"
)
ver = res.stdout.splitlines()[0]
return re.sub(r"\s*\(release\)", "", ver)
def opusenc_get_version():
res = run([
"opusenc", "--version"
],
capture_output=True,
text = True,
shell=platform.system() == "Windows"
)
line = res.stdout.splitlines()[0]
splitlines = line.split()
opus_tools_version = splitlines[2]
libopus_version = splitlines[5][:-1]
writing_application = f'opusenc from opus-tools {opus_tools_version} using libopus {libopus_version}'
return writing_application
def opusenc_encode(in_path: str, out_path: str, opusenc_settings):
"""Encode an audio file that is natively supported by opusenc"""
tmp_out_path = out_path.replace(".mka", ".opus")
run([
"opusenc",
in_path,
tmp_out_path,
opusenc_settings
])
run([
"mkvmerge",
"--output", out_path,
'(', tmp_out_path, ')'
])
fs.remove_file(tmp_out_path)
apply_audio_settings(out_path, opusenc_get_version(), opusenc_settings)
def opusenc_ffmpeg_encode(in_path: str, out_path: str, opusenc_settings:str, stream_no: int=0, ffmpeg_codec: str="copy", ffmpeg_format: str="wav", ffmpeg_seek_settings: str=""):
"""Encode an audio file with opusenc with help from ffmpeg"""
tmp_out_path = out_path.replace(".mka", ".opus")
# this should also use run(), but that requires some edits
os.system(f"ffmpeg {ffmpeg_seek_settings} -i \"{in_path}\" -map 0:a:{stream_no} -c:a {ffmpeg_codec} -f {ffmpeg_format} - | opusenc - \"{tmp_out_path}\" {opusenc_settings}")
run([
"mkvmerge",
"--output", out_path,
'(', tmp_out_path, ')'
])
fs.remove_file(tmp_out_path)
apply_audio_settings(out_path, opusenc_get_version(), opusenc_settings)
def svt_av1_encode_standalone(in_path: str, out_path: str, encoder_settings, extra_encoder_settings:str =""):
"""Encode a vapoursynth script with vspipe and SVT-AV1"""
os.system(f"vspipe \"{in_path}\" -c y4m - | SvtAv1EncApp -i - -b \"{out_path}.ivf\" {extra_encoder_settings} {encoder_settings}")
run([
"mkvmerge",
"--output", out_path,
'(', f"{out_path}.ivf", ')'
])
apply_video_settings(out_path, svt_get_binary_version(), encoder_settings, encoder_name)
# Deprecated by av1_encode
def svt_av1_encode(in_path: str, out_path: str, encoder_settings: str, worker_count: int=0, thread_affinity: int=0, scene_path :str= "", extra_av1an_flags: list[str]=None):
"""Encode a file(can be either a vapoursynth script or a video file) with av1an and SVT-AV1"""
av1_encode(in_path, out_path, "svt-av1", encoder_settings, worker_count, thread_affinity, scene_path, extra_av1an_flags)
def av1_encode(in_path: str, out_path: str, encoder: str, encoder_settings: str, worker_count: int=0, thread_affinity: int=0, scene_path :str="", extra_av1an_flags: list[str]=None):
"""Encode a file(can be either a vapoursynth script or a video file) with av1an and an AV1 encoder, either aomenc or SVT-AV1(sorry rav1e)"""
ver = svt_get_binary_version() if encoder == "svt-av1" else aom_get_binary_version()
print(f"Encoding using {ver}")
run([
"av1an",
"-i", in_path,
"-o", out_path,
"--scenes", scene_path,
"-w", str(worker_count),
"--set-thread-affinity", str(thread_affinity),
"-c", "mkvmerge",
"-e", encoder,
"-v", encoder_settings,
*extra_av1an_flags
])
apply_video_settings(out_path, ver, encoder_settings, encoder_name)
def x265_encode(in_path: str, out_path: str, settings: str, scene_path: str, worker_count: int=0, thread_affinity: int=0, extra_av1an_flags: list[str]=None):
"""Encode a file(can be either a vapoursynth script or a video file) with av1an and x265"""
run([
"av1an",
"-i", in_path,
"-o", out_path,
"--scenes", scene_path,
"-w", str(worker_count),
"--set-thread-affinity", str(thread_affinity),
"-c", "mkvmerge",
"-e", "x265",
"-v", settings,
*extra_av1an_flags
])
apply_video_info_x265(out_path, encoder_name)
# example usage
def luma_boost_encode(in_path: str, out_path: str, zones_path : str, scene_path: str, encoder_settings: str):
if not os.path.exists(scene_path):
src = core.bs.VideoSource(in_path)
generate_scenes(src, zones_path, "scenechanges.pickle", scene_path, encoder_settings)
svt_av1_encode(in_path, out_path, encoder_settings, 8, 2, scene_path, ["--keep", "--resume"])