Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 59 additions & 7 deletions streamonitor/sites/stripchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
import requests
import base64
import hashlib
import logging

from streamonitor.bot import RoomIdBot
from streamonitor.downloaders.hls import getVideoNativeHLS
from streamonitor.enums import Status


logger = logging.getLogger(__name__)


class StripChat(RoomIdBot):
site = 'StripChat'
siteslug = 'SC'
Expand Down Expand Up @@ -66,8 +70,15 @@ def getInitialData(cls):
raise Exception("Failed to fetch main.js from StripChat")
StripChat._main_js_data = r.content.decode('utf-8')

doppio_js_index = re.findall('([0-9]+):"Doppio"', StripChat._main_js_data)[0]
doppio_js_hash = re.findall(f'{doppio_js_index}:\\"([a-zA-Z0-9]{{20}})\\"', StripChat._main_js_data)[0]
try:
doppio_js_index_match = re.findall('([0-9]+):"Doppio"', StripChat._main_js_data)
doppio_js_index = doppio_js_index_match[0]
doppio_js_hash_match = re.findall(f'{doppio_js_index}:\\"([a-zA-Z0-9]{{20}})\\"', StripChat._main_js_data)
doppio_js_hash = doppio_js_hash_match[0]
except Exception as e:
logging.getLogger(__name__).exception("getInitialData: failed to parse main.js for Doppio chunk info: %s", e)
StripChat._doppio_js_data = None
return

r = session.get(f"{mmp_base}/chunk-Doppio-{doppio_js_hash}.js", headers=cls.headers)
if r.status_code != 200:
Expand Down Expand Up @@ -104,17 +115,58 @@ def _decode(encrypted_b64: str, key: str) -> str:

@classmethod
def getMouflonDecKey(cls, pkey):
"""
Defensive lookup for mouflon decryption key:
- preserves previous behavior (returns None when not found)
- tolerates cls._doppio_js_data being None/non-str/bytes
- escapes pkey for safe regex matching
- logs helpful diagnostics
"""
if cls._mouflon_keys is None:
cls._mouflon_keys = {}

if not pkey:
# keep previous behavior of returning None for missing keys
return None

if pkey in cls._mouflon_keys:
return cls._mouflon_keys[pkey]
else:
_pdks = re.findall(f'"{pkey}:(.*?)"', cls._doppio_js_data)
if len(_pdks) > 0:
pdk = cls._mouflon_keys.setdefault(pkey, _pdks[0])

js_data = cls._doppio_js_data
if js_data is None:
# nothing to search
return None

if isinstance(js_data, bytes):
try:
js_data = js_data.decode('utf-8', errors='replace')
except Exception:
# fallback to str() to avoid TypeError in re.findall
js_data = str(js_data)

if not isinstance(js_data, str):
# unexpected type: convert to string for the regex and log what we got
logging.getLogger(__name__).warning(
"getMouflonDecKey: _doppio_js_data has unexpected type %s; converting to str()",
type(js_data).__name__
)
js_data = str(js_data)

# Escape pkey so special regex characters don't break the pattern
escaped_pkey = re.escape(pkey)
pattern = rf'"{escaped_pkey}:(.*?)"'
matches = re.findall(pattern, js_data, flags=re.S)
if matches:
pdk = cls._mouflon_keys.setdefault(pkey, matches[0])
try:
with open(cls._mouflon_cache_filename, 'w') as f:
json.dump(cls._mouflon_keys, f)
return pdk
except Exception:
logging.getLogger(__name__).exception("getMouflonDecKey: failed to write mouflon key cache")
return pdk

# not found
logging.getLogger(__name__).debug("getMouflonDecKey: no match for pkey %r", pkey)
return None

@staticmethod
Expand Down
49 changes: 49 additions & 0 deletions tests/test_stripchat_mouflon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json
from pathlib import Path

import pytest

# Import the class under test
from streamonitor.sites.stripchat import StripChat


def setup_stripchat(tmp_path: Path):
"""
Reset shared class state between tests and point cache to a temp file.
"""
StripChat._mouflon_keys = {}
StripChat._cached_keys = None
StripChat._mouflon_cache_filename = str(tmp_path / "mouflon_cache.json")


def test_none_js_data_returns_none(tmp_path):
setup_stripchat(tmp_path)
StripChat._doppio_js_data = None
assert StripChat.getMouflonDecKey("any") is None


def test_bytes_js_data_decoded_and_match(tmp_path):
setup_stripchat(tmp_path)
# bytes payload that decodes to a simple key:value pair
StripChat._doppio_js_data = b'"abc:decoded_value" some other content'
res = StripChat.getMouflonDecKey("abc")
assert res == "decoded_value"
# ensure the cache file was written and contains the key
with open(StripChat._mouflon_cache_filename, "r") as f:
data = json.load(f)
assert data.get("abc") == "decoded_value"


def test_str_js_data_escape_pkey(tmp_path):
setup_stripchat(tmp_path)
# pkey contains regex-special characters
pkey = r'a.b*c+?^$'
StripChat._doppio_js_data = f'"{pkey}:val123"'
res = StripChat.getMouflonDecKey(pkey)
assert res == "val123"


def test_no_match_returns_none(tmp_path):
setup_stripchat(tmp_path)
StripChat._doppio_js_data = '"foo:bar"'
assert StripChat.getMouflonDecKey("nonexistent") is None