-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathembeddings.py
More file actions
306 lines (245 loc) · 9.62 KB
/
embeddings.py
File metadata and controls
306 lines (245 loc) · 9.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""High-dimensional embedding generation with optional hyperbolic projection."""
import numpy as np
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
import torch
from tqdm import tqdm
@dataclass
class EmbeddedChunk:
"""A chunk with its embedding vector."""
chunk_id: str
content: str
embedding: np.ndarray
metadata: Dict[str, Any]
# Optional hyperbolic embedding
hyperbolic_embedding: Optional[np.ndarray] = None
class EmbeddingEngine:
"""
High-dimensional embedding engine using Sentence Transformers.
Supports:
- Dense semantic embeddings (BGE, E5, etc.)
- Optional hyperbolic projection for hierarchical data
- Batch processing for 800+ page books
"""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda",
normalize: bool = True,
use_hyperbolic: bool = False
):
self.model_name = model_name
self.device = device if torch.cuda.is_available() else "cpu"
self.normalize = normalize
self.use_hyperbolic = use_hyperbolic
self._model = None
self._hyperbolic_mapper = None
@property
def model(self):
"""Lazy load the embedding model."""
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name, device=self.device)
print(f"Loaded embedding model: {self.model_name}")
print(f"Embedding dimension: {self._model.get_sentence_embedding_dimension()}")
return self._model
@property
def dimension(self) -> int:
"""Get the embedding dimension."""
return self.model.get_sentence_embedding_dimension()
def embed_texts(
self,
texts: List[str],
batch_size: int = 32,
show_progress: bool = True
) -> np.ndarray:
"""
Generate embeddings for a list of texts.
Args:
texts: List of text strings to embed
batch_size: Batch size for processing
show_progress: Whether to show progress bar
Returns:
numpy array of shape (len(texts), embedding_dim)
"""
# Add instruction prefix for BGE models (improves retrieval)
if "bge" in self.model_name.lower():
texts = [f"Represent this cybersecurity document for retrieval: {t}" for t in texts]
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress,
normalize_embeddings=self.normalize,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""
Generate embedding for a query (with query-specific prefix).
Args:
query: The search query
Returns:
numpy array of shape (embedding_dim,)
"""
# BGE models use different prefixes for queries
if "bge" in self.model_name.lower():
query = f"Represent this question for searching relevant passages: {query}"
embedding = self.model.encode(
query,
normalize_embeddings=self.normalize,
convert_to_numpy=True
)
return embedding
def project_to_hyperbolic(self, embeddings: np.ndarray) -> np.ndarray:
"""
Project Euclidean embeddings to Poincaré ball (hyperbolic space).
This is useful for hierarchical cybersecurity concepts where
relationships form tree-like structures.
"""
if not self.use_hyperbolic:
return embeddings
try:
import geoopt
except ImportError:
print("Warning: geoopt not installed, skipping hyperbolic projection")
return embeddings
# Create Poincaré ball manifold
ball = geoopt.PoincareBall()
# Convert to tensor
emb_tensor = torch.tensor(embeddings, dtype=torch.float32)
# Project to hyperbolic space using exponential map
# First normalize to unit ball
norms = torch.norm(emb_tensor, dim=1, keepdim=True)
normalized = emb_tensor / (norms + 1e-8)
# Scale to fit in Poincaré ball (radius < 1)
scaled = normalized * 0.9 # Leave margin from boundary
# Apply exponential map from origin
origin = torch.zeros_like(scaled[0])
hyperbolic = ball.expmap(origin, scaled)
return hyperbolic.numpy()
class ContextualEmbedder:
"""
Embedder that adds contextual information to improve retrieval.
Implements the "Contextual Breadcrumbs" strategy from the plan.
"""
def __init__(self, base_engine: EmbeddingEngine):
self.engine = base_engine
def create_contextual_text(
self,
content: str,
chapter: Optional[str] = None,
section: Optional[str] = None,
summary: Optional[str] = None,
categories: Optional[List[str]] = None
) -> str:
"""
Create enriched text with contextual information prepended.
This "anchors" the embedding in the semantic space by adding
structural context that helps with retrieval.
"""
context_parts = []
# Add hierarchical breadcrumbs
if chapter:
context_parts.append(f"Chapter: {chapter}")
if section:
context_parts.append(f"Section: {section}")
# Add categories as tags
if categories:
context_parts.append(f"Topics: {', '.join(categories)}")
# Add summary if available
if summary:
context_parts.append(f"Summary: {summary}")
# Combine context with content
if context_parts:
context = " | ".join(context_parts)
return f"[{context}]\n\n{content}"
return content
def embed_chunks(
self,
chunks: List[Any], # List of Chunk objects
batch_size: int = 32
) -> List[EmbeddedChunk]:
"""
Embed chunks with contextual enrichment.
"""
# Create contextual texts
texts = []
for chunk in chunks:
contextual_text = self.create_contextual_text(
content=chunk.content,
chapter=chunk.chapter,
section=chunk.section,
summary=chunk.summary,
categories=chunk.metadata.get('categories')
)
texts.append(contextual_text)
# Generate embeddings
embeddings = self.engine.embed_texts(texts, batch_size=batch_size)
# Optionally project to hyperbolic space
hyperbolic_embeddings = None
if self.engine.use_hyperbolic:
hyperbolic_embeddings = self.engine.project_to_hyperbolic(embeddings)
# Create EmbeddedChunk objects
embedded_chunks = []
for i, chunk in enumerate(chunks):
embedded = EmbeddedChunk(
chunk_id=chunk.id,
content=chunk.content,
embedding=embeddings[i],
metadata=chunk.to_dict(),
hyperbolic_embedding=hyperbolic_embeddings[i] if hyperbolic_embeddings is not None else None
)
embedded_chunks.append(embedded)
return embedded_chunks
class VectorArithmetic:
"""
Enables semantic vector arithmetic like: King - Man + Woman = Queen
For cybersecurity: Zero Trust + Cloud = Cloud-Native Identity
"""
def __init__(self, engine: EmbeddingEngine):
self.engine = engine
self._concept_cache: Dict[str, np.ndarray] = {}
def get_concept_vector(self, concept: str) -> np.ndarray:
"""Get or compute vector for a concept."""
if concept not in self._concept_cache:
self._concept_cache[concept] = self.engine.embed_query(concept)
return self._concept_cache[concept]
def compute(self, expression: str) -> np.ndarray:
"""
Compute vector arithmetic from expression.
Example: "Zero Trust + Cloud Architecture - On-Premise"
"""
# Parse the expression
expression = expression.strip()
# Split by + and -
parts = []
current = ""
sign = 1
for char in expression + "+": # Add + to flush last term
if char in "+-":
if current.strip():
parts.append((sign, current.strip()))
sign = 1 if char == "+" else -1
current = ""
else:
current += char
# Compute result vector
result = np.zeros(self.engine.dimension)
for sign, concept in parts:
vec = self.get_concept_vector(concept)
result += sign * vec
# Normalize result
result = result / (np.linalg.norm(result) + 1e-8)
return result
def analogy(self, a: str, b: str, c: str) -> np.ndarray:
"""
Compute analogy: A is to B as C is to ?
Example: "SQL Injection" is to "Web Application" as "?" is to "Database"
Formula: B - A + C
"""
vec_a = self.get_concept_vector(a)
vec_b = self.get_concept_vector(b)
vec_c = self.get_concept_vector(c)
result = vec_b - vec_a + vec_c
result = result / (np.linalg.norm(result) + 1e-8)
return result