Files
2025-12-09 12:13:01 +01:00

376 lines
13 KiB
Python

"""
Streaming parser for large HTML documents.
"""
import io
from typing import Dict, Any, TYPE_CHECKING
from lxml import etree
from lxml.html import HtmlElement
from edgar.documents.config import ParserConfig
from edgar.documents.exceptions import HTMLParsingError, DocumentTooLargeError
# Use TYPE_CHECKING to avoid circular imports
if TYPE_CHECKING:
from edgar.documents.document import Document, DocumentMetadata
from edgar.documents.nodes import DocumentNode, HeadingNode, ParagraphNode, TextNode, SectionNode, ContainerNode
from edgar.documents.table_nodes import TableNode
from edgar.documents.types import SemanticType
class StreamingParser:
"""
Streaming parser for large HTML documents.
Processes documents in chunks to minimize memory usage
while maintaining parse quality.
"""
# Chunk size for streaming (1MB)
CHUNK_SIZE = 1024 * 1024
# Maximum node buffer before flush
MAX_NODE_BUFFER = 1000
def __init__(self, config: ParserConfig, strategies: Dict[str, Any]):
"""
Initialize streaming parser.
Args:
config: Parser configuration
strategies: Parsing strategies to use
"""
self.config = config
self.strategies = strategies
self._reset_state()
def _reset_state(self):
"""Reset parser state."""
# Import here to avoid circular import
from edgar.documents.document import DocumentMetadata
from edgar.documents.nodes import DocumentNode
self.current_section = None
self.node_buffer = []
self.metadata = DocumentMetadata()
self.root = DocumentNode()
self.current_parent = self.root
self.tag_stack = []
self.text_buffer = []
self.in_table = False
self.table_buffer = []
self.bytes_processed = 0
def parse(self, html: str) -> "Document":
"""
Parse HTML in streaming mode.
Args:
html: HTML content to parse
Returns:
Parsed Document
Raises:
DocumentTooLargeError: If document exceeds size limit
HTMLParsingError: If parsing fails
"""
self._reset_state()
# Store original HTML BEFORE parsing (needed for TOC-based section detection)
original_html = html
try:
# Create streaming parser
parser = etree.iterparse(
io.BytesIO(html.encode('utf-8')),
events=('start', 'end'),
html=True,
recover=True,
encoding='utf-8'
)
# Process events
for event, elem in parser:
self._process_event(event, elem)
# Check size limit
self.bytes_processed += len(etree.tostring(elem, encoding='unicode', method='html'))
if self.bytes_processed > self.config.max_document_size:
raise DocumentTooLargeError(self.bytes_processed, self.config.max_document_size)
# Flush buffer if needed
if len(self.node_buffer) >= self.MAX_NODE_BUFFER:
self._flush_buffer()
# Clean up processed elements to save memory
elem.clear()
while elem.getprevious() is not None:
parent = elem.getparent()
if parent is not None:
del parent[0]
else:
break
# Final flush
self._flush_buffer()
# Store original HTML in metadata for section detection (TOC analysis)
self.metadata.original_html = original_html
# Create document (import here to avoid circular import)
from edgar.documents.document import Document
document = Document(root=self.root, metadata=self.metadata)
# Store config reference (required for section detection)
document._config = self.config
# Apply post-processing
from edgar.documents.processors.postprocessor import DocumentPostprocessor
postprocessor = DocumentPostprocessor(self.config)
document = postprocessor.process(document)
return document
except etree.ParseError as e:
raise HTMLParsingError(f"Streaming parse failed: {str(e)}")
except Exception as e:
if isinstance(e, (DocumentTooLargeError, HTMLParsingError)):
raise
raise HTMLParsingError(f"Unexpected error during streaming parse: {str(e)}")
def _process_event(self, event: str, elem: HtmlElement):
"""Process a parse event."""
if event == 'start':
self._handle_start_tag(elem)
elif event == 'end':
self._handle_end_tag(elem)
def _handle_start_tag(self, elem: HtmlElement):
"""Handle opening tag."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import ContainerNode
tag = elem.tag.lower()
# Track tag stack
self.tag_stack.append(tag)
# Extract metadata from early elements
if tag == 'title' and elem.text:
self._extract_title_metadata(elem.text)
elif tag == 'meta':
self._extract_meta_metadata(elem)
# Handle specific tags
if tag == 'body':
# Create a container for body content
body_container = ContainerNode(tag_name='body')
self.root.add_child(body_container)
self.current_parent = body_container
elif tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
self._start_heading(elem)
elif tag == 'p':
self._start_paragraph(elem)
elif tag == 'table':
self._start_table(elem)
elif tag == 'section':
self._start_section(elem)
def _handle_end_tag(self, elem: HtmlElement):
"""Handle closing tag."""
tag = elem.tag.lower()
# Remove from tag stack
if self.tag_stack and self.tag_stack[-1] == tag:
self.tag_stack.pop()
# Handle specific tags
if tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
self._end_heading(elem)
elif tag == 'p':
self._end_paragraph(elem)
elif tag == 'table':
self._end_table(elem)
elif tag == 'section':
self._end_section(elem)
elif tag == 'body':
# When body ends, flush any remaining nodes
self._flush_buffer()
# Handle text content
if elem.text:
self.text_buffer.append(elem.text.strip())
if elem.tail:
self.text_buffer.append(elem.tail.strip())
def _start_heading(self, elem: HtmlElement):
"""Start processing a heading."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import HeadingNode
level = int(elem.tag[1])
text = self._get_text_content(elem)
# Create heading node
heading = HeadingNode(
level=level,
content=text
)
# Check if this is a section header
if self.strategies.get('header_detection'):
detector = self.strategies['header_detection']
if detector.is_section_header(text, elem):
heading.semantic_type = SemanticType.SECTION_HEADER
self.node_buffer.append(heading)
def _end_heading(self, elem: HtmlElement):
"""End processing a heading."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import HeadingNode
# Get text content from element
text = self._get_text_content(elem)
if text and self.node_buffer and isinstance(self.node_buffer[-1], HeadingNode):
self.node_buffer[-1].content = text
# Clear any accumulated text buffer
self.text_buffer.clear()
def _start_paragraph(self, elem: HtmlElement):
"""Start processing a paragraph."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import ParagraphNode
para = ParagraphNode()
# Get style if present
style_attr = elem.get('style')
if style_attr and self.strategies.get('style_parser'):
style_parser = self.strategies['style_parser']
para.style = style_parser.parse(style_attr)
self.node_buffer.append(para)
def _end_paragraph(self, elem: HtmlElement):
"""End processing a paragraph."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import ParagraphNode, TextNode
# Get text content from element
text = self._get_text_content(elem)
if text and self.node_buffer and isinstance(self.node_buffer[-1], ParagraphNode):
text_node = TextNode(content=text)
self.node_buffer[-1].add_child(text_node)
# Clear any accumulated text buffer
self.text_buffer.clear()
def _start_table(self, elem: HtmlElement):
"""Start processing a table."""
self.in_table = True
self.table_buffer = []
# Store table element for later processing
self.table_elem = elem
def _end_table(self, elem: HtmlElement):
"""End processing a table."""
# Import node types at runtime to avoid circular imports
from edgar.documents.table_nodes import TableNode
self.in_table = False
# Process table with table processor if available
if self.strategies.get('table_processing'):
processor = self.strategies['table_processing']
table_node = processor.process(elem)
if table_node:
self.node_buffer.append(table_node)
else:
# Basic table node
table = TableNode()
self.node_buffer.append(table)
self.table_buffer.clear()
def _start_section(self, elem: HtmlElement):
"""Start processing a section."""
# Import node types at runtime to avoid circular imports
from edgar.documents.nodes import SectionNode
section = SectionNode()
# Get section attributes
section_id = elem.get('id')
if section_id:
section.metadata['id'] = section_id
section_class = elem.get('class')
if section_class:
section.metadata['class'] = section_class
self.current_section = section
self.node_buffer.append(section)
def _end_section(self, elem: HtmlElement):
"""End processing a section."""
self.current_section = None
def _flush_buffer(self):
"""Flush node buffer to document tree."""
for node in self.node_buffer:
# Add to current parent
if self.current_section:
self.current_section.add_child(node)
else:
self.current_parent.add_child(node)
self.node_buffer.clear()
def _get_text_content(self, elem: HtmlElement) -> str:
"""Extract text content from element."""
text_parts = []
if elem.text:
text_parts.append(elem.text.strip())
for child in elem:
child_text = self._get_text_content(child)
if child_text:
text_parts.append(child_text)
if child.tail:
text_parts.append(child.tail.strip())
return ' '.join(text_parts)
def _extract_title_metadata(self, title: str):
"""Extract metadata from title."""
# Example: "APPLE INC - 10-K - 2023-09-30"
parts = title.split(' - ')
if len(parts) >= 2:
self.metadata.company = parts[0].strip()
self.metadata.form = parts[1].strip()
if len(parts) >= 3:
self.metadata.filing_date = parts[2].strip()
def _extract_meta_metadata(self, elem: HtmlElement):
"""Extract metadata from meta tags."""
name = elem.get('name', '').lower()
content = elem.get('content', '')
if name and content:
if name == 'company':
self.metadata.company = content
elif name == 'filing-type':
self.metadata.form = content
elif name == 'cik':
self.metadata.cik = content
elif name == 'filing-date':
self.metadata.filing_date = content
elif name == 'accession-number':
self.metadata.accession_number = content