Files
2025-12-09 12:13:01 +01:00

607 lines
20 KiB
Python

"""
Company Dataset Builder for EdgarTools
Builds high-performance company datasets from SEC submissions data with two output formats:
1. PyArrow Parquet (5-20 MB) - Fast filtering with PyArrow compute API
2. DuckDB (287 MB) - Optional SQL interface for power users
Performance:
- Build time: ~30 seconds (optimized with orjson + company filtering)
- Records: ~562,413 companies (40% individual filers filtered)
- Query speed: <1ms (DuckDB) or <100ms (Parquet)
Example:
>>> from edgar.reference import get_company_dataset
>>> import pyarrow.compute as pc
>>>
>>> # Load dataset (builds on first use)
>>> companies = get_company_dataset()
>>>
>>> # Filter pharmaceutical companies
>>> pharma = companies.filter(pc.field('sic').between(2834, 2836))
>>> print(f"Found {len(pharma)} pharma companies")
"""
from pathlib import Path
from typing import Optional, Union
import logging
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm
from edgar.core import get_edgar_data_directory, log
# Try to import orjson for performance, fall back to stdlib json
try:
import orjson
def load_json(path: Path) -> dict:
"""Load JSON file using orjson (1.55x faster)"""
return orjson.loads(path.read_bytes())
JSON_PARSER = "orjson"
except ImportError:
import json
def load_json(path: Path) -> dict:
"""Load JSON file using stdlib json"""
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
JSON_PARSER = "json (stdlib)"
# Company dataset schema
COMPANY_SCHEMA = pa.schema([
('cik', pa.string()), # Keep as string to preserve leading zeros
('name', pa.string()),
('sic', pa.int32()), # Nullable - some companies have no SIC
('sic_description', pa.string()),
('tickers', pa.string()), # Pipe-delimited (e.g., "AAPL|APPLE")
('exchanges', pa.string()), # Pipe-delimited (e.g., "Nasdaq|NYSE")
('state_of_incorporation', pa.string()),
('state_of_incorporation_description', pa.string()),
('fiscal_year_end', pa.string()), # MMDD format
('entity_type', pa.string()),
('ein', pa.string()),
])
def is_individual_from_json(data: dict) -> bool:
"""
Determine if entity is an individual filer vs a company.
Uses the same logic as edgar.entity.data:478 (is_individual property).
Companies typically have:
- Tickers or exchanges
- State of incorporation
- Entity type other than '' or 'other'
- Company-specific filings (10-K, 10-Q, 8-K, etc.)
Args:
data: Parsed JSON submission data
Returns:
True if individual filer, False if company
Example:
>>> data = {'cik': '0001318605', 'tickers': ['TSLA']}
>>> is_individual_from_json(data)
False
>>> data = {'cik': '0001078519', 'name': 'JOHN DOE'}
>>> is_individual_from_json(data)
True
"""
# Has ticker or exchange → company
if data.get('tickers') or data.get('exchanges'):
return False
# Has state of incorporation → company (with exceptions)
state = data.get('stateOfIncorporation', '')
if state and state != '':
# Reed Hastings exception (individual with state of incorporation)
if data.get('cik') == '0001033331':
return True
return False
# Has entity type (not '' or 'other') → company
entity_type = data.get('entityType', '')
if entity_type and entity_type not in ['', 'other']:
return False
# Files company forms (10-K, 10-Q, etc.) → company
filings = data.get('filings', {})
if filings:
recent = filings.get('recent', {})
forms = recent.get('form', [])
company_forms = {'10-K', '10-Q', '8-K', '10-K/A', '10-Q/A', '20-F', 'S-1'}
if any(form in company_forms for form in forms):
return False
# Default: individual
return True
def build_company_dataset_parquet(
submissions_dir: Path,
output_path: Path,
filter_individuals: bool = True,
show_progress: bool = True
) -> pa.Table:
"""
Build PyArrow Parquet dataset from submissions directory (companies only).
This function processes all CIK*.json files in the submissions directory,
filters out individual filers (optional), and creates a compressed Parquet file.
Performance:
- ~30 seconds for 562,413 companies (with orjson + filtering)
- Output size: ~5-20 MB (zstd compressed)
- Memory usage: ~100-200 MB during build
Args:
submissions_dir: Directory containing CIK*.json files
output_path: Where to save the .pq file
filter_individuals: Skip individual filers (default: True)
show_progress: Show progress bar (default: True)
Returns:
PyArrow Table with company data
Raises:
FileNotFoundError: If submissions_dir doesn't exist
Example:
>>> from pathlib import Path
>>> submissions_dir = Path.home() / '.edgar' / 'submissions'
>>> output_path = Path.home() / '.edgar' / 'companies.pq'
>>> table = build_company_dataset_parquet(submissions_dir, output_path)
>>> print(f"Built dataset: {len(table):,} companies")
"""
if not submissions_dir.exists():
raise FileNotFoundError(
f"Submissions directory not found: {submissions_dir}\n\n"
"Please download submissions data first:\n"
" from edgar.storage import download_submissions\n"
" download_submissions()\n"
)
# Get all submission JSON files
json_files = list(submissions_dir.glob("CIK*.json"))
if len(json_files) == 0:
raise FileNotFoundError(
f"No submission files found in: {submissions_dir}\n"
"Expected CIK*.json files"
)
log.info(f"Building company dataset from {len(json_files):,} submission files")
log.info(f"Using JSON parser: {JSON_PARSER}")
companies = []
errors = 0
individuals_skipped = 0
# Process each file with progress bar
iterator = tqdm(json_files, desc="Processing submissions", disable=not show_progress)
for json_file in iterator:
try:
data = load_json(json_file)
# Skip individuals if filtering enabled
if filter_individuals and is_individual_from_json(data):
individuals_skipped += 1
continue
# Extract SIC (handle empty strings)
sic = data.get('sic')
sic_int = int(sic) if sic and sic != '' else None
# Extract tickers and exchanges (filter None values)
tickers = data.get('tickers', [])
exchanges = data.get('exchanges', [])
companies.append({
'cik': data.get('cik'),
'name': data.get('name'),
'sic': sic_int,
'sic_description': data.get('sicDescription'),
'tickers': '|'.join(filter(None, tickers)) if tickers else None,
'exchanges': '|'.join(filter(None, exchanges)) if exchanges else None,
'state_of_incorporation': data.get('stateOfIncorporation'),
'state_of_incorporation_description': data.get('stateOfIncorporationDescription'),
'fiscal_year_end': data.get('fiscalYearEnd'),
'entity_type': data.get('entityType'),
'ein': data.get('ein'),
})
except Exception as e:
errors += 1
log.debug(f"Error processing {json_file.name}: {e}")
continue
# Log statistics
log.info(f"Processed {len(json_files):,} files:")
log.info(f" - Companies: {len(companies):,}")
if filter_individuals:
log.info(f" - Individuals skipped: {individuals_skipped:,}")
if errors > 0:
log.warning(f" - Errors: {errors:,}")
# Create PyArrow Table
table = pa.Table.from_pylist(companies, schema=COMPANY_SCHEMA)
# Write to Parquet with compression
output_path.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(
table,
output_path,
compression='zstd',
compression_level=9,
use_dictionary=True
)
file_size_mb = output_path.stat().st_size / (1024 * 1024)
log.info(f"Saved Parquet file: {output_path} ({file_size_mb:.1f} MB)")
return table
def build_company_dataset_duckdb(
submissions_dir: Path,
output_path: Path,
filter_individuals: bool = True,
create_indexes: bool = True,
show_progress: bool = True
) -> None:
"""
Build DuckDB database from submissions directory (companies only).
This function creates a DuckDB database with a 'companies' table and
optional indexes on key columns for fast querying.
Performance:
- ~30 seconds for 562,413 companies (with orjson + filtering)
- Output size: ~287 MB
- Query speed: <1ms with indexes
Args:
submissions_dir: Directory containing CIK*.json files
output_path: Where to save the .duckdb file
filter_individuals: Skip individual filers (default: True)
create_indexes: Create indexes on cik, sic, name (default: True)
show_progress: Show progress bar (default: True)
Raises:
FileNotFoundError: If submissions_dir doesn't exist
ImportError: If duckdb package not installed
Example:
>>> from pathlib import Path
>>> submissions_dir = Path.home() / '.edgar' / 'submissions'
>>> output_path = Path.home() / '.edgar' / 'companies.duckdb'
>>> build_company_dataset_duckdb(submissions_dir, output_path)
>>>
>>> import duckdb
>>> con = duckdb.connect(str(output_path))
>>> result = con.execute("SELECT COUNT(*) FROM companies").fetchone()
>>> print(f"Companies: {result[0]:,}")
"""
try:
import duckdb
except ImportError:
raise ImportError(
"DuckDB export requires duckdb package.\n"
"Install with: pip install duckdb"
)
if not submissions_dir.exists():
raise FileNotFoundError(
f"Submissions directory not found: {submissions_dir}\n\n"
"Please download submissions data first:\n"
" from edgar.storage import download_submissions\n"
" download_submissions()\n"
)
# Get all submission JSON files
json_files = list(submissions_dir.glob("CIK*.json"))
if len(json_files) == 0:
raise FileNotFoundError(
f"No submission files found in: {submissions_dir}\n"
"Expected CIK*.json files"
)
log.info(f"Building DuckDB database from {len(json_files):,} submission files")
log.info(f"Using JSON parser: {JSON_PARSER}")
companies = []
errors = 0
individuals_skipped = 0
# Process each file with progress bar
iterator = tqdm(json_files, desc="Processing submissions", disable=not show_progress)
for json_file in iterator:
try:
data = load_json(json_file)
# Skip individuals if filtering enabled
if filter_individuals and is_individual_from_json(data):
individuals_skipped += 1
continue
# Extract SIC (handle empty strings)
sic = data.get('sic')
sic_int = int(sic) if sic and sic != '' else None
# Extract tickers and exchanges (filter None values)
tickers = data.get('tickers', [])
exchanges = data.get('exchanges', [])
companies.append({
'cik': data.get('cik'),
'name': data.get('name'),
'sic': sic_int,
'sic_description': data.get('sicDescription'),
'tickers': '|'.join(filter(None, tickers)) if tickers else None,
'exchanges': '|'.join(filter(None, exchanges)) if exchanges else None,
'state_of_incorporation': data.get('stateOfIncorporation'),
'state_of_incorporation_description': data.get('stateOfIncorporationDescription'),
'fiscal_year_end': data.get('fiscalYearEnd'),
'entity_type': data.get('entityType'),
'ein': data.get('ein'),
})
except Exception as e:
errors += 1
log.debug(f"Error processing {json_file.name}: {e}")
continue
# Log statistics
log.info(f"Processed {len(json_files):,} files:")
log.info(f" - Companies: {len(companies):,}")
if filter_individuals:
log.info(f" - Individuals skipped: {individuals_skipped:,}")
if errors > 0:
log.warning(f" - Errors: {errors:,}")
# Create DuckDB database
import pandas as pd
output_path.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(output_path))
# Create table from DataFrame
df = pd.DataFrame(companies)
con.execute("CREATE TABLE companies AS SELECT * FROM df")
# Create indexes
if create_indexes:
log.info("Creating indexes...")
con.execute("CREATE INDEX idx_cik ON companies(cik)")
con.execute("CREATE INDEX idx_sic ON companies(sic)")
con.execute("CREATE INDEX idx_name ON companies(name)")
# Add metadata table
con.execute("""
CREATE TABLE metadata AS
SELECT
CURRENT_TIMESTAMP as created_at,
COUNT(*) as total_companies,
COUNT(DISTINCT sic) as unique_sic_codes,
COUNT(DISTINCT CASE WHEN tickers IS NOT NULL THEN 1 END) as companies_with_tickers,
COUNT(DISTINCT CASE WHEN exchanges IS NOT NULL THEN 1 END) as companies_with_exchanges
FROM companies
""")
con.close()
file_size_mb = output_path.stat().st_size / (1024 * 1024)
log.info(f"Saved DuckDB database: {output_path} ({file_size_mb:.1f} MB)")
def load_company_dataset_parquet(parquet_path: Path) -> pa.Table:
"""
Load company dataset from Parquet file.
This is a simple wrapper around pyarrow.parquet.read_table() with
logging for consistency.
Performance: <100ms for typical dataset
Args:
parquet_path: Path to .pq file
Returns:
PyArrow Table with company data
Example:
>>> from pathlib import Path
>>> path = Path.home() / '.edgar' / 'companies.pq'
>>> companies = load_company_dataset_parquet(path)
>>> print(f"Loaded {len(companies):,} companies")
"""
if not parquet_path.exists():
raise FileNotFoundError(f"Parquet file not found: {parquet_path}")
table = pq.read_table(parquet_path)
log.debug(f"Loaded {len(table):,} companies from {parquet_path}")
return table
def to_duckdb(
parquet_path: Path,
duckdb_path: Path,
create_indexes: bool = True
) -> None:
"""
Convert Parquet dataset to DuckDB database.
This provides an easy way to export the Parquet dataset to DuckDB
for users who want SQL query capabilities.
Performance: <5 seconds for typical dataset
Args:
parquet_path: Path to source .pq file
duckdb_path: Path to output .duckdb file
create_indexes: Create indexes on key columns (default: True)
Example:
>>> from pathlib import Path
>>> parquet_path = Path.home() / '.edgar' / 'companies.pq'
>>> duckdb_path = Path.home() / '.edgar' / 'companies.duckdb'
>>> to_duckdb(parquet_path, duckdb_path)
>>>
>>> import duckdb
>>> con = duckdb.connect(str(duckdb_path))
>>> result = con.execute(
... "SELECT * FROM companies WHERE sic = 2834"
... ).fetchdf()
"""
try:
import duckdb
except ImportError:
raise ImportError(
"DuckDB export requires duckdb package.\n"
"Install with: pip install duckdb"
)
if not parquet_path.exists():
raise FileNotFoundError(f"Parquet file not found: {parquet_path}")
log.info(f"Converting Parquet to DuckDB: {parquet_path} -> {duckdb_path}")
# Read Parquet file and convert to pandas
table = pq.read_table(parquet_path)
import pandas as pd
df = table.to_pandas()
# Create DuckDB database
duckdb_path.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(duckdb_path))
# Create table from DataFrame
con.execute("CREATE TABLE companies AS SELECT * FROM df")
# Create indexes
if create_indexes:
log.info("Creating indexes...")
con.execute("CREATE INDEX idx_cik ON companies(cik)")
con.execute("CREATE INDEX idx_sic ON companies(sic)")
con.execute("CREATE INDEX idx_name ON companies(name)")
# Add metadata
con.execute("""
CREATE TABLE metadata AS
SELECT
CURRENT_TIMESTAMP as created_at,
COUNT(*) as total_companies,
COUNT(DISTINCT sic) as unique_sic_codes,
COUNT(DISTINCT CASE WHEN tickers IS NOT NULL THEN 1 END) as companies_with_tickers,
COUNT(DISTINCT CASE WHEN exchanges IS NOT NULL THEN 1 END) as companies_with_exchanges
FROM companies
""")
con.close()
file_size_mb = duckdb_path.stat().st_size / (1024 * 1024)
log.info(f"Exported to DuckDB: {duckdb_path} ({file_size_mb:.1f} MB)")
# In-memory cache for dataset
_CACHE = {}
def get_company_dataset(rebuild: bool = False) -> pa.Table:
"""
Get company dataset, building from submissions if needed.
This function checks for a cached dataset at ~/.edgar/companies.pq.
If not found, it automatically builds the dataset from submissions data.
On first use, this will take ~30 seconds to build the dataset. Subsequent
calls load from cache in <100ms.
Args:
rebuild: Force rebuild even if cache exists (default: False)
Returns:
PyArrow Table with company data (~562,413 companies)
Raises:
FileNotFoundError: If submissions directory not found or incomplete
Performance:
- First use: ~30 seconds (builds dataset)
- Cached: <100ms (loads from disk)
- Memory: ~20-50 MB
Example:
>>> from edgar.reference import get_company_dataset
>>> import pyarrow.compute as pc
>>>
>>> # First call builds dataset (takes ~30s)
>>> companies = get_company_dataset()
>>> print(f"Loaded {len(companies):,} companies")
>>>
>>> # Subsequent calls are fast (<100ms)
>>> companies = get_company_dataset()
>>>
>>> # Filter pharmaceutical companies (SIC 2834-2836)
>>> pharma = companies.filter(
... pc.field('sic').between(2834, 2836)
... )
>>> print(f"Found {len(pharma)} pharma companies")
>>>
>>> # Filter by exchange
>>> nasdaq = companies.filter(
... pc.field('exchanges').contains('Nasdaq')
... )
>>>
>>> # Force rebuild with latest data
>>> companies = get_company_dataset(rebuild=True)
"""
# Check in-memory cache first
if not rebuild and 'companies' in _CACHE:
return _CACHE['companies']
# Check disk cache
cache_path = get_edgar_data_directory() / 'companies.pq'
if cache_path.exists() and not rebuild:
# Load from cache
log.info(f"Loading company dataset from cache: {cache_path}")
table = load_company_dataset_parquet(cache_path)
_CACHE['companies'] = table
return table
# Need to build dataset
log.info("Building company dataset from submissions (this may take ~30 seconds)...")
submissions_dir = get_edgar_data_directory() / 'submissions'
if not submissions_dir.exists() or len(list(submissions_dir.glob('CIK*.json'))) < 100000:
raise FileNotFoundError(
f"Submissions directory not found or incomplete: {submissions_dir}\n\n"
"Please download submissions data first:\n"
" from edgar.storage import download_submissions\n"
" download_submissions()\n\n"
"This is a one-time download (~500 MB compressed)."
)
# Build dataset
table = build_company_dataset_parquet(
submissions_dir,
cache_path,
filter_individuals=True
)
log.info(f"✅ Built dataset: {len(table):,} companies, cached at {cache_path}")
_CACHE['companies'] = table
return table