diff --git a/.github/workflows/pm-perf.yml b/.github/workflows/pm-perf.yml new file mode 100644 index 000000000..e91c78a48 --- /dev/null +++ b/.github/workflows/pm-perf.yml @@ -0,0 +1,218 @@ +name: utoopm-perf + +on: + push: + branches: [main, next] + pull_request: + branches: [main, next] + workflow_dispatch: + inputs: + registry: + description: 'Registry to test' + required: false + default: 'both' + type: choice + options: + - both + - npmjs + - npmmirror + +permissions: + contents: read + pull-requests: write + +env: + REGISTRY_MODE: ${{ github.event.inputs.registry || 'both' }} + +jobs: + performance: + name: PM Performance Analysis + # Only run for PM-related changes (contains '(pm)' in commit/PR title) + # or manual trigger + if: > + github.event_name == 'workflow_dispatch' || + (github.event_name == 'push' && contains(github.event.head_commit.message, '(pm)')) || + (github.event_name == 'pull_request' && contains(github.event.pull_request.title, '(pm)')) + strategy: + fail-fast: false + matrix: + os: [macos-latest, ubuntu-latest] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + + - name: Init git submodules + run: git submodule update --init --recursive --depth 1 + + - name: Free disk space (Ubuntu) + if: matrix.os == 'ubuntu-latest' + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: nightly-2025-10-27 + + - name: Cache cargo + uses: Swatinem/rust-cache@v2 + with: + shared-key: pm-perf-${{ matrix.os }} + + - name: Build utoo-pm with tracing + run: cargo build --release --bin utoo -p utoo-pm --features tracing-chrome + + - name: Clone test projects + run: | + mkdir -p /tmp/pm-bench + cd /tmp/pm-bench + + # Clone ant-design + if [ ! -d "ant-design" ]; then + echo "Cloning ant-design..." + git clone --depth=1 https://github.com/ant-design/ant-design.git + fi + + # Clone ant-design-x (next branch) + if [ ! -d "ant-design-x" ]; then + echo "Cloning ant-design-x (next branch)..." + git clone --branch next --depth=1 https://github.com/ant-design/x.git ant-design-x + fi + + - name: Setup trace directories + run: mkdir -p /tmp/pm-bench/.trace + + - name: Run traced benchmarks + run: | + UTOO_BIN="${GITHUB_WORKSPACE}/target/release/utoo" + TRACE_DIR="/tmp/pm-bench/.trace" + BENCH_DIR="/tmp/pm-bench" + + # Determine registries based on mode + REGISTRIES=() + if [ "$REGISTRY_MODE" = "both" ] || [ "$REGISTRY_MODE" = "npmjs" ]; then + REGISTRIES+=("https://registry.npmjs.org") + fi + if [ "$REGISTRY_MODE" = "both" ] || [ "$REGISTRY_MODE" = "npmmirror" ]; then + REGISTRIES+=("https://registry.npmmirror.com") + fi + + PROJECTS=("ant-design" "ant-design-x") + + for project in "${PROJECTS[@]}"; do + for registry in "${REGISTRIES[@]}"; do + registry_name=$(echo "$registry" | sed 's|https://registry.||' | sed 's|.org||' | sed 's|.com||') + + echo "==========================================" + echo "Project: $project" + echo "Registry: $registry_name" + echo "==========================================" + + cd "$BENCH_DIR/$project" + + # Cold install (clean cache) + echo "--- Cold Install ---" + git clean -dfx + rm -rf ~/.cache/nm + + trace_file="${TRACE_DIR}/${project}_${registry_name}_cold_${{ matrix.os }}.json" + TRACING_CHROME="$trace_file" "$UTOO_BIN" install --ignore-scripts --registry="$registry" || true + + # Warm install (with cache) + echo "--- Warm Install ---" + git clean -dfx + + trace_file="${TRACE_DIR}/${project}_${registry_name}_warm_${{ matrix.os }}.json" + TRACING_CHROME="$trace_file" "$UTOO_BIN" install --ignore-scripts --registry="$registry" || true + + echo "" + done + done + + - name: Analyze traces + run: | + TRACE_DIR="/tmp/pm-bench/.trace" + + # Generate individual reports + for trace_file in "$TRACE_DIR"/*.json; do + if [ -f "$trace_file" ]; then + basename=$(basename "$trace_file" .json) + report_file="${TRACE_DIR}/${basename}_report.md" + echo "Analyzing: $basename" + python3 agents/tools/analyze_pm_trace.py "$trace_file" "$report_file" "$basename" || true + fi + done + + # Generate combined summary report + python3 agents/tools/analyze_pm_trace.py --summary "$TRACE_DIR" "${TRACE_DIR}/summary_${{ matrix.os }}.md" + + - name: Prepare PR comment + run: | + TRACE_DIR="/tmp/pm-bench/.trace" + COMMENT_FILE="${TRACE_DIR}/pr_comment_${{ matrix.os }}.md" + + echo "## utoo-pm Performance Report (${{ matrix.os }})" > "$COMMENT_FILE" + echo "" >> "$COMMENT_FILE" + echo "
" >> "$COMMENT_FILE" + echo "Click to expand full report" >> "$COMMENT_FILE" + echo "" >> "$COMMENT_FILE" + + if [ -f "${TRACE_DIR}/summary_${{ matrix.os }}.md" ]; then + cat "${TRACE_DIR}/summary_${{ matrix.os }}.md" >> "$COMMENT_FILE" + fi + + echo "" >> "$COMMENT_FILE" + echo "
" >> "$COMMENT_FILE" + + # Add to Job Summary + cat "$COMMENT_FILE" >> $GITHUB_STEP_SUMMARY + + - name: Find Current PR + id: findPr + if: github.event_name != 'workflow_dispatch' + run: | + PR_NUMBER="" + + if [[ "${{ github.event_name }}" == "pull_request" ]]; then + PR_NUMBER="${{ github.event.pull_request.number }}" + else + # Try to match by branch name + PR_NUMBER=$(gh pr list --head "${{ github.ref_name }}" --state open --json number --jq '.[0].number') + + # If not found, try to match by commit SHA + if [ -z "$PR_NUMBER" ] || [ "$PR_NUMBER" == "null" ]; then + PR_NUMBER=$(gh pr list --search "${{ github.sha }}" --state open --json number --jq '.[0].number') + fi + fi + + if [ -n "$PR_NUMBER" ] && [ "$PR_NUMBER" != "null" ]; then + echo "number=$PR_NUMBER" >> $GITHUB_OUTPUT + fi + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_REPO: ${{ github.repository }} + + - name: Post PR Comment + if: steps.findPr.outputs.number + uses: marocchino/sticky-pull-request-comment@v2 + with: + header: utoopm-perf-report-${{ matrix.os }} + path: /tmp/pm-bench/.trace/pr_comment_${{ matrix.os }}.md + number: ${{ steps.findPr.outputs.number }} + recreate: true + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Upload Traces + uses: actions/upload-artifact@v4 + with: + name: utoopm-perf-traces-${{ matrix.os }} + path: /tmp/pm-bench/.trace/ + retention-days: 7 diff --git a/Cargo.lock b/Cargo.lock index f77b8ac8b..6d90d20a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9622,6 +9622,7 @@ dependencies = [ "toml", "tracing", "tracing-appender", + "tracing-chrome", "tracing-subscriber", "utoo-ruborist", ] diff --git a/agents/tools/__pycache__/analyze_pm_trace.cpython-314.pyc b/agents/tools/__pycache__/analyze_pm_trace.cpython-314.pyc new file mode 100644 index 000000000..2631070a0 Binary files /dev/null and b/agents/tools/__pycache__/analyze_pm_trace.cpython-314.pyc differ diff --git a/agents/tools/analyze_pm_trace.py b/agents/tools/analyze_pm_trace.py new file mode 100644 index 000000000..00ab319d4 --- /dev/null +++ b/agents/tools/analyze_pm_trace.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 +""" +utoo-pm Performance Analysis Script +Based on utoo-pm Performance Analysis Agent Protocol + +Usage: + Single trace: python3 analyze_pm_trace.py [project_name] + Summary mode: python3 analyze_pm_trace.py --summary + +Analyzes Chrome Trace data from utoo-pm to identify performance bottlenecks +in Network, File I/O, and Decompression operations. +""" + +import json +import sys +import os +import glob +from collections import defaultdict +from datetime import datetime + + +def analyze_trace(trace_path, output_path=None, override_project_name=None): + """Analyze Chrome Trace and generate PM performance report. + Returns metrics dict if output_path is None.""" + + print(f"Loading trace: {trace_path}") + with open(trace_path, 'r') as f: + data = json.load(f) + + events = data if isinstance(data, list) else data.get("traceEvents", []) + print(f"Analyzing {len(events):,} events...") + + # Try to infer project name + project_name = override_project_name or os.environ.get("TRACE_PROJECT") or "Unknown Project" + + # Initialize metrics + min_ts, max_ts = float('inf'), float('-inf') + threads = set() + thread_work = 0.0 + + # Task statistics (only meaningful tasks >= 10us) + meaningful_tasks = defaultdict(lambda: {'count': 0, 'duration': 0.0, 'max': 0.0}) + noise_count = 0 + + # Duration buckets + buckets = { + '<10us': 0, '10us-100us': 0, '100us-1ms': 0, + '1ms-10ms': 0, '10ms-100ms': 0, '>100ms': 0 + } + + # Stack for B/E events + stacks = defaultdict(list) + + # PM-specific metrics + download_stats = {'count': 0, 'duration': 0.0, 'bytes': 0} + clone_stats = {'count': 0, 'duration': 0.0, 'clonefile': 0, 'ficlone': 0, 'copy': 0} + extract_stats = {'count': 0, 'duration': 0.0, 'files': 0} + + def categorize_duration(dur): + if dur < 10: + return '<10us' + elif dur < 100: + return '10us-100us' + elif dur < 1000: + return '100us-1ms' + elif dur < 10000: + return '1ms-10ms' + elif dur < 100000: + return '10ms-100ms' + else: + return '>100ms' + + def process_event(name, dur): + nonlocal thread_work, noise_count + thread_work += dur + bucket = categorize_duration(dur) + buckets[bucket] += 1 + + if dur < 10: + noise_count += 1 + else: + meaningful_tasks[name]['count'] += 1 + meaningful_tasks[name]['duration'] += dur + meaningful_tasks[name]['max'] = max(meaningful_tasks[name]['max'], dur) + + # PM-specific tracking + name_lower = name.lower() + if 'download' in name_lower: + download_stats['count'] += 1 + download_stats['duration'] += dur + elif 'clone' in name_lower or 'ficlone' in name_lower: + clone_stats['count'] += 1 + clone_stats['duration'] += dur + if 'clonefile' in name_lower: + clone_stats['clonefile'] += 1 + elif 'ficlone' in name_lower: + clone_stats['ficlone'] += 1 + elif 'copy' in name_lower: + clone_stats['copy'] += 1 + elif 'tar_extract' in name_lower or 'unpack' in name_lower: + extract_stats['count'] += 1 + extract_stats['duration'] += dur + + for event in events: + ph = event.get('ph') + tid = event.get('tid') + ts = event.get('ts', 0) + name = event.get('name', 'unknown') + + if ts > 0: + min_ts = min(min_ts, ts) + max_ts = max(max_ts, ts) + + threads.add(tid) + + if ph == 'X': # Complete event + dur = event.get('dur', 0) + max_ts = max(max_ts, ts + dur) + process_event(name, dur) + + elif ph == 'B': # Begin event + stacks[tid].append((ts, name)) + + elif ph == 'E': # End event + if stacks[tid]: + start_ts, start_name = stacks[tid].pop() + dur = ts - start_ts + process_event(start_name, dur) + + # Calculate metrics + if max_ts <= min_ts: + print("Warning: No valid timestamps found in trace") + wall_time_ms = 0 + parallelism = 0 + utilization = 0 + else: + wall_time_ms = (max_ts - min_ts) / 1000.0 + parallelism = thread_work / (max_ts - min_ts) if (max_ts - min_ts) > 0 else 0 + + thread_work_ms = thread_work / 1000.0 + num_threads = len(threads) if threads else 1 + utilization = (parallelism / num_threads) * 100 if num_threads > 0 else 0 + + total_tasks = noise_count + sum(t['count'] for t in meaningful_tasks.values()) + meaningful_count = sum(t['count'] for t in meaningful_tasks.values()) + + # Sort tasks + by_duration = sorted(meaningful_tasks.items(), key=lambda x: x[1]['duration'], reverse=True) + + # PM-specific categories + categories = { + 'P0: Network': ['download', 'http_request', 'ping_registry', 'select_registry'], + 'P1: File I/O': ['clone_package', 'clone_dir', 'clonefile', 'ficlone', 'linux_clone', + 'windows_copy', 'file_write', 'create_dir'], + 'P2: Decompression': ['unpack_stream', 'tar_extract', 'file_write_batch'], + 'P3: Concurrency': ['install_packages', 'semaphore'] + } + + cat_stats = defaultdict(lambda: {'count': 0, 'duration': 0.0}) + for name, stats in meaningful_tasks.items(): + matched = False + lower_name = name.lower() + for cat, keywords in categories.items(): + if any(kw in lower_name for kw in keywords): + cat_stats[cat]['count'] += stats['count'] + cat_stats[cat]['duration'] += stats['duration'] + matched = True + break + if not matched: + cat_stats['Other']['count'] += stats['count'] + cat_stats['Other']['duration'] += stats['duration'] + + # Pre-calculate derived metrics + max_thread_work = max(thread_work, 1) + + # Network stats + network_time_ms = download_stats['duration'] / 1000.0 + network_pct = (download_stats['duration'] * 100) / max_thread_work + + # I/O stats + io_time_ms = clone_stats['duration'] / 1000.0 + io_pct = (clone_stats['duration'] * 100) / max_thread_work + + # Decompression stats + decomp_time_ms = extract_stats['duration'] / 1000.0 + decomp_pct = (extract_stats['duration'] * 100) / max_thread_work + + # Build metrics dict + metrics = { + 'project_name': project_name, + 'wall_time_ms': wall_time_ms, + 'thread_work_ms': thread_work_ms, + 'parallelism': parallelism, + 'utilization': utilization, + 'total_tasks': total_tasks, + 'meaningful_count': meaningful_count, + 'network_time_ms': network_time_ms, + 'network_pct': network_pct, + 'download_count': download_stats['count'], + 'io_time_ms': io_time_ms, + 'io_pct': io_pct, + 'clone_count': clone_stats['count'], + 'clone_stats': clone_stats, + 'decomp_time_ms': decomp_time_ms, + 'decomp_pct': decomp_pct, + 'extract_count': extract_stats['count'], + 'cat_stats': dict(cat_stats), + 'by_duration': by_duration[:15], + 'buckets': buckets, + 'num_threads': num_threads, + 'event_count': len(events), + } + + if output_path is None: + return metrics + + # Generate report + generate_report(metrics, trace_path, output_path) + return metrics + + +def generate_report(metrics, trace_path, output_path): + """Generate markdown report from metrics.""" + report_id = f"utoopm_performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + trace_file = os.path.basename(trace_path) + trace_size_mb = os.path.getsize(trace_path) / (1024 ** 2) + + # Utilization status + utilization = metrics['utilization'] + if utilization < 60: + util_status = 'Suboptimal' + elif utilization > 80: + util_status = 'Good' + else: + util_status = 'Average' + + # Workload Distribution Table Rows + workload_rows = "" + cat_stats = metrics['cat_stats'] + max_thread_work = max(metrics['thread_work_ms'] * 1000, 1) + for cat_name in ['P0: Network', 'P1: File I/O', 'P2: Decompression', 'P3: Concurrency', 'Other']: + stats = cat_stats.get(cat_name, {'count': 0, 'duration': 0.0}) + dur_ms = stats['duration'] / 1000.0 + pct = (stats['duration'] * 100) / max_thread_work + workload_rows += f"| {cat_name} | {stats['count']:,} | {dur_ms:,.1f} | {pct:.1f}% |\n" + + total_tasks_safe = max(metrics['total_tasks'], 1) + + report = f"""# utoo-pm Performance Report + +**Report ID**: `{report_id}` +**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +**Trace File**: `{trace_file}` ({trace_size_mb:.1f}MB, {metrics['event_count']/1e3:.1f}K events) +**Test Project**: `{metrics['project_name']}` + +--- + +## Executive Summary + +| Metric | Value | Assessment | +|--------|-------|------------| +| Total Wall Time | **{metrics['wall_time_ms']:,.1f} ms** | Baseline | +| Total Thread Work | **{metrics['thread_work_ms']:,.1f} ms** | ~{metrics['parallelism']:.1f}x parallelism | +| Thread Utilization | **{utilization:.1f}%** | {util_status} | +| Total Spans | **{metrics['total_tasks']:,}** | Total count | +| Meaningful Tasks (>= 10us) | **{metrics['meaningful_count']:,}** | ({metrics['meaningful_count']*100/total_tasks_safe:.1f}% of total) | + +### Workload Distribution by Tier + +| Category | Tasks | Total Time (ms) | % of Work | +|----------|-------|-----------------|-----------| +{workload_rows} +--- + +## P0: Network Analysis + +| Metric | Value | +|--------|-------| +| Download Operations | {metrics['download_count']:,} | +| Total Network Time | {metrics['network_time_ms']:,.1f} ms | +| Network % of Work | {metrics['network_pct']:.1f}% | + +**Assessment**: {'Network-bound - consider faster registry or more concurrent downloads' if metrics['network_pct'] > 50 else 'Network overhead is acceptable'} + +--- + +## P1: File I/O Analysis + +| Metric | Value | +|--------|-------| +| Clone Operations | {metrics['clone_count']:,} | +| Total I/O Time | {metrics['io_time_ms']:,.1f} ms | +| I/O % of Work | {metrics['io_pct']:.1f}% | + +**Clone Strategy Distribution**: +- clonefile (macOS CoW): {metrics['clone_stats']['clonefile']} +- FICLONE (Linux CoW): {metrics['clone_stats']['ficlone']} +- Regular copy (fallback): {metrics['clone_stats']['copy']} + +**Assessment**: {'I/O-bound - check filesystem CoW support' if metrics['io_pct'] > 30 else 'I/O performance is acceptable'} + +--- + +## P2: Decompression Analysis + +| Metric | Value | +|--------|-------| +| Extract Operations | {metrics['extract_count']:,} | +| Total Decompress Time | {metrics['decomp_time_ms']:,.1f} ms | +| Decompress % of Work | {metrics['decomp_pct']:.1f}% | + +**Assessment**: {'Decompression-bound - consider pipeline tuning' if metrics['decomp_pct'] > 30 else 'Decompression performance is acceptable'} + +--- + +## Top 15 Tasks by Duration + +| Total (ms) | Count | Avg (us) | Max (ms) | Task Name | +|------------|-------|----------|----------|-----------| +""" + + for name, stats in metrics['by_duration']: + avg_us = stats['duration'] / stats['count'] if stats['count'] > 0 else 0 + total_ms = stats['duration'] / 1000.0 + max_ms = stats['max'] / 1000.0 + report += f"| {total_ms:,.1f} | {stats['count']:,} | {avg_us:,.0f} | {max_ms:,.1f} | `{name}` |\n" + + buckets = metrics['buckets'] + report += f""" +--- + +## Diagnostic Signals + +| Signal | Status | Finding | +|--------|--------|---------| +| Thread Utilization (P0) | {'**Low**' if utilization < 60 else 'Good'} | {utilization:.1f}% utilization | +| Network Dominance (P0) | {'**High**' if metrics['network_pct'] > 50 else 'Normal'} | {metrics['network_pct']:.1f}% of work | +| I/O Bottleneck (P1) | {'**Detected**' if metrics['io_pct'] > 30 else 'Minimal'} | {metrics['io_pct']:.1f}% of work | +| Decompress Overhead (P2) | {'**High**' if metrics['decomp_pct'] > 30 else 'Normal'} | {metrics['decomp_pct']:.1f}% of work | +| Heavy Tasks (>100ms) | {'**Review**' if buckets['>100ms'] > 5 else 'Minimal'} | {buckets['>100ms']} tasks | + +--- + +## Recommendations + +""" + + if utilization < 70: + report += f"1. **[P0]** Low thread utilization ({utilization:.1f}%) - investigate concurrency bottlenecks\n" + if metrics['network_pct'] > 50: + report += f"2. **[P0]** Network dominates ({metrics['network_pct']:.1f}%) - consider faster registry or parallel downloads\n" + if metrics['io_pct'] > 30: + report += f"3. **[P1]** I/O overhead ({metrics['io_pct']:.1f}%) - verify CoW cloning is working\n" + if metrics['decomp_pct'] > 30: + report += f"4. **[P2]** Decompression overhead ({metrics['decomp_pct']:.1f}%) - tune streaming pipeline\n" + if buckets['>100ms'] > 5: + report += f"5. **[P1]** {buckets['>100ms']} heavy tasks (>100ms) detected - consider splitting\n" + + if utilization >= 70 and metrics['network_pct'] <= 50 and metrics['io_pct'] <= 30: + report += "Performance is within acceptable parameters. No immediate action required.\n" + + report += f""" +--- + +*Report generated by utoo-pm Performance Analysis Agent* +*Protocol: utoopm-performance-agent.md* +""" + + print(f"Writing report to {output_path}") + with open(output_path, 'w') as f: + f.write(report) + + print(f"Report generated successfully!") + print(f"\nQuick Summary:") + print(f" Wall Time: {metrics['wall_time_ms']:,.1f} ms") + print(f" Parallelism: {metrics['parallelism']:.1f}x") + print(f" Thread Util: {utilization:.1f}%") + print(f" Network: {metrics['network_pct']:.1f}%") + print(f" I/O: {metrics['io_pct']:.1f}%") + print(f" Decompress: {metrics['decomp_pct']:.1f}%") + + +def generate_summary_report(trace_dir, output_path): + """Generate a combined summary report from multiple trace files.""" + trace_files = glob.glob(os.path.join(trace_dir, "*.json")) + + if not trace_files: + print(f"No trace files found in {trace_dir}") + with open(output_path, 'w') as f: + f.write("# utoo-pm Performance Summary\n\nNo trace files found.\n") + return + + print(f"Found {len(trace_files)} trace files") + + # Analyze all traces + all_metrics = [] + for trace_file in sorted(trace_files): + try: + # Extract info from filename: {project}_{registry}_{type}_{os}.json + basename = os.path.basename(trace_file).replace('.json', '') + metrics = analyze_trace(trace_file, output_path=None, override_project_name=basename) + metrics['trace_file'] = basename + all_metrics.append(metrics) + except Exception as e: + print(f"Error analyzing {trace_file}: {e}") + + if not all_metrics: + with open(output_path, 'w') as f: + f.write("# utoo-pm Performance Summary\n\nNo valid trace files analyzed.\n") + return + + # Generate summary report + report = f"""# utoo-pm Performance Summary + +**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +**Trace Count**: {len(all_metrics)} + +--- + +## Benchmark Results Comparison + +| Scenario | Wall Time | Network | I/O | Decompress | Parallelism | +|----------|-----------|---------|-----|------------|-------------| +""" + + for m in all_metrics: + report += f"| `{m['trace_file']}` | {m['wall_time_ms']:,.0f}ms | {m['network_pct']:.1f}% | {m['io_pct']:.1f}% | {m['decomp_pct']:.1f}% | {m['parallelism']:.1f}x |\n" + + # Group by project for comparison + projects = defaultdict(list) + for m in all_metrics: + # Parse project name from trace_file + parts = m['trace_file'].split('_') + if len(parts) >= 2: + project = parts[0] + if parts[1] in ['design', 'design-x']: + project = f"{parts[0]}-{parts[1]}" + projects[project].append(m) + + report += "\n---\n\n## By Project\n\n" + + for project, metrics_list in sorted(projects.items()): + report += f"### {project}\n\n" + report += "| Type | Registry | Wall Time | Network % | I/O % |\n" + report += "|------|----------|-----------|-----------|-------|\n" + + for m in metrics_list: + trace_name = m['trace_file'] + # Extract registry and type + parts = trace_name.split('_') + registry = "unknown" + install_type = "unknown" + for i, p in enumerate(parts): + if p in ['npmjs', 'npmmirror']: + registry = p + if p in ['cold', 'warm', 'hot']: + install_type = p + + report += f"| {install_type} | {registry} | {m['wall_time_ms']:,.0f}ms | {m['network_pct']:.1f}% | {m['io_pct']:.1f}% |\n" + + report += "\n" + + # Registry comparison + report += "---\n\n## Registry Comparison\n\n" + + registries = defaultdict(list) + for m in all_metrics: + trace_name = m['trace_file'] + if 'npmjs' in trace_name: + registries['npmjs'].append(m) + elif 'npmmirror' in trace_name: + registries['npmmirror'].append(m) + + if len(registries) >= 2: + report += "| Registry | Avg Wall Time | Avg Network % | Scenarios |\n" + report += "|----------|---------------|---------------|----------|\n" + + for reg, metrics_list in sorted(registries.items()): + avg_wall = sum(m['wall_time_ms'] for m in metrics_list) / len(metrics_list) + avg_net = sum(m['network_pct'] for m in metrics_list) / len(metrics_list) + report += f"| {reg} | {avg_wall:,.0f}ms | {avg_net:.1f}% | {len(metrics_list)} |\n" + else: + report += "Only one registry tested.\n" + + # Cold vs Warm comparison + report += "\n---\n\n## Cold vs Warm Install\n\n" + + install_types = defaultdict(list) + for m in all_metrics: + if '_cold_' in m['trace_file']: + install_types['cold'].append(m) + elif '_warm_' in m['trace_file']: + install_types['warm'].append(m) + + if len(install_types) >= 2: + report += "| Type | Avg Wall Time | Avg Network % | Avg I/O % | Scenarios |\n" + report += "|------|---------------|---------------|-----------|----------|\n" + + for itype in ['cold', 'warm']: + if itype in install_types: + metrics_list = install_types[itype] + avg_wall = sum(m['wall_time_ms'] for m in metrics_list) / len(metrics_list) + avg_net = sum(m['network_pct'] for m in metrics_list) / len(metrics_list) + avg_io = sum(m['io_pct'] for m in metrics_list) / len(metrics_list) + report += f"| {itype} | {avg_wall:,.0f}ms | {avg_net:.1f}% | {avg_io:.1f}% | {len(metrics_list)} |\n" + + # Calculate speedup + if 'cold' in install_types and 'warm' in install_types: + cold_avg = sum(m['wall_time_ms'] for m in install_types['cold']) / len(install_types['cold']) + warm_avg = sum(m['wall_time_ms'] for m in install_types['warm']) / len(install_types['warm']) + if warm_avg > 0: + speedup = cold_avg / warm_avg + report += f"\n**Cache Speedup**: {speedup:.1f}x faster with warm cache\n" + else: + report += "Need both cold and warm install data for comparison.\n" + + report += f""" +--- + +*Summary generated by utoo-pm Performance Analysis Agent* +""" + + print(f"Writing summary to {output_path}") + with open(output_path, 'w') as f: + f.write(report) + + print("Summary generated successfully!") + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage:") + print(" Single trace: python3 analyze_pm_trace.py [project_name]") + print(" Summary mode: python3 analyze_pm_trace.py --summary ") + sys.exit(1) + + if sys.argv[1] == "--summary": + trace_dir = sys.argv[2] + report_file = sys.argv[3] + generate_summary_report(trace_dir, report_file) + else: + trace_file = sys.argv[1] + report_file = sys.argv[2] + override_project_name = sys.argv[3] if len(sys.argv) > 3 else None + + if not os.path.exists(trace_file): + print(f"Error: Trace file not found: {trace_file}") + sys.exit(1) + + analyze_trace(trace_file, report_file, override_project_name) diff --git a/agents/utoopm-performance-agent.md b/agents/utoopm-performance-agent.md new file mode 100644 index 000000000..554762a03 --- /dev/null +++ b/agents/utoopm-performance-agent.md @@ -0,0 +1,192 @@ +# utoo-pm Performance Analysis Agent Protocol + +This document defines the specialized diagnostic procedure for analyzing utoo-pm (package manager) performance. It is a universal protocol designed for AI agents to investigate bottlenecks within package installation workflows. + +## Objective + +Empower AI agents to identify and resolve performance bottlenecks in utoo-pm by analyzing Chrome Trace data. Focus on Network, File I/O, and Decompression operations specific to package management. + +--- + +## Step 1: Data Acquisition & Environment Prep + +- **Build**: Run `cargo build --release --bin utoo -p utoo-pm` to compile utoo-pm. +- **Trace Generation**: Run utoo with `TRACING_CHROME=$PWD/.trace/pm_trace_$(date +%Y%m%d_%H%M%S).json`. + - *Example*: `TRACING_CHROME=$PWD/.trace/pm_trace.json utoo install --prefix examples/with-antd` +- **Intermediate Files**: All filtered JSON fragments and analytical results **MUST** be placed in the `./.trace/` directory. Diagnostic scripts are maintained in the `./agents/tools/` directory. +- **Workspace Hygiene**: Ensure `./.trace/` is in `.gitignore`. Never upload raw trace data (> 500MB) directly; share filtered summaries or key findings. +- **Search Tooling**: Use `ripgrep` (command: `rg`) for all code searches. + +### Tracing Overhead Note + +Chrome Trace instrumentation introduces overhead. Tasks with duration **< 10us** are likely dominated by tracing instrumentation cost rather than actual work. **Exclude these from statistical analysis** to avoid misleading conclusions. + +--- + +## Step 2: Universal Diagnostic Matrix (Tiers P0-P3) + +Follow this tiered hierarchy. Solve P0 before descending to P1, as network latency often dominates cold install time. + +### Tier 1: Network Operations (Priority: P0) + +*Focus: Download latency, throughput, retry frequency, registry selection.* + +- **A. Download Latency (HTTP Round-Trip)** + - **Signal**: Long `download` spans with significant time before data transfer begins. + - **Insight**: DNS resolution, TLS handshake, and connection establishment can dominate for many small packages. + - **Action**: Check if connection pooling is effective. Look for repeated connection setup to the same host. + +- **B. Retry Frequency (Network Failures)** + - **Signal**: Multiple `retry` events for the same URL. + - **Insight**: High retry count indicates unstable network or overloaded registry. + - **Action**: Check registry selection logic. Consider faster mirrors (npmmirror vs npmjs). + +- **C. Registry Selection Latency** + - **Signal**: Slow `ping_registry` spans during startup. + - **Action**: Ensure registry ping is concurrent. Cache selected registry for subsequent installs. + +- **D. Throughput Bottleneck** + - **Signal**: Long download duration with low bytes/second. + - **Action**: Check concurrent download limits. Current limit is 40 concurrent downloads (from semaphore). + +### Tier 2: File I/O Operations (Priority: P1) + +*Focus: Clone/copy performance, directory creation, permission setting.* + +- **E. Clone Strategy Distribution** + - **Signal**: Time distribution across `clonefile` (macOS), `ficlone` (Linux), `hardlink`, `copy_file`. + - **Insight**: CoW cloning (clonefile/ficlone) should be near-instant. Fallback to copy indicates filesystem limitation. + - **Platforms**: + - macOS: Native `clonefile()` syscall for instant CoW cloning + - Linux: FICLONE ioctl (reflink), `copy_file_range()` fallback + - Windows: Regular async copy + +- **F. Directory Creation Overhead** + - **Signal**: Many `create_dir` spans with cumulative high duration. + - **Action**: Check if directory creation is deduplicated (DashSet cache should prevent duplicates). + +- **G. Permission Setting Overhead (Unix)** + - **Signal**: Significant time in `set_permissions` spans. + - **Action**: Batch permission setting or use async operations. + +- **H. Hardlink vs Copy Decision** + - **Signal**: Packages with install scripts using `copy_file` instead of `hardlink`. + - **Insight**: Hardlinks are used for packages without install scripts. Packages with scripts require full copies. + +### Tier 3: Decompression Pipeline (Priority: P2) + +*Focus: Gzip decode time, tar extraction throughput, streaming pipeline efficiency.* + +- **I. Gzip Decode Throughput** + - **Signal**: Long `gzip_decode` spans relative to download size. + - **Insight**: Gzip decoding is CPU-bound. Large packages may benefit from parallel decompression. + - **Action**: Check if decompression is happening in a blocking context. + +- **J. Tar Extraction Efficiency** + - **Signal**: Long `tar_extract` spans with many small files. + - **Insight**: Tar extraction is I/O-bound. Many small files cause syscall overhead. + - **Action**: Consider batching file writes (current batch: 100 files or 50MB). + +- **K. Streaming Pipeline Efficiency** + - **Signal**: Gaps between `gzip_decode` and `file_write_batch` spans. + - **Insight**: Two-stage pipeline (extract -> write) should overlap. + - **Action**: Check channel capacity (current: 500 entries). Increase if producer outpaces consumer. + +### Tier 4: Concurrency & Batching (Priority: P3) + +*Focus: Semaphore utilization, batching efficiency.* + +- **L. Semaphore Wait Time** + - **Signal**: Long waits for `semaphore_acquire` spans. + - **Insight**: Indicates concurrency bottleneck. + - **Config**: + - Download semaphore: 40 concurrent downloads + - File write semaphore: 16 concurrent writes + - **Action**: Adjust semaphore limits based on system capabilities. + +- **M. Batch Size Efficiency** + - **Signal**: Many small batches processed sequentially. + - **Current Config**: 100 files or 50MB per batch. + - **Action**: Tune batch size based on workload characteristics. + +--- + +## Step 3: Actionable Diagnostic Workflow + +1. **Quantitative Baseline**: Run the summary script with the **`TRACE_PROJECT`** environment variable. + - *Command*: `TRACE_PROJECT=examples/with-antd python3 agents/tools/analyze_pm_trace.py ` + +2. **Qualitative Timeline Scan**: Open the trace in `chrome://tracing` or `edge://tracing`. Look for: + - Network gaps (idle time between downloads) + - I/O bottlenecks (long sequential file operations) + - Pipeline stalls (gaps in streaming decode -> write) + +3. **Causal Attribution**: Identify the parent span of top bottlenecks to understand *why* they were invoked. + +4. **Final Reporting**: Summarize findings and save the report to `./agents/reports/utoopm_performance_report_YYYYMMDD_HHMMSS.md`. Include specific tiered signals and recommended actions. + +--- + +## Step 4: Cache Analysis + +**Goal**: Ensure warm installs are significantly faster than cold installs. + +- **Manifest Cache**: Check if package metadata is cached (`~/.cache/nm/manifests`). +- **Tarball Cache**: Check if downloaded packages are cached (`~/.cache/nm/tarballs`). +- **Resolved Marker**: Each extracted package has `_resolved` marker to skip re-extraction. + +**Red Flags**: +- Warm install time close to cold install (cache not effective) +- Re-downloading cached packages (cache invalidation issue) + +--- + +## Step 5: Optimization Playbook + +1. **Network Optimization** + - Use concurrent registry pings for faster selection + - Increase download concurrency if bandwidth allows + - Consider HTTP/2 multiplexing for registry requests + +2. **I/O Optimization** + - Prefer CoW cloning (clonefile/ficlone) over copy + - Use hardlinks for packages without install scripts + - Batch directory creation to reduce syscall overhead + +3. **Decompression Optimization** + - Increase streaming buffer size for large packages + - Consider parallel decompression for multi-core utilization + - Tune batch size for optimal memory/throughput tradeoff + +4. **Concurrency Tuning** + - Profile semaphore wait times + - Adjust limits based on system resources (CPU cores, disk IOPS, network bandwidth) + +--- + +## Resource Mapping + +| Operation | File | Key Span Names | +|-----------|------|----------------| +| Download | `util/downloader.rs` | `download`, `http_request` | +| Decompress | `util/downloader.rs` | `gzip_decode`, `tar_extract`, `unpack_stream` | +| Clone/Copy | `util/cloner.rs` | `clone_package`, `clonefile`, `ficlone`, `copy_file` | +| Registry | `util/registry.rs` | `ping_registry`, `select_registry` | +| Install | `service/install.rs` | `install_packages`, `resolve_package` | + +--- + +## Key Metrics to Track + +| Metric | Good | Warning | Critical | +|--------|------|---------|----------| +| Cold install time | < 30s | 30-60s | > 60s | +| Warm install time | < 5s | 5-15s | > 15s | +| Download throughput | > 10 MB/s | 5-10 MB/s | < 5 MB/s | +| Clone success rate | > 95% | 80-95% | < 80% | +| Retry rate | < 1% | 1-5% | > 5% | + +--- + +*Protocol Version: 1.0* +*Last Updated: 2026-02* diff --git a/crates/pm/Cargo.toml b/crates/pm/Cargo.toml index 90874cac0..2c53d5dd1 100644 --- a/crates/pm/Cargo.toml +++ b/crates/pm/Cargo.toml @@ -49,6 +49,7 @@ tokio-util = { version = "0.7", features = ["io"] } toml = { workspace = true } tracing = { workspace = true } tracing-appender = "0.2" +tracing-chrome = { version = "0.7.2", optional = true } tracing-subscriber = { workspace = true } utoo-ruborist = { path = "../ruborist" } @@ -61,3 +62,7 @@ openssl = { version = "0.10", features = ["vendored"] } [dev-dependencies] mockito = "1.7.0" + +[features] +default = [] +tracing-chrome = ["dep:tracing-chrome"] diff --git a/crates/pm/src/cmd/deps.rs b/crates/pm/src/cmd/deps.rs index 3de5a3d01..6d5a2cbd1 100644 --- a/crates/pm/src/cmd/deps.rs +++ b/crates/pm/src/cmd/deps.rs @@ -1,5 +1,6 @@ use anyhow::{Context as _, Result}; use std::path::Path; +use tracing::instrument; use utoo_ruborist::lock::PackageLock; use utoo_ruborist::service::build_deps as ruborist_build_deps; @@ -8,6 +9,8 @@ use crate::helper::lock::save_package_lock; use crate::service::workspace::WorkspaceService; use crate::util::logger::{finish_progress_bar, start_progress_bar}; +/// Build dependency tree using ruborist +#[instrument(name = "build_deps", skip_all)] pub async fn build_deps(cwd: &Path) -> Result { start_progress_bar(); diff --git a/crates/pm/src/helper/lock.rs b/crates/pm/src/helper/lock.rs index f42f2f74d..a5ff3b4ed 100644 --- a/crates/pm/src/helper/lock.rs +++ b/crates/pm/src/helper/lock.rs @@ -2,6 +2,7 @@ use anyhow::{Context as _, Result, anyhow}; use serde_json::Value; use std::collections::HashMap; use std::path::{Path, PathBuf}; +use tracing::instrument; use super::fs::Context; use crate::helper::workspace::find_workspaces; @@ -74,6 +75,7 @@ fn deps_map_equals_lock(pkg_deps: &HashMap, lock_field: Option<& *pkg_deps == lock_deps } +#[instrument(name = "ensure_package_lock", skip_all)] pub async fn ensure_package_lock(root_path: &Path) -> Result { // Check package.json exists in project directory if crate::fs::metadata(root_path.join("package.json")) @@ -411,6 +413,7 @@ pub async fn is_pkg_lock_outdated(root_path: &Path) -> Result { /// Build dependencies with tgz download. /// Used by `utoo install` command for faster installation. +#[instrument(name = "build_deps_with_download", skip_all)] async fn build_deps_with_download(cwd: &Path) -> Result { let options = Context::build_deps_options(cwd.to_path_buf()).await; ruborist_build_deps(options).await diff --git a/crates/pm/src/service/install.rs b/crates/pm/src/service/install.rs index 8588940d9..9db63f6da 100644 --- a/crates/pm/src/service/install.rs +++ b/crates/pm/src/service/install.rs @@ -7,6 +7,7 @@ use std::path::Path; use std::pin::Pin; use std::sync::Arc; use tokio::sync::Semaphore; +use tracing::instrument; use crate::helper::global_bin::get_global_bin_dir; use crate::helper::lock::{ @@ -238,6 +239,7 @@ async fn clean_unused_packages( Ok(()) } +#[instrument(name = "clean_deps", skip_all)] async fn clean_deps(groups: &HashMap>, cwd: &Path) -> Result<()> { let mut valid_packages = std::collections::HashSet::new(); for packages in groups.values() { @@ -270,6 +272,8 @@ async fn clean_deps(groups: &HashMap>, cwd: &Path) Ok(()) } +/// Install packages by dependency depth levels +#[instrument(name = "install_packages", skip_all, fields(depth_levels = groups.len()))] pub async fn install_packages( groups: &HashMap>, cache_dir: &Path, @@ -482,6 +486,7 @@ impl InstallService { Ok(()) } + #[instrument(name = "install", skip_all)] pub async fn install(ignore_scripts: bool, root_path: &Path) -> Result<()> { // Get PackageLock directly, avoiding redundant disk read/parse operations let package_lock = ensure_package_lock(root_path).await?; diff --git a/crates/pm/src/service/rebuild.rs b/crates/pm/src/service/rebuild.rs index fe217af60..a546fa9bc 100644 --- a/crates/pm/src/service/rebuild.rs +++ b/crates/pm/src/service/rebuild.rs @@ -1,6 +1,7 @@ use crate::service::package::PackageService; use anyhow::Result; use std::path::Path; +use tracing::instrument; use utoo_ruborist::lock::PackageLock; pub struct RebuildService; @@ -12,6 +13,7 @@ impl RebuildService { /// * `package_lock` - Package lock information in memory /// * `root_path` - Project root path /// * `bins_only` - Whether to only process binary file linking, skipping script execution + #[instrument(name = "rebuild", skip_all)] pub async fn rebuild( package_lock: &PackageLock, root_path: &Path, diff --git a/crates/pm/src/util/cloner.rs b/crates/pm/src/util/cloner.rs index cc44e7e5d..b7e62fc72 100644 --- a/crates/pm/src/util/cloner.rs +++ b/crates/pm/src/util/cloner.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; use std::path::{Path, PathBuf}; +use tracing::{Instrument, instrument}; use super::json::load_package_json_from_path; #[cfg(any(target_os = "macos", target_os = "linux"))] @@ -348,6 +349,8 @@ pub async fn find_real_src>(src: P) -> Option { None } +/// Internal clone implementation with platform-specific optimizations +#[instrument(name = "clone_dir", skip_all)] async fn clone(src: &Path, dst: &Path, find_real: bool) -> Result<()> { let real_src = if find_real { find_real_src(src) @@ -388,36 +391,46 @@ async fn clone(src: &Path, dst: &Path, find_real: bool) -> Result<()> { let src_c = CString::new(real_src.as_os_str().as_bytes())?; let dst_c = CString::new(dst.as_os_str().as_bytes())?; - Retry::spawn(create_retry_strategy(), || async { - match unsafe { clonefile(src_c.as_ptr(), dst_c.as_ptr(), 0) } { - 0 => { - tracing::debug!("clone {} to {} success", real_src.display(), dst.display()); - Ok(()) - } - _ => { - let _ = fs::remove_dir_all(dst).await.map_err(|e| { + Retry::spawn(create_retry_strategy(), || { + async { + match unsafe { clonefile(src_c.as_ptr(), dst_c.as_ptr(), 0) } { + 0 => { tracing::debug!( - "Failed to clean target directory {}: {}", - dst.display(), - e + "clone {} to {} success", + real_src.display(), + dst.display() ); - }); - Err(anyhow::anyhow!( - "Failed to clone file: {}", - std::io::Error::last_os_error() - )) + Ok(()) + } + _ => { + let _ = fs::remove_dir_all(dst).await.map_err(|e| { + tracing::debug!( + "Failed to clean target directory {}: {}", + dst.display(), + e + ); + }); + Err(anyhow::anyhow!( + "Failed to clone file: {}", + std::io::Error::last_os_error() + )) + } } } + .instrument(tracing::trace_span!("clonefile")) }) .await?; } #[cfg(target_os = "linux")] { - Retry::spawn(create_retry_strategy(), || async { - linux_clone::clone_dir(&real_src, dst).await?; - tracing::debug!("clone {} to {} success", real_src.display(), dst.display()); - Ok::<(), anyhow::Error>(()) + Retry::spawn(create_retry_strategy(), || { + async { + linux_clone::clone_dir(&real_src, dst).await?; + tracing::debug!("clone {} to {} success", real_src.display(), dst.display()); + Ok::<(), anyhow::Error>(()) + } + .instrument(tracing::trace_span!("linux_clone")) }) .await?; } @@ -427,10 +440,13 @@ async fn clone(src: &Path, dst: &Path, find_real: bool) -> Result<()> { use super::retry::create_retry_strategy; use tokio_retry::Retry; - Retry::spawn(create_retry_strategy(), || async { - windows_clone::clone_dir(&real_src, dst).await?; - tracing::debug!("clone {} to {} success", real_src.display(), dst.display()); - Ok::<(), anyhow::Error>(()) + Retry::spawn(create_retry_strategy(), || { + async { + windows_clone::clone_dir(&real_src, dst).await?; + tracing::debug!("clone {} to {} success", real_src.display(), dst.display()); + Ok::<(), anyhow::Error>(()) + } + .instrument(tracing::trace_span!("windows_copy")) }) .await?; } @@ -455,6 +471,7 @@ async fn validate_name_version(dst: &Path, name: &str, version: &str) -> bool { } /// Clone a package from cache to destination with name/version validation +#[instrument(name = "clone_package", skip_all, fields(pkg = %format!("{}@{}", name, version)))] pub async fn clone_package(src: &Path, dst: &Path, name: &str, version: &str) -> Result<()> { match crate::fs::try_exists(dst).await? { true if validate_name_version(dst, name, version).await => { diff --git a/crates/pm/src/util/downloader.rs b/crates/pm/src/util/downloader.rs index 8202833c2..188f5579e 100644 --- a/crates/pm/src/util/downloader.rs +++ b/crates/pm/src/util/downloader.rs @@ -11,6 +11,7 @@ use tokio::{fs::File, io::AsyncReadExt}; use tokio_retry::RetryIf; use tokio_tar::Archive; use tokio_util::io::StreamReader; +use tracing::{Instrument, instrument}; use super::retry::build_dns_cached_client; use super::retry::{RetryableError, create_retry_strategy}; @@ -33,6 +34,9 @@ fn lock_key(url: &str, dest: &Path) -> u64 { hasher.finish() } +/// Download and extract a tarball from URL to destination directory. +/// Uses streaming decompression pipeline for memory efficiency. +#[instrument(name = "download", skip_all, fields(url = %url))] pub async fn download(url: &str, dest: &Path) -> Result<()> { let start = std::time::Instant::now(); let key = lock_key(url, dest); @@ -50,35 +54,39 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> { RetryIf::spawn( create_retry_strategy(), - || async { - let response = DOWNLOADER_CLIENT - .get(url) - .send() - .await - .with_context(|| format!("Failed to send HTTP request to {url}")) - .map_err(|e| RetryableError::Temporary(format!("Network error: {e}")))?; - - match response.status() { - StatusCode::OK => { - if let Err(e) = try_unpack_stream_direct(response, dest).await { - tracing::debug!("Stream unpacking failed {}: {:#}", dest.display(), e); - return Err(RetryableError::Temporary(format!( - "Network error during streaming: {e:#}" - ))); + || { + let http_span = tracing::trace_span!("http_request", url = %url); + async { + let response = DOWNLOADER_CLIENT + .get(url) + .send() + .await + .with_context(|| format!("Failed to send HTTP request to {url}")) + .map_err(|e| RetryableError::Temporary(format!("Network error: {e}")))?; + + match response.status() { + StatusCode::OK => { + if let Err(e) = try_unpack_stream_direct(response, dest).await { + tracing::debug!("Stream unpacking failed {}: {:#}", dest.display(), e); + return Err(RetryableError::Temporary(format!( + "Network error during streaming: {e:#}" + ))); + } + Ok(()) + } + StatusCode::NOT_FOUND => { + tracing::debug!("URL not found {url}"); + Err(RetryableError::Permanent(format!("URL not found {url}"))) + } + status => { + tracing::debug!("Error: {status}, url: {url}, retrying"); + Err(RetryableError::Temporary(format!( + "HTTP error: {status}, url: {url}" + ))) } - Ok(()) - } - StatusCode::NOT_FOUND => { - tracing::debug!("URL not found {url}"); - Err(RetryableError::Permanent(format!("URL not found {url}"))) - } - status => { - tracing::debug!("Error: {status}, url: {url}, retrying"); - Err(RetryableError::Temporary(format!( - "HTTP error: {status}, url: {url}" - ))) } } + .instrument(http_span) }, |e: &RetryableError| matches!(e, RetryableError::Temporary(_)), ) @@ -90,7 +98,9 @@ pub async fn download(url: &str, dest: &Path) -> Result<()> { Ok(()) } -// Stream-based unpacking directly from HTTP Response +/// Stream-based unpacking directly from HTTP Response. +/// Uses a two-stage pipeline: gzip decode + tar extract -> concurrent file write. +#[instrument(name = "unpack_stream", skip_all)] async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> { use std::sync::Arc; use tokio::sync::{Semaphore, mpsc}; @@ -110,135 +120,148 @@ async fn try_unpack_stream_direct(response: Response, dest: &Path) -> Result<()> let dest = dest.to_path_buf(); - // Stage 1: Streaming tar extraction + // Stage 1: Streaming tar extraction (gzip decode + tar extract) let extraction_task = { let entry_tx = entry_tx.clone(); let dest = dest.clone(); - tokio::spawn(async move { - // Create streaming gzip decoder - let gzip_decoder = GzipDecoder::new(stream_reader); - let mut tar_archive = Archive::new(gzip_decoder); - let mut entries = tar_archive.entries()?; - - while let Some(entry_result) = entries.next().await { - let mut entry = entry_result.with_context(|| "Failed to read tar entry")?; - let path = entry - .path() - .with_context(|| "Failed to get entry path")? - .into_owned(); - let full_path = dest.join(&path); - let is_dir = entry.header().entry_type().is_dir(); - - // Only process files, skip directories (they'll be created when writing files) - if !is_dir { - // Stream file content - let mut content = Vec::new(); - entry - .read_to_end(&mut content) - .await - .with_context(|| format!("Failed to read tar entry: {}", path.display()))?; - - // Extract file permission mode - let mode = entry.header().mode().unwrap_or(0o644); - - let size = content.len(); - let extracted_entry = ExtractedEntry { - path: full_path, - content, - size, - mode, - }; - - if entry_tx.send(extracted_entry).await.is_err() { - break; + tokio::spawn( + async move { + // Create streaming gzip decoder + let gzip_decoder = GzipDecoder::new(stream_reader); + let mut tar_archive = Archive::new(gzip_decoder); + let mut entries = tar_archive.entries()?; + let mut file_count = 0u32; + + while let Some(entry_result) = entries.next().await { + let mut entry = entry_result.with_context(|| "Failed to read tar entry")?; + let path = entry + .path() + .with_context(|| "Failed to get entry path")? + .into_owned(); + let full_path = dest.join(&path); + let is_dir = entry.header().entry_type().is_dir(); + + // Only process files, skip directories (they'll be created when writing files) + if !is_dir { + // Stream file content + let mut content = Vec::new(); + entry.read_to_end(&mut content).await.with_context(|| { + format!("Failed to read tar entry: {}", path.display()) + })?; + + // Extract file permission mode + let mode = entry.header().mode().unwrap_or(0o644); + + let size = content.len(); + let extracted_entry = ExtractedEntry { + path: full_path, + content, + size, + mode, + }; + + if entry_tx.send(extracted_entry).await.is_err() { + break; + } + file_count += 1; } } - } - Ok::<(), anyhow::Error>(()) - }) + tracing::trace!(file_count, "tar extraction completed"); + Ok::<(), anyhow::Error>(()) + } + .instrument(tracing::trace_span!("tar_extract")), + ) }; // Stage 2: Concurrent file writing with cached directory creation let file_writing_task = { - tokio::spawn(async move { - use dashmap::DashSet; - - let semaphore = Arc::new(Semaphore::new(16)); - let created_dirs = Arc::new(DashSet::::new()); - let mut write_tasks = Vec::new(); - let mut batch_size = 0; - let mut total_bytes = 0; - const MAX_BATCH_SIZE: usize = 100; - const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB - - while let Some(entry) = entry_rx.recv().await { - let semaphore = Arc::clone(&semaphore); - let created_dirs = Arc::clone(&created_dirs); - batch_size += 1; - total_bytes += entry.size; - - let task = tokio::spawn(async move { - let _permit = semaphore.acquire().await.unwrap(); - - // Ensure parent directory exists using cache - if let Some(parent) = entry.path.parent() { - let parent_path = parent.to_path_buf(); - - // Check cache first to avoid duplicate directory creation - if !created_dirs.contains(&parent_path) { - if let Err(e) = crate::fs::create_dir_all(&parent_path).await { - tracing::debug!( - "Failed to create parent dir {}: {}", - parent_path.display(), - e - ); - return Err(anyhow::anyhow!( - "Failed to create parent directory: {e}" - ) - .context(format!("Parent directory: {}", parent_path.display()))); + tokio::spawn( + async move { + use dashmap::DashSet; + + let semaphore = Arc::new(Semaphore::new(16)); + let created_dirs = Arc::new(DashSet::::new()); + let mut write_tasks = Vec::new(); + let mut batch_size = 0; + let mut total_bytes: usize = 0; + let mut total_files: u32 = 0; + const MAX_BATCH_SIZE: usize = 100; + const MAX_BATCH_BYTES: usize = 50 * 1024 * 1024; // 50MB + + while let Some(entry) = entry_rx.recv().await { + let semaphore = Arc::clone(&semaphore); + let created_dirs = Arc::clone(&created_dirs); + batch_size += 1; + total_bytes += entry.size; + total_files += 1; + + let task = tokio::spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + + // Ensure parent directory exists using cache + if let Some(parent) = entry.path.parent() { + let parent_path = parent.to_path_buf(); + + // Check cache first to avoid duplicate directory creation + if !created_dirs.contains(&parent_path) { + if let Err(e) = crate::fs::create_dir_all(&parent_path).await { + tracing::debug!( + "Failed to create parent dir {}: {}", + parent_path.display(), + e + ); + return Err(anyhow::anyhow!( + "Failed to create parent directory: {e}" + ) + .context(format!( + "Parent directory: {}", + parent_path.display() + ))); + } + + created_dirs.insert(parent_path); } - - created_dirs.insert(parent_path); } - } - // Write file content - if let Err(e) = crate::fs::write(&entry.path, &entry.content).await { - tracing::debug!("Failed to write file {}: {}", entry.path.display(), e); - return Err(anyhow::anyhow!("Write failed: {e}") - .context(format!("File path: {}", entry.path.display()))); - } + // Write file content + if let Err(e) = crate::fs::write(&entry.path, &entry.content).await { + tracing::debug!("Failed to write file {}: {}", entry.path.display(), e); + return Err(anyhow::anyhow!("Write failed: {e}") + .context(format!("File path: {}", entry.path.display()))); + } - // Set original file permissions from tar entry (Unix only) - set_file_permissions(&entry.path, entry.mode).await?; + // Set original file permissions from tar entry (Unix only) + set_file_permissions(&entry.path, entry.mode).await?; - Ok::<(), anyhow::Error>(()) - }); + Ok::<(), anyhow::Error>(()) + }); - write_tasks.push(task); + write_tasks.push(task); - // Process in batches to manage memory and concurrency - if batch_size >= MAX_BATCH_SIZE - || total_bytes >= MAX_BATCH_BYTES - || entry_rx.is_empty() - { - for task in write_tasks.drain(..) { - task.await??; + // Process in batches to manage memory and concurrency + if batch_size >= MAX_BATCH_SIZE + || total_bytes >= MAX_BATCH_BYTES + || entry_rx.is_empty() + { + for task in write_tasks.drain(..) { + task.await??; + } + batch_size = 0; } - batch_size = 0; - total_bytes = 0; } - } - // Wait for remaining tasks - for task in write_tasks { - task.await??; - } + // Wait for remaining tasks + for task in write_tasks { + task.await??; + } - Ok::<(), anyhow::Error>(()) - }) + tracing::trace!(total_files, total_bytes, "file writing completed"); + Ok::<(), anyhow::Error>(()) + } + .instrument(tracing::trace_span!("file_write_batch")), + ) }; // Close sender channel diff --git a/crates/pm/src/util/linker.rs b/crates/pm/src/util/linker.rs index 792f442f9..e6cc200aa 100644 --- a/crates/pm/src/util/linker.rs +++ b/crates/pm/src/util/linker.rs @@ -1,6 +1,7 @@ use crate::fs; use anyhow::{Context, Result}; use std::path::{Path, PathBuf}; +use tracing::instrument; /// Convert a path to absolute path fn to_absolute(path: &Path) -> Result { @@ -12,6 +13,7 @@ fn to_absolute(path: &Path) -> Result { } } +#[instrument(name = "link", skip_all)] pub async fn link(src: &Path, dst: &Path) -> Result<()> { // Convert to absolute paths let abs_src = to_absolute(src)?; diff --git a/crates/pm/src/util/logger.rs b/crates/pm/src/util/logger.rs index d17cc63c0..f424a1d39 100644 --- a/crates/pm/src/util/logger.rs +++ b/crates/pm/src/util/logger.rs @@ -12,6 +12,20 @@ use tracing_subscriber::{ EnvFilter, Layer, Registry, fmt, layer::SubscriberExt, util::SubscriberInitExt, }; +#[cfg(feature = "tracing-chrome")] +use tracing_chrome::FlushGuard as ChromeFlushGuard; + +/// Unified guard for different tracing backends. +/// The inner guards are kept alive via RAII pattern to ensure proper flushing. +#[allow(dead_code)] +pub enum TracingGuard { + /// Standard file logging guard + File(WorkerGuard), + /// Chrome trace format guard (for performance analysis) + #[cfg(feature = "tracing-chrome")] + Chrome(ChromeFlushGuard), +} + pub static PROGRESS_BAR: Lazy = Lazy::new(|| { let pb = ProgressBar::new(0).with_style( ProgressStyle::with_template("{spinner:.blue} +{pos:.green} ~{len:.magenta} {wide_msg}") @@ -27,7 +41,61 @@ static LOG_FILE_PATH: OnceCell = OnceCell::new(); /// Initialize tracing subscriber with console and file output /// Returns (log_path, guard) - the guard must be kept alive for the duration of the program -pub fn init_tracing(verbose: bool) -> Result<(PathBuf, WorkerGuard)> { +/// +/// If `TRACING_CHROME` environment variable is set (requires `tracing-chrome` feature), +/// outputs Chrome Trace format for performance analysis. +/// The trace can be viewed in chrome://tracing. +/// +/// Example: `TRACING_CHROME=./trace.json utoo install` +pub fn init_tracing(verbose: bool) -> Result<(PathBuf, TracingGuard)> { + // Check for Chrome Trace mode (for performance analysis) + #[cfg(feature = "tracing-chrome")] + if let Ok(chrome_file) = env::var("TRACING_CHROME") { + return init_chrome_tracing(&chrome_file); + } + + // Standard tracing mode + init_standard_tracing(verbose) +} + +/// Initialize Chrome Trace format output for performance analysis +#[cfg(feature = "tracing-chrome")] +fn init_chrome_tracing(chrome_file: &str) -> Result<(PathBuf, TracingGuard)> { + use tracing_chrome::ChromeLayerBuilder; + + let mut builder = ChromeLayerBuilder::new().include_args(true); + + // If the value is not "1" or "true", treat it as a file path + let trace_path = if chrome_file != "1" && chrome_file != "true" { + let path = PathBuf::from(chrome_file); + // Ensure parent directory exists + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + builder = builder.file(&path); + path + } else { + // Default to temp directory + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + env::temp_dir().join(format!("utoo-trace-{timestamp}.json")) + }; + + let (chrome_layer, chrome_guard) = builder.build(); + + // Use trace level for Chrome output to capture all spans + Registry::default() + .with(chrome_layer) + .with(EnvFilter::new("utoo=trace")) + .init(); + + Ok((trace_path, TracingGuard::Chrome(chrome_guard))) +} + +/// Initialize standard file and console tracing +fn init_standard_tracing(verbose: bool) -> Result<(PathBuf, TracingGuard)> { // 1. Build environment filters // Note: Binary name is "utoo", so module paths start with "utoo::" not "utoo_pm::" @@ -78,7 +146,7 @@ pub fn init_tracing(verbose: bool) -> Result<(PathBuf, WorkerGuard)> { ) .init(); - Ok((log_path, guard)) + Ok((log_path, TracingGuard::File(guard))) } /// Get the path to the current log file diff --git a/crates/pm/src/util/registry.rs b/crates/pm/src/util/registry.rs index 682fe8ae5..a04bfdfdf 100644 --- a/crates/pm/src/util/registry.rs +++ b/crates/pm/src/util/registry.rs @@ -3,6 +3,7 @@ //! Provides functions for selecting the fastest npm registry by ping latency. use colored::Colorize; +use tracing::instrument; /// Default registries for auto-selection pub const REGISTRY_NPMMIRROR: &str = "https://registry.npmmirror.com"; @@ -19,6 +20,7 @@ struct PingResult { } /// Ping a registry and measure latency +#[instrument(name = "ping_registry", skip(client), fields(registry = %registry_url))] async fn ping_registry(client: &reqwest::Client, registry_url: &str) -> PingResult { let ping_url = format!("{}/-/ping", registry_url); let start = std::time::Instant::now(); @@ -38,6 +40,7 @@ async fn ping_registry(client: &reqwest::Client, registry_url: &str) -> PingResu } /// Select fastest registry by concurrent ping +#[instrument(name = "select_registry")] pub async fn select_fastest_registry() -> String { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_millis(PING_TIMEOUT_MS))