-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocument_loader.py
More file actions
204 lines (165 loc) · 7.28 KB
/
document_loader.py
File metadata and controls
204 lines (165 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""Document loading and preprocessing for PDF/text books."""
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
import fitz # PyMuPDF
@dataclass
class DocumentSection:
"""Represents a section of the document with hierarchy info."""
content: str
page_number: int
section_title: Optional[str] = None
chapter: Optional[str] = None
hierarchy_level: int = 0 # 0=chapter, 1=section, 2=subsection
metadata: Dict[str, Any] = field(default_factory=dict)
class BookLoader:
"""Load and preprocess books while preserving structure."""
# Common chapter/section patterns in technical books
CHAPTER_PATTERNS = [
r'^Chapter\s+(\d+)[:\s]+(.+)$',
r'^CHAPTER\s+(\d+)[:\s]+(.+)$',
r'^(\d+)\.\s+([A-Z][^.]+)$',
r'^Part\s+(\d+|[IVX]+)[:\s]+(.+)$',
]
SECTION_PATTERNS = [
r'^(\d+\.\d+)\s+(.+)$',
r'^(\d+\.\d+\.\d+)\s+(.+)$',
r'^Section\s+(\d+)[:\s]+(.+)$',
]
# Cybersecurity-specific patterns
CVE_PATTERN = r'CVE-\d{4}-\d{4,}'
SEVERITY_KEYWORDS = {
'critical': ['critical', 'severe', 'emergency', 'zero-day', '0-day'],
'high': ['high', 'important', 'significant', 'serious'],
'medium': ['medium', 'moderate', 'notable'],
'low': ['low', 'minor', 'informational']
}
def __init__(self):
self.sections: List[DocumentSection] = []
self.current_chapter = None
self.current_section = None
def load_pdf(self, pdf_path: str) -> List[DocumentSection]:
"""Load PDF and extract structured sections."""
path = Path(pdf_path)
if not path.exists():
raise FileNotFoundError(f"Book not found: {pdf_path}")
doc = fitz.open(pdf_path)
self.sections = []
for page_num, page in enumerate(doc, start=1):
text = page.get_text("text")
blocks = page.get_text("blocks")
# Process each text block
for block in blocks:
if block[6] == 0: # Text block
block_text = block[4].strip()
if block_text:
self._process_block(block_text, page_num)
doc.close()
return self._merge_small_sections()
def load_text(self, text_path: str) -> List[DocumentSection]:
"""Load plain text file with structure detection."""
path = Path(text_path)
if not path.exists():
raise FileNotFoundError(f"Book not found: {text_path}")
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
# Split by double newlines to get paragraphs
paragraphs = re.split(r'\n\s*\n', content)
self.sections = []
for i, para in enumerate(paragraphs):
para = para.strip()
if para:
self._process_block(para, page_number=i // 40 + 1) # Estimate pages
return self._merge_small_sections()
def _process_block(self, text: str, page_number: int):
"""Process a text block and detect structure."""
lines = text.split('\n')
first_line = lines[0].strip() if lines else ""
# Check for chapter heading
for pattern in self.CHAPTER_PATTERNS:
match = re.match(pattern, first_line, re.IGNORECASE)
if match:
self.current_chapter = match.group(2) if len(match.groups()) > 1 else first_line
self.current_section = None
return
# Check for section heading
for pattern in self.SECTION_PATTERNS:
match = re.match(pattern, first_line)
if match:
self.current_section = match.group(2) if len(match.groups()) > 1 else first_line
return
# Regular content
metadata = self._extract_metadata(text)
section = DocumentSection(
content=text,
page_number=page_number,
section_title=self.current_section,
chapter=self.current_chapter,
hierarchy_level=2 if self.current_section else (1 if self.current_chapter else 0),
metadata=metadata
)
self.sections.append(section)
def _extract_metadata(self, text: str) -> Dict[str, Any]:
"""Extract cybersecurity-specific metadata."""
metadata = {}
# Extract CVEs
cves = re.findall(self.CVE_PATTERN, text)
if cves:
metadata['cves'] = list(set(cves))
# Detect severity level
text_lower = text.lower()
for severity, keywords in self.SEVERITY_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
metadata['severity'] = severity
break
# Detect technical categories
categories = self._detect_categories(text_lower)
if categories:
metadata['categories'] = categories
return metadata
def _detect_categories(self, text: str) -> List[str]:
"""Detect cybersecurity categories in text."""
category_keywords = {
'network_security': ['firewall', 'ids', 'ips', 'packet', 'tcp', 'udp', 'dns', 'vpn'],
'web_security': ['xss', 'sql injection', 'csrf', 'owasp', 'http', 'api'],
'cryptography': ['encryption', 'decrypt', 'hash', 'aes', 'rsa', 'certificate', 'tls', 'ssl'],
'malware': ['malware', 'virus', 'trojan', 'ransomware', 'rootkit', 'botnet'],
'authentication': ['authentication', 'authorization', 'password', 'mfa', '2fa', 'oauth'],
'incident_response': ['incident', 'forensics', 'investigation', 'breach', 'response'],
'compliance': ['compliance', 'gdpr', 'hipaa', 'pci', 'iso 27001', 'nist'],
'cloud_security': ['cloud', 'aws', 'azure', 'gcp', 'container', 'kubernetes', 'docker'],
'memory_safety': ['buffer overflow', 'memory corruption', 'heap', 'stack', 'use-after-free'],
}
detected = []
for category, keywords in category_keywords.items():
if any(kw in text for kw in keywords):
detected.append(category)
return detected
def _merge_small_sections(self) -> List[DocumentSection]:
"""Merge very small consecutive sections."""
if not self.sections:
return []
merged = []
buffer = None
for section in self.sections:
if buffer is None:
buffer = section
elif len(buffer.content) < 200 and buffer.chapter == section.chapter:
# Merge small sections
buffer.content += "\n\n" + section.content
buffer.metadata.update(section.metadata)
else:
merged.append(buffer)
buffer = section
if buffer:
merged.append(buffer)
return merged
def load_book(path: str) -> List[DocumentSection]:
"""Convenience function to load a book."""
loader = BookLoader()
path_obj = Path(path)
if path_obj.suffix.lower() == '.pdf':
return loader.load_pdf(path)
else:
return loader.load_text(path)