Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 85 additions & 50 deletions streamonitor/sites/stripchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@


class StripChat(Bot):
site = 'StripChat'
siteslug = 'SC'
site = "StripChat"
siteslug = "SC"

_static_data = None
_main_js_data = None
_doppio_js_data = None
_mouflon_keys: dict = None
_mouflon_keys: dict = {"Zeechoej4aleeshi": "ubahjae7goPoodi6"}
_cached_keys: dict[str, bytes] = None

def __init__(self, username):
Expand All @@ -33,63 +33,83 @@ def __init__(self, username):
time.sleep(1)
super().__init__(username)
self.vr = False
self.getVideo = lambda _, url, filename: getVideoNativeHLS(self, url, filename, StripChat.m3u_decoder)
self.getVideo = lambda _, url, filename: getVideoNativeHLS(
self, url, filename, StripChat.m3u_decoder
)

@classmethod
def getInitialData(cls):
r = requests.get('https://hu.stripchat.com/api/front/v3/config/static', headers=cls.headers)
r = requests.get(
"https://hu.stripchat.com/api/front/v3/config/static", headers=cls.headers
)
if r.status_code != 200:
raise Exception("Failed to fetch static data from StripChat")
StripChat._static_data = r.json().get('static')
StripChat._static_data = r.json().get("static")

mmp_origin = StripChat._static_data['features']['MMPExternalSourceOrigin']
mmp_version = StripChat._static_data['featuresV2']['playerModuleExternalLoading']['mmpVersion']
mmp_origin = StripChat._static_data["features"]["MMPExternalSourceOrigin"]
mmp_version = StripChat._static_data["featuresV2"][
"playerModuleExternalLoading"
]["mmpVersion"]
mmp_base = f"{mmp_origin}/v{mmp_version}"

r = requests.get(f"{mmp_base}/main.js", headers=cls.headers)
if r.status_code != 200:
raise Exception("Failed to fetch main.js from StripChat")
StripChat._main_js_data = r.content.decode('utf-8')
StripChat._main_js_data = r.content.decode("utf-8")

doppio_js_name = re.findall('require[(]"./(Doppio.*?[.]js)"[)]', StripChat._main_js_data)[0]
doppio_js_index = re.findall('([0-9]+):"Doppio"', StripChat._main_js_data)[0]
doppio_js_hash = re.findall(
f'{doppio_js_index}:\\"([a-zA-Z0-9]{{20}})\\"', StripChat._main_js_data
)[0]

r = requests.get(
f"{mmp_base}/chunk-Doppio-{doppio_js_hash}.js", headers=cls.headers
)

r = requests.get(f"{mmp_base}/{doppio_js_name}", headers=cls.headers)
if r.status_code != 200:
raise Exception("Failed to fetch doppio.js from StripChat")
StripChat._doppio_js_data = r.content.decode('utf-8')
StripChat._doppio_js_data = r.content.decode("utf-8")

@classmethod
def m3u_decoder(cls, content):
_mouflon_file_attr = "#EXT-X-MOUFLON:FILE:"
_mouflon_filename = 'media.mp4'
_mouflon_filename = "media.mp4"

def _decode(encrypted_b64: str, key: str) -> str:
if cls._cached_keys is None:
cls._cached_keys = {}
hash_bytes = cls._cached_keys[key] if key in cls._cached_keys \
else cls._cached_keys.setdefault(key, hashlib.sha256(key.encode("utf-8")).digest())
hash_bytes = (
cls._cached_keys[key]
if key in cls._cached_keys
else cls._cached_keys.setdefault(
key, hashlib.sha256(key.encode("utf-8")).digest()
)
)
encrypted_data = base64.b64decode(encrypted_b64 + "==")
return bytes(a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes))).decode("utf-8")
return bytes(
a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes))
).decode("utf-8")

psch, pkey, pdkey = StripChat._getMouflonFromM3U(content)

decoded = ''
decoded = ""
lines = content.splitlines()
last_decoded_file = None
for line in lines:
if line.startswith(_mouflon_file_attr):
last_decoded_file = _decode(line[len(_mouflon_file_attr):], pdkey)
last_decoded_file = _decode(line[len(_mouflon_file_attr) :], pdkey)
elif line.endswith(_mouflon_filename) and last_decoded_file:
decoded += (line.replace(_mouflon_filename, last_decoded_file)) + '\n'
decoded += (line.replace(_mouflon_filename, last_decoded_file)) + "\n"
last_decoded_file = None
else:
decoded += line + '\n'
decoded += line + "\n"
return decoded

@classmethod
def getMouflonDecKey(cls, pkey):
if cls._mouflon_keys is None:
cls._mouflon_keys = {}

if pkey in cls._mouflon_keys:
return cls._mouflon_keys[pkey]
else:
Expand All @@ -100,17 +120,22 @@ def getMouflonDecKey(cls, pkey):

@staticmethod
def _getMouflonFromM3U(m3u8_doc):
# print(f"Moufloun: {m3u8_doc}")
_start = 0
_needle = '#EXT-X-MOUFLON:'
_needle = "#EXT-X-MOUFLON:"
while _needle in (_doc := m3u8_doc[_start:]):
_mouflon_start = _doc.find(_needle)
if _mouflon_start > 0:
_mouflon = _doc[_mouflon_start:m3u8_doc.find('\n', _mouflon_start)].strip().split(':')
_mouflon = (
_doc[_mouflon_start : m3u8_doc.find("\n", _mouflon_start)]
.strip()
.split(":")
)
psch = _mouflon[2]
pkey = _mouflon[3]
pdkey = StripChat.getMouflonDecKey(pkey)
if pdkey:
return psch, pkey, pdkey

return psch, pkey, pdkey
_start += _mouflon_start + len(_needle)
return None, None, None

Expand All @@ -122,58 +147,68 @@ def getVideoUrl(self):

def getPlaylistVariants(self, url):
url = "https://edge-hls.{host}/hls/{id}{vr}/master/{id}{vr}{auto}.m3u8".format(
host='doppiocdn.' + random.choice(['org', 'com', 'net']),
id=self.lastInfo["streamName"],
vr='_vr' if self.vr else '',
auto='_auto' if not self.vr else ''
)
host="doppiocdn." + random.choice(["org", "com", "net"]),
id=self.lastInfo["streamName"],
vr="_vr" if self.vr else "",
auto="_auto" if not self.vr else "",
)
result = requests.get(url, headers=self.headers, cookies=self.cookies)
m3u8_doc = result.content.decode("utf-8")
psch, pkey, pdkey = StripChat._getMouflonFromM3U(m3u8_doc)
self.debug(f"Extracted key {psch}, {pkey}, {pdkey}")
variants = super().getPlaylistVariants(m3u_data=m3u8_doc)
return [variant | {'url': f'{variant["url"]}{"&" if "?" in variant["url"] else "?"}psch={psch}&pkey={pkey}'}
for variant in variants]
return [
variant
| {
"url": f"{variant['url']}{'&' if '?' in variant['url'] else '?'}psch={psch}&pkey={pkey}"
}
for variant in variants
]

@staticmethod
def uniq(length=16):
chars = ''.join(chr(i) for i in range(ord('a'), ord('z')+1))
chars += ''.join(chr(i) for i in range(ord('0'), ord('9')+1))
return ''.join(random.choice(chars) for _ in range(length))
chars = "".join(chr(i) for i in range(ord("a"), ord("z") + 1))
chars += "".join(chr(i) for i in range(ord("0"), ord("9") + 1))
return "".join(random.choice(chars) for _ in range(length))

def getStatus(self):
r = requests.get(
f'https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}',
headers=self.headers
f"https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}",
headers=self.headers,
)

try:
data = r.json()
except requests.exceptions.JSONDecodeError:
self.log('Failed to parse JSON response')
self.log(f"Failed to parse JSON response")
return Status.UNKNOWN

if 'cam' not in data:
if 'error' in data:
error = data['error']
if error == 'Not Found':
if "cam" not in data:
if "error" in data:
error = data["error"]
if error == "Not Found":
return Status.NOTEXIST
self.logger.warn(f'Status returned error: {error}')
self.logger.warn(f"Status returned error: {error}")
return Status.UNKNOWN

self.lastInfo = {'model': data['user']['user']}
if isinstance(data['cam'], dict):
self.lastInfo |= data['cam']
self.lastInfo = {"model": data["user"]["user"]}
if isinstance(data["cam"], dict):
self.lastInfo |= data["cam"]

status = self.lastInfo['model'].get('status')
if status == "public" and self.lastInfo["isCamAvailable"] and self.lastInfo["isCamActive"]:
status = self.lastInfo["model"].get("status")
if (
status == "public"
and self.lastInfo["isCamAvailable"]
and self.lastInfo["isCamActive"]
):
return Status.PUBLIC
if status in ["private", "groupShow", "p2p", "virtualPrivate", "p2pVoice"]:
return Status.PRIVATE
if status in ["off", "idle"]:
return Status.OFFLINE
if self.lastInfo['model'].get('isDeleted') is True:
if self.lastInfo["model"].get("isDeleted") is True:
return Status.NOTEXIST
if data['user'].get('isGeoBanned') is True:
if data["user"].get("isGeoBanned") is True:
return Status.RESTRICTED
self.logger.warn(f'Got unknown status: {status}')
self.logger.warn(f"Got unknown status: {status}")
return Status.UNKNOWN