From db2a433fc827f8fa1faf0f9063acf96f68ffd5bd Mon Sep 17 00:00:00 2001 From: unconst Date: Thu, 18 Jun 2026 12:12:12 -0600 Subject: [PATCH 1/6] Add `btcli deriv` local long/short sandbox simulator Adds a deterministic, in-process sandbox (no chain, no wallet) for the covered continuous-unwind long/short model. Implements the spec's closed-form math against a single shared CPMM pool so users can preview, open, top-up, advance time on, and close short/long positions and see how carry, break-even, close cost, and pool price interact. New command group `deriv` (alias `d`): quote / open / topup / close / advance / status / reset Pure engine in src/commands/deriv/sim.py (no btcli deps); rich render layer in deriv.py; wired into cli.py. Co-authored-by: Cursor --- bittensor_cli/cli.py | 112 ++++ bittensor_cli/src/commands/deriv/__init__.py | 0 bittensor_cli/src/commands/deriv/deriv.py | 279 ++++++++++ bittensor_cli/src/commands/deriv/sim.py | 550 +++++++++++++++++++ 4 files changed, 941 insertions(+) create mode 100644 bittensor_cli/src/commands/deriv/__init__.py create mode 100644 bittensor_cli/src/commands/deriv/deriv.py create mode 100644 bittensor_cli/src/commands/deriv/sim.py diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 86ff00b15..99a287366 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -78,6 +78,7 @@ from bittensor_cli.src.commands import sudo, wallets, view from bittensor_cli.src.commands import weights as weights_cmds from bittensor_cli.src.commands.liquidity import liquidity +from bittensor_cli.src.commands.deriv import deriv, sim from bittensor_cli.src.commands.crowd import ( contribute as crowd_contribute, create as create_crowdloan, @@ -885,6 +886,7 @@ def __init__(self): self.weights_app = typer.Typer(epilog=_epilog) self.view_app = typer.Typer(epilog=_epilog) self.liquidity_app = typer.Typer(epilog=_epilog) + self.deriv_app = typer.Typer(epilog=_epilog) self.crowd_app = typer.Typer(epilog=_epilog) self.utils_app = typer.Typer(epilog=_epilog) self.axon_app = typer.Typer(epilog=_epilog) @@ -1418,6 +1420,27 @@ def __init__(self): "remove", rich_help_panel=HELP_PANELS["LIQUIDITY"]["LIQUIDITY_MGMT"] )(self.liquidity_remove) + # Deriv sandbox (local long/short simulator, no chain) + _DERIV_QUOTE = "Preview" + _DERIV_LIFE = "Position lifecycle" + _DERIV_SIM = "Sandbox" + self.app.add_typer( + self.deriv_app, + name="deriv", + short_help="local long/short sandbox simulator, aliases: `d`", + no_args_is_help=True, + ) + self.app.add_typer( + self.deriv_app, name="d", hidden=True, no_args_is_help=True + ) + self.deriv_app.command("quote", rich_help_panel=_DERIV_QUOTE)(self.deriv_quote) + self.deriv_app.command("open", rich_help_panel=_DERIV_LIFE)(self.deriv_open) + self.deriv_app.command("topup", rich_help_panel=_DERIV_LIFE)(self.deriv_topup) + self.deriv_app.command("close", rich_help_panel=_DERIV_LIFE)(self.deriv_close) + self.deriv_app.command("advance", rich_help_panel=_DERIV_SIM)(self.deriv_advance) + self.deriv_app.command("status", rich_help_panel=_DERIV_SIM)(self.deriv_status) + self.deriv_app.command("reset", rich_help_panel=_DERIV_SIM)(self.deriv_reset) + # utils app self.utils_app.command("convert")(self.convert) self.utils_app.command("latency")(self.best_connection) @@ -9060,6 +9083,95 @@ def view_dashboard( ) ) + # Deriv sandbox (local long/short simulator) + + @staticmethod + def _deriv_state_opt() -> str: + return typer.Option( + sim.DEFAULT_STATE_PATH, + "--state", + help="Path to the local sandbox state file.", + ) + + def deriv_quote( + self, + p: float = typer.Argument(..., help="Position input P (capital you supply)."), + side: str = typer.Option( + "short", "--side", help="Position side: short or long." + ), + state_path: str = _deriv_state_opt(), + ): + """Preview an open: retained proceeds, liability, effective LTV, carry, close cost, break-even. No state change.""" + if side not in ("short", "long"): + print_error("side must be 'short' or 'long'") + return + deriv.quote(state_path, side, p) + + def deriv_open( + self, + p: float = typer.Argument(..., help="Position input P (capital you supply)."), + side: str = typer.Option( + "short", "--side", help="Position side: short or long." + ), + state_path: str = _deriv_state_opt(), + ): + """Open a long/short position in the sandbox.""" + if side not in ("short", "long"): + print_error("side must be 'short' or 'long'") + return + deriv.open_(state_path, side, p) + + def deriv_topup( + self, + position_id: int = typer.Argument(..., help="Position id to top up."), + amount: float = typer.Argument(..., help="Amount to add to the carry buffer R."), + state_path: str = _deriv_state_opt(), + ): + """Top up a position's carry buffer (delays default; does not change liability).""" + deriv.top_up(state_path, position_id, amount) + + def deriv_close( + self, + position_id: int = typer.Argument(..., help="Position id to close."), + fraction: float = typer.Option( + 1.0, "--fraction", "-f", help="Fraction to close, in (0, 1]." + ), + state_path: str = _deriv_state_opt(), + ): + """Close (or partially close) a position and see PnL.""" + deriv.close(state_path, position_id, fraction) + + def deriv_advance( + self, + duration: str = typer.Argument( + ..., help="Time to advance, e.g. '30d', '12h', '7200b' or a block count." + ), + state_path: str = _deriv_state_opt(), + ): + """Advance simulated time: decay carry, run restoration zaps, process defaults.""" + deriv.advance(state_path, duration) + + def deriv_status( + self, + state_path: str = _deriv_state_opt(), + ): + """Show the pool, per-side utilization/carry/capacity, and your open positions.""" + deriv.status(state_path) + + def deriv_reset( + self, + tao: float = typer.Option(1000.0, "--tao", help="Initial pool TAO reserve."), + alpha: float = typer.Option( + 100_000.0, "--alpha", help="Initial pool Alpha reserve." + ), + enable_longs: bool = typer.Option( + True, "--enable-longs/--shorts-only", help="Enable the long side." + ), + state_path: str = _deriv_state_opt(), + ): + """Reset the sandbox to a fresh pool (clears all positions).""" + deriv.reset(state_path, tao, alpha, enable_longs) + # Liquidity def liquidity_add( diff --git a/bittensor_cli/src/commands/deriv/__init__.py b/bittensor_cli/src/commands/deriv/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/bittensor_cli/src/commands/deriv/deriv.py b/bittensor_cli/src/commands/deriv/deriv.py new file mode 100644 index 000000000..1ffdd5113 --- /dev/null +++ b/bittensor_cli/src/commands/deriv/deriv.py @@ -0,0 +1,279 @@ +""" +Command handlers for the `btcli deriv` sandbox: render layer over `sim.py`. + +These commands are fully local (no chain, no wallet). They load a JSON state +file, mutate it through the simulator, render a rich view, and save it back. +""" + +from __future__ import annotations + +from rich.table import Table + +from bittensor_cli.src import COLORS +from bittensor_cli.src.bittensor.utils import console, print_error +from bittensor_cli.src.commands.deriv import sim + +C = COLORS.G # general palette +_SHORT = "#C25E7C" # rose +_LONG = "#53B5A0" # teal + + +def _fmt(x: float, dp: int = 4) -> str: + if x == float("inf"): + return "inf" + return f"{x:,.{dp}f}" + + +def _days(x: float) -> str: + if x == float("inf"): + return "never" + if x >= 365: + return f"{x / 365:.2f} yr" + return f"{x:.1f} d" + + +def _side_color(side: str) -> str: + return _SHORT if side == "short" else _LONG + + +# --------------------------------------------------------------------------- +def reset( + state_path: str, + tao: float, + alpha: float, + enable_longs: bool, +) -> None: + cfg = sim.Config(long_side_enabled=enable_longs) + state = sim.State.new(tao=tao, alpha=alpha, config=cfg) + sim.save_state(state, state_path) + console.print( + f"[{C.SUCCESS}]Sandbox reset.[/{C.SUCCESS}] " + f"Pool: [{C.BAL}]{_fmt(tao, 2)} TAO[/{C.BAL}] / " + f"[{_LONG}]{_fmt(alpha, 2)} Alpha[/{_LONG}] " + f"price=[{C.SYM}]{_fmt(state.pool.price, 6)}[/{C.SYM}] " + f"longs={'on' if enable_longs else 'off'}" + ) + + +def quote(state_path: str, side: str, p_input: float) -> None: + state = sim.load_state(state_path) + try: + q = sim.quote(state, side, p_input) # type: ignore[arg-type] + except sim.SimError as e: + print_error(str(e)) + return + + liab_unit = "Alpha" if side == "short" else "TAO" + floor_unit = "TAO" if side == "short" else "Alpha" + buf_unit = floor_unit # buffer is denominated in the floor asset + + t = Table( + title=f"[bold {_side_color(side)}]{side.upper()}[/] open quote " + f"(P = {_fmt(p_input, 4)} {floor_unit})", + show_header=False, + title_justify="left", + ) + t.add_column(style=C.SUBHEAD) + t.add_column(style="white", justify="right") + + t.add_row("Position input P", f"{_fmt(q.p_input)} {floor_unit}") + t.add_row("Gross collateral C", f"{_fmt(q.gross_collateral)} {floor_unit}") + t.add_row("Retained proceeds N (→ R0)", f"{_fmt(q.retained_proceeds)} {buf_unit}") + t.add_row("Effective LTV", f"{q.effective_ltv * 100:.2f}%") + t.add_row(f"Fixed liability {'Q' if side == 'short' else 'D'}", f"{_fmt(q.liability)} {liab_unit}") + t.add_row("Linked escrow E", f"{_fmt(q.escrow)}") + t.add_row("Pool fraction φ", f"{q.pool_fraction * 100:.3f}%") + t.add_row("Price impact", f"{q.price_impact * 100:.3f}%") + t.add_row(" price before → after", f"{_fmt(q.price_before, 6)} → {_fmt(q.price_after, 6)}") + t.add_row("Daily carry (now)", f"{q.daily_carry * 100:.3f}%/day") + t.add_row("Time to dust min / max", f"{_days(q.min_days_to_dust)} / {_days(q.max_days_to_dust)}") + if side == "short": + t.add_row("Est. close cost K(Q)", f"{_fmt(q.est_close_cost)} TAO") + t.add_row("Break-even close price", f"{_fmt(q.break_even_price, 6)} TAO/Alpha") + t.add_row( + "Profitable if close cost <", + f"R = {_fmt(q.retained_proceeds)} | rational if < P+R = {_fmt(q.p_input + q.retained_proceeds)}", + ) + else: + t.add_row("Close: repay liability D", f"{_fmt(q.est_close_cost)} TAO") + t.add_row("Break-even Alpha price", f"{_fmt(q.break_even_price, 6)} TAO/Alpha") + console.print(t) + + +def open_(state_path: str, side: str, p_input: float) -> None: + state = sim.load_state(state_path) + try: + pos = sim.open_position(state, side, p_input) # type: ignore[arg-type] + except sim.SimError as e: + print_error(str(e)) + return + sim.save_state(state, state_path) + console.print( + f"[{C.SUCCESS}]Opened[/{C.SUCCESS}] [{_side_color(side)}]{side}[/] position " + f"[bold]#{pos.id}[/] P={_fmt(pos.p)} " + f"{'Q' if side == 'short' else 'D'}={_fmt(pos.liability)} R0={_fmt(pos.r_stored)}" + ) + _print_status(state) + + +def top_up(state_path: str, position_id: int, amount: float) -> None: + state = sim.load_state(state_path) + try: + pos = sim.top_up(state, position_id, amount) + except sim.SimError as e: + print_error(str(e)) + return + sim.save_state(state, state_path) + console.print( + f"[{C.SUCCESS}]Topped up[/{C.SUCCESS}] #{pos.id} by {_fmt(amount)} → " + f"buffer R = {_fmt(pos.r_stored)}" + ) + + +def close(state_path: str, position_id: int, fraction: float) -> None: + state = sim.load_state(state_path) + try: + res = sim.close_position(state, position_id, fraction) + except sim.SimError as e: + print_error(str(e)) + return + sim.save_state(state, state_path) + pnl_color = C.SUCCESS if res.pnl >= 0 else _SHORT + verb = "Closed" if res.fully_closed else f"Partially closed ({fraction:.0%})" + payout_unit = "TAO" if res.side == "short" else "Alpha" + repay_unit = "Alpha" if res.side == "short" else "TAO" + console.print( + f"[{C.SUCCESS}]{verb}[/{C.SUCCESS}] #{res.position_id} " + f"repaid {_fmt(res.repaid)} {repay_unit} " + f"close-cost {_fmt(res.close_cost)} TAO " + f"payout {_fmt(res.payout)} {payout_unit} " + f"PnL [{pnl_color}]{_fmt(res.pnl)} TAO[/{pnl_color}]" + ) + _print_status(state) + + +def advance(state_path: str, duration: str) -> None: + state = sim.load_state(state_path) + try: + blocks = sim.parse_duration(duration) + rep = sim.advance(state, blocks) + except (sim.SimError, ValueError) as e: + print_error(str(e)) + return + sim.save_state(state, state_path) + console.print( + f"[{C.SUBHEAD}]Advanced[/{C.SUBHEAD}] {rep.blocks} blocks ({rep.days:.2f} d). " + f"restored: [{_SHORT}]{_fmt(rep.restored_tao)} TAO[/{_SHORT}] (short) / " + f"[{_LONG}]{_fmt(rep.restored_alpha)} Alpha[/{_LONG}] (long). " + f"price {_fmt(rep.price_before, 6)} → {_fmt(rep.price_after, 6)}" + ) + if rep.defaults: + console.print(f"[{_SHORT}]Defaulted positions: {rep.defaults}[/{_SHORT}]") + _print_status(state) + + +def status(state_path: str) -> None: + state = sim.load_state(state_path) + _print_status(state) + + +# --------------------------------------------------------------------------- +def _print_status(state: sim.State) -> None: + pool = state.pool + cfg = state.config + + # Pool + per-side aggregate / capacity view (the "interaction" dashboard). + u_s = sim._utilization(state, "short") + u_l = sim._utilization(state, "long") + cap_s = cfg.kappa_short * min(pool.t, pool.t_ema) + cap_l = cfg.kappa_long * min(pool.a, pool.a_ema) + carry_s = cfg.d_min + (cfg.d_max - cfg.d_min) * u_s * u_s + carry_l = cfg.d_min + (cfg.d_max - cfg.d_min) * u_l * u_l + + pt = Table( + title=f"[bold {C.HEADER}]Sandbox pool[/] block {state.block} ({state.block / sim.BLOCKS_PER_DAY:.2f} d)", + title_justify="left", + show_header=True, + header_style=C.SUBHEAD_MAIN, + ) + pt.add_column("") + pt.add_column("TAO", justify="right") + pt.add_column("Alpha", justify="right") + pt.add_row("Reserves (live)", _fmt(pool.t, 2), _fmt(pool.a, 2)) + pt.add_row("Reserves (EMA)", _fmt(pool.t_ema, 2), _fmt(pool.a_ema, 2)) + pt.add_row("Price (TAO/Alpha)", _fmt(pool.price, 6), "") + console.print(pt) + + st = Table(show_header=True, header_style=C.SUBHEAD_MAIN, title_justify="left") + st.add_column("Side") + st.add_column("Util u", justify="right") + st.add_column("Footprint S", justify="right") + st.add_column("Capacity Smax", justify="right") + st.add_column("Carry/day", justify="right") + st.add_column("Σ buffer R", justify="right") + st.add_column("Σ escrow E", justify="right") + st.add_column("Σ liability", justify="right") + st.add_row( + f"[{_SHORT}]short[/{_SHORT}]", + f"{u_s * 100:.1f}%", + _fmt(state.b_sigma_short, 3), + _fmt(cap_s, 3), + f"{carry_s * 100:.3f}%", + _fmt(state.r_sigma_short, 3), + _fmt(state.e_sigma_short, 3), + f"{_fmt(state.q_sigma_short, 2)} α", + ) + st.add_row( + f"[{_LONG}]long[/{_LONG}]", + f"{u_l * 100:.1f}%", + _fmt(state.b_sigma_long, 3), + _fmt(cap_l, 3), + f"{carry_l * 100:.3f}%", + _fmt(state.r_sigma_long, 3), + _fmt(state.e_sigma_long, 3), + f"{_fmt(state.d_sigma_long, 2)} τ", + ) + console.print(st) + + open_positions = state.open_positions() + if not open_positions: + console.print(f"[{C.SUBHEAD_EX_1}]No open positions.[/{C.SUBHEAD_EX_1}]") + return + + pos_t = Table( + title="[bold]Open positions[/]", + title_justify="left", + show_header=True, + header_style=C.SUBHEAD_MAIN, + ) + pos_t.add_column("#", justify="right") + pos_t.add_column("Side") + pos_t.add_column("Floor P", justify="right") + pos_t.add_column("Buffer R", justify="right") + pos_t.add_column("Liability", justify="right") + pos_t.add_column("Close cost", justify="right") + pos_t.add_column("Status / PnL if closed", justify="right") + for p in open_positions: + r, e, b = p.materialized(state.omega(p.side)) + if p.side == "short": + if p.liability < pool.a: + cost = pool.t * p.liability / (pool.a - p.liability) + else: + cost = float("inf") + pnl = (p.p + r) - cost - p.p # = R - close_cost + liab_str = f"{_fmt(p.liability, 2)} α" + else: + cost = p.liability # repay D tao + pnl = (p.p + r) * pool.price - cost - p.p * pool.price + liab_str = f"{_fmt(p.liability, 2)} τ" + pnl_color = C.SUCCESS if pnl >= 0 else _SHORT + pos_t.add_row( + str(p.id), + f"[{_side_color(p.side)}]{p.side}[/]", + _fmt(p.p, 3), + _fmt(r, 3), + liab_str, + _fmt(cost, 3), + f"[{pnl_color}]{_fmt(pnl, 3)} TAO[/{pnl_color}]", + ) + console.print(pos_t) diff --git a/bittensor_cli/src/commands/deriv/sim.py b/bittensor_cli/src/commands/deriv/sim.py new file mode 100644 index 000000000..32499da2f --- /dev/null +++ b/bittensor_cli/src/commands/deriv/sim.py @@ -0,0 +1,550 @@ +""" +Local simulator for the Fixed-Liability Covered Continuous-Unwind model (spec v3.6.1). + +This is a deterministic, in-process sandbox: no chain, no wallet, no signing. It +implements the closed-form math from the spec (see appendix A) against a single +fake CPMM pool so a user can open / top-up / close short and long positions, +advance simulated time, and watch carry, break-even, close cost and pool price +push on each other. + +Simplifications relative to production (intentional, see spec section 14.6): + * No pool fees. The closed-form "no-fee CPMM core" is used everywhere. + * Single pool (one subnet). Positions share that pool, which is the whole point: + it lets you see how shorts and longs interact through price and utilization. + * No real defaults scheduling / MEV / drand. Buffer-reaches-dust default is + processed deterministically on advance. + * Long side is enabled by default here so both sides can be explored. The spec + launches shorts-first with longs gated; that flag lives in `Config`. +""" + +from __future__ import annotations + +import json +import math +import os +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Literal, Optional + +Side = Literal["short", "long"] + +BLOCKS_PER_DAY = 7200 # 12s blocks +DEFAULT_STATE_PATH = os.path.expanduser("~/.bittensor/deriv_sim.json") + + +class SimError(Exception): + """Raised when an operation violates a spec rule (rejected open, bad fraction...).""" + + +@dataclass +class Config: + """Policy parameters. Defaults follow the spec's conservative starting set (section 14.1).""" + + lambda_short: float = 0.50 + lambda_long: float = 0.50 + kappa_short: float = 0.33 # active-footprint cap fraction (~ phi_cap 1/3) + kappa_long: float = 0.25 + d_min: float = 0.001 # 0.1%/day at zero utilization + d_max: float = 0.015 # 1.5%/day at full utilization + r_dust: float = 1.0 # buffer dust threshold (in the side's buffer asset) + ema_halflife_blocks: int = BLOCKS_PER_DAY # lagged EMA reference + long_side_enabled: bool = True # spec default is False (shorts-first); on here to explore both + + +@dataclass +class Position: + id: int + side: Side + # Non-decaying floor supplied by the trader (TAO for short, Alpha for long). + p: float + # Fixed liability: Alpha (Q) for a short, TAO (D) for a long. Does not decay. + liability: float + # Stored (last-materialized) decaying components. + r_stored: float # retained-proceeds buffer + e_stored: float # linked escrow + b_stored: float # utilization footprint + omega_entry: float + status: str = "open" # open | closed | defaulted + + def materialized(self, omega_side: float) -> tuple[float, float, float]: + """Return current (R, E, B) given the side accumulator, without mutating.""" + f = math.exp(-(omega_side - self.omega_entry)) + return self.r_stored * f, self.e_stored * f, self.b_stored * f + + +@dataclass +class Pool: + # Live CPMM reserves. + a: float # Alpha reserve + t: float # TAO reserve + # Lagged EMA references for risk sizing. + a_ema: float + t_ema: float + + @property + def price(self) -> float: + """Alpha price in TAO = TAO reserve / Alpha reserve.""" + return self.t / self.a + + +@dataclass +class State: + pool: Pool + config: Config + # Side accumulators (monotonic) and aggregate current components. + omega_short: float = 0.0 + omega_long: float = 0.0 + r_sigma_short: float = 0.0 + e_sigma_short: float = 0.0 + b_sigma_short: float = 0.0 + r_sigma_long: float = 0.0 + e_sigma_long: float = 0.0 + b_sigma_long: float = 0.0 + q_sigma_short: float = 0.0 # aggregate fixed Alpha liability + d_sigma_long: float = 0.0 # aggregate fixed TAO liability + block: int = 0 + next_id: int = 1 + positions: list[Position] = field(default_factory=list) + + # ---- persistence ------------------------------------------------------- + def to_json(self) -> str: + d = asdict(self) + return json.dumps(d, indent=2) + + @classmethod + def from_json(cls, raw: str) -> "State": + d = json.loads(raw) + pool = Pool(**d["pool"]) + config = Config(**d["config"]) + positions = [Position(**p) for p in d["positions"]] + d.update(pool=pool, config=config, positions=positions) + return cls(**d) + + @classmethod + def new( + cls, + tao: float = 1000.0, + alpha: float = 100_000.0, + config: Optional[Config] = None, + ) -> "State": + pool = Pool(a=alpha, t=tao, a_ema=alpha, t_ema=tao) + return cls(pool=pool, config=config or Config()) + + # ---- side helpers ------------------------------------------------------ + def omega(self, side: Side) -> float: + return self.omega_short if side == "short" else self.omega_long + + def open_positions(self, side: Optional[Side] = None) -> list[Position]: + return [ + p + for p in self.positions + if p.status == "open" and (side is None or p.side == side) + ] + + +def load_state(path: str = DEFAULT_STATE_PATH) -> State: + p = Path(path) + if not p.exists(): + state = State.new() + save_state(state, path) + return state + return State.from_json(p.read_text()) + + +def save_state(state: State, path: str = DEFAULT_STATE_PATH) -> None: + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(state.to_json()) + + +# --------------------------------------------------------------------------- +# Pure pricing helpers (spec appendix A) +# --------------------------------------------------------------------------- +def _solve_collateral(p_input: float, lam: float, s: float, ref: float) -> float: + """Solve gross collateral C from user input P (spec 4.2). Positive root.""" + a = lam * lam / ref + b = 1.0 - lam + 2.0 * lam * s / ref + return (-b + math.sqrt(b * b + 4.0 * a * p_input)) / (2.0 * a) + + +def _phi_from_n(n: float, live_ref: float) -> float: + """Pool fraction that generates retained proceeds N (spec 4.3). Smaller root.""" + inner = 1.0 - 4.0 * n / live_ref + if inner < 0: + raise SimError( + f"remove-and-sell-back domain failed: 4N ({4 * n:.4f}) > live reserve ({live_ref:.4f})" + ) + return (1.0 - math.sqrt(inner)) / 2.0 + + +@dataclass +class Quote: + """Pre-trade preview (spec 1.2: what the trader should see before opening).""" + + side: Side + p_input: float + gross_collateral: float + retained_proceeds: float # N -> R0 + effective_ltv: float + pool_fraction: float # phi + price_impact: float # delta + liability: float # Q (alpha) for short, D (tao) for long + escrow: float + footprint: float # B + daily_carry: float # at current utilization + min_days_to_dust: float # decay at d_max + max_days_to_dust: float # decay at d_min + est_close_cost: float # short: TAO to buy Q alpha; long: D tao + break_even_price: float # alpha price at which close breaks even + price_before: float + price_after: float + + +def _days_to_dust(r0: float, daily_decay: float, r_dust: float) -> float: + if r0 <= r_dust: + return 0.0 + if daily_decay <= 0: + return math.inf + return math.log(r_dust / r0) / math.log(1.0 - daily_decay) + + +def quote(state: State, side: Side, p_input: float) -> Quote: + """Compute an open preview without mutating state.""" + cfg = state.config + pool = state.pool + if side == "long" and not cfg.long_side_enabled: + raise SimError("long side is disabled (spec launch posture is shorts-first)") + if p_input <= 0: + raise SimError("position input P must be positive") + + if side == "short": + lam, kappa = cfg.lambda_short, cfg.kappa_short + ref = min(pool.t, pool.t_ema) # T_ref + s = state.b_sigma_short + live_ref = pool.t + else: + lam, kappa = cfg.lambda_long, cfg.kappa_long + ref = min(pool.a, pool.a_ema) # A_ref + s = state.b_sigma_long + live_ref = pool.a + + c = _solve_collateral(p_input, lam, s, ref) + n = c - p_input + if n <= 0: + raise SimError( + f"open rejected: effective LTV <= 0 at this utilization (N={n:.4f})" + ) + b = lam * c + if s + b > kappa * ref: + raise SimError( + f"open rejected: footprint cap exceeded (S+B={s + b:.4f} > kappa*ref={kappa * ref:.4f})" + ) + phi = _phi_from_n(n, live_ref) + + if side == "short": + liability = phi * pool.a # Q alpha + escrow = phi * pool.t # E tao + price_impact = 1.0 - (1.0 - phi) ** 2 # delta_S (downward) + price_after = pool.t * (1.0 - phi) ** 2 / pool.a + # close cost: TAO to buy Q alpha out of the pool (trader holds none) + est_close = ( + pool.t * liability / (pool.a - liability) + if liability < pool.a + else math.inf + ) + break_even_price = est_close / liability if liability > 0 else 0.0 + else: + liability = phi * pool.t # D tao + escrow = phi * pool.a # E alpha + price_impact = 1.0 / (1.0 - phi) ** 2 - 1.0 # delta_L (upward) + price_after = pool.t / (pool.a * (1.0 - phi) ** 2) + # long break-even: alpha price at which returned (P+R) alpha covers liability D + break_even_price = liability / (p_input + n) if (p_input + n) > 0 else 0.0 + est_close = liability # repay D tao + + u = _utilization(state, side) + daily = cfg.d_min + (cfg.d_max - cfg.d_min) * u * u + return Quote( + side=side, + p_input=p_input, + gross_collateral=c, + retained_proceeds=n, + effective_ltv=n / c, + pool_fraction=phi, + price_impact=price_impact, + liability=liability, + escrow=escrow, + footprint=b, + daily_carry=daily, + min_days_to_dust=_days_to_dust(n, cfg.d_max, cfg.r_dust), + max_days_to_dust=_days_to_dust(n, cfg.d_min, cfg.r_dust), + est_close_cost=est_close, + break_even_price=break_even_price, + price_before=pool.price, + price_after=price_after, + ) + + +def _utilization(state: State, side: Side) -> float: + cfg = state.config + if side == "short": + denom = cfg.kappa_short * state.pool.t_ema + s = state.b_sigma_short + else: + denom = cfg.kappa_long * state.pool.a_ema + s = state.b_sigma_long + if denom <= 0: + return 0.0 + return min(1.0, s / denom) + + +# --------------------------------------------------------------------------- +# Mutating operations +# --------------------------------------------------------------------------- +def open_position(state: State, side: Side, p_input: float) -> Position: + q = quote(state, side, p_input) + pool = state.pool + if side == "short": + # remove-and-sell-back: A unchanged, T -> (1-phi)^2 T + pool.t = pool.t * (1.0 - q.pool_fraction) ** 2 + state.r_sigma_short += q.retained_proceeds + state.e_sigma_short += q.escrow + state.b_sigma_short += q.footprint + state.q_sigma_short += q.liability + omega_entry = state.omega_short + else: + # mirror: T unchanged, A -> (1-phi)^2 A + pool.a = pool.a * (1.0 - q.pool_fraction) ** 2 + state.r_sigma_long += q.retained_proceeds + state.e_sigma_long += q.escrow + state.b_sigma_long += q.footprint + state.d_sigma_long += q.liability + omega_entry = state.omega_long + + pos = Position( + id=state.next_id, + side=side, + p=p_input, + liability=q.liability, + r_stored=q.retained_proceeds, + e_stored=q.escrow, + b_stored=q.footprint, + omega_entry=omega_entry, + ) + state.next_id += 1 + state.positions.append(pos) + return pos + + +def _get_open(state: State, position_id: int) -> Position: + for p in state.positions: + if p.id == position_id: + if p.status != "open": + raise SimError(f"position {position_id} is {p.status}, not open") + return p + raise SimError(f"no open position with id {position_id}") + + +def _materialize(state: State, pos: Position) -> None: + """Fold elapsed decay into a single position's stored components.""" + omega = state.omega(pos.side) + f = math.exp(-(omega - pos.omega_entry)) + pos.r_stored *= f + pos.e_stored *= f + pos.b_stored *= f + pos.omega_entry = omega + + +def top_up(state: State, position_id: int, amount: float) -> Position: + if amount <= 0: + raise SimError("top-up amount must be positive") + pos = _get_open(state, position_id) + if pos.side == "long" and not state.config.long_side_enabled: + raise SimError("long side is disabled") + _materialize(state, pos) + pos.r_stored += amount + if pos.side == "short": + state.r_sigma_short += amount + else: + state.r_sigma_long += amount + return pos + + +@dataclass +class CloseResult: + position_id: int + fraction: float + side: Side + repaid: float # alpha (short) or tao (long) returned to cover liability + payout: float # P+R slice returned to the trader (TAO short / Alpha long) + close_cost: float # market cost to source the repaid liability asset + pnl: float # payout - close_cost - capital_consumed_for_this_slice + fully_closed: bool + + +def close_position( + state: State, position_id: int, fraction: float = 1.0 +) -> CloseResult: + if fraction <= 0 or fraction > 1: + raise SimError("fraction must be in (0, 1]") + pos = _get_open(state, position_id) + _materialize(state, pos) + pool = state.pool + rho = fraction + + repaid = rho * pos.liability + payout = rho * (pos.p + pos.r_stored) + + if pos.side == "short": + # close cost: buy `repaid` alpha out of the pool + if repaid >= pool.a: + raise SimError("close cost unbounded: liability exceeds pool Alpha") + close_cost = pool.t * repaid / (pool.a - repaid) + # settlement zap injects (alpha=repaid, tao=rho*E) into the pool + pool.a += repaid + pool.t += rho * pos.e_stored + state.q_sigma_short -= repaid + state.r_sigma_short -= rho * pos.r_stored + state.e_sigma_short -= rho * pos.e_stored + state.b_sigma_short -= rho * pos.b_stored + pnl = payout - close_cost - rho * pos.p + else: + # long repays `repaid` TAO; settlement zap injects (alpha=rho*E, tao=repaid) + close_cost = repaid # D tao + pool.a += rho * pos.e_stored + pool.t += repaid + state.d_sigma_long -= repaid + state.r_sigma_long -= rho * pos.r_stored + state.e_sigma_long -= rho * pos.e_stored + state.b_sigma_long -= rho * pos.b_stored + # payout is Alpha; value vs capital consumed in TAO terms at current price + pnl = payout * pool.price - close_cost - rho * pos.p * pool.price + + pos.p *= 1.0 - rho + pos.liability *= 1.0 - rho + pos.r_stored *= 1.0 - rho + pos.e_stored *= 1.0 - rho + pos.b_stored *= 1.0 - rho + fully = rho >= 1.0 or pos.p <= 1e-12 + if fully: + pos.status = "closed" + return CloseResult( + position_id=position_id, + fraction=rho, + side=pos.side, + repaid=repaid, + payout=payout, + close_cost=close_cost, + pnl=pnl, + fully_closed=fully, + ) + + +@dataclass +class AdvanceReport: + blocks: int + days: float + restored_tao: float # short-side restoration injected into pool TAO + restored_alpha: float # long-side restoration injected into pool Alpha + defaults: list[int] = field(default_factory=list) + price_before: float = 0.0 + price_after: float = 0.0 + + +def _decay_side(state: State, side: Side, blocks: int) -> float: + """Apply aggregate block-step unwind for one side, return restoration amount.""" + cfg = state.config + u = _utilization(state, side) + d_day = cfg.d_min + (cfg.d_max - cfg.d_min) * u * u + g = (1.0 - d_day) ** (blocks / BLOCKS_PER_DAY) + if side == "short": + r, e, b = ( + state.r_sigma_short, + state.e_sigma_short, + state.b_sigma_short, + ) + restored = (r + e) * (1.0 - g) + state.r_sigma_short = r * g + state.e_sigma_short = e * g + state.b_sigma_short = b * g + state.omega_short += -math.log(g) if g > 0 else 0.0 + state.pool.t += restored # restoration zap nets to one-sided TAO injection + else: + r, e, b = ( + state.r_sigma_long, + state.e_sigma_long, + state.b_sigma_long, + ) + restored = (r + e) * (1.0 - g) + state.r_sigma_long = r * g + state.e_sigma_long = e * g + state.b_sigma_long = b * g + state.omega_long += -math.log(g) if g > 0 else 0.0 + state.pool.a += restored # mirror: one-sided Alpha injection + return restored + + +def _update_ema(state: State, blocks: int) -> None: + cfg = state.config + pool = state.pool + alpha = 1.0 - math.exp(-blocks / max(1, cfg.ema_halflife_blocks)) + pool.t_ema += (pool.t - pool.t_ema) * alpha + pool.a_ema += (pool.a - pool.a_ema) * alpha + + +def _process_defaults(state: State) -> list[int]: + defaulted = [] + for pos in state.open_positions(): + _materialize(state, pos) + if pos.r_stored <= state.config.r_dust: + if pos.side == "short": + state.pool.t += pos.r_stored + pos.e_stored + state.r_sigma_short -= pos.r_stored + state.e_sigma_short -= pos.e_stored + state.b_sigma_short -= pos.b_stored + state.q_sigma_short -= pos.liability + else: + state.pool.a += pos.r_stored + pos.e_stored + state.r_sigma_long -= pos.r_stored + state.e_sigma_long -= pos.e_stored + state.b_sigma_long -= pos.b_stored + state.d_sigma_long -= pos.liability + # floor P is recycled out of the pool (lost to the trader) + pos.r_stored = pos.e_stored = pos.b_stored = 0.0 + pos.liability = 0.0 + pos.p = 0.0 + pos.status = "defaulted" + defaulted.append(pos.id) + return defaulted + + +def advance(state: State, blocks: int) -> AdvanceReport: + if blocks <= 0: + raise SimError("blocks must be positive") + price_before = state.pool.price + restored_tao = _decay_side(state, "short", blocks) + restored_alpha = _decay_side(state, "long", blocks) + _update_ema(state, blocks) + defaults = _process_defaults(state) + state.block += blocks + return AdvanceReport( + blocks=blocks, + days=blocks / BLOCKS_PER_DAY, + restored_tao=restored_tao, + restored_alpha=restored_alpha, + defaults=defaults, + price_before=price_before, + price_after=state.pool.price, + ) + + +def parse_duration(text: str) -> int: + """Parse '7200', '30d', '12h', '100b' into a block count.""" + t = text.strip().lower() + if t.endswith("d"): + return int(round(float(t[:-1]) * BLOCKS_PER_DAY)) + if t.endswith("h"): + return int(round(float(t[:-1]) * BLOCKS_PER_DAY / 24)) + if t.endswith("b"): + return int(float(t[:-1])) + return int(float(t)) From 5a3a1f949beaed12649044924a9a14cb7ad185a9 Mon Sep 17 00:00:00 2001 From: unconst Date: Thu, 18 Jun 2026 12:27:41 -0600 Subject: [PATCH 2/6] Make `btcli deriv` operate on-chain (replace sandbox) Replaces the local simulator with real on-chain commands that drive the pallet-subtensor covered continuous-unwind extrinsics and read the DerivativesRuntimeApi. Commands (`deriv`, alias `d`), all over --network / wallet / --netuid: quote preview open (quote_open_short/long) positions list your positions (get_*_positions / get_*_position) market per-subnet market state (get_subnet_*_state) open open_short / open_long topup top_up_short / top_up_long close close_short / close_long (partial via --fraction) default permissionless default_short / default_long Removes the sandbox sim engine. Co-authored-by: Cursor --- bittensor_cli/cli.py | 254 +++++++--- bittensor_cli/src/commands/deriv/deriv.py | 548 ++++++++++++--------- bittensor_cli/src/commands/deriv/sim.py | 550 ---------------------- 3 files changed, 505 insertions(+), 847 deletions(-) delete mode 100644 bittensor_cli/src/commands/deriv/sim.py diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 99a287366..ab3f2aae3 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -78,7 +78,7 @@ from bittensor_cli.src.commands import sudo, wallets, view from bittensor_cli.src.commands import weights as weights_cmds from bittensor_cli.src.commands.liquidity import liquidity -from bittensor_cli.src.commands.deriv import deriv, sim +from bittensor_cli.src.commands.deriv import deriv from bittensor_cli.src.commands.crowd import ( contribute as crowd_contribute, create as create_crowdloan, @@ -1420,26 +1420,25 @@ def __init__(self): "remove", rich_help_panel=HELP_PANELS["LIQUIDITY"]["LIQUIDITY_MGMT"] )(self.liquidity_remove) - # Deriv sandbox (local long/short simulator, no chain) - _DERIV_QUOTE = "Preview" + # Deriv (on-chain covered long/short derivatives) + _DERIV_INFO = "Info" _DERIV_LIFE = "Position lifecycle" - _DERIV_SIM = "Sandbox" self.app.add_typer( self.deriv_app, name="deriv", - short_help="local long/short sandbox simulator, aliases: `d`", + short_help="covered long/short derivative commands, aliases: `d`", no_args_is_help=True, ) self.app.add_typer( self.deriv_app, name="d", hidden=True, no_args_is_help=True ) - self.deriv_app.command("quote", rich_help_panel=_DERIV_QUOTE)(self.deriv_quote) + self.deriv_app.command("quote", rich_help_panel=_DERIV_INFO)(self.deriv_quote) + self.deriv_app.command("positions", rich_help_panel=_DERIV_INFO)(self.deriv_positions) + self.deriv_app.command("market", rich_help_panel=_DERIV_INFO)(self.deriv_market) self.deriv_app.command("open", rich_help_panel=_DERIV_LIFE)(self.deriv_open) self.deriv_app.command("topup", rich_help_panel=_DERIV_LIFE)(self.deriv_topup) self.deriv_app.command("close", rich_help_panel=_DERIV_LIFE)(self.deriv_close) - self.deriv_app.command("advance", rich_help_panel=_DERIV_SIM)(self.deriv_advance) - self.deriv_app.command("status", rich_help_panel=_DERIV_SIM)(self.deriv_status) - self.deriv_app.command("reset", rich_help_panel=_DERIV_SIM)(self.deriv_reset) + self.deriv_app.command("default", rich_help_panel=_DERIV_LIFE)(self.deriv_default) # utils app self.utils_app.command("convert")(self.convert) @@ -9083,94 +9082,217 @@ def view_dashboard( ) ) - # Deriv sandbox (local long/short simulator) + # Deriv (on-chain covered long/short derivatives) @staticmethod - def _deriv_state_opt() -> str: - return typer.Option( - sim.DEFAULT_STATE_PATH, - "--state", - help="Path to the local sandbox state file.", - ) + def _deriv_side(side: str) -> Optional[str]: + if side not in ("short", "long"): + print_error("--side must be 'short' or 'long'") + return None + return side def deriv_quote( self, - p: float = typer.Argument(..., help="Position input P (capital you supply)."), - side: str = typer.Option( - "short", "--side", help="Position side: short or long." + network: Optional[list[str]] = Options.network, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + amount: float = typer.Option( + ..., "--amount", "-a", help="Position input P (TAO for short, Alpha for long)." ), - state_path: str = _deriv_state_opt(), + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Preview an open: retained proceeds, liability, effective LTV, carry, close cost, break-even. No state change.""" - if side not in ("short", "long"): - print_error("side must be 'short' or 'long'") + """Preview opening a covered position: retained proceeds, liability, effective LTV, carry, close cost.""" + self.verbosity_handler(quiet, verbose, json_output, prompt=False) + if not (side := self._deriv_side(side)): return - deriv.quote(state_path, side, p) + return self._run_command( + deriv.quote_open( + subtensor=self.initialize_chain(network), + netuid=netuid, side=side, amount=amount, json_output=json_output, + ) + ) - def deriv_open( + def deriv_positions( self, - p: float = typer.Argument(..., help="Position input P (capital you supply)."), - side: str = typer.Option( - "short", "--side", help="Position side: short or long." + network: Optional[list[str]] = Options.network, + wallet_name: str = Options.wallet_name, + wallet_path: str = Options.wallet_path, + wallet_hotkey: str = Options.wallet_hotkey, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + netuid: Optional[int] = Options.netuid_not_req, + coldkey_ss58: Optional[str] = typer.Option( + None, "--coldkey-ss58", "--ss58", help="Coldkey to inspect (defaults to your wallet)." ), - state_path: str = _deriv_state_opt(), + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Open a long/short position in the sandbox.""" - if side not in ("short", "long"): - print_error("side must be 'short' or 'long'") + """List your covered positions (a side, optionally a single subnet).""" + self.verbosity_handler(quiet, verbose, json_output, prompt=False) + if not (side := self._deriv_side(side)): return - deriv.open_(state_path, side, p) + if not coldkey_ss58: + wallet = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, + ) + coldkey_ss58 = wallet.coldkeypub.ss58_address + return self._run_command( + deriv.show_positions( + subtensor=self.initialize_chain(network), + coldkey_ss58=coldkey_ss58, side=side, netuid=netuid, + json_output=json_output, + ) + ) - def deriv_topup( + def deriv_market( self, - position_id: int = typer.Argument(..., help="Position id to top up."), - amount: float = typer.Argument(..., help="Amount to add to the carry buffer R."), - state_path: str = _deriv_state_opt(), + network: Optional[list[str]] = Options.network, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Top up a position's carry buffer (delays default; does not change liability).""" - deriv.top_up(state_path, position_id, amount) + """Show the per-subnet derivative market state (caps, utilization, carry, open interest).""" + self.verbosity_handler(quiet, verbose, json_output, prompt=False) + if not (side := self._deriv_side(side)): + return + return self._run_command( + deriv.show_market( + subtensor=self.initialize_chain(network), + netuid=netuid, side=side, json_output=json_output, + ) + ) - def deriv_close( + def deriv_open( self, - position_id: int = typer.Argument(..., help="Position id to close."), - fraction: float = typer.Option( - 1.0, "--fraction", "-f", help="Fraction to close, in (0, 1]." + network: Optional[list[str]] = Options.network, + wallet_name: str = Options.wallet_name, + wallet_path: str = Options.wallet_path, + wallet_hotkey: str = Options.wallet_hotkey, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + amount: float = typer.Option( + ..., "--amount", "-a", help="Position input P (TAO for short, Alpha for long)." ), - state_path: str = _deriv_state_opt(), + prompt: bool = Options.prompt, + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Close (or partially close) a position and see PnL.""" - deriv.close(state_path, position_id, fraction) + """Open a covered short/long position on a subnet.""" + self.verbosity_handler(quiet, verbose, json_output, prompt) + if not (side := self._deriv_side(side)): + return + wallet, hotkey = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.HOTKEY, WO.PATH], validate=WV.WALLET, + return_wallet_and_hotkey=True, + ) + return self._run_command( + deriv.open_position( + subtensor=self.initialize_chain(network), + wallet=wallet, hotkey_ss58=hotkey, netuid=netuid, side=side, + amount=amount, prompt=prompt, json_output=json_output, + ) + ) - def deriv_advance( + def deriv_topup( self, - duration: str = typer.Argument( - ..., help="Time to advance, e.g. '30d', '12h', '7200b' or a block count." + network: Optional[list[str]] = Options.network, + wallet_name: str = Options.wallet_name, + wallet_path: str = Options.wallet_path, + wallet_hotkey: str = Options.wallet_hotkey, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + amount: float = typer.Option( + ..., "--amount", "-a", help="Amount to add to the carry buffer (TAO short / Alpha long)." ), - state_path: str = _deriv_state_opt(), + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Advance simulated time: decay carry, run restoration zaps, process defaults.""" - deriv.advance(state_path, duration) + """Top up a position's carry buffer (delays default; does not change the fixed liability).""" + self.verbosity_handler(quiet, verbose, json_output, prompt=False) + if not (side := self._deriv_side(side)): + return + wallet = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, + ) + return self._run_command( + deriv.top_up( + subtensor=self.initialize_chain(network), + wallet=wallet, netuid=netuid, side=side, amount=amount, + json_output=json_output, + ) + ) - def deriv_status( + def deriv_close( self, - state_path: str = _deriv_state_opt(), + network: Optional[list[str]] = Options.network, + wallet_name: str = Options.wallet_name, + wallet_path: str = Options.wallet_path, + wallet_hotkey: str = Options.wallet_hotkey, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + fraction: float = typer.Option( + 1.0, "--fraction", "-f", help="Fraction to close, in (0, 1] (1 = full close)." + ), + prompt: bool = Options.prompt, + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Show the pool, per-side utilization/carry/capacity, and your open positions.""" - deriv.status(state_path) + """Close (or partially close) a covered position and repay the fixed liability.""" + self.verbosity_handler(quiet, verbose, json_output, prompt) + if not (side := self._deriv_side(side)): + return + wallet = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, + ) + return self._run_command( + deriv.close_position( + subtensor=self.initialize_chain(network), + wallet=wallet, netuid=netuid, side=side, fraction=fraction, + prompt=prompt, json_output=json_output, + ) + ) - def deriv_reset( + def deriv_default( self, - tao: float = typer.Option(1000.0, "--tao", help="Initial pool TAO reserve."), - alpha: float = typer.Option( - 100_000.0, "--alpha", help="Initial pool Alpha reserve." - ), - enable_longs: bool = typer.Option( - True, "--enable-longs/--shorts-only", help="Enable the long side." + network: Optional[list[str]] = Options.network, + wallet_name: str = Options.wallet_name, + wallet_path: str = Options.wallet_path, + wallet_hotkey: str = Options.wallet_hotkey, + netuid: int = Options.netuid, + side: str = typer.Option("short", "--side", help="Position side: short or long."), + coldkey_ss58: str = typer.Option( + ..., "--coldkey-ss58", "--ss58", help="Coldkey of the position to default." ), - state_path: str = _deriv_state_opt(), + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): - """Reset the sandbox to a fresh pool (clears all positions).""" - deriv.reset(state_path, tao, alpha, enable_longs) + """Permissionlessly default a covered position whose buffer has reached dust.""" + self.verbosity_handler(quiet, verbose, json_output, prompt=False) + if not (side := self._deriv_side(side)): + return + wallet = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, + ) + return self._run_command( + deriv.default_position( + subtensor=self.initialize_chain(network), + wallet=wallet, coldkey_ss58=coldkey_ss58, netuid=netuid, side=side, + json_output=json_output, + ) + ) # Liquidity diff --git a/bittensor_cli/src/commands/deriv/deriv.py b/bittensor_cli/src/commands/deriv/deriv.py index 1ffdd5113..4a5b814cb 100644 --- a/bittensor_cli/src/commands/deriv/deriv.py +++ b/bittensor_cli/src/commands/deriv/deriv.py @@ -1,279 +1,365 @@ """ -Command handlers for the `btcli deriv` sandbox: render layer over `sim.py`. - -These commands are fully local (no chain, no wallet). They load a JSON state -file, mutate it through the simulator, render a rich view, and save it back. +On-chain covered long/short derivatives (`btcli deriv`). + +Drives the real `pallet-subtensor` covered continuous-unwind extrinsics +(`open_short` / `close_short` / `top_up_short` / `default_short` and the long +mirrors) and reads the `DerivativesRuntimeApi` quotes, positions, and per-subnet +market state. The economics: position input `P` (floor) + retained-proceeds +buffer `R` (decays as carry) + fixed liability `Q` (Alpha, short) / `D` (TAO, +long) that you repay to close. """ -from __future__ import annotations +import json +from typing import TYPE_CHECKING, Optional from rich.table import Table from bittensor_cli.src import COLORS -from bittensor_cli.src.bittensor.utils import console, print_error -from bittensor_cli.src.commands.deriv import sim - -C = COLORS.G # general palette +from bittensor_cli.src.bittensor.balances import Balance +from bittensor_cli.src.bittensor.utils import ( + console, + json_console, + print_error, + confirm_action, + unlock_key, +) + +if TYPE_CHECKING: + from bittensor_wallet import Wallet + from bittensor_cli.src.bittensor.subtensor_interface import SubtensorInterface + +PPB = 1_000_000_000 +C = COLORS.G _SHORT = "#C25E7C" # rose _LONG = "#53B5A0" # teal +# Per-side dispatch metadata: extrinsic suffix, runtime-api suffix, and which +# asset denominates the floor/buffer (`base`) vs the fixed liability (`liab`). +_SIDES = { + "short": {"base": "TAO", "liab": "Alpha", "liab_field": "alpha_liability"}, + "long": {"base": "Alpha", "liab": "TAO", "liab_field": "tao_liability"}, +} -def _fmt(x: float, dp: int = 4) -> str: - if x == float("inf"): - return "inf" - return f"{x:,.{dp}f}" +def _color(side: str) -> str: + return _SHORT if side == "short" else _LONG -def _days(x: float) -> str: - if x == float("inf"): - return "never" - if x >= 365: - return f"{x / 365:.2f} yr" - return f"{x:.1f} d" +def _amt(rao: Optional[int], unit: str) -> str: + if rao is None: + return "-" + return f"{rao / 1e9:,.4f} {unit}" -def _side_color(side: str) -> str: - return _SHORT if side == "short" else _LONG +def _pct(ppb: Optional[int]) -> str: + if ppb is None: + return "-" + return f"{ppb / 1e7:.3f}%" -# --------------------------------------------------------------------------- -def reset( - state_path: str, - tao: float, - alpha: float, - enable_longs: bool, -) -> None: - cfg = sim.Config(long_side_enabled=enable_longs) - state = sim.State.new(tao=tao, alpha=alpha, config=cfg) - sim.save_state(state, state_path) - console.print( - f"[{C.SUCCESS}]Sandbox reset.[/{C.SUCCESS}] " - f"Pool: [{C.BAL}]{_fmt(tao, 2)} TAO[/{C.BAL}] / " - f"[{_LONG}]{_fmt(alpha, 2)} Alpha[/{_LONG}] " - f"price=[{C.SYM}]{_fmt(state.pool.price, 6)}[/{C.SYM}] " - f"longs={'on' if enable_longs else 'off'}" - ) +async def _api(subtensor: "SubtensorInterface", method: str, params: list): + """Call a DerivativesRuntimeApi method, returning the decoded value or None.""" + res = await subtensor.query_runtime_api("DerivativesRuntimeApi", method, params) + return getattr(res, "value", res) -def quote(state_path: str, side: str, p_input: float) -> None: - state = sim.load_state(state_path) - try: - q = sim.quote(state, side, p_input) # type: ignore[arg-type] - except sim.SimError as e: - print_error(str(e)) - return - liab_unit = "Alpha" if side == "short" else "TAO" - floor_unit = "TAO" if side == "short" else "Alpha" - buf_unit = floor_unit # buffer is denominated in the floor asset +def _amount_to_rao(amount: float) -> int: + return int(round(amount * 1e9)) + +# --------------------------------------------------------------------------- +# Reads +# --------------------------------------------------------------------------- +def _render_open_quote(side: str, netuid: int, p_rao: int, q: dict) -> None: + meta = _SIDES[side] + base, liab = meta["base"], meta["liab"] t = Table( - title=f"[bold {_side_color(side)}]{side.upper()}[/] open quote " - f"(P = {_fmt(p_input, 4)} {floor_unit})", + title=f"[bold {_color(side)}]{side.upper()}[/] open quote " + f"netuid {netuid} P={_amt(p_rao, base)}", show_header=False, title_justify="left", ) t.add_column(style=C.SUBHEAD) t.add_column(style="white", justify="right") - - t.add_row("Position input P", f"{_fmt(q.p_input)} {floor_unit}") - t.add_row("Gross collateral C", f"{_fmt(q.gross_collateral)} {floor_unit}") - t.add_row("Retained proceeds N (→ R0)", f"{_fmt(q.retained_proceeds)} {buf_unit}") - t.add_row("Effective LTV", f"{q.effective_ltv * 100:.2f}%") - t.add_row(f"Fixed liability {'Q' if side == 'short' else 'D'}", f"{_fmt(q.liability)} {liab_unit}") - t.add_row("Linked escrow E", f"{_fmt(q.escrow)}") - t.add_row("Pool fraction φ", f"{q.pool_fraction * 100:.3f}%") - t.add_row("Price impact", f"{q.price_impact * 100:.3f}%") - t.add_row(" price before → after", f"{_fmt(q.price_before, 6)} → {_fmt(q.price_after, 6)}") - t.add_row("Daily carry (now)", f"{q.daily_carry * 100:.3f}%/day") - t.add_row("Time to dust min / max", f"{_days(q.min_days_to_dust)} / {_days(q.max_days_to_dust)}") - if side == "short": - t.add_row("Est. close cost K(Q)", f"{_fmt(q.est_close_cost)} TAO") - t.add_row("Break-even close price", f"{_fmt(q.break_even_price, 6)} TAO/Alpha") - t.add_row( - "Profitable if close cost <", - f"R = {_fmt(q.retained_proceeds)} | rational if < P+R = {_fmt(q.p_input + q.retained_proceeds)}", - ) - else: - t.add_row("Close: repay liability D", f"{_fmt(q.est_close_cost)} TAO") - t.add_row("Break-even Alpha price", f"{_fmt(q.break_even_price, 6)} TAO/Alpha") + t.add_row("Gross collateral C", _amt(q.get("gross_collateral"), base)) + t.add_row("Retained proceeds N (→ R0)", _amt(q.get("retained_proceeds"), base)) + t.add_row("Effective LTV", _pct(q.get("effective_ltv"))) + t.add_row( + f"Fixed liability {'Q' if side == 'short' else 'D'}", + _amt(q.get(meta["liab_field"]), liab), + ) + t.add_row("Linked escrow E", _amt(q.get("escrow"), base)) + t.add_row("Daily carry (now)", _pct(q.get("daily_decay"))) + if "est_close_cost" in q: + t.add_row("Est. close cost", _amt(q.get("est_close_cost"), "TAO")) console.print(t) -def open_(state_path: str, side: str, p_input: float) -> None: - state = sim.load_state(state_path) - try: - pos = sim.open_position(state, side, p_input) # type: ignore[arg-type] - except sim.SimError as e: - print_error(str(e)) +async def quote_open( + subtensor: "SubtensorInterface", + netuid: int, + side: str, + amount: float, + json_output: bool, +) -> None: + p_rao = _amount_to_rao(amount) + method = f"quote_open_{side}" + q = await _api(subtensor, method, [netuid, p_rao]) + if json_output: + json_console.print(json.dumps({"netuid": netuid, "side": side, "quote": q})) return - sim.save_state(state, state_path) - console.print( - f"[{C.SUCCESS}]Opened[/{C.SUCCESS}] [{_side_color(side)}]{side}[/] position " - f"[bold]#{pos.id}[/] P={_fmt(pos.p)} " - f"{'Q' if side == 'short' else 'D'}={_fmt(pos.liability)} R0={_fmt(pos.r_stored)}" - ) - _print_status(state) + if not q: + print_error( + f"No quote returned for a {side} on netuid {netuid}. " + f"Is the subnet dynamic, the side enabled, and P ≥ min input?" + ) + return + _render_open_quote(side, netuid, p_rao, q) -def top_up(state_path: str, position_id: int, amount: float) -> None: - state = sim.load_state(state_path) - try: - pos = sim.top_up(state, position_id, amount) - except sim.SimError as e: - print_error(str(e)) - return - sim.save_state(state, state_path) - console.print( - f"[{C.SUCCESS}]Topped up[/{C.SUCCESS}] #{pos.id} by {_fmt(amount)} → " - f"buffer R = {_fmt(pos.r_stored)}" +def _render_positions(side: str, positions: list) -> None: + meta = _SIDES[side] + base, liab = meta["base"], meta["liab"] + close_field = "est_close_cost" if side == "short" else "tao_to_close" + table = Table( + title=f"[bold {_color(side)}]{side.upper()}[/] positions", + title_justify="left", + show_header=True, + header_style=C.SUBHEAD_MAIN, ) + for col in ("netuid", "floor P", "buffer R", f"liability ({liab})", + "collateral P+R", "close cost", "carry/day", "→dust", "defaultable"): + table.add_column(col, justify="right") + for p in positions: + table.add_row( + str(p.get("netuid")), + _amt(p.get("floor"), base), + _amt(p.get("buffer"), base), + _amt(p.get(meta["liab_field"]), liab), + _amt(p.get("collateral_claim"), base), + _amt(p.get(close_field), "TAO"), + _pct(p.get("daily_decay")), + ("yes" if p.get("default_eligible") else "no"), + str(p.get("defaultable_at_block")), + ) + console.print(table) -def close(state_path: str, position_id: int, fraction: float) -> None: - state = sim.load_state(state_path) - try: - res = sim.close_position(state, position_id, fraction) - except sim.SimError as e: - print_error(str(e)) - return - sim.save_state(state, state_path) - pnl_color = C.SUCCESS if res.pnl >= 0 else _SHORT - verb = "Closed" if res.fully_closed else f"Partially closed ({fraction:.0%})" - payout_unit = "TAO" if res.side == "short" else "Alpha" - repay_unit = "Alpha" if res.side == "short" else "TAO" - console.print( - f"[{C.SUCCESS}]{verb}[/{C.SUCCESS}] #{res.position_id} " - f"repaid {_fmt(res.repaid)} {repay_unit} " - f"close-cost {_fmt(res.close_cost)} TAO " - f"payout {_fmt(res.payout)} {payout_unit} " - f"PnL [{pnl_color}]{_fmt(res.pnl)} TAO[/{pnl_color}]" - ) - _print_status(state) +def _render_close_quote(side: str, fraction: float, cq: dict) -> None: + if side == "short": + body = ( + f"repay {_amt(cq.get('repay_alpha'), 'Alpha')} · " + f"return {_amt(cq.get('returned_tao'), 'TAO')} · " + f"buyback ~{_amt(cq.get('est_buyback_cost'), 'TAO')} " + f"(held {_amt(cq.get('alpha_held'), 'Alpha')}, " + f"need {_amt(cq.get('alpha_needed'), 'Alpha')})" + ) + else: + body = ( + f"repay {_amt(cq.get('repay_tao'), 'TAO')} · " + f"return {_amt(cq.get('returned_alpha'), 'Alpha')} · " + f"escrow settled {_amt(cq.get('escrow_settled'), 'Alpha')}" + ) + console.print(f"[{C.SUBHEAD}]Close {fraction:.0%} quote:[/{C.SUBHEAD}] {body}") -def advance(state_path: str, duration: str) -> None: - state = sim.load_state(state_path) - try: - blocks = sim.parse_duration(duration) - rep = sim.advance(state, blocks) - except (sim.SimError, ValueError) as e: - print_error(str(e)) +async def show_positions( + subtensor: "SubtensorInterface", + coldkey_ss58: str, + side: str, + netuid: Optional[int], + json_output: bool, +) -> None: + if netuid is not None: + pos = await _api(subtensor, f"get_{side}_position", [coldkey_ss58, netuid]) + positions = [pos] if pos else [] + else: + positions = await _api(subtensor, f"get_{side}_positions", [coldkey_ss58]) or [] + if json_output: + json_console.print(json.dumps({"side": side, "positions": positions})) return - sim.save_state(state, state_path) - console.print( - f"[{C.SUBHEAD}]Advanced[/{C.SUBHEAD}] {rep.blocks} blocks ({rep.days:.2f} d). " - f"restored: [{_SHORT}]{_fmt(rep.restored_tao)} TAO[/{_SHORT}] (short) / " - f"[{_LONG}]{_fmt(rep.restored_alpha)} Alpha[/{_LONG}] (long). " - f"price {_fmt(rep.price_before, 6)} → {_fmt(rep.price_after, 6)}" - ) - if rep.defaults: - console.print(f"[{_SHORT}]Defaulted positions: {rep.defaults}[/{_SHORT}]") - _print_status(state) + if not positions: + console.print(f"[{C.SUBHEAD_EX_1}]No open {side} positions.[/{C.SUBHEAD_EX_1}]") + return + _render_positions(side, positions) -def status(state_path: str) -> None: - state = sim.load_state(state_path) - _print_status(state) +async def show_market( + subtensor: "SubtensorInterface", + netuid: int, + side: str, + json_output: bool, +) -> None: + st = await _api(subtensor, f"get_subnet_{side}_state", [netuid]) + if json_output: + json_console.print(json.dumps({"netuid": netuid, "side": side, "state": st})) + return + if not st: + print_error(f"No {side} market state for netuid {netuid} (subnet may not exist).") + return + meta = _SIDES[side] + ref_label = "T_ref" if side == "short" else "A_ref" + ref_unit = "TAO" if side == "short" else "Alpha" + oi_field = "open_interest_alpha" if side == "short" else "open_interest_tao" + oi_unit = "Alpha" if side == "short" else "TAO" + t = Table( + title=f"[bold {_color(side)}]{side.upper()}[/] market netuid {netuid}", + show_header=False, + title_justify="left", + ) + t.add_column(style=C.SUBHEAD) + t.add_column(style="white", justify="right") + t.add_row("Enabled", str(st.get(f"{side}s_enabled"))) + t.add_row("Base LTV λ", _pct(st.get("base_ltv"))) + t.add_row("Footprint cap κ", _pct(st.get("kappa"))) + t.add_row(f"Reference reserve {ref_label}", _amt(st.get("t_ref" if side == "short" else "a_ref"), ref_unit)) + t.add_row("Footprint used / cap", f"{_amt(st.get('footprint_used'), ref_unit)} / {_amt(st.get('footprint_cap'), ref_unit)}") + t.add_row("Footprint remaining", _amt(st.get("footprint_remaining"), ref_unit)) + t.add_row("Current daily carry", _pct(st.get("current_daily_decay"))) + t.add_row("Decay min / max", f"{_pct(st.get('decay_min'))} / {_pct(st.get('decay_max'))}") + t.add_row("Aggregate buffer R", _amt(st.get("buffer_total"), ref_unit)) + t.add_row("Aggregate escrow E", _amt(st.get("escrow_total"), ref_unit)) + t.add_row(f"Open interest ({oi_unit})", _amt(st.get(oi_field), oi_unit)) + t.add_row("Min input", _amt(st.get("min_input"), ref_unit)) + t.add_row("Dust threshold", _amt(st.get("dust_threshold"), ref_unit)) + t.add_row("Default grace (blocks)", str(st.get("default_grace"))) + console.print(t) # --------------------------------------------------------------------------- -def _print_status(state: sim.State) -> None: - pool = state.pool - cfg = state.config - - # Pool + per-side aggregate / capacity view (the "interaction" dashboard). - u_s = sim._utilization(state, "short") - u_l = sim._utilization(state, "long") - cap_s = cfg.kappa_short * min(pool.t, pool.t_ema) - cap_l = cfg.kappa_long * min(pool.a, pool.a_ema) - carry_s = cfg.d_min + (cfg.d_max - cfg.d_min) * u_s * u_s - carry_l = cfg.d_min + (cfg.d_max - cfg.d_min) * u_l * u_l - - pt = Table( - title=f"[bold {C.HEADER}]Sandbox pool[/] block {state.block} ({state.block / sim.BLOCKS_PER_DAY:.2f} d)", - title_justify="left", - show_header=True, - header_style=C.SUBHEAD_MAIN, +# Writes (extrinsics) +# --------------------------------------------------------------------------- +def _report(success: bool, message: str, ok_msg: str, json_output: bool) -> tuple: + if json_output: + json_console.print(json.dumps({"success": success, "message": message})) + elif success: + console.print(f"[{C.SUCCESS}]{ok_msg}[/{C.SUCCESS}]") + else: + print_error(f"Error: {message}") + return success, message + + +async def open_position( + subtensor: "SubtensorInterface", + wallet: "Wallet", + hotkey_ss58: str, + netuid: int, + side: str, + amount: float, + prompt: bool, + json_output: bool, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = False, +) -> tuple: + base = _SIDES[side]["base"] + p_rao = _amount_to_rao(amount) + q = await _api(subtensor, f"quote_open_{side}", [netuid, p_rao]) + if not q: + return _report(False, f"No quote for {side} on netuid {netuid}", "", json_output) + if not json_output: + _render_open_quote(side, netuid, p_rao, q) + if prompt and not confirm_action( + f"Open a {side} on netuid {netuid} with P = {_amt(p_rao, base)}?" + ): + return False, "Cancelled" + if not (unlock := unlock_key(wallet)).success: + return _report(False, unlock.message, "", json_output) + + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function=f"open_{side}", + call_params={"hotkey": hotkey_ss58, "netuid": netuid, "position_input": p_rao}, ) - pt.add_column("") - pt.add_column("TAO", justify="right") - pt.add_column("Alpha", justify="right") - pt.add_row("Reserves (live)", _fmt(pool.t, 2), _fmt(pool.a, 2)) - pt.add_row("Reserves (EMA)", _fmt(pool.t_ema, 2), _fmt(pool.a_ema, 2)) - pt.add_row("Price (TAO/Alpha)", _fmt(pool.price, 6), "") - console.print(pt) - - st = Table(show_header=True, header_style=C.SUBHEAD_MAIN, title_justify="left") - st.add_column("Side") - st.add_column("Util u", justify="right") - st.add_column("Footprint S", justify="right") - st.add_column("Capacity Smax", justify="right") - st.add_column("Carry/day", justify="right") - st.add_column("Σ buffer R", justify="right") - st.add_column("Σ escrow E", justify="right") - st.add_column("Σ liability", justify="right") - st.add_row( - f"[{_SHORT}]short[/{_SHORT}]", - f"{u_s * 100:.1f}%", - _fmt(state.b_sigma_short, 3), - _fmt(cap_s, 3), - f"{carry_s * 100:.3f}%", - _fmt(state.r_sigma_short, 3), - _fmt(state.e_sigma_short, 3), - f"{_fmt(state.q_sigma_short, 2)} α", + success, message, _ = await subtensor.sign_and_send_extrinsic( + call=call, wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, ) - st.add_row( - f"[{_LONG}]long[/{_LONG}]", - f"{u_l * 100:.1f}%", - _fmt(state.b_sigma_long, 3), - _fmt(cap_l, 3), - f"{carry_l * 100:.3f}%", - _fmt(state.r_sigma_long, 3), - _fmt(state.e_sigma_long, 3), - f"{_fmt(state.d_sigma_long, 2)} τ", + return _report(success, message, f"Opened {side} position on netuid {netuid}.", json_output) + + +async def top_up( + subtensor: "SubtensorInterface", + wallet: "Wallet", + netuid: int, + side: str, + amount: float, + json_output: bool, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = False, +) -> tuple: + if not (unlock := unlock_key(wallet)).success: + return _report(False, unlock.message, "", json_output) + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function=f"top_up_{side}", + call_params={"netuid": netuid, "amount": _amount_to_rao(amount)}, ) - console.print(st) - - open_positions = state.open_positions() - if not open_positions: - console.print(f"[{C.SUBHEAD_EX_1}]No open positions.[/{C.SUBHEAD_EX_1}]") - return - - pos_t = Table( - title="[bold]Open positions[/]", - title_justify="left", - show_header=True, - header_style=C.SUBHEAD_MAIN, + success, message, _ = await subtensor.sign_and_send_extrinsic( + call=call, wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, ) - pos_t.add_column("#", justify="right") - pos_t.add_column("Side") - pos_t.add_column("Floor P", justify="right") - pos_t.add_column("Buffer R", justify="right") - pos_t.add_column("Liability", justify="right") - pos_t.add_column("Close cost", justify="right") - pos_t.add_column("Status / PnL if closed", justify="right") - for p in open_positions: - r, e, b = p.materialized(state.omega(p.side)) - if p.side == "short": - if p.liability < pool.a: - cost = pool.t * p.liability / (pool.a - p.liability) - else: - cost = float("inf") - pnl = (p.p + r) - cost - p.p # = R - close_cost - liab_str = f"{_fmt(p.liability, 2)} α" - else: - cost = p.liability # repay D tao - pnl = (p.p + r) * pool.price - cost - p.p * pool.price - liab_str = f"{_fmt(p.liability, 2)} τ" - pnl_color = C.SUCCESS if pnl >= 0 else _SHORT - pos_t.add_row( - str(p.id), - f"[{_side_color(p.side)}]{p.side}[/]", - _fmt(p.p, 3), - _fmt(r, 3), - liab_str, - _fmt(cost, 3), - f"[{pnl_color}]{_fmt(pnl, 3)} TAO[/{pnl_color}]", - ) - console.print(pos_t) + return _report(success, message, f"Topped up {side} buffer on netuid {netuid}.", json_output) + + +async def close_position( + subtensor: "SubtensorInterface", + wallet: "Wallet", + netuid: int, + side: str, + fraction: float, + prompt: bool, + json_output: bool, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = False, +) -> tuple: + if not (0 < fraction <= 1): + return _report(False, "fraction must be in (0, 1]", "", json_output) + fraction_ppb = int(round(fraction * PPB)) + ck = wallet.coldkeypub.ss58_address + cq = await _api(subtensor, f"quote_close_{side}", [ck, netuid, fraction_ppb]) + if cq and not json_output: + _render_close_quote(side, fraction, cq) + if prompt and not confirm_action( + f"Close {fraction:.0%} of your {side} on netuid {netuid}?" + ): + return False, "Cancelled" + if not (unlock := unlock_key(wallet)).success: + return _report(False, unlock.message, "", json_output) + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function=f"close_{side}", + call_params={"netuid": netuid, "fraction_ppb": fraction_ppb}, + ) + success, message, _ = await subtensor.sign_and_send_extrinsic( + call=call, wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + ) + return _report(success, message, f"Closed {fraction:.0%} of {side} on netuid {netuid}.", json_output) + + +async def default_position( + subtensor: "SubtensorInterface", + wallet: "Wallet", + coldkey_ss58: str, + netuid: int, + side: str, + json_output: bool, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = False, +) -> tuple: + if not (unlock := unlock_key(wallet)).success: + return _report(False, unlock.message, "", json_output) + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function=f"default_{side}", + call_params={"coldkey": coldkey_ss58, "netuid": netuid}, + ) + success, message, _ = await subtensor.sign_and_send_extrinsic( + call=call, wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + ) + return _report(success, message, f"Defaulted {side} position on netuid {netuid}.", json_output) diff --git a/bittensor_cli/src/commands/deriv/sim.py b/bittensor_cli/src/commands/deriv/sim.py deleted file mode 100644 index 32499da2f..000000000 --- a/bittensor_cli/src/commands/deriv/sim.py +++ /dev/null @@ -1,550 +0,0 @@ -""" -Local simulator for the Fixed-Liability Covered Continuous-Unwind model (spec v3.6.1). - -This is a deterministic, in-process sandbox: no chain, no wallet, no signing. It -implements the closed-form math from the spec (see appendix A) against a single -fake CPMM pool so a user can open / top-up / close short and long positions, -advance simulated time, and watch carry, break-even, close cost and pool price -push on each other. - -Simplifications relative to production (intentional, see spec section 14.6): - * No pool fees. The closed-form "no-fee CPMM core" is used everywhere. - * Single pool (one subnet). Positions share that pool, which is the whole point: - it lets you see how shorts and longs interact through price and utilization. - * No real defaults scheduling / MEV / drand. Buffer-reaches-dust default is - processed deterministically on advance. - * Long side is enabled by default here so both sides can be explored. The spec - launches shorts-first with longs gated; that flag lives in `Config`. -""" - -from __future__ import annotations - -import json -import math -import os -from dataclasses import dataclass, field, asdict -from pathlib import Path -from typing import Literal, Optional - -Side = Literal["short", "long"] - -BLOCKS_PER_DAY = 7200 # 12s blocks -DEFAULT_STATE_PATH = os.path.expanduser("~/.bittensor/deriv_sim.json") - - -class SimError(Exception): - """Raised when an operation violates a spec rule (rejected open, bad fraction...).""" - - -@dataclass -class Config: - """Policy parameters. Defaults follow the spec's conservative starting set (section 14.1).""" - - lambda_short: float = 0.50 - lambda_long: float = 0.50 - kappa_short: float = 0.33 # active-footprint cap fraction (~ phi_cap 1/3) - kappa_long: float = 0.25 - d_min: float = 0.001 # 0.1%/day at zero utilization - d_max: float = 0.015 # 1.5%/day at full utilization - r_dust: float = 1.0 # buffer dust threshold (in the side's buffer asset) - ema_halflife_blocks: int = BLOCKS_PER_DAY # lagged EMA reference - long_side_enabled: bool = True # spec default is False (shorts-first); on here to explore both - - -@dataclass -class Position: - id: int - side: Side - # Non-decaying floor supplied by the trader (TAO for short, Alpha for long). - p: float - # Fixed liability: Alpha (Q) for a short, TAO (D) for a long. Does not decay. - liability: float - # Stored (last-materialized) decaying components. - r_stored: float # retained-proceeds buffer - e_stored: float # linked escrow - b_stored: float # utilization footprint - omega_entry: float - status: str = "open" # open | closed | defaulted - - def materialized(self, omega_side: float) -> tuple[float, float, float]: - """Return current (R, E, B) given the side accumulator, without mutating.""" - f = math.exp(-(omega_side - self.omega_entry)) - return self.r_stored * f, self.e_stored * f, self.b_stored * f - - -@dataclass -class Pool: - # Live CPMM reserves. - a: float # Alpha reserve - t: float # TAO reserve - # Lagged EMA references for risk sizing. - a_ema: float - t_ema: float - - @property - def price(self) -> float: - """Alpha price in TAO = TAO reserve / Alpha reserve.""" - return self.t / self.a - - -@dataclass -class State: - pool: Pool - config: Config - # Side accumulators (monotonic) and aggregate current components. - omega_short: float = 0.0 - omega_long: float = 0.0 - r_sigma_short: float = 0.0 - e_sigma_short: float = 0.0 - b_sigma_short: float = 0.0 - r_sigma_long: float = 0.0 - e_sigma_long: float = 0.0 - b_sigma_long: float = 0.0 - q_sigma_short: float = 0.0 # aggregate fixed Alpha liability - d_sigma_long: float = 0.0 # aggregate fixed TAO liability - block: int = 0 - next_id: int = 1 - positions: list[Position] = field(default_factory=list) - - # ---- persistence ------------------------------------------------------- - def to_json(self) -> str: - d = asdict(self) - return json.dumps(d, indent=2) - - @classmethod - def from_json(cls, raw: str) -> "State": - d = json.loads(raw) - pool = Pool(**d["pool"]) - config = Config(**d["config"]) - positions = [Position(**p) for p in d["positions"]] - d.update(pool=pool, config=config, positions=positions) - return cls(**d) - - @classmethod - def new( - cls, - tao: float = 1000.0, - alpha: float = 100_000.0, - config: Optional[Config] = None, - ) -> "State": - pool = Pool(a=alpha, t=tao, a_ema=alpha, t_ema=tao) - return cls(pool=pool, config=config or Config()) - - # ---- side helpers ------------------------------------------------------ - def omega(self, side: Side) -> float: - return self.omega_short if side == "short" else self.omega_long - - def open_positions(self, side: Optional[Side] = None) -> list[Position]: - return [ - p - for p in self.positions - if p.status == "open" and (side is None or p.side == side) - ] - - -def load_state(path: str = DEFAULT_STATE_PATH) -> State: - p = Path(path) - if not p.exists(): - state = State.new() - save_state(state, path) - return state - return State.from_json(p.read_text()) - - -def save_state(state: State, path: str = DEFAULT_STATE_PATH) -> None: - p = Path(path) - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text(state.to_json()) - - -# --------------------------------------------------------------------------- -# Pure pricing helpers (spec appendix A) -# --------------------------------------------------------------------------- -def _solve_collateral(p_input: float, lam: float, s: float, ref: float) -> float: - """Solve gross collateral C from user input P (spec 4.2). Positive root.""" - a = lam * lam / ref - b = 1.0 - lam + 2.0 * lam * s / ref - return (-b + math.sqrt(b * b + 4.0 * a * p_input)) / (2.0 * a) - - -def _phi_from_n(n: float, live_ref: float) -> float: - """Pool fraction that generates retained proceeds N (spec 4.3). Smaller root.""" - inner = 1.0 - 4.0 * n / live_ref - if inner < 0: - raise SimError( - f"remove-and-sell-back domain failed: 4N ({4 * n:.4f}) > live reserve ({live_ref:.4f})" - ) - return (1.0 - math.sqrt(inner)) / 2.0 - - -@dataclass -class Quote: - """Pre-trade preview (spec 1.2: what the trader should see before opening).""" - - side: Side - p_input: float - gross_collateral: float - retained_proceeds: float # N -> R0 - effective_ltv: float - pool_fraction: float # phi - price_impact: float # delta - liability: float # Q (alpha) for short, D (tao) for long - escrow: float - footprint: float # B - daily_carry: float # at current utilization - min_days_to_dust: float # decay at d_max - max_days_to_dust: float # decay at d_min - est_close_cost: float # short: TAO to buy Q alpha; long: D tao - break_even_price: float # alpha price at which close breaks even - price_before: float - price_after: float - - -def _days_to_dust(r0: float, daily_decay: float, r_dust: float) -> float: - if r0 <= r_dust: - return 0.0 - if daily_decay <= 0: - return math.inf - return math.log(r_dust / r0) / math.log(1.0 - daily_decay) - - -def quote(state: State, side: Side, p_input: float) -> Quote: - """Compute an open preview without mutating state.""" - cfg = state.config - pool = state.pool - if side == "long" and not cfg.long_side_enabled: - raise SimError("long side is disabled (spec launch posture is shorts-first)") - if p_input <= 0: - raise SimError("position input P must be positive") - - if side == "short": - lam, kappa = cfg.lambda_short, cfg.kappa_short - ref = min(pool.t, pool.t_ema) # T_ref - s = state.b_sigma_short - live_ref = pool.t - else: - lam, kappa = cfg.lambda_long, cfg.kappa_long - ref = min(pool.a, pool.a_ema) # A_ref - s = state.b_sigma_long - live_ref = pool.a - - c = _solve_collateral(p_input, lam, s, ref) - n = c - p_input - if n <= 0: - raise SimError( - f"open rejected: effective LTV <= 0 at this utilization (N={n:.4f})" - ) - b = lam * c - if s + b > kappa * ref: - raise SimError( - f"open rejected: footprint cap exceeded (S+B={s + b:.4f} > kappa*ref={kappa * ref:.4f})" - ) - phi = _phi_from_n(n, live_ref) - - if side == "short": - liability = phi * pool.a # Q alpha - escrow = phi * pool.t # E tao - price_impact = 1.0 - (1.0 - phi) ** 2 # delta_S (downward) - price_after = pool.t * (1.0 - phi) ** 2 / pool.a - # close cost: TAO to buy Q alpha out of the pool (trader holds none) - est_close = ( - pool.t * liability / (pool.a - liability) - if liability < pool.a - else math.inf - ) - break_even_price = est_close / liability if liability > 0 else 0.0 - else: - liability = phi * pool.t # D tao - escrow = phi * pool.a # E alpha - price_impact = 1.0 / (1.0 - phi) ** 2 - 1.0 # delta_L (upward) - price_after = pool.t / (pool.a * (1.0 - phi) ** 2) - # long break-even: alpha price at which returned (P+R) alpha covers liability D - break_even_price = liability / (p_input + n) if (p_input + n) > 0 else 0.0 - est_close = liability # repay D tao - - u = _utilization(state, side) - daily = cfg.d_min + (cfg.d_max - cfg.d_min) * u * u - return Quote( - side=side, - p_input=p_input, - gross_collateral=c, - retained_proceeds=n, - effective_ltv=n / c, - pool_fraction=phi, - price_impact=price_impact, - liability=liability, - escrow=escrow, - footprint=b, - daily_carry=daily, - min_days_to_dust=_days_to_dust(n, cfg.d_max, cfg.r_dust), - max_days_to_dust=_days_to_dust(n, cfg.d_min, cfg.r_dust), - est_close_cost=est_close, - break_even_price=break_even_price, - price_before=pool.price, - price_after=price_after, - ) - - -def _utilization(state: State, side: Side) -> float: - cfg = state.config - if side == "short": - denom = cfg.kappa_short * state.pool.t_ema - s = state.b_sigma_short - else: - denom = cfg.kappa_long * state.pool.a_ema - s = state.b_sigma_long - if denom <= 0: - return 0.0 - return min(1.0, s / denom) - - -# --------------------------------------------------------------------------- -# Mutating operations -# --------------------------------------------------------------------------- -def open_position(state: State, side: Side, p_input: float) -> Position: - q = quote(state, side, p_input) - pool = state.pool - if side == "short": - # remove-and-sell-back: A unchanged, T -> (1-phi)^2 T - pool.t = pool.t * (1.0 - q.pool_fraction) ** 2 - state.r_sigma_short += q.retained_proceeds - state.e_sigma_short += q.escrow - state.b_sigma_short += q.footprint - state.q_sigma_short += q.liability - omega_entry = state.omega_short - else: - # mirror: T unchanged, A -> (1-phi)^2 A - pool.a = pool.a * (1.0 - q.pool_fraction) ** 2 - state.r_sigma_long += q.retained_proceeds - state.e_sigma_long += q.escrow - state.b_sigma_long += q.footprint - state.d_sigma_long += q.liability - omega_entry = state.omega_long - - pos = Position( - id=state.next_id, - side=side, - p=p_input, - liability=q.liability, - r_stored=q.retained_proceeds, - e_stored=q.escrow, - b_stored=q.footprint, - omega_entry=omega_entry, - ) - state.next_id += 1 - state.positions.append(pos) - return pos - - -def _get_open(state: State, position_id: int) -> Position: - for p in state.positions: - if p.id == position_id: - if p.status != "open": - raise SimError(f"position {position_id} is {p.status}, not open") - return p - raise SimError(f"no open position with id {position_id}") - - -def _materialize(state: State, pos: Position) -> None: - """Fold elapsed decay into a single position's stored components.""" - omega = state.omega(pos.side) - f = math.exp(-(omega - pos.omega_entry)) - pos.r_stored *= f - pos.e_stored *= f - pos.b_stored *= f - pos.omega_entry = omega - - -def top_up(state: State, position_id: int, amount: float) -> Position: - if amount <= 0: - raise SimError("top-up amount must be positive") - pos = _get_open(state, position_id) - if pos.side == "long" and not state.config.long_side_enabled: - raise SimError("long side is disabled") - _materialize(state, pos) - pos.r_stored += amount - if pos.side == "short": - state.r_sigma_short += amount - else: - state.r_sigma_long += amount - return pos - - -@dataclass -class CloseResult: - position_id: int - fraction: float - side: Side - repaid: float # alpha (short) or tao (long) returned to cover liability - payout: float # P+R slice returned to the trader (TAO short / Alpha long) - close_cost: float # market cost to source the repaid liability asset - pnl: float # payout - close_cost - capital_consumed_for_this_slice - fully_closed: bool - - -def close_position( - state: State, position_id: int, fraction: float = 1.0 -) -> CloseResult: - if fraction <= 0 or fraction > 1: - raise SimError("fraction must be in (0, 1]") - pos = _get_open(state, position_id) - _materialize(state, pos) - pool = state.pool - rho = fraction - - repaid = rho * pos.liability - payout = rho * (pos.p + pos.r_stored) - - if pos.side == "short": - # close cost: buy `repaid` alpha out of the pool - if repaid >= pool.a: - raise SimError("close cost unbounded: liability exceeds pool Alpha") - close_cost = pool.t * repaid / (pool.a - repaid) - # settlement zap injects (alpha=repaid, tao=rho*E) into the pool - pool.a += repaid - pool.t += rho * pos.e_stored - state.q_sigma_short -= repaid - state.r_sigma_short -= rho * pos.r_stored - state.e_sigma_short -= rho * pos.e_stored - state.b_sigma_short -= rho * pos.b_stored - pnl = payout - close_cost - rho * pos.p - else: - # long repays `repaid` TAO; settlement zap injects (alpha=rho*E, tao=repaid) - close_cost = repaid # D tao - pool.a += rho * pos.e_stored - pool.t += repaid - state.d_sigma_long -= repaid - state.r_sigma_long -= rho * pos.r_stored - state.e_sigma_long -= rho * pos.e_stored - state.b_sigma_long -= rho * pos.b_stored - # payout is Alpha; value vs capital consumed in TAO terms at current price - pnl = payout * pool.price - close_cost - rho * pos.p * pool.price - - pos.p *= 1.0 - rho - pos.liability *= 1.0 - rho - pos.r_stored *= 1.0 - rho - pos.e_stored *= 1.0 - rho - pos.b_stored *= 1.0 - rho - fully = rho >= 1.0 or pos.p <= 1e-12 - if fully: - pos.status = "closed" - return CloseResult( - position_id=position_id, - fraction=rho, - side=pos.side, - repaid=repaid, - payout=payout, - close_cost=close_cost, - pnl=pnl, - fully_closed=fully, - ) - - -@dataclass -class AdvanceReport: - blocks: int - days: float - restored_tao: float # short-side restoration injected into pool TAO - restored_alpha: float # long-side restoration injected into pool Alpha - defaults: list[int] = field(default_factory=list) - price_before: float = 0.0 - price_after: float = 0.0 - - -def _decay_side(state: State, side: Side, blocks: int) -> float: - """Apply aggregate block-step unwind for one side, return restoration amount.""" - cfg = state.config - u = _utilization(state, side) - d_day = cfg.d_min + (cfg.d_max - cfg.d_min) * u * u - g = (1.0 - d_day) ** (blocks / BLOCKS_PER_DAY) - if side == "short": - r, e, b = ( - state.r_sigma_short, - state.e_sigma_short, - state.b_sigma_short, - ) - restored = (r + e) * (1.0 - g) - state.r_sigma_short = r * g - state.e_sigma_short = e * g - state.b_sigma_short = b * g - state.omega_short += -math.log(g) if g > 0 else 0.0 - state.pool.t += restored # restoration zap nets to one-sided TAO injection - else: - r, e, b = ( - state.r_sigma_long, - state.e_sigma_long, - state.b_sigma_long, - ) - restored = (r + e) * (1.0 - g) - state.r_sigma_long = r * g - state.e_sigma_long = e * g - state.b_sigma_long = b * g - state.omega_long += -math.log(g) if g > 0 else 0.0 - state.pool.a += restored # mirror: one-sided Alpha injection - return restored - - -def _update_ema(state: State, blocks: int) -> None: - cfg = state.config - pool = state.pool - alpha = 1.0 - math.exp(-blocks / max(1, cfg.ema_halflife_blocks)) - pool.t_ema += (pool.t - pool.t_ema) * alpha - pool.a_ema += (pool.a - pool.a_ema) * alpha - - -def _process_defaults(state: State) -> list[int]: - defaulted = [] - for pos in state.open_positions(): - _materialize(state, pos) - if pos.r_stored <= state.config.r_dust: - if pos.side == "short": - state.pool.t += pos.r_stored + pos.e_stored - state.r_sigma_short -= pos.r_stored - state.e_sigma_short -= pos.e_stored - state.b_sigma_short -= pos.b_stored - state.q_sigma_short -= pos.liability - else: - state.pool.a += pos.r_stored + pos.e_stored - state.r_sigma_long -= pos.r_stored - state.e_sigma_long -= pos.e_stored - state.b_sigma_long -= pos.b_stored - state.d_sigma_long -= pos.liability - # floor P is recycled out of the pool (lost to the trader) - pos.r_stored = pos.e_stored = pos.b_stored = 0.0 - pos.liability = 0.0 - pos.p = 0.0 - pos.status = "defaulted" - defaulted.append(pos.id) - return defaulted - - -def advance(state: State, blocks: int) -> AdvanceReport: - if blocks <= 0: - raise SimError("blocks must be positive") - price_before = state.pool.price - restored_tao = _decay_side(state, "short", blocks) - restored_alpha = _decay_side(state, "long", blocks) - _update_ema(state, blocks) - defaults = _process_defaults(state) - state.block += blocks - return AdvanceReport( - blocks=blocks, - days=blocks / BLOCKS_PER_DAY, - restored_tao=restored_tao, - restored_alpha=restored_alpha, - defaults=defaults, - price_before=price_before, - price_after=state.pool.price, - ) - - -def parse_duration(text: str) -> int: - """Parse '7200', '30d', '12h', '100b' into a block count.""" - t = text.strip().lower() - if t.endswith("d"): - return int(round(float(t[:-1]) * BLOCKS_PER_DAY)) - if t.endswith("h"): - return int(round(float(t[:-1]) * BLOCKS_PER_DAY / 24)) - if t.endswith("b"): - return int(float(t[:-1])) - return int(float(t)) From 33a7ac2524304bad3936e1ac9704dca6c2071e8f Mon Sep 17 00:00:00 2001 From: unconst Date: Thu, 18 Jun 2026 12:33:05 -0600 Subject: [PATCH 3/6] Simplify deriv commands for expressivity with simplicity - Drop `default` from the CLI surface: it's a permissionless keeper op on other accounts' dusted positions, not something a user runs to manage their own positions. The on-chain extrinsic remains available. - Trim `positions` from 9 (truncating) columns to 7 plain-language ones: you paid P / buffer R / you owe / you'd get P+R / close cost / health. - Trim `market` to the 5 fields you need to decide to open: open allowed, base LTV, capacity left, min input, current carry. - Trim `quote` to plain terms (you pay / buffer / you owe / LTV / carry / close cost); drop internal gross-collateral and escrow rows. Core surface: quote, positions, market, open, topup, close. Co-authored-by: Cursor --- bittensor_cli/cli.py | 34 +-------- bittensor_cli/src/commands/deriv/deriv.py | 84 +++++++---------------- 2 files changed, 26 insertions(+), 92 deletions(-) diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index ab3f2aae3..c9ca102af 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -1438,7 +1438,6 @@ def __init__(self): self.deriv_app.command("open", rich_help_panel=_DERIV_LIFE)(self.deriv_open) self.deriv_app.command("topup", rich_help_panel=_DERIV_LIFE)(self.deriv_topup) self.deriv_app.command("close", rich_help_panel=_DERIV_LIFE)(self.deriv_close) - self.deriv_app.command("default", rich_help_panel=_DERIV_LIFE)(self.deriv_default) # utils app self.utils_app.command("convert")(self.convert) @@ -9156,7 +9155,7 @@ def deriv_market( verbose: bool = Options.verbose, json_output: bool = Options.json_output, ): - """Show the per-subnet derivative market state (caps, utilization, carry, open interest).""" + """Show a subnet's derivative market: whether opens are allowed, LTV, capacity left, min input, carry.""" self.verbosity_handler(quiet, verbose, json_output, prompt=False) if not (side := self._deriv_side(side)): return @@ -9263,37 +9262,6 @@ def deriv_close( ) ) - def deriv_default( - self, - network: Optional[list[str]] = Options.network, - wallet_name: str = Options.wallet_name, - wallet_path: str = Options.wallet_path, - wallet_hotkey: str = Options.wallet_hotkey, - netuid: int = Options.netuid, - side: str = typer.Option("short", "--side", help="Position side: short or long."), - coldkey_ss58: str = typer.Option( - ..., "--coldkey-ss58", "--ss58", help="Coldkey of the position to default." - ), - quiet: bool = Options.quiet, - verbose: bool = Options.verbose, - json_output: bool = Options.json_output, - ): - """Permissionlessly default a covered position whose buffer has reached dust.""" - self.verbosity_handler(quiet, verbose, json_output, prompt=False) - if not (side := self._deriv_side(side)): - return - wallet = self.wallet_ask( - wallet_name, wallet_path, wallet_hotkey, - ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, - ) - return self._run_command( - deriv.default_position( - subtensor=self.initialize_chain(network), - wallet=wallet, coldkey_ss58=coldkey_ss58, netuid=netuid, side=side, - json_output=json_output, - ) - ) - # Liquidity def liquidity_add( diff --git a/bittensor_cli/src/commands/deriv/deriv.py b/bittensor_cli/src/commands/deriv/deriv.py index 4a5b814cb..00b556078 100644 --- a/bittensor_cli/src/commands/deriv/deriv.py +++ b/bittensor_cli/src/commands/deriv/deriv.py @@ -2,11 +2,12 @@ On-chain covered long/short derivatives (`btcli deriv`). Drives the real `pallet-subtensor` covered continuous-unwind extrinsics -(`open_short` / `close_short` / `top_up_short` / `default_short` and the long -mirrors) and reads the `DerivativesRuntimeApi` quotes, positions, and per-subnet -market state. The economics: position input `P` (floor) + retained-proceeds -buffer `R` (decays as carry) + fixed liability `Q` (Alpha, short) / `D` (TAO, -long) that you repay to close. +(`open_short` / `top_up_short` / `close_short` and the long mirrors) and reads +the `DerivativesRuntimeApi` quotes, positions, and per-subnet market state. + +The economics in three terms: position input `P` (your capital / floor) + +retained-proceeds buffer `R` (decays over time as carry) + a fixed liability +`Q` (Alpha, short) / `D` (TAO, long) that you repay to close. """ import json @@ -81,16 +82,15 @@ def _render_open_quote(side: str, netuid: int, p_rao: int, q: dict) -> None: ) t.add_column(style=C.SUBHEAD) t.add_column(style="white", justify="right") - t.add_row("Gross collateral C", _amt(q.get("gross_collateral"), base)) - t.add_row("Retained proceeds N (→ R0)", _amt(q.get("retained_proceeds"), base)) - t.add_row("Effective LTV", _pct(q.get("effective_ltv"))) + t.add_row("You pay P", _amt(p_rao, base)) + t.add_row("Retained buffer R", _amt(q.get("retained_proceeds"), base)) t.add_row( - f"Fixed liability {'Q' if side == 'short' else 'D'}", + f"You owe to close {'Q' if side == 'short' else 'D'}", _amt(q.get(meta["liab_field"]), liab), ) - t.add_row("Linked escrow E", _amt(q.get("escrow"), base)) - t.add_row("Daily carry (now)", _pct(q.get("daily_decay"))) - if "est_close_cost" in q: + t.add_row("Effective LTV", _pct(q.get("effective_ltv"))) + t.add_row("Daily carry", _pct(q.get("daily_decay"))) + if q.get("est_close_cost") is not None: t.add_row("Est. close cost", _amt(q.get("est_close_cost"), "TAO")) console.print(t) @@ -127,10 +127,12 @@ def _render_positions(side: str, positions: list) -> None: show_header=True, header_style=C.SUBHEAD_MAIN, ) - for col in ("netuid", "floor P", "buffer R", f"liability ({liab})", - "collateral P+R", "close cost", "carry/day", "→dust", "defaultable"): + for col in ("netuid", "you paid P", "buffer R", f"you owe ({liab})", + "you'd get P+R", "close cost", "health"): table.add_column(col, justify="right") for p in positions: + eligible = p.get("default_eligible") + health = f"[{_SHORT}]DUST[/{_SHORT}]" if eligible else f"[{C.SUCCESS}]ok[/{C.SUCCESS}]" table.add_row( str(p.get("netuid")), _amt(p.get("floor"), base), @@ -138,9 +140,7 @@ def _render_positions(side: str, positions: list) -> None: _amt(p.get(meta["liab_field"]), liab), _amt(p.get("collateral_claim"), base), _amt(p.get(close_field), "TAO"), - _pct(p.get("daily_decay")), - ("yes" if p.get("default_eligible") else "no"), - str(p.get("defaultable_at_block")), + health, ) console.print(table) @@ -197,11 +197,8 @@ async def show_market( if not st: print_error(f"No {side} market state for netuid {netuid} (subnet may not exist).") return - meta = _SIDES[side] - ref_label = "T_ref" if side == "short" else "A_ref" ref_unit = "TAO" if side == "short" else "Alpha" - oi_field = "open_interest_alpha" if side == "short" else "open_interest_tao" - oi_unit = "Alpha" if side == "short" else "TAO" + enabled = st.get(f"{side}s_enabled") t = Table( title=f"[bold {_color(side)}]{side.upper()}[/] market netuid {netuid}", show_header=False, @@ -209,20 +206,14 @@ async def show_market( ) t.add_column(style=C.SUBHEAD) t.add_column(style="white", justify="right") - t.add_row("Enabled", str(st.get(f"{side}s_enabled"))) - t.add_row("Base LTV λ", _pct(st.get("base_ltv"))) - t.add_row("Footprint cap κ", _pct(st.get("kappa"))) - t.add_row(f"Reference reserve {ref_label}", _amt(st.get("t_ref" if side == "short" else "a_ref"), ref_unit)) - t.add_row("Footprint used / cap", f"{_amt(st.get('footprint_used'), ref_unit)} / {_amt(st.get('footprint_cap'), ref_unit)}") - t.add_row("Footprint remaining", _amt(st.get("footprint_remaining"), ref_unit)) - t.add_row("Current daily carry", _pct(st.get("current_daily_decay"))) - t.add_row("Decay min / max", f"{_pct(st.get('decay_min'))} / {_pct(st.get('decay_max'))}") - t.add_row("Aggregate buffer R", _amt(st.get("buffer_total"), ref_unit)) - t.add_row("Aggregate escrow E", _amt(st.get("escrow_total"), ref_unit)) - t.add_row(f"Open interest ({oi_unit})", _amt(st.get(oi_field), oi_unit)) + t.add_row( + "Open allowed", + f"[{C.SUCCESS}]yes[/{C.SUCCESS}]" if enabled else f"[{_SHORT}]no (disabled)[/{_SHORT}]", + ) + t.add_row("Base LTV", _pct(st.get("base_ltv"))) + t.add_row("Capacity left (max open)", _amt(st.get("footprint_remaining"), ref_unit)) t.add_row("Min input", _amt(st.get("min_input"), ref_unit)) - t.add_row("Dust threshold", _amt(st.get("dust_threshold"), ref_unit)) - t.add_row("Default grace (blocks)", str(st.get("default_grace"))) + t.add_row("Daily carry now", _pct(st.get("current_daily_decay"))) console.print(t) @@ -338,28 +329,3 @@ async def close_position( wait_for_finalization=wait_for_finalization, ) return _report(success, message, f"Closed {fraction:.0%} of {side} on netuid {netuid}.", json_output) - - -async def default_position( - subtensor: "SubtensorInterface", - wallet: "Wallet", - coldkey_ss58: str, - netuid: int, - side: str, - json_output: bool, - wait_for_inclusion: bool = True, - wait_for_finalization: bool = False, -) -> tuple: - if not (unlock := unlock_key(wallet)).success: - return _report(False, unlock.message, "", json_output) - call = await subtensor.substrate.compose_call( - call_module="SubtensorModule", - call_function=f"default_{side}", - call_params={"coldkey": coldkey_ss58, "netuid": netuid}, - ) - success, message, _ = await subtensor.sign_and_send_extrinsic( - call=call, wallet=wallet, - wait_for_inclusion=wait_for_inclusion, - wait_for_finalization=wait_for_finalization, - ) - return _report(success, message, f"Defaulted {side} position on netuid {netuid}.", json_output) From ccdbb384f99d015877f855af9642884bb98b7eb0 Mon Sep 17 00:00:00 2001 From: unconst Date: Thu, 18 Jun 2026 13:07:44 -0600 Subject: [PATCH 4/6] deriv open: hold the liability on any hotkey (name or SS58) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `open` now takes the hotkey the same way as other btcli commands (`--hotkey` / `--hotkey-ss58` / `--wallet.hotkey`, plus a `--validator` alias). When an SS58 is given, only the coldkey is validated — so the liability hotkey need not be your own or registered to you. You can stake to a validator and have the position's Alpha/ TAO liability held (and repaid at close) on that validator hotkey. Co-authored-by: Cursor --- bittensor_cli/cli.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index c9ca102af..5fd80f733 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -9171,12 +9171,23 @@ def deriv_open( network: Optional[list[str]] = Options.network, wallet_name: str = Options.wallet_name, wallet_path: str = Options.wallet_path, - wallet_hotkey: str = Options.wallet_hotkey, netuid: int = Options.netuid, side: str = typer.Option("short", "--side", help="Position side: short or long."), amount: float = typer.Option( ..., "--amount", "-a", help="Position input P (TAO for short, Alpha for long)." ), + wallet_hotkey: Optional[str] = typer.Option( + None, + "--hotkey", + "--hotkey-ss58", + "--wallet-hotkey", + "--wallet.hotkey", + "--validator", + "-H", + help="Hotkey/validator (name or SS58) that holds the position's liability " + "stake. It need not be your own or registered to you — stake to a validator " + "and the liability is held (and repaid at close) on that hotkey.", + ), prompt: bool = Options.prompt, quiet: bool = Options.quiet, verbose: bool = Options.verbose, @@ -9186,15 +9197,24 @@ def deriv_open( self.verbosity_handler(quiet, verbose, json_output, prompt) if not (side := self._deriv_side(side)): return - wallet, hotkey = self.wallet_ask( - wallet_name, wallet_path, wallet_hotkey, - ask_for=[WO.NAME, WO.HOTKEY, WO.PATH], validate=WV.WALLET, - return_wallet_and_hotkey=True, - ) + # Hotkey may be an SS58 (e.g. a validator you stake to) or a wallet hotkey + # name. SS58 path validates only the coldkey, so the hotkey need not be yours. + if wallet_hotkey and is_valid_ss58_address(wallet_hotkey): + hotkey_ss58 = wallet_hotkey + wallet = self.wallet_ask( + wallet_name, wallet_path, None, + ask_for=[WO.NAME, WO.PATH], validate=WV.WALLET, + ) + else: + wallet, hotkey_ss58 = self.wallet_ask( + wallet_name, wallet_path, wallet_hotkey, + ask_for=[WO.NAME, WO.HOTKEY, WO.PATH], validate=WV.WALLET_AND_HOTKEY, + return_wallet_and_hotkey=True, + ) return self._run_command( deriv.open_position( subtensor=self.initialize_chain(network), - wallet=wallet, hotkey_ss58=hotkey, netuid=netuid, side=side, + wallet=wallet, hotkey_ss58=hotkey_ss58, netuid=netuid, side=side, amount=amount, prompt=prompt, json_output=json_output, ) ) From 42922468afc433fe15bdcf1016a4269b421452f1 Mon Sep 17 00:00:00 2001 From: unconst Date: Tue, 23 Jun 2026 17:19:28 -0600 Subject: [PATCH 5/6] deriv close: default to cash-settled self-cover Make `btcli deriv close` use the self-covering extrinsics (`close_short_self` / `close_long_self`) by default so closing needs no pre-held Alpha (short) or TAO (long): the protocol covers the liability from the pool and charges it against the position's floor+buffer. Add a `--from-holdings` opt-out for the repay-it-yourself path, a short-side underwater pre-check so we don't broadcast a guaranteed-to-fail close, and a cash-settled close-quote preview. Co-authored-by: Cursor --- bittensor_cli/cli.py | 18 ++++- bittensor_cli/src/commands/deriv/deriv.py | 91 +++++++++++++++++++---- 2 files changed, 91 insertions(+), 18 deletions(-) diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 5fd80f733..39222ca1d 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -9261,12 +9261,26 @@ def deriv_close( fraction: float = typer.Option( 1.0, "--fraction", "-f", help="Fraction to close, in (0, 1] (1 = full close)." ), + from_holdings: bool = typer.Option( + False, + "--from-holdings/--self-cover", + help="Repay the fixed liability from your own holdings (Alpha for a " + "short, TAO for a long) instead of the default cash-settled close. " + "The default (--self-cover) needs no pre-held Alpha/TAO: the protocol " + "covers the liability from the pool and charges it against your " + "floor+buffer, returning the remainder. Cash-settled closes are " + "rejected if the position is underwater.", + ), prompt: bool = Options.prompt, quiet: bool = Options.quiet, verbose: bool = Options.verbose, json_output: bool = Options.json_output, ): - """Close (or partially close) a covered position and repay the fixed liability.""" + """Close (or partially close) a covered position. + + Defaults to a cash-settled (self-covering) close that needs no pre-held + Alpha/TAO; use --from-holdings to repay the liability yourself. + """ self.verbosity_handler(quiet, verbose, json_output, prompt) if not (side := self._deriv_side(side)): return @@ -9278,7 +9292,7 @@ def deriv_close( deriv.close_position( subtensor=self.initialize_chain(network), wallet=wallet, netuid=netuid, side=side, fraction=fraction, - prompt=prompt, json_output=json_output, + from_holdings=from_holdings, prompt=prompt, json_output=json_output, ) ) diff --git a/bittensor_cli/src/commands/deriv/deriv.py b/bittensor_cli/src/commands/deriv/deriv.py index 00b556078..c45646994 100644 --- a/bittensor_cli/src/commands/deriv/deriv.py +++ b/bittensor_cli/src/commands/deriv/deriv.py @@ -8,6 +8,12 @@ The economics in three terms: position input `P` (your capital / floor) + retained-proceeds buffer `R` (decays over time as carry) + a fixed liability `Q` (Alpha, short) / `D` (TAO, long) that you repay to close. + +Closing is cash-settled by default (`close_short_self` / `close_long_self`): the +protocol covers the fixed liability straight from the pool and charges the cost +against your own floor+buffer, so you need no pre-held Alpha (short) or TAO +(long). Pass `from_holdings=True` to instead repay the liability yourself via +`close_short` / `close_long`. """ import json @@ -145,21 +151,52 @@ def _render_positions(side: str, positions: list) -> None: console.print(table) -def _render_close_quote(side: str, fraction: float, cq: dict) -> None: +def _short_self_underwater(cq: dict) -> bool: + """True when a short's pool buyback cost exceeds the floor+buffer claim, so a + cash-settled close would be rejected on-chain (`CloseCostExceedsClaim`).""" + buyback = cq.get("est_buyback_cost") + claim = cq.get("returned_tao") + return buyback is not None and claim is not None and buyback > claim + + +def _render_close_quote(side: str, fraction: float, cq: dict, self_cover: bool) -> None: if side == "short": - body = ( - f"repay {_amt(cq.get('repay_alpha'), 'Alpha')} · " - f"return {_amt(cq.get('returned_tao'), 'TAO')} · " - f"buyback ~{_amt(cq.get('est_buyback_cost'), 'TAO')} " - f"(held {_amt(cq.get('alpha_held'), 'Alpha')}, " - f"need {_amt(cq.get('alpha_needed'), 'Alpha')})" - ) + buyback = cq.get("est_buyback_cost") + claim = cq.get("returned_tao") + if self_cover: + net = None + if buyback is not None and claim is not None: + net = max(0, claim - buyback) + body = ( + f"cash-settled · buyback ~{_amt(buyback, 'TAO')} from the pool · " + f"you receive ~{_amt(net, 'TAO')} " + f"(claim {_amt(claim, 'TAO')} − buyback) · no Alpha needed" + ) + if _short_self_underwater(cq): + body += f" · [{_SHORT}]UNDERWATER (would be rejected)[/{_SHORT}]" + else: + body = ( + f"repay {_amt(cq.get('repay_alpha'), 'Alpha')} · " + f"return {_amt(claim, 'TAO')} · " + f"buyback ~{_amt(buyback, 'TAO')} " + f"(held {_amt(cq.get('alpha_held'), 'Alpha')}, " + f"need {_amt(cq.get('alpha_needed'), 'Alpha')})" + ) else: - body = ( - f"repay {_amt(cq.get('repay_tao'), 'TAO')} · " - f"return {_amt(cq.get('returned_alpha'), 'Alpha')} · " - f"escrow settled {_amt(cq.get('escrow_settled'), 'Alpha')}" - ) + claim = cq.get("returned_alpha") + if self_cover: + body = ( + f"cash-settled · sell part of your Alpha claim to raise " + f"{_amt(cq.get('repay_tao'), 'TAO')} · return the remaining Alpha " + f"(claim {_amt(claim, 'Alpha')}) · " + f"escrow settled {_amt(cq.get('escrow_settled'), 'Alpha')} · no TAO needed" + ) + else: + body = ( + f"repay {_amt(cq.get('repay_tao'), 'TAO')} · " + f"return {_amt(claim, 'Alpha')} · " + f"escrow settled {_amt(cq.get('escrow_settled'), 'Alpha')}" + ) console.print(f"[{C.SUBHEAD}]Close {fraction:.0%} quote:[/{C.SUBHEAD}] {body}") @@ -302,25 +339,47 @@ async def close_position( fraction: float, prompt: bool, json_output: bool, + from_holdings: bool = False, wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> tuple: if not (0 < fraction <= 1): return _report(False, "fraction must be in (0, 1]", "", json_output) fraction_ppb = int(round(fraction * PPB)) + # Default: cash-settled self-cover (no pre-held Alpha/TAO). Opt out with + # `from_holdings` to repay the liability from your own balance. + self_cover = not from_holdings ck = wallet.coldkeypub.ss58_address cq = await _api(subtensor, f"quote_close_{side}", [ck, netuid, fraction_ppb]) if cq and not json_output: - _render_close_quote(side, fraction, cq) + _render_close_quote(side, fraction, cq, self_cover) + # A cash-settled close is rejected on-chain when the pool buyback cost exceeds + # the floor+buffer claim (underwater). Catch the detectable short-side case up + # front so we don't broadcast a guaranteed-to-fail extrinsic. + if self_cover and cq and side == "short" and _short_self_underwater(cq): + return _report( + False, + "Position is underwater: the pool buyback cost exceeds your floor+buffer " + "claim, so a cash-settled close would be rejected. Close with " + "--from-holdings (repay the Alpha yourself) or let it default.", + "", + json_output, + ) + mode = ( + "cash-settled, no Alpha/TAO needed" + if self_cover + else f"repaying the liability from your {_SIDES[side]['liab']} holdings" + ) if prompt and not confirm_action( - f"Close {fraction:.0%} of your {side} on netuid {netuid}?" + f"Close {fraction:.0%} of your {side} on netuid {netuid} ({mode})?" ): return False, "Cancelled" if not (unlock := unlock_key(wallet)).success: return _report(False, unlock.message, "", json_output) + call_function = f"close_{side}_self" if self_cover else f"close_{side}" call = await subtensor.substrate.compose_call( call_module="SubtensorModule", - call_function=f"close_{side}", + call_function=call_function, call_params={"netuid": netuid, "fraction_ppb": fraction_ppb}, ) success, message, _ = await subtensor.sign_and_send_extrinsic( From ecf5c0108915194c48810f8276c468d04c562427 Mon Sep 17 00:00:00 2001 From: unconst Date: Wed, 24 Jun 2026 12:55:06 -0600 Subject: [PATCH 6/6] deriv: add --slippage/--limit-price guard rails to open & close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds caller-supplied slippage protection (default: none) to `btcli deriv open` and `btcli deriv close`. `--slippage ` reads the live pool price and derives an on-chain `limit_price` in the adverse direction; `--limit-price` sets an absolute bound (TAO per Alpha). The runtime reverts with SlippageExceeded if the post-trade executable price breaches the bound, protecting against sandwiching/MEV. All deriv extrinsics now forward `limit_price` (top-up passes None — it has no pool interaction). Co-authored-by: Cursor --- bittensor_cli/cli.py | 29 +++++++++++ bittensor_cli/src/commands/deriv/deriv.py | 61 +++++++++++++++++++++-- 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 39222ca1d..2ce569c1e 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -9188,6 +9188,20 @@ def deriv_open( "stake. It need not be your own or registered to you — stake to a validator " "and the liability is held (and repaid at close) on that hotkey.", ), + slippage: Optional[float] = typer.Option( + None, + "--slippage", + "--tolerance", + help="Slippage tolerance in percent (e.g. 0.5). The CLI converts it to " + "an on-chain limit price in the adverse direction; the open reverts if " + "the executed price breaches it. Default: no protection.", + ), + limit_price: Optional[float] = typer.Option( + None, + "--limit-price", + help="Absolute limit price (TAO per Alpha) instead of --slippage. " + "Open reverts if the post-trade price is worse than this bound.", + ), prompt: bool = Options.prompt, quiet: bool = Options.quiet, verbose: bool = Options.verbose, @@ -9216,6 +9230,7 @@ def deriv_open( subtensor=self.initialize_chain(network), wallet=wallet, hotkey_ss58=hotkey_ss58, netuid=netuid, side=side, amount=amount, prompt=prompt, json_output=json_output, + slippage=slippage, limit_price=limit_price, ) ) @@ -9271,6 +9286,19 @@ def deriv_close( "floor+buffer, returning the remainder. Cash-settled closes are " "rejected if the position is underwater.", ), + slippage: Optional[float] = typer.Option( + None, + "--slippage", + "--tolerance", + help="Slippage tolerance in percent (e.g. 0.5). The CLI converts it to " + "an on-chain limit price in the adverse direction; the close reverts if " + "the executed price breaches it. Default: no protection.", + ), + limit_price: Optional[float] = typer.Option( + None, + "--limit-price", + help="Absolute limit price (TAO per Alpha) instead of --slippage.", + ), prompt: bool = Options.prompt, quiet: bool = Options.quiet, verbose: bool = Options.verbose, @@ -9293,6 +9321,7 @@ def deriv_close( subtensor=self.initialize_chain(network), wallet=wallet, netuid=netuid, side=side, fraction=fraction, from_holdings=from_holdings, prompt=prompt, json_output=json_output, + slippage=slippage, limit_price=limit_price, ) ) diff --git a/bittensor_cli/src/commands/deriv/deriv.py b/bittensor_cli/src/commands/deriv/deriv.py index c45646994..78fe37b92 100644 --- a/bittensor_cli/src/commands/deriv/deriv.py +++ b/bittensor_cli/src/commands/deriv/deriv.py @@ -74,6 +74,49 @@ def _amount_to_rao(amount: float) -> int: return int(round(amount * 1e9)) +async def _current_price_ppb(subtensor: "SubtensorInterface", netuid: int) -> Optional[int]: + """Executable alpha price (TAO/alpha) scaled by 1e9, from live pool reserves. + Returns None if reserves can't be read.""" + try: + tao = await subtensor.substrate.query("SubtensorModule", "SubnetTAO", [netuid]) + alpha = await subtensor.substrate.query("SubtensorModule", "SubnetAlphaIn", [netuid]) + t = int(getattr(tao, "value", tao)) + a = int(getattr(alpha, "value", alpha)) + if a <= 0: + return None + return (t * 1_000_000_000) // a + except Exception: + return None + + +async def _resolve_limit_price( + subtensor: "SubtensorInterface", + netuid: int, + side: str, + op: str, + slippage: Optional[float], + limit_price: Optional[float], +) -> Optional[int]: + """Resolve the on-chain `limit_price` (price ppb, None = no protection). + + Direction (adverse move to protect against): + - short open / long close -> price floor (reject if price ends below). + - long open / short close -> price ceiling (reject if price ends above). + - top-up has no pool interaction (bound is a no-op on chain). + """ + if limit_price is not None: + return int(round(limit_price * 1e9)) + if slippage is None or op == "topup": + return None + cur = await _current_price_ppb(subtensor, netuid) + if cur is None: + console.print(f"[{C.SUBHEAD_EX_1}]Could not read pool price; sending without slippage bound.[/{C.SUBHEAD_EX_1}]") + return None + tol = max(0.0, slippage) / 100.0 + is_floor = (op == "open" and side == "short") or (op == "close" and side == "long") + return int(cur * (1 - tol)) if is_floor else int(cur * (1 + tol)) + + # --------------------------------------------------------------------------- # Reads # --------------------------------------------------------------------------- @@ -276,6 +319,8 @@ async def open_position( amount: float, prompt: bool, json_output: bool, + slippage: Optional[float] = None, + limit_price: Optional[float] = None, wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> tuple: @@ -293,10 +338,16 @@ async def open_position( if not (unlock := unlock_key(wallet)).success: return _report(False, unlock.message, "", json_output) + limit_ppb = await _resolve_limit_price(subtensor, netuid, side, "open", slippage, limit_price) call = await subtensor.substrate.compose_call( call_module="SubtensorModule", call_function=f"open_{side}", - call_params={"hotkey": hotkey_ss58, "netuid": netuid, "position_input": p_rao}, + call_params={ + "hotkey": hotkey_ss58, + "netuid": netuid, + "position_input": p_rao, + "limit_price": limit_ppb, + }, ) success, message, _ = await subtensor.sign_and_send_extrinsic( call=call, wallet=wallet, @@ -321,7 +372,8 @@ async def top_up( call = await subtensor.substrate.compose_call( call_module="SubtensorModule", call_function=f"top_up_{side}", - call_params={"netuid": netuid, "amount": _amount_to_rao(amount)}, + # top-up never touches the pool, so the bound is a no-op on chain. + call_params={"netuid": netuid, "amount": _amount_to_rao(amount), "limit_price": None}, ) success, message, _ = await subtensor.sign_and_send_extrinsic( call=call, wallet=wallet, @@ -340,6 +392,8 @@ async def close_position( prompt: bool, json_output: bool, from_holdings: bool = False, + slippage: Optional[float] = None, + limit_price: Optional[float] = None, wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> tuple: @@ -376,11 +430,12 @@ async def close_position( return False, "Cancelled" if not (unlock := unlock_key(wallet)).success: return _report(False, unlock.message, "", json_output) + limit_ppb = await _resolve_limit_price(subtensor, netuid, side, "close", slippage, limit_price) call_function = f"close_{side}_self" if self_cover else f"close_{side}" call = await subtensor.substrate.compose_call( call_module="SubtensorModule", call_function=call_function, - call_params={"netuid": netuid, "fraction_ppb": fraction_ppb}, + call_params={"netuid": netuid, "fraction_ppb": fraction_ppb, "limit_price": limit_ppb}, ) success, message, _ = await subtensor.sign_and_send_extrinsic( call=call, wallet=wallet,