""" Query interface for the Entity Facts API. This module provides a fluent query builder for filtering and analyzing financial facts with AI-ready features. """ import re from collections import defaultdict from datetime import date, datetime from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union import pandas as pd from rich.box import SIMPLE, SIMPLE_HEAVY from rich.console import Group from rich.padding import Padding from rich.panel import Panel from rich.table import Table from rich.text import Text from edgar.entity.models import DataQuality, FinancialFact if TYPE_CHECKING: from edgar.entity.statement import FinancialStatement from edgar.enums import PeriodType class FactQuery: """ Fluent query builder for financial facts with AI-ready features. This class provides a chainable interface for building complex queries against financial facts, with support for filtering, sorting, and transforming results. """ def __init__(self, facts: List[FinancialFact], indices: Dict[str, Dict]): """ Initialize the query builder. Args: facts: List of all available facts indices: Pre-computed indices for efficient filtering """ self._all_facts = facts self._indices = indices self._filters: List[Callable] = [] self._sort_field: Optional[str] = None self._sort_ascending: bool = True self._limit: Optional[int] = None # Concept filtering def by_concept(self, concept: str, exact: bool = False) -> 'FactQuery': """ Filter by concept name or pattern. Args: concept: Concept name or label to match exact: If True, require exact match; otherwise, use fuzzy matching Returns: Self for method chaining """ if exact: # Use index for exact matching matching_facts = self._indices['by_concept'].get(concept, []) fact_ids = {id(f) for f in matching_facts} self._filters.append(lambda f: id(f) in fact_ids) else: # Case-insensitive partial matching concept_lower = concept.lower() self._filters.append( lambda f: concept_lower in f.concept.lower() or (f.label and concept_lower in f.label.lower()) ) return self def by_label(self, label: str, fuzzy: bool = True) -> 'FactQuery': """ Filter by human-readable label. Args: label: Label to match fuzzy: If True, use fuzzy matching; otherwise, exact match Returns: Self for method chaining """ if fuzzy: label_lower = label.lower() self._filters.append(lambda f: f.label and label_lower in f.label.lower()) else: self._filters.append(lambda f: f.label == label) return self def by_text(self, pattern: str) -> 'FactQuery': """ Search across concept names, labels, and other text fields for a pattern. This is a flexible search that looks for the pattern in all relevant text fields of the financial facts using case-insensitive regex matching. Args: pattern: Pattern to search for in various text fields (supports regex) Returns: Self for method chaining Example: # Find all revenue-related facts facts.query().by_text("revenue|sales") # Find facts with "cash" in any text field facts.query().by_text("cash") """ regex = re.compile(pattern, re.IGNORECASE) def text_filter(fact): # Search in concept name if fact.concept and regex.search(fact.concept): return True # Search in label if fact.label and regex.search(fact.label): return True # Search in taxonomy (namespace) if fact.taxonomy and regex.search(fact.taxonomy): return True # Search in business context if available if hasattr(fact, 'business_context') and fact.business_context and regex.search(fact.business_context): return True # Search in statement type if available if fact.statement_type and regex.search(fact.statement_type): return True return False self._filters.append(text_filter) return self # Time-based filtering def by_fiscal_year(self, year: int) -> 'FactQuery': """ Filter by fiscal year. Args: year: Fiscal year to filter by Returns: Self for method chaining """ matching_facts = self._indices['by_fiscal_year'].get(year, []) fact_ids = {id(f) for f in matching_facts} self._filters.append(lambda f: id(f) in fact_ids) return self def by_fiscal_period(self, period: str) -> 'FactQuery': """ Filter by fiscal period (FY, Q1, Q2, Q3, Q4). Args: period: Fiscal period to filter by Returns: Self for method chaining """ matching_facts = self._indices['by_fiscal_period'].get(period, []) fact_ids = {id(f) for f in matching_facts} self._filters.append(lambda f: id(f) in fact_ids) return self def by_period_length(self, months: int) -> 'FactQuery': """ Filter by period length in months. This is useful to ensure you're comparing comparable periods (e.g., only quarterly data or only annual data). Args: months: Number of months (3 for quarterly, 9 for YTD, 12 for annual) Returns: Self for method chaining Example: # Get only quarterly (3-month) income statements facts.query().by_statement_type('IncomeStatement').by_period_length(3) """ def matches_period_length(fact): if fact.period_start and fact.period_type == 'duration': month_diff = (fact.period_end.year - fact.period_start.year) * 12 month_diff += fact.period_end.month - fact.period_start.month + 1 # Allow for slight variations (e.g., 90-92 days counts as 3 months) return abs(month_diff - months) <= 1 return False self._filters.append(matches_period_length) return self def by_period_type(self, period_type: Union[str, 'PeriodType']) -> 'FactQuery': """ Filter by period type using PeriodType enum for enhanced developer experience. This method provides a convenient way to filter facts by common period types like annual, quarterly, and monthly periods using either PeriodType enum values or string equivalents. Args: period_type: Period type - either PeriodType enum or string equivalent ('annual', 'quarterly', 'monthly') Returns: Self for method chaining Example: # Using PeriodType enum (recommended) from edgar.enums import PeriodType annual_facts = facts.query().by_period_type(PeriodType.ANNUAL).get() quarterly_facts = facts.query().by_period_type(PeriodType.QUARTERLY).get() # Using string equivalents (also supported) annual_facts = facts.query().by_period_type('annual').get() quarterly_facts = facts.query().by_period_type('quarterly').get() Note: TTM and YTD period types require special calculation logic and are not yet supported by this method. Use .by_period_length(12) for 12-month periods or implement custom TTM/YTD calculation logic. """ # Import here to avoid circular imports try: from edgar.enums import validate_period_type except ImportError: # Fallback if enums not available def validate_period_type(p): if isinstance(p, str) and p.lower() in ['annual', 'quarterly', 'monthly']: return p.lower() raise ValueError(f"Invalid period type: {p}") validated_period = validate_period_type(period_type) # Map period types to period lengths (in months) period_mapping = { 'annual': 12, 'quarterly': 3, 'monthly': 1 } if validated_period in period_mapping: # Delegate to existing by_period_length method return self.by_period_length(period_mapping[validated_period]) elif validated_period in ['ttm', 'ytd']: # TTM and YTD require special calculation logic not yet implemented raise NotImplementedError( f"Period type '{validated_period}' requires calculation logic not yet implemented. " f"For trailing twelve months data, use .by_period_length(12) to get 12-month periods, " f"or use facts.income_statement(annual=False, periods=4) for quarterly aggregation." ) else: # This shouldn't happen if validate_period_type works correctly raise ValueError(f"Unsupported period type: {validated_period}") def date_range(self, start: Union[date, str, None] = None, end: Union[date, str, None] = None) -> 'FactQuery': """ Filter by date range. Args: start: Start date (inclusive). Can be a date object or string in 'YYYY-MM-DD' format end: End date (inclusive). Can be a date object or string in 'YYYY-MM-DD' format Returns: Self for method chaining Raises: ValueError: If neither start nor end is provided, or if date string format is invalid """ if start is None and end is None: raise ValueError("At least one of start or end date must be provided") # Parse string dates to date objects def parse_date(date_value: Union[date, str, None]) -> Optional[date]: if date_value is None: return None if isinstance(date_value, date): return date_value if isinstance(date_value, str): try: return datetime.strptime(date_value, '%Y-%m-%d').date() except ValueError: raise ValueError(f"Invalid date format '{date_value}'. Expected 'YYYY-MM-DD'") from None raise ValueError(f"Invalid date type: {type(date_value)}. Expected date object or string") parsed_start = parse_date(start) parsed_end = parse_date(end) # Create filter based on provided dates if parsed_start is not None and parsed_end is not None: # Both start and end provided self._filters.append( lambda f: f.period_end and parsed_start <= f.period_end <= parsed_end ) elif parsed_start is not None: # Only start provided - filter for dates >= start self._filters.append( lambda f: f.period_end and f.period_end >= parsed_start ) else: # Only end provided - filter for dates <= end self._filters.append( lambda f: f.period_end and f.period_end <= parsed_end ) return self def as_of(self, as_of_date: date) -> 'FactQuery': """ Get facts as of a specific date (point-in-time). Args: as_of_date: Date to get facts as of Returns: Self for method chaining """ self._filters.append( lambda f: f.filing_date and f.filing_date <= as_of_date ) return self # Quality filtering def high_quality_only(self) -> 'FactQuery': """ Filter to only high-quality, audited facts. Returns: Self for method chaining """ self._filters.append( lambda f: f.data_quality == DataQuality.HIGH and f.is_audited ) return self def min_confidence(self, threshold: float) -> 'FactQuery': """ Filter by minimum confidence score. Args: threshold: Minimum confidence score (0.0 to 1.0) Returns: Self for method chaining """ self._filters.append(lambda f: f.confidence_score >= threshold) return self # Statement and form filtering def by_statement_type(self, statement_type: str) -> 'FactQuery': """ Filter by financial statement type. Args: statement_type: Statement type (BalanceSheet, IncomeStatement, CashFlow) Returns: Self for method chaining """ matching_facts = self._indices['by_statement'].get(statement_type, []) fact_ids = {id(f) for f in matching_facts} self._filters.append(lambda f: id(f) in fact_ids) return self def by_form_type(self, form_type: Union[str, List[str]]) -> 'FactQuery': """ Filter by SEC form type. Args: form_type: Form type(s) to filter by Returns: Self for method chaining """ if isinstance(form_type, str): form_types = [form_type] else: form_types = form_type # Collect all matching facts from index matching_facts = [] for form in form_types: matching_facts.extend(self._indices['by_form'].get(form, [])) fact_ids = {id(f) for f in matching_facts} self._filters.append(lambda f: id(f) in fact_ids) return self # Special queries def latest_instant(self) -> 'FactQuery': """ Filter to only the most recent instant facts (for balance sheet items). Returns: Self for method chaining """ self._filters.append(lambda f: f.period_type == 'instant') self._sort_field = 'period_end' self._sort_ascending = False # Group by concept and keep only latest def keep_latest(facts: List[FinancialFact]) -> List[FinancialFact]: latest_by_concept = {} for fact in facts: key = fact.concept if key not in latest_by_concept or fact.period_end > latest_by_concept[key].period_end: latest_by_concept[key] = fact return list(latest_by_concept.values()) # We'll apply this in execute() self._post_filter = keep_latest return self def latest_periods(self, n: int = 4, annual: bool = True) -> 'FactQuery': """ Get facts from the n most recent periods. Args: n: Number of recent periods to include annual: If True, only use annual (FY) periods; if False, use all period types Returns: Self for method chaining """ # First, get all unique periods all_facts = self._apply_current_filters() # Group facts by unique periods and calculate period info period_info = {} for fact in all_facts: period_key = (fact.fiscal_year, fact.fiscal_period) if period_key not in period_info: # Calculate period length if we have duration facts period_months = 12 # Default for FY if fact.period_start and fact.period_type == 'duration' and fact.period_end: period_months = (fact.period_end.year - fact.period_start.year) * 12 period_months += fact.period_end.month - fact.period_start.month + 1 period_info[period_key] = { 'end_date': fact.period_end or date.max, 'period_months': period_months, 'is_annual': fact.fiscal_period == 'FY', 'filing_date': fact.filing_date or date.min } # Create list of periods with their metadata period_list = [] for period_key, info in period_info.items(): period_list.append((period_key, info)) if annual: # When annual=True, only use annual periods - no backfilling with interim periods annual_periods = [(pk, info) for pk, info in period_list if info['is_annual']] # Sort annual periods by fiscal year (newest first) annual_periods.sort(key=lambda x: x[0][0], reverse=True) # Sort by fiscal_year # Select only annual periods, up to n selected_periods = [pk for pk, _ in annual_periods[:n]] else: # Sort all periods by end date (newest first) period_list.sort(key=lambda x: x[1]['end_date'], reverse=True) selected_periods = [pk for pk, _ in period_list[:n]] # Filter to only these periods self._filters.append( lambda f: (f.fiscal_year, f.fiscal_period) in selected_periods ) return self # Enhanced filtering methods for structural metadata def by_section(self, section: str) -> 'FactQuery': """ Filter by statement section (e.g., 'Current Assets', 'Operating Activities'). Args: section: Section name to filter by Returns: Self for method chaining """ section_lower = section.lower() self._filters.append( lambda f: f.section and section_lower in f.section.lower() ) return self def by_depth(self, max_depth: int) -> 'FactQuery': """ Filter by hierarchy depth in statement. Args: max_depth: Maximum depth to include Returns: Self for method chaining """ self._filters.append( lambda f: f.depth is not None and f.depth <= max_depth ) return self def totals_only(self) -> 'FactQuery': """ Get only total/sum concepts. Returns: Self for method chaining """ self._filters.append(lambda f: f.is_total) return self def concrete_only(self) -> 'FactQuery': """ Exclude abstract/header concepts. Returns: Self for method chaining """ self._filters.append(lambda f: not f.is_abstract) return self def abstracts_only(self) -> 'FactQuery': """ Get only abstract/header concepts. Returns: Self for method chaining """ self._filters.append(lambda f: f.is_abstract) return self def with_parent(self, parent_concept: str) -> 'FactQuery': """ Filter by parent concept in hierarchy. Args: parent_concept: Parent concept name Returns: Self for method chaining """ self._filters.append( lambda f: f.parent_concept and parent_concept in f.parent_concept ) return self def root_items_only(self) -> 'FactQuery': """ Get only root level items (no parent). Returns: Self for method chaining """ self._filters.append( lambda f: f.parent_concept is None or f.depth == 0 ) return self # Sorting and limiting def sort_by(self, field: str, ascending: bool = True) -> 'FactQuery': """ Sort results by field. Args: field: Field name to sort by ascending: Sort order Returns: Self for method chaining """ self._sort_field = field self._sort_ascending = ascending return self def latest(self, n: int = 1) -> List[FinancialFact]: """ Get the n most recent facts. Args: n: Number of facts to return Returns: List of facts """ self._sort_field = 'filing_date' self._sort_ascending = False self._limit = n return self.execute() # Execution methods def execute(self) -> List[FinancialFact]: """ Execute query and return matching facts. Returns: List of facts matching all filters """ results = self._apply_current_filters() # Apply post-filter if set (e.g., for latest_instant) if hasattr(self, '_post_filter'): results = self._post_filter(results) # Apply sorting if self._sort_field: try: results.sort( key=lambda f: getattr(f, self._sort_field) or (date.min if self._sort_field.endswith('date') else 0), reverse=not self._sort_ascending ) except AttributeError: pass # Ignore if field doesn't exist # Apply limit if self._limit is not None: results = results[:self._limit] return results def with_hierarchy(self) -> 'HierarchicalFactsResult': """ Return facts organized hierarchically based on parent-child relationships. Returns: HierarchicalFactsResult with tree structure """ facts = self.execute() return HierarchicalFactsResult(facts) def to_dataframe(self, *columns) -> pd.DataFrame: """ Convert results to pandas DataFrame. Args: columns: Optional list of columns to include Returns: DataFrame with query results """ facts = self.execute() if not facts: return pd.DataFrame() # Convert to records records = [] for fact in facts: record = { 'concept': fact.concept, 'label': fact.label, 'value': fact.value, 'numeric_value': fact.numeric_value, 'unit': fact.unit, 'scale': fact.scale, 'period_start': fact.period_start, 'period_end': fact.period_end, 'period_type': fact.period_type, 'fiscal_year': fact.fiscal_year, 'fiscal_period': fact.fiscal_period, 'filing_date': fact.filing_date, 'form_type': fact.form_type, 'accession': fact.accession, 'data_quality': fact.data_quality.value, 'confidence_score': fact.confidence_score, 'is_audited': fact.is_audited, 'is_estimated': fact.is_estimated, 'statement_type': fact.statement_type } records.append(record) df = pd.DataFrame(records) # Select columns if specified if columns: available_columns = [col for col in columns if col in df.columns] if available_columns: # Only select if there are matching columns df = df[available_columns] return df def to_llm_context(self) -> List[Dict[str, Any]]: """ Convert results to LLM-friendly context. Returns: List of fact contexts for LLM consumption """ facts = self.execute() return [f.to_llm_context() for f in facts] def pivot_by_period(self, return_statement: bool = True) -> Union['FinancialStatement', pd.DataFrame]: """ Pivot facts to show concepts as rows and periods as columns. This method automatically deduplicates facts to ensure each concept has only one value per period in the resulting pivot table. Args: return_statement: If True, return FinancialStatement wrapper; if False, return raw DataFrame Returns: FinancialStatement or DataFrame with concepts as rows and periods as columns """ # First deduplicate the facts to avoid pivot conflicts facts = self.execute() deduplicated_facts = self._deduplicate_facts(facts) if not deduplicated_facts: return pd.DataFrame() # Convert to DataFrame for pivoting records = [] for fact in deduplicated_facts: # Generate professional period label period_label = self._format_period_label(fact) records.append({ 'label': fact.label, 'numeric_value': fact.numeric_value, 'period_key': period_label, 'period_end': fact.period_end, 'fiscal_period': fact.fiscal_period }) df = pd.DataFrame(records) if df.empty: return df # Pivot table pivot = df.pivot_table( index='label', columns='period_key', values='numeric_value', aggfunc='first' # Should be unique after deduplication ) # Sort columns by period (newest first) # Create a mapping of column names to sort keys column_sort_keys = {} for _, row in df[['period_key', 'period_end', 'fiscal_period']].drop_duplicates().iterrows(): key = row['period_key'] end_date = row['period_end'] fiscal_period = row['fiscal_period'] # Sort by date, with annual periods last # Handle None dates if end_date is None: sort_key = (date.min, 0) elif fiscal_period == 'FY': sort_key = (end_date, 5) else: sort_key = (end_date, 0) column_sort_keys[key] = sort_key # Sort columns by date (newest first) sorted_columns = sorted(pivot.columns, key=lambda x: column_sort_keys.get(x, (date.min, 0)), reverse=True) pivot = pivot[sorted_columns] # Check for period consistency based on ACTUAL displayed periods, not all facts displayed_period_types = set() for col in pivot.columns: if 'FY' in col: displayed_period_types.add('12M') elif any(q in col for q in ['Q1', 'Q2', 'Q3', 'Q4']): displayed_period_types.add('3M') elif '9M' in col: displayed_period_types.add('9M') elif '6M' in col: displayed_period_types.add('6M') else: # Try to infer from the fiscal_period in the original data matching_rows = df[df['period_key'] == col] if not matching_rows.empty: fp = matching_rows.iloc[0]['fiscal_period'] if fp == 'FY': displayed_period_types.add('12M') elif fp in ['Q1', 'Q2', 'Q3', 'Q4']: displayed_period_types.add('3M') # Only warn if there are actually mixed period types in the displayed data if len(displayed_period_types) > 1: pivot.attrs['mixed_periods'] = True pivot.attrs['period_lengths'] = sorted(list(displayed_period_types)) else: pivot.attrs['mixed_periods'] = False pivot.attrs['period_lengths'] = list(displayed_period_types) if displayed_period_types else [] # Return appropriate format if return_statement: from edgar.entity.statement import FinancialStatement # Determine statement type from facts statement_types = {f.statement_type for f in deduplicated_facts if f.statement_type} statement_type = list(statement_types)[0] if len(statement_types) == 1 else "Statement" # Get entity name from facts (if available) entity_name = "" # Could be passed in or extracted from facts return FinancialStatement( data=pivot, statement_type=statement_type, entity_name=entity_name, period_lengths=pivot.attrs.get('period_lengths', []), mixed_periods=pivot.attrs.get('mixed_periods', False) ) else: # Set display format to avoid scientific notation for raw DataFrame pd.options.display.float_format = '{:,.0f}'.format return pivot def _format_period_label(self, fact: FinancialFact) -> str: """ Format period label for professional investors. Hedge funds and institutional investors typically expect: - Quarterly (3M): "Q2 2024" - Year-to-date (9M): "9M 2024" or "YTD Q3 2024" - Annual (12M): "FY 2024" - Clear indication of period length Args: fact: The financial fact to format Returns: Professional period label """ if not fact.period_end: return f"{fact.fiscal_period} {fact.fiscal_year}" # Get the end date components end_date = fact.period_end year = end_date.year # PRIORITY: If the fiscal_period is explicitly "FY", trust it if fact.fiscal_period == 'FY': return f"FY {year}" # Calculate period length in months if we have start date for duration periods if fact.period_start and fact.period_type == 'duration': # Calculate the number of months in the period months_diff = (fact.period_end.year - fact.period_start.year) * 12 months_diff += fact.period_end.month - fact.period_start.month # Add 1 to include both start and end months months_diff += 1 # Determine period type based on length if months_diff <= 3: # Standard quarterly period (3 months) end_month = end_date.month if end_month in [1, 2, 3]: quarter = 'Q1' elif end_month in [4, 5, 6]: quarter = 'Q2' elif end_month in [7, 8, 9]: quarter = 'Q3' else: quarter = 'Q4' return f"{quarter} {year}" elif months_diff <= 6: # Half-year period return f"6M {year}" elif months_diff <= 9: # Year-to-date through Q3 (9 months) return f"9M {year}" elif months_diff >= 11: # Full year (allow 11-13 months for fiscal year variations) return f"FY {year}" else: # Non-standard period - show actual months return f"{months_diff}M {year}" # Fallback for instant facts or when no start date - use calendar-based quarters if fact.fiscal_period in ['Q1', 'Q2', 'Q3', 'Q4']: # Use calendar-based quarter determination from end date end_month = end_date.month if end_month in [1, 2, 3]: quarter = 'Q1' elif end_month in [4, 5, 6]: quarter = 'Q2' elif end_month in [7, 8, 9]: quarter = 'Q3' else: quarter = 'Q4' return f"{quarter} {year}" elif fact.fiscal_period == 'FY': return f"FY {year}" else: return f"{fact.fiscal_period} {year}" # Helper methods def _apply_current_filters(self) -> List[FinancialFact]: """Apply all current filters to the facts""" results = self._all_facts for filter_func in self._filters: results = [f for f in results if filter_func(f)] return results def count(self) -> int: """ Get count of facts matching current filters. Returns: Number of matching facts """ return len(self._apply_current_filters()) def _deduplicate_facts(self, facts: List[FinancialFact]) -> List[FinancialFact]: """ Remove duplicate facts for the same concept and period. When multiple facts exist for the same concept and period, this method selects the most appropriate one based on: 1. Most recent filing date 2. Preference for audited (10-K) over unaudited (10-Q) forms 3. Original forms over amendments Args: facts: List of facts that may contain duplicates Returns: List of deduplicated facts """ from collections import defaultdict # Group facts by concept and period grouped = defaultdict(list) for fact in facts: # Create a key that uniquely identifies the concept and period if fact.period_type == 'instant': period_key = (fact.concept, fact.period_end, 'instant') else: period_key = (fact.concept, fact.period_start, fact.period_end, 'duration') grouped[period_key].append(fact) # Select the best fact from each group deduplicated = [] for group_facts in grouped.values(): if len(group_facts) == 1: deduplicated.append(group_facts[0]) else: # Sort by criteria (descending priority): # 1. Filing date (most recent first) # 2. Form type (10-K preferred over 10-Q) # 3. Non-amendments preferred sorted_facts = sorted( group_facts, key=lambda f: ( f.filing_date or date.min, 1 if f.form_type == '10-K' else 0, 0 if '/A' in f.form_type else 1 ), reverse=True ) deduplicated.append(sorted_facts[0]) return deduplicated def __rich__(self): """Creates a rich representation showing the most useful facts information.""" # Get the facts for this query facts = self.execute() # Title with count title = Text.assemble( "🔍 ", ("Query Results", "bold blue"), f" ({len(facts):,} facts)" ) if not facts: # Empty results empty_panel = Panel( Text("No facts matching the current filters", style="dim"), title=title, border_style="blue" ) return empty_panel # Limit results for display (show first 20, indicate if more exist) display_limit = 40 display_facts = facts[:display_limit] has_more = len(facts) > display_limit # Create main results table results_table = Table(box=SIMPLE, show_header=True, padding=(0, 1)) results_table.add_column("Concept", style="bold", max_width=80) results_table.add_column("Label", style="bold", max_width=80) results_table.add_column("Value", justify="right", max_width=15) results_table.add_column("Start") results_table.add_column("End", max_width=10) # Add rows for fact in display_facts: results_table.add_row( fact.concept, fact.label, str(fact.value) if fact.value else "N/A", str(fact.period_start) if fact.period_start else "N/A", str(fact.period_end) if fact.period_end else "N/A", ) # Summary stats table stats_table = Table(box=SIMPLE_HEAVY, show_header=False, padding=(0, 1)) stats_table.add_column("Metric", style="dim") stats_table.add_column("Value", style="bold") # Calculate stats unique_concepts = len(set(f.concept for f in facts)) unique_periods = len(set((f.fiscal_year, f.fiscal_period) for f in facts if f.fiscal_year and f.fiscal_period)) form_types = set(f.form_type for f in facts if f.form_type) # Get date range dates = [f.filing_date for f in facts if f.filing_date] if dates: date_range = f"{min(dates).strftime('%Y-%m-%d')} to {max(dates).strftime('%Y-%m-%d')}" else: date_range = "N/A" stats_table.add_row("Total Facts", f"{len(facts):,}") stats_table.add_row("Unique Concepts", f"{unique_concepts:,}") stats_table.add_row("Unique Periods", f"{unique_periods:,}") stats_table.add_row("Form Types", ", ".join(sorted(form_types)[:3]) + ("..." if len(form_types) > 3 else "")) stats_table.add_row("Date Range", date_range) stats_panel = Panel( stats_table, title="📊 Query Summary", border_style="bright_black" ) # Main results panel if has_more: subtitle = f"Showing first {display_limit:,} of {len(facts):,} facts • Use .to_dataframe() for all results" else: subtitle = f"All {len(facts):,} facts shown" results_panel = Panel( results_table, title="📋 Facts", subtitle=subtitle, border_style="bright_black" ) # Combine panels content = Group( Padding("", (1, 0, 0, 0)), stats_panel, results_panel ) return Panel( content, title=title, border_style="blue" ) def __repr__(self) -> str: """String representation using rich formatting.""" from edgar.richtools import repr_rich return repr_rich(self.__rich__()) class HierarchicalFactsResult: """ Results organized in hierarchical tree structure. This class organizes facts based on parent-child relationships to provide a tree view of the data. """ def __init__(self, facts: List[FinancialFact]): """ Initialize with flat list of facts. Args: facts: List of financial facts to organize """ self.facts = facts self._build_hierarchy() def _build_hierarchy(self): """Build hierarchical structure from facts.""" # Create lookup maps self.fact_map = {} self.children_map = defaultdict(list) self.roots = [] # First pass: create map and identify relationships for fact in self.facts: concept = fact.concept.split(':')[-1] if ':' in fact.concept else fact.concept self.fact_map[concept] = fact if fact.parent_concept: self.children_map[fact.parent_concept].append(concept) elif fact.depth == 0 or fact.depth is None: self.roots.append(concept) def to_dict(self) -> Dict[str, Any]: """Convert to nested dictionary structure.""" def build_node(concept: str) -> Dict[str, Any]: fact = self.fact_map.get(concept) if not fact: return {'concept': concept, 'error': 'Fact not found'} node = { 'concept': concept, 'label': fact.label, 'value': fact.numeric_value, 'is_abstract': fact.is_abstract, 'is_total': fact.is_total, 'section': fact.section, 'depth': fact.depth } # Add children children = self.children_map.get(concept, []) if children: node['children'] = [build_node(child) for child in children] return node return { 'roots': [build_node(root) for root in self.roots], 'total_facts': len(self.facts) } def to_dataframe(self, include_hierarchy: bool = True) -> pd.DataFrame: """ Convert to DataFrame with optional hierarchy indicators. Args: include_hierarchy: Whether to include hierarchy columns Returns: DataFrame with facts and hierarchy info """ records = [] def process_node(concept: str, level: int = 0, parent: str = None): fact = self.fact_map.get(concept) if not fact: return record = { 'concept': concept, 'label': fact.label, 'value': fact.numeric_value, 'unit': fact.unit, 'fiscal_year': fact.fiscal_year, 'fiscal_period': fact.fiscal_period } if include_hierarchy: record['level'] = level record['parent'] = parent record['is_abstract'] = fact.is_abstract record['is_total'] = fact.is_total record['section'] = fact.section records.append(record) # Process children for child in self.children_map.get(concept, []): process_node(child, level + 1, concept) # Process all roots for root in self.roots: process_node(root) # Add orphaned facts (not in hierarchy) processed = set(r['concept'] for r in records) for fact in self.facts: concept = fact.concept.split(':')[-1] if ':' in fact.concept else fact.concept if concept not in processed: record = { 'concept': concept, 'label': fact.label, 'value': fact.numeric_value, 'unit': fact.unit, 'fiscal_year': fact.fiscal_year, 'fiscal_period': fact.fiscal_period } if include_hierarchy: record['level'] = 0 record['parent'] = None record['is_abstract'] = fact.is_abstract record['is_total'] = fact.is_total record['section'] = fact.section records.append(record) return pd.DataFrame(records) if records else pd.DataFrame()