diff --git a/streamonitor/sites/stripchat.py b/streamonitor/sites/stripchat.py index dfe290a7..70061707 100644 --- a/streamonitor/sites/stripchat.py +++ b/streamonitor/sites/stripchat.py @@ -1,3 +1,9 @@ +import os +from dotenv import load_dotenv + +# Load environment variables from .env +load_dotenv() + import itertools import json import os.path @@ -13,25 +19,26 @@ class StripChat(Bot): - site = 'StripChat' - siteslug = 'SC' + site = "StripChat" + siteslug = "SC" _static_data = None _main_js_data = None _doppio_js_data = None - _mouflon_cache_filename = 'stripchat_mouflon_keys.json' - _mouflon_keys: dict = None + _mouflon_keys: dict = {"Zeechoej4aleeshi": "ubahjae7goPoodi6"} _cached_keys: dict[str, bytes] = None - if os.path.exists(_mouflon_cache_filename): - with open(_mouflon_cache_filename) as f: - try: - if not isinstance(_mouflon_keys, dict): - _mouflon_keys = {} - _mouflon_keys.update(json.load(f)) - print('Loaded StripChat mouflon key cache') - except Exception as e: - print('Error loading mouflon key cache:', e) + @classmethod + def get_proxies(cls): + http = os.getenv("HTTP_PROXY") + https = os.getenv("HTTPS_PROXY") + + if http or https: + return { + "http": http, + "https": https + } + return None def __init__(self, username): if StripChat._static_data is None: @@ -39,94 +46,146 @@ def __init__(self, username): try: self.getInitialData() except Exception as e: - print('Error initializing StripChat static data:', e) + StripChat._static_data = None + raise e + + while StripChat._static_data == {}: + time.sleep(1) super().__init__(username) self.vr = False - self.getVideo = lambda _, url, filename: getVideoNativeHLS(self, url, filename, StripChat.m3u_decoder) + self.getVideo = lambda _, url, filename: getVideoNativeHLS( + self, url, filename, StripChat.m3u_decoder + ) @classmethod def getInitialData(cls): - session = requests.Session() - r = session.get('https://hu.stripchat.com/api/front/v3/config/static', headers=cls.headers) + r = requests.get( + "https://hu.stripchat.com/api/front/v3/config/static", + headers=cls.headers, + proxies=cls.get_proxies() + ) + if r.status_code != 200: raise Exception("Failed to fetch static data from StripChat") - StripChat._static_data = r.json().get('static') - mmp_origin = StripChat._static_data['features']['MMPExternalSourceOrigin'] - mmp_version = StripChat._static_data['featuresV2']['playerModuleExternalLoading']['mmpVersion'] + StripChat._static_data = r.json().get("static") + + mmp_origin = StripChat._static_data["features"]["MMPExternalSourceOrigin"] + mmp_version = StripChat._static_data["featuresV2"][ + "playerModuleExternalLoading" + ]["mmpVersion"] mmp_base = f"{mmp_origin}/v{mmp_version}" - r = session.get(f"{mmp_base}/main.js", headers=cls.headers) + r = requests.get( + f"{mmp_base}/main.js", + headers=cls.headers, + proxies=cls.get_proxies() + ) + if r.status_code != 200: raise Exception("Failed to fetch main.js from StripChat") - StripChat._main_js_data = r.content.decode('utf-8') - doppio_js_index = re.findall('([0-9]+):"Doppio"', StripChat._main_js_data)[0] - doppio_js_hash = re.findall(f'{doppio_js_index}:\\"([a-zA-Z0-9]{{20}})\\"', StripChat._main_js_data)[0] + StripChat._main_js_data = r.content.decode("utf-8") + + doppio_js_index = re.findall( + '([0-9]+):"Doppio"', StripChat._main_js_data + )[0] + + doppio_js_hash = re.findall( + f'{doppio_js_index}:\\"([a-zA-Z0-9]{{20}})\\"', + StripChat._main_js_data + )[0] + + r = requests.get( + f"{mmp_base}/chunk-Doppio-{doppio_js_hash}.js", + headers=cls.headers, + proxies=cls.get_proxies() + ) - r = session.get(f"{mmp_base}/chunk-Doppio-{doppio_js_hash}.js", headers=cls.headers) if r.status_code != 200: raise Exception("Failed to fetch doppio.js from StripChat") - StripChat._doppio_js_data = r.content.decode('utf-8') + + StripChat._doppio_js_data = r.content.decode("utf-8") @classmethod def m3u_decoder(cls, content): _mouflon_file_attr = "#EXT-X-MOUFLON:FILE:" - _mouflon_filename = 'media.mp4' + _mouflon_filename = "media.mp4" def _decode(encrypted_b64: str, key: str) -> str: if cls._cached_keys is None: cls._cached_keys = {} - hash_bytes = cls._cached_keys[key] if key in cls._cached_keys \ - else cls._cached_keys.setdefault(key, hashlib.sha256(key.encode("utf-8")).digest()) + + hash_bytes = ( + cls._cached_keys[key] + if key in cls._cached_keys + else cls._cached_keys.setdefault( + key, hashlib.sha256(key.encode("utf-8")).digest() + ) + ) + encrypted_data = base64.b64decode(encrypted_b64 + "==") - return bytes(a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes))).decode("utf-8") + + return bytes( + a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes)) + ).decode("utf-8") psch, pkey, pdkey = StripChat._getMouflonFromM3U(content) - decoded = '' + decoded = "" lines = content.splitlines() last_decoded_file = None + for line in lines: if line.startswith(_mouflon_file_attr): last_decoded_file = _decode(line[len(_mouflon_file_attr):], pdkey) elif line.endswith(_mouflon_filename) and last_decoded_file: - decoded += (line.replace(_mouflon_filename, last_decoded_file)) + '\n' + decoded += (line.replace(_mouflon_filename, last_decoded_file)) + "\n" last_decoded_file = None else: - decoded += line + '\n' + decoded += line + "\n" + return decoded @classmethod def getMouflonDecKey(cls, pkey): if cls._mouflon_keys is None: cls._mouflon_keys = {} + if pkey in cls._mouflon_keys: return cls._mouflon_keys[pkey] - else: - _pdks = re.findall(f'"{pkey}:(.*?)"', cls._doppio_js_data) - if len(_pdks) > 0: - pdk = cls._mouflon_keys.setdefault(pkey, _pdks[0]) - with open(cls._mouflon_cache_filename, 'w') as f: - json.dump(cls._mouflon_keys, f) - return pdk + + _pdks = re.findall(f'"{pkey}:(.*?)"', cls._doppio_js_data) + + if len(_pdks) > 0: + return cls._mouflon_keys.setdefault(pkey, _pdks[0]) + return None @staticmethod def _getMouflonFromM3U(m3u8_doc): _start = 0 - _needle = '#EXT-X-MOUFLON:' + _needle = "#EXT-X-MOUFLON:" + while _needle in (_doc := m3u8_doc[_start:]): _mouflon_start = _doc.find(_needle) + if _mouflon_start > 0: - _mouflon = _doc[_mouflon_start:m3u8_doc.find('\n', _mouflon_start)].strip().split(':') + _mouflon = ( + _doc[_mouflon_start: m3u8_doc.find("\n", _mouflon_start)] + .strip() + .split(":") + ) + psch = _mouflon[2] pkey = _mouflon[3] pdkey = StripChat.getMouflonDecKey(pkey) - if pdkey: - return psch, pkey, pdkey + + return psch, pkey, pdkey + _start += _mouflon_start + len(_needle) + return None, None, None def getWebsiteURL(self): @@ -137,61 +196,78 @@ def getVideoUrl(self): def getPlaylistVariants(self, url): url = "https://edge-hls.{host}/hls/{id}{vr}/master/{id}{vr}{auto}.m3u8".format( - host='doppiocdn.' + random.choice(['org', 'com', 'net']), - id=self.lastInfo["streamName"], - vr='_vr' if self.vr else '', - auto='_auto' if not self.vr else '' - ) - result = self.session.get(url, headers=self.headers, cookies=self.cookies) + host="doppiocdn." + random.choice(["org", "com", "net"]), + id=self.lastInfo["streamName"], + vr="_vr" if self.vr else "", + auto="_auto" if not self.vr else "", + ) + + result = requests.get( + url, + headers=self.headers, + cookies=self.cookies, + proxies=self.get_proxies() + ) + m3u8_doc = result.content.decode("utf-8") psch, pkey, pdkey = StripChat._getMouflonFromM3U(m3u8_doc) - if pdkey is None: - self.log(f'Failed to get mouflon decryption key') - return [] + + self.debug(f"Extracted key {psch}, {pkey}, {pdkey}") + variants = super().getPlaylistVariants(m3u_data=m3u8_doc) - return [variant | {'url': f'{variant["url"]}{"&" if "?" in variant["url"] else "?"}psch={psch}&pkey={pkey}'} - for variant in variants] + + return [ + variant | { + "url": f"{variant['url']}{'&' if '?' in variant['url'] else '?'}psch={psch}&pkey={pkey}" + } + for variant in variants + ] @staticmethod def uniq(length=16): - chars = ''.join(chr(i) for i in range(ord('a'), ord('z')+1)) - chars += ''.join(chr(i) for i in range(ord('0'), ord('9')+1)) - return ''.join(random.choice(chars) for _ in range(length)) + chars = "".join(chr(i) for i in range(ord("a"), ord("z") + 1)) + chars += "".join(chr(i) for i in range(ord("0"), ord("9") + 1)) + return "".join(random.choice(chars) for _ in range(length)) def getStatus(self): - r = self.session.get( - f'https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}', - headers=self.headers + r = requests.get( + f"https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}", + headers=self.headers, + proxies=self.get_proxies() ) try: data = r.json() except requests.exceptions.JSONDecodeError: - self.log('Failed to parse JSON response') + self.log("Failed to parse JSON response") return Status.UNKNOWN - if 'cam' not in data: - if 'error' in data: - error = data['error'] - if error == 'Not Found': - return Status.NOTEXIST - self.logger.warn(f'Status returned error: {error}') + if "cam" not in data: + if "error" in data and data["error"] == "Not Found": + return Status.NOTEXIST return Status.UNKNOWN - self.lastInfo = {'model': data['user']['user']} - if isinstance(data['cam'], dict): - self.lastInfo |= data['cam'] + self.lastInfo = {"model": data["user"]["user"]} + + if isinstance(data["cam"], dict): + self.lastInfo |= data["cam"] + + status = self.lastInfo["model"].get("status") - status = self.lastInfo['model'].get('status') if status == "public" and self.lastInfo["isCamAvailable"] and self.lastInfo["isCamActive"]: return Status.PUBLIC + if status in ["private", "groupShow", "p2p", "virtualPrivate", "p2pVoice"]: return Status.PRIVATE + if status in ["off", "idle"]: return Status.OFFLINE - if self.lastInfo['model'].get('isDeleted') is True: + + if self.lastInfo["model"].get("isDeleted") is True: return Status.NOTEXIST - if data['user'].get('isGeoBanned') is True: + + if data["user"].get("isGeoBanned") is True: return Status.RESTRICTED - self.logger.warn(f'Got unknown status: {status}') + + self.logger.warn(f"Got unknown status: {status}") return Status.UNKNOWN