import itertools import json import pickle import re import webbrowser from contextlib import nullcontext from dataclasses import dataclass from datetime import datetime from functools import cached_property, lru_cache from io import BytesIO from os import PathLike from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, cast import httpx import numpy as np import pandas as pd import pyarrow as pa import pyarrow.compute as pc import pyarrow.csv as pa_csv import pyarrow.parquet as pq from rich import box from rich.columns import Columns from rich.console import Group from rich.panel import Panel from rich.status import Status from rich.table import Table from rich.text import Text from edgar._markdown import text_to_markdown from edgar._party import Address from edgar.attachments import Attachment, Attachments, AttachmentServer, FilingHomepage from edgar.core import ( DataPager, IntString, PagingState, Quarters, YearAndQuarter, YearAndQuarters, Years, cache_except_none, current_year_and_quarter, filing_date_to_year_quarters, is_probably_html, is_start_of_quarter, listify, log, parallel_thread_map, quarters_in_year, sec_edgar, ) from edgar.dates import InvalidDateException from edgar.files.html import Document from edgar.files.html_documents import get_clean_html from edgar.files.htmltools import html_sections from edgar.files.markdown import to_markdown from edgar.filtering import filter_by_accession_number, filter_by_cik, filter_by_date, filter_by_exchange, filter_by_form, filter_by_ticker from edgar.formatting import accession_number_text, display_size from edgar.headers import FilingDirectory, IndexHeaders from edgar.httprequests import download_file, download_text, download_text_between_tags from edgar.reference import describe_form from edgar.reference.tickers import Exchange, find_ticker, find_ticker_safe from edgar.richtools import Docs, print_rich, repr_rich, rich_to_text from edgar.search import BM25Search, RegexSearch from edgar.sgml import FilingHeader, FilingSGML, Reports, Statements from edgar.storage import is_using_local_storage, local_filing_path from edgar.xbrl import XBRL, XBRLFilingWithNoXbrlData """ Contain functionality for working with SEC filing indexes and filings The module contains the following functions - `get_filings(year, quarter, index)` """ __all__ = [ 'Filing', 'Filings', 'get_filings', 'FilingHeader', 'PagingState', 'Attachment', 'Attachments', 'FilingHomepage', 'available_quarters', 'get_by_accession_number', 'filing_date_to_year_quarters' ] full_index_url = "https://www.sec.gov/Archives/edgar/full-index/{}/QTR{}/{}.{}" daily_index_url = "https://www.sec.gov/Archives/edgar/daily-index/{}/QTR{}/{}.{}.idx" filing_homepage_url_re = re.compile(f"{sec_edgar}/data/[0-9]{1,}/[0-9]{10}-[0-9]{2}-[0-9]{4}-index.html") full_or_daily = ['daily', 'full'] index_types = ['form', 'company', 'xbrl'] file_types = ['gz', 'idx'] form_index = "form" xbrl_index = "xbrl" company_index = "company" index_field_delimiter_re = re.compile(r" {2,}") max_concurrent_http_connections = 10 accession_number_re = re.compile(r"\d{10}-\d{2}-\d{6}$") xbrl_document_types = ['XBRL INSTANCE DOCUMENT', 'XBRL INSTANCE FILE', 'EXTRACTED XBRL INSTANCE DOCUMENT'] def is_valid_filing_date(filing_date: str) -> bool: if ":" in filing_date: # Check for only one colon if filing_date.count(":") > 1: return False start_date, end_date = filing_date.split(":") if start_date: if not is_valid_date(start_date): return False if end_date: if not is_valid_date(end_date): return False else: if not is_valid_date(filing_date): return False return True def is_valid_date(date_str: str, date_format: str = "%Y-%m-%d") -> bool: pattern = r"^\d{4}-\d{2}-\d{2}$" if not re.match(pattern, date_str): return False try: datetime.strptime(date_str, date_format) return True except ValueError: return False def get_previous_quarter(year, quarter) -> Tuple[int, int]: if not quarter: _, quarter = current_year_and_quarter() # Given a year and quarter return the previous quarter if quarter == 1: return year - 1, 4 else: return year, quarter - 1 @lru_cache(maxsize=1) def available_quarters() -> YearAndQuarters: """ Get a list of year and quarter tuples :return: """ current_year, current_quarter = current_year_and_quarter() start_quarters = [(1994, 3), (1994, 4)] in_between_quarters = list(itertools.product(range(1995, current_year), range(1, 5))) end_quarters = list(itertools.product([current_year], range(1, current_quarter + 1))) return start_quarters + in_between_quarters + end_quarters def expand_quarters(year: Union[int, List[int]], quarter: Optional[Union[int, List[int]]] = None) -> YearAndQuarters: """ Expand the list of years and a list of quarters to a full list of tuples covering the full range :param year: The year or years :param quarter: The quarter or quarters :return: """ years = listify(year) quarters = listify(quarter) if quarter else quarters_in_year return [yq for yq in itertools.product(years, quarters) if yq in available_quarters() ] class FileSpecs: """ A specification for a fixed width file """ def __init__(self, specs: List[Tuple[str, Tuple[int, int], pa.lib.DataType]]): self._spec_type = specs[0][0].title() self.splits = list(zip(*specs, strict=False))[1] self.schema = pa.schema( [ pa.field(name, datatype) for name, _, datatype in specs ] ) def __str__(self): return f"{self._spec_type} File Specs" form_specs = FileSpecs( [("form", (0, 12), pa.string()), ("company", (12, 74), pa.string()), ("cik", (74, 82), pa.int32()), ("filing_date", (85, 97), pa.string()), ("accession_number", (97, 141), pa.string()) ] ) company_specs = FileSpecs( [("company", (0, 62), pa.string()), ("form", (62, 74), pa.string()), ("cik", (74, 82), pa.int32()), ("filing_date", (85, 97), pa.string()), ("accession_number", (97, 141), pa.string()) ] ) FORM_INDEX_FORM_COLUMN = 0 COMPANY_INDEX_FORM_COLUMN = -4 INDEX_COLUMN_NAMES = ['form', 'company', 'cik', 'filing_date', 'accession_number'] def read_fixed_width_index(index_text: str, file_specs: FileSpecs) -> pa.Table: """ Read the index text as a fixed width file :param index_text: The index text as downloaded from SEC Edgar :param file_specs: The file specs containing the column definitions :return: """ # Treat as a single array lines = index_text.rstrip('\n').split('\n') # Find where the data starts data_start = 0 for index, line in enumerate(lines): if line.startswith("-----"): data_start = index + 1 break data_lines = lines[data_start:] array = pa.array(data_lines) # Then split into separate arrays by file specs arrays = [ pc.utf8_trim_whitespace( pc.utf8_slice_codeunits(array, start=start, stop=stop)) for start, stop, in file_specs.splits ] # Change the CIK to int arrays[2] = pa.compute.cast(arrays[2], pa.int32()) # Convert filingdate from string to date # Some files have %Y%m-%d other %Y%m%d date_format = '%Y-%m-%d' if len(arrays[3][0].as_py()) == 10 else '%Y%m%d' arrays[3] = pc.cast(pc.strptime(arrays[3], date_format, 'us'), pa.date32()) # Get the accession number from the file directory_or_file arrays[4] = pa.compute.utf8_slice_codeunits( pa.compute.utf8_rtrim(arrays[4], characters=".txt"), start=-20) return pa.Table.from_arrays( arrays=arrays, names=list(file_specs.schema.names), ) def read_index_file(index_text: str, form_column: int = FORM_INDEX_FORM_COLUMN, filing_date_format:str="%Y-%m-%d") -> pa.Table: """ Read the index text using multiple spaces as delimiter """ # Split into lines and find the data start lines = index_text.rstrip('\n').split('\n') data_start = 0 for index, line in enumerate(lines): if line.startswith("-----"): data_start = index + 1 break # Process data lines data_lines = lines[data_start:] # Handle empty lines if not data_lines: return _empty_filing_index() # The form and company name can both contain spaces the remaining fields cannot. # It is assumed that the form will only contain runs of a single space (e.g. "1-A POS") # so splitting on runs of 2 spaces or more will keep form names intact. rows = [re.split(index_field_delimiter_re, line.strip()) for line in data_lines if line.strip()] # Form names are in a different column depending on the index type. forms = pa.array([row[form_column] for row in rows]) # CIKs are always the third-to-last field ciks = pa.array([int(row[-3]) for row in rows], type=pa.int32()) # Dates are always second-to-last field dates = pc.strptime(pa.array([row[-2] for row in rows]), filing_date_format, 'us') dates = pc.cast(dates, pa.date32()) # Accession numbers are in the file path accession_numbers = pa.array([row[-1][-24:-4] for row in rows]) # Company names may have runs of more than one space so anything which hasn't already # been extracted is concatenated to form the company name. if form_column == 0: companies = pa.array([" ".join(row[1:-3]) for row in rows]) else: companies = pa.array([" ".join(row[0:form_column]) for row in rows]) return pa.Table.from_arrays( [forms, companies, ciks, dates, accession_numbers], names=INDEX_COLUMN_NAMES ) def read_form_index_file(index_text: str) -> pa.Table: """Read the form index file""" return read_index_file(index_text, form_column=FORM_INDEX_FORM_COLUMN) def read_company_index_file(index_text: str) -> pa.Table: """Read the company index file""" return read_index_file(index_text, form_column=COMPANY_INDEX_FORM_COLUMN) def read_pipe_delimited_index(index_text: str) -> pa.Table: """ Read the index file as a pipe delimited index :param index_text: The index text as read from SEC Edgar :return: The index data as a pyarrow table """ index_table = pa_csv.read_csv( BytesIO(index_text.encode()), parse_options=pa_csv.ParseOptions(delimiter="|"), read_options=pa_csv.ReadOptions(skip_rows=10, column_names=['cik', 'company', 'form', 'filing_date', 'accession_number']) ) index_table = index_table.set_column( 0, "cik", pa.compute.cast(index_table[0], pa.int32()) ).set_column(4, "accession_number", pc.utf8_slice_codeunits(index_table[4], start=-24, stop=-4)) return index_table def fetch_filing_index(year_and_quarter: YearAndQuarter, index: str ): year, quarter = year_and_quarter url = full_index_url.format(year, quarter, index, "gz") try: index_table = fetch_filing_index_at_url(url, index) return (year, quarter), index_table except httpx.HTTPStatusError as e: if is_start_of_quarter() and e.response.status_code == 403: # Return an empty filing index return (year, quarter), _empty_filing_index() else: raise def fetch_daily_filing_index(date: str, index: str = 'form'): year, month, day = date.split("-") quarter = (int(month) - 1) // 3 + 1 url = daily_index_url.format(year, quarter, index, date.replace("-", "")) index_table = fetch_filing_index_at_url(url, index, filing_date_format='%Y%m%d') return index_table def fetch_filing_index_at_url(url: str, index: str, filing_date_format:str='%Y-%m-%d') -> Optional[pa.Table]: index_text = download_text(url=url) assert index_text is not None if index == "xbrl": index_table: pa.Table = read_pipe_delimited_index(str(index_text)) else: # Read as a fixed width index file form_column = FORM_INDEX_FORM_COLUMN if index == "form" else COMPANY_INDEX_FORM_COLUMN index_table: pa.Table = read_index_file(index_text, form_column=form_column, filing_date_format=filing_date_format) return index_table def _empty_filing_index(): schema = pa.schema([ ('form', pa.string()), ('company', pa.string()), ('cik', pa.int32()), ('filing_date', pa.date32()), ('accession_number', pa.string()), ]) # Create an empty table with the defined schema return pa.Table.from_arrays([ pa.array([], type=pa.string()), pa.array([], type=pa.string()), pa.array([], type=pa.int32()), pa.array([], type=pa.date32()), pa.array([], type=pa.string()), ], schema=schema) def get_filings_for_quarters(year_and_quarters: YearAndQuarters, index="form") -> pa.Table: """ Get the filings for the quarters :param year_and_quarters: :param index: The index to use - "form", "company", or "xbrl" :return: The filings as a pyarrow table """ if len(year_and_quarters) == 1: _, final_index_table = fetch_filing_index(year_and_quarter=year_and_quarters[0], index=index) else: quarters_and_indexes = parallel_thread_map( lambda yq: fetch_filing_index(year_and_quarter=yq, index=index), year_and_quarters ) quarter_and_indexes_sorted = sorted(quarters_and_indexes, key=lambda d: d[0]) index_tables = [fd[1] for fd in quarter_and_indexes_sorted] final_index_table: pa.Table = pa.concat_tables(index_tables, mode="default") return final_index_table class Filings: """ A container for filings """ def __init__(self, filing_index: pa.Table, original_state: Optional[PagingState] = None): self.data: pa.Table = filing_index self.data_pager = DataPager(self.data) # This keeps track of where the index should start in case this is just a page in the Filings self._original_state = original_state or PagingState(0, len(self.data)) self._hash = None @property def docs(self): return Docs(self) def to_pandas(self, *columns) -> pd.DataFrame: """Return the filing index as a python dataframe""" df = self.data.to_pandas() return df.filter(columns) if len(columns) > 0 else df def save_parquet(self, location: str): """Save the filing index as parquet""" pq.write_table(self.data, location) def save(self, location: str): """Save the filing index as parquet""" self.save_parquet(location) def download(self, data_directory: Optional[str] = None): """ Download the filings based on the accession numbers in this Filings object. This is a convenience method that calls `download_filings` with this object as the `filings` parameter. Args: data_directory: Directory to save the downloaded files. Defaults to the Edgar data directory. """ from edgar.storage import download_filings download_filings(data_directory=data_directory, overwrite_existing=True, filings=self) def get_filing_at(self, item: int, enrich: bool = True): """Get filing at index, optionally enriching with related entities""" # Get the primary filing data accession_no = self.data['accession_number'][item].as_py() related_entities = [] if enrich: # Use PyArrow to find all entities with same accession number # Limit search to nearby entries for performance (+/- 10 positions) start = max(0, item - 10) end = min(len(self.data), item + 11) # Slice the data and search efficiently slice_data = self.data.slice(start, end - start) mask = pc.equal(slice_data['accession_number'], accession_no) for idx in range(len(mask)): if mask[idx].as_py(): actual_idx = start + idx if actual_idx != item: # Skip the primary filing related_entities.append({ 'cik': slice_data['cik'][idx].as_py(), 'company': slice_data['company'][idx].as_py() }) # Create Filing with related entities return Filing( cik=self.data['cik'][item].as_py(), company=self.data['company'][item].as_py(), form=self.data['form'][item].as_py(), filing_date=self.data['filing_date'][item].as_py(), accession_no=accession_no, related_entities=related_entities ) @property def date_range(self) -> Tuple[datetime, datetime]: """Return a tuple of the start and end dates in the filing index""" min_max_dates: dict[str, datetime] = pc.min_max(self.data['filing_date']).as_py() return min_max_dates['min'], min_max_dates['max'] @property def start_date(self) -> Optional[str]: """Return the start date for the filings""" return str(self.date_range[0]) if self.date_range[0] else self.date_range[0] @property def end_date(self) -> str: """Return the end date for the filings""" return str(self.date_range[1]) if self.date_range[1] else self.date_range[1] def latest(self, n: int = 1): """Get the latest n filings""" sort_indices = pc.sort_indices(self.data, sort_keys=[("filing_date", "descending")]) sort_indices_top = sort_indices[:min(n, len(sort_indices))] latest_filing_index = pc.take(data=self.data, indices=sort_indices_top) filings = Filings(latest_filing_index) if len(filings) == 1: return filings[0] return filings def filter(self, *, form: Optional[Union[str, List[IntString]]] = None, amendments: bool = None, filing_date: Optional[str] = None, date: Optional[str] = None, cik: Union[IntString, List[IntString]] = None, exchange: Union[str, List[str], Exchange, List[Exchange]] = None, ticker: Union[str, List[str]] = None, accession_number: Union[str, List[str]] = None) -> 'Filings': """ Get some filings >>> filings = get_filings() Filter the filings On a date >>> filings.filter(date="2020-01-01") Up to a date >>> filings.filter(date=":2020-03-01") From a date >>> filings.filter(date="2020-01-01:") # Between dates >>> filings.filter(date="2020-01-01:2020-03-01") :param form: The form or list of forms to filter by :param amendments: Whether to include amendments to the forms e.g. include "10-K/A" if filtering for "10-K" :param filing_date: The filing date :param date: An alias for the filing date :param cik: The CIK or list of CIKs to filter by :param exchange: The exchange or list of exchanges to filter by :param ticker: The ticker or list of tickers to filter by :param accession_number: The accession number or list of accession numbers to filter by :return: The filtered filings """ filing_index = self.data forms = form if isinstance(forms, list): forms = [str(f) for f in forms] # Filter by form if forms: filing_index = filter_by_form(filing_index, form=forms, amendments=amendments) elif amendments is not None: # Get the unique values of the form as a pylist forms = list(set([form.replace("/A", "") for form in pc.unique(filing_index['form']).to_pylist()])) filing_index = filter_by_form(filing_index, form=forms, amendments=amendments) # filing_date and date are aliases filing_date = filing_date or date if filing_date: try: filing_index = filter_by_date(filing_index, filing_date, 'filing_date') except InvalidDateException as e: log.error(e) return Filings(_empty_filing_index()) # Filter by cik if cik: filing_index = filter_by_cik(filing_index, cik) # Filter by exchange if exchange: filing_index = filter_by_exchange(filing_index, exchange) if ticker: filing_index = filter_by_ticker(filing_index, ticker) # Filter by accession number if accession_number: filing_index = filter_by_accession_number(filing_index, accession_number=accession_number) return Filings(filing_index) def _head(self, n): assert n > 0, "The number of filings to select - `n`, should be greater than 0" return self.data.slice(0, min(n, len(self.data))) def head(self, n: int): """Get the first n filings""" selection = self._head(n) return Filings(selection) def _tail(self, n): assert n > 0, "The number of filings to select - `n`, should be greater than 0" return self.data.slice(max(0, len(self.data) - n), len(self.data)) def tail(self, n: int): """Get the last n filings""" selection = self._tail(n) return Filings(selection) def _sample(self, n: int): assert len(self) >= n > 0, \ "The number of filings to select - `n`, should be greater than 0 and less than the number of filings" return self.data.take(np.random.choice(len(self), n, replace=False)).sort_by([("filing_date", "descending")]) def sample(self, n: int): """Get a random sample of n filings""" selection = self._sample(n) return Filings(selection) @property def empty(self) -> bool: return len(self.data) == 0 def current(self): """Display the current page ... which is the default for this filings object""" return self def next(self): """Show the next page""" data_page = self.data_pager.next() if data_page is None: log.warning("End of data .. use previous() \u2190 ") return None start_index, _ = self.data_pager._current_range filings_state = PagingState(page_start=start_index, num_records=len(self)) return Filings(data_page, original_state=filings_state) def previous(self): """ Show the previous page of the data :return: """ data_page = self.data_pager.previous() if data_page is None: log.warning(" No previous data .. use next() \u2192 ") return None start_index, _ = self.data_pager._current_range filings_state = PagingState(page_start=start_index, num_records=len(self)) return Filings(data_page, original_state=filings_state) def _get_by_accession_number(self, accession_number: str): mask = pc.equal(self.data['accession_number'], accession_number) idx = mask.index(True).as_py() if idx > -1: return self.get_filing_at(idx) def get(self, index_or_accession_number: IntString): """ First, get some filings >>> filings = get_filings() Get the Filing at that index location or that has the accession number >>> filings.get(100) >>> filings.get("0001721868-22-000010") :param index_or_accession_number: :return: """ if isinstance(index_or_accession_number, int) or index_or_accession_number.isdigit(): return self.get_filing_at(int(index_or_accession_number)) else: accession_number = index_or_accession_number.strip() mask = pc.equal(self.data['accession_number'], accession_number) idx = mask.index(True).as_py() if idx > -1: return self.get_filing_at(idx) if not accession_number_re.match(accession_number): log.warning( f"Invalid accession number [{accession_number}]" "\n valid accession number [0000000000-00-000000]" ) def find(self, company_search_str: str): from edgar.entity import find_company # Search for the company search_results = find_company(company_search_str) return self.filter(cik=search_results.ciks) def to_dict(self, max_rows: int = 1000) -> Dict[str, Any]: """Return the filings as a json string but only the first max_rows records""" return cast(Dict[str, Any], self.to_pandas().head(max_rows).to_dict(orient="records")) def __getitem__(self, item): return self.get_filing_at(item) def __len__(self): return len(self.data) def __iter__(self): self.n = 0 return self def __next__(self): if self.n < len(self.data): filing: Filing = self[self.n] self.n += 1 return filing else: raise StopIteration @property def summary(self): return (f"Showing {self.data_pager.page_size} of " f"{self._original_state.num_records:,} filings") def _page_index(self) -> range: """Create the range index to set on the page dataframe depending on where in the data we are """ if self._original_state: return range(self._original_state.page_start, self._original_state.page_start + min(self.data_pager.page_size, len(self.data))) # set the index to the size of the page else: return range(*self.data_pager._current_range) def __eq__(self, other): # Check if other is Filings or subclass of Filings if not isinstance(other, self.__class__) and not issubclass(other.__class__, self.__class__): return False if len(self) != len(other): return False if self.start_date != other.start_date or self.end_date != other.end_date: return False # Handle empty tables if len(self) == 0: return True # Two empty tables with same dates are equal # Compare just accession_number columns return self.data['accession_number'].equals(other.data['accession_number']) def __hash__(self): if self._hash is None: # Base hash components hash_components = [self.__class__.__name__, len(self), self.start_date, self.end_date] # Only add accession numbers if table is not empty if len(self) > 0: # Handle different table sizes appropriately if len(self) == 1: hash_components.append(self.data['accession_number'][0].as_py()) elif len(self) == 2: hash_components.append(self.data['accession_number'][0].as_py()) hash_components.append(self.data['accession_number'][1].as_py()) else: hash_components.append(self.data['accession_number'][0].as_py()) hash_components.append(self.data['accession_number'][len(self) // 2].as_py()) hash_components.append(self.data['accession_number'][len(self) - 1].as_py()) self._hash = hash(tuple(hash_components)) return self._hash def __rich__(self) -> Panel: # Create table with appropriate columns and styling table = Table( show_header=True, header_style="bold", show_edge=True, expand=False, padding=(0, 1), box=box.SIMPLE, row_styles=["", "bold"] ) # Add columns with specific styling and alignment table.add_column("#", style="dim", justify="right") table.add_column("Form", width=10) table.add_column("CIK", style="dim", width=10, justify="right") table.add_column("Ticker", width=6, style="yellow") table.add_column("Company", style="bold green", width=38, no_wrap=True) table.add_column("Filing Date", width=11) table.add_column("Accession Number", width=20) table.add_column(" ", width=1, style="cyan dim") # Group indicator column # Get current page from data pager current_page = self.data_pager.current() # Calculate start index for proper indexing start_idx = self._original_state.page_start if self._original_state else self.data_pager.start_index # Identify groups of consecutive filings with same accession number groups = {} accession_numbers = [current_page['accession_number'][i].as_py() for i in range(len(current_page))] for i in range(len(accession_numbers)): acc_no = accession_numbers[i] # Check previous and next accession numbers prev_acc = accession_numbers[i-1] if i > 0 else None next_acc = accession_numbers[i+1] if i < len(accession_numbers)-1 else None if acc_no != prev_acc and acc_no == next_acc: groups[i] = '┐' # Start of group elif acc_no == prev_acc and acc_no == next_acc: groups[i] = '│' # Middle of group elif acc_no == prev_acc and acc_no != next_acc: groups[i] = '┘' # End of group else: groups[i] = ' ' # Standalone filing # Iterate through rows in current page for i in range(len(current_page)): cik = current_page['cik'][i].as_py() ticker = find_ticker(cik) row = [ str(start_idx + i), current_page['form'][i].as_py(), str(cik), ticker, current_page['company'][i].as_py(), str(current_page['filing_date'][i].as_py()), accession_number_text(current_page['accession_number'][i].as_py()), groups.get(i, ' ') # Add group indicator ] table.add_row(*row) # Show paging information only if there are multiple pages elements = [table] if self.data_pager.total_pages > 1: total_filings = self._original_state.num_records current_count = len(current_page) start_num = start_idx + 1 end_num = start_idx + current_count page_info = Text.assemble( ("Showing ", "dim"), (f"{start_num:,}", "bold red"), (" to ", "dim"), (f"{end_num:,}", "bold red"), (" of ", "dim"), (f"{total_filings:,}", "bold"), (" filings.", "dim"), (" Page using ", "dim"), ("← prev()", "bold gray54"), (" and ", "dim"), ("next() →", "bold gray54") ) elements.extend([Text("\n"), page_info]) # Get the subtitle start_date, end_date = self.date_range subtitle = f"SEC Filings between {start_date:%Y-%m-%d} and {end_date:%Y-%m-%d}" if start_date else "" return Panel( Group(*elements), title="SEC Filings", subtitle=subtitle, border_style="bold grey54", expand=False ) def __repr__(self): return repr_rich(self.__rich__()) def sort_filings_by_priority(filing_table: pa.Table, priority_forms: Optional[List[str]] = None) -> pa.Table: """ Sort a filings table by date (descending) and form priority. Args: filing_table: PyArrow table containing filings data priority_forms: List of forms in priority order. Forms not in list will be sorted alphabetically after priority forms. Defaults to common forms if None. Returns: PyArrow table sorted by date and form priority """ if priority_forms is None: priority_forms = ['10-Q', '10-Q/A', '10-K', '10-K/A', '8-K', '8-K/A', '6-K', '6-K/A', '13F-HR', '144', '4', 'D', 'SC 13D', 'SC 13G'] # Create form priority values forms_array = filing_table['form'] priorities = [] for form_type in forms_array.to_pylist(): try: priority = priority_forms.index(form_type) except ValueError: priority = len(priority_forms) priorities.append(priority) # Add priority column with_priority = filing_table.append_column( 'form_priority', pa.array(priorities, type=pa.int32()) ) # Sort by date (descending), priority (ascending), form name (ascending) sorted_table = with_priority.sort_by([ ("filing_date", "descending"), ("form_priority", "ascending"), ("form", "ascending") ]) # Remove temporary priority column return sorted_table.drop(['form_priority']) def get_filings(year: Optional[Years] = None, quarter: Optional[Quarters] = None, form: Optional[Union[str, List[IntString]]] = None, amendments: bool = True, filing_date: Optional[str] = None, index="form", priority_sorted_forms: Optional[List[str]] = None) -> Optional[Filings]: """ Downloads the filing index for a given year or list of years, and a quarter or list of quarters. So you can download for 2020, [2020,2021,2022] or range(2020, 2023) Examples >>> from edgar import get_filings >>> filings_ = get_filings(2021) # Get filings for 2021 >>> filings_ = get_filings(2021, 4) # Get filings for 2021 Q4 >>> filings_ = get_filings(2021, [3,4]) # Get filings for 2021 Q3 and Q4 >>> filings_ = get_filings([2020, 2021]) # Get filings for 2020 and 2021 >>> filings_ = get_filings([2020, 2021], 4) # Get filings for Q4 of 2020 and 2021 >>> filings_ = get_filings(range(2010, 2021)) # Get filings between 2010 and 2021 - does not include 2021 >>> filings_ = get_filings(2021, 4, form="D") # Get filings for 2021 Q4 for form D >>> filings_ = get_filings(2021, 4, filing_date="2021-10-01") # Get filings for 2021 Q4 on "2021-10-01" >>> filings_ = get_filings(2021, 4, filing_date="2021-10-01:2021-10-10") # Get filings for 2021 Q4 between # "2021-10-01" and "2021-10-10" :param year The year of the filing :param quarter The quarter of the filing :param form The form or forms as a string e.g. "10-K" or a List ["10-K", "8-K"] :param amendments If True will expand the list of forms to include amendments e.g. "10-K/A" :param filing_date The filing date to filter by in YYYY-MM-DD format e.g. filing_date="2022-01-17" or filing_date="2022-01-17:2022-02-28" :param index The index type - "form" or "company" or "xbrl" :param priority_sorted_forms: A list of forms to sort by priority. This presents these forms first for each day. :return: """ # Check if defaults were used defaults_used = (year is None and quarter is None and form is None and amendments is True and filing_date is None and index == "form" and priority_sorted_forms is None) if filing_date: if not is_valid_filing_date(filing_date): log.warning("""Provide a valid filing date in the format YYYY-MM-DD or YYYY-MM-DD:YYYY-MM-DD""") return None year_and_quarters = filing_date_to_year_quarters(filing_date) elif not year: # If no year specified, take the current year and quarter. (We need the quarter later) year, quarter = current_year_and_quarter() # Expand quarters for the year to date so use expand_quarters(year, quarter=None) year_and_quarters: YearAndQuarters = expand_quarters(year, quarter=None) else: year_and_quarters: YearAndQuarters = expand_quarters(year, quarter) if len(year_and_quarters) == 0: log.warning(f""" Provide a year between 1994 and {datetime.now().year} and optionally a quarter (1-4) for which the SEC has filings. e.g. filings = get_filings(2023) OR filings = get_filings(2023, 1) (You specified the year {year} and quarter {quarter}) """) return None filing_index = get_filings_for_quarters(year_and_quarters, index=index) filings = Filings(filing_index) if form or filing_date: filings = filings.filter(form=form, amendments=amendments, filing_date=filing_date) if not filings: if defaults_used: # Ensure at least some data is returned previous_quarter = [get_previous_quarter(year, quarter)] filing_index = get_filings_for_quarters(previous_quarter, index=index) filings = Filings(filing_index) sorted_filing_index = sort_filings_by_priority(filings.data, priority_sorted_forms) return Filings(sorted_filing_index) # Return an empty filings object return Filings(_empty_filing_index()) # Sort the filings using the separate sort function sorted_filing_index = sort_filings_by_priority(filings.data, priority_sorted_forms) return Filings(sorted_filing_index) @lru_cache(maxsize=8) def _get_cached_filings(year: Optional[Years] = None, quarter: Optional[Quarters] = None, form: Optional[Union[str, List[IntString]]] = None, amendments: bool = True, filing_date: Optional[str] = None, index="form") -> Union[Filings, None]: # Get the filings but cache the result return get_filings(year=year, quarter=quarter, form=form, amendments=amendments, filing_date=filing_date, index=index) def parse_filing_header(content): data = {} current_key = None lines = content.split('\n') for line in lines: if line.endswith(':'): current_key = line[:-1] # Remove the trailing colon data[current_key] = {} elif current_key and ':' in line: key, value = map(str.strip, line.split(':', 1)) data[current_key][key] = value return data def _create_address_table(business_address: Address, mailing_address: Address): address_table = Table("Type", "Street1", "Street2", "City", "State", "Zipcode", title="\U0001F4EC Addresses", box=box.SIMPLE) if business_address: address_table.add_row("\U0001F3E2 Business Address", business_address.street1, business_address.street2, business_address.city, business_address.state_or_country, business_address.zipcode) if mailing_address: address_table.add_row("\U0001F4ED Mailing Address", mailing_address.street1, mailing_address.street2, mailing_address.city, mailing_address.state_or_country, mailing_address.zipcode) return address_table class Filing: """ A single SEC filing. Allow you to access the documents and data for that filing """ def __init__(self, cik: int, company: str, form: str, filing_date: str, accession_no: str, related_entities: List[Dict] = None): self.cik = cik self.company = company self.form = form self.filing_date = filing_date self.accession_no = accession_no self._filing_homepage = None self._sgml = None # New: Store related entities from index self._related_entities = related_entities or [] @property def docs(self): return Docs(self) @property def accession_number(self): return self.accession_no @property def all_ciks(self) -> List[int]: """Get all CIKs including related entities""" # If we have related entities from the index, use those if self._related_entities: ciks = [self.cik] ciks.extend(e['cik'] for e in self._related_entities) return sorted(list(set(ciks))) # Otherwise, check the header for all filers try: header = self.header if header and header.filers and len(header.filers) > 1: # Multiple filers in header ciks = [] for filer in header.filers: if filer.company_information and filer.company_information.cik: # Convert CIK string to int, removing leading zeros cik_int = int(filer.company_information.cik.lstrip('0')) ciks.append(cik_int) if ciks: return sorted(list(set(ciks))) except Exception as e: # Log warning when header access fails log.warning( f"Could not access header for multi-entity detection in Filing " f"(accession_no={self.accession_no}, cik={self.cik}): {str(e)}. " f"This may occur if the accession number is invalid or the filing doesn't exist on EDGAR." ) return [self.cik] @property def all_entities(self) -> List[Dict[str, Any]]: """Get all entity information""" # If we have related entities from the index, use those if self._related_entities: entities = [{'cik': self.cik, 'company': self.company}] entities.extend(self._related_entities) return entities # Otherwise, check the header for all filers try: header = self.header if header and header.filers and len(header.filers) > 1: # Multiple filers in header entities = [] for filer in header.filers: if filer.company_information and filer.company_information.cik: # Convert CIK string to int, removing leading zeros cik_int = int(filer.company_information.cik.lstrip('0')) entities.append({ 'cik': cik_int, 'company': filer.company_information.name or f'CIK {cik_int}' }) if entities: return entities except Exception as e: # Log warning when header access fails log.warning( f"Could not access header for entity information in Filing " f"(accession_no={self.accession_no}, cik={self.cik}): {str(e)}. " f"This may occur if the accession number is invalid or the filing doesn't exist on EDGAR." ) return [{'cik': self.cik, 'company': self.company}] @property def is_multi_entity(self) -> bool: """Check if this filing has multiple entities""" # First check if we have related entities from the index if len(self._related_entities) > 0: return True # Otherwise, check the header for multiple filers try: header = self.header if header and header.filers and len(header.filers) > 1: return True except Exception as e: # Log warning when header access fails log.warning( f"Could not access header for multi-entity check in Filing " f"(accession_no={self.accession_no}, cik={self.cik}): {str(e)}. " f"This may occur if the accession number is invalid or the filing doesn't exist on EDGAR." ) return False @property def document(self): """ :return: The primary display document on the filing, generally HTML but can be XHTML """ document = self.sgml().attachments.primary_html_document # If the document is not in the SGML then we have to go to the homepage if document: if document.extension == '.paper': # If the document is a paper filing, we return the scanned document if it exists attachments = self.homepage.attachments scanned_documents = attachments.query("document == 'scanned.pdf'") if len(scanned_documents) > 0: return scanned_documents.get_by_index(0) return self.homepage.primary_html_document return document return self.homepage.primary_html_document @property def primary_documents(self): """ :return: a list of the primary documents on the filing, generally HTML or XHTML and optionally XML """ documents = self.sgml().attachments.primary_documents if len(documents) == 0: documents = self.homepage.primary_documents return documents @property def period_of_report(self): """ Get the period of report for the filing """ return self.sgml().period_of_report @property def attachments(self): # Return all the attachments on the filing sgml_filing: FilingSGML = self.sgml() return sgml_filing.attachments @property def exhibits(self): # Return all the exhibits on the filing return self.attachments.exhibits @lru_cache(maxsize=4) def html(self) -> Optional[str]: """Returns the html contents of the primary document if it is html""" sgml = self.sgml() html = sgml.html() if not html: document:Attachment = self.homepage.primary_html_document if document.empty or document.is_binary(): return None return self.homepage.primary_html_document.download() if html.endswith(""): return None if html.startswith("", "") return f"