Files
2025-12-09 12:13:01 +01:00

132 lines
3.9 KiB
Python

"""
Utility functions for entity processing.
This module contains utility functions used throughout the entity package
for data processing, normalization, and validation.
"""
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
import pyarrow
from edgar.entity.constants import COMPANY_FORMS
def has_company_filings(filings_form_array: 'pyarrow.ChunkedArray', max_filings: int = 50) -> bool:
"""
Efficiently check if any form in the PyArrow ChunkedArray matches company-only forms.
Limited to checking the first max_filings entries for performance.
Args:
filings_form_array: PyArrow ChunkedArray containing form values
max_filings: Maximum number of filings to check
Returns:
True if any form matches a company form, False otherwise
"""
# Early exit for empty arrays
if filings_form_array.null_count == filings_form_array.length:
return False
# Handle case with fewer than max_filings
total_filings = filings_form_array.length()
filings_to_check = min(total_filings, max_filings)
# Track how many we've checked so far
checked_count = 0
# Process chunks in the ChunkedArray until we hit our limit
for chunk in filings_form_array.chunks:
chunk_size = len(chunk)
# If this chunk would exceed our limit, slice it
if checked_count + chunk_size > filings_to_check:
# Only check remaining forms needed to reach filings_to_check
remaining = filings_to_check - checked_count
sliced_chunk = chunk.slice(0, remaining)
# Use safer iteration over array values
for i in range(len(sliced_chunk)):
# Get value safely, handling nulls
val = sliced_chunk.take([i]).to_pylist()[0]
if val is not None and val in COMPANY_FORMS:
return True
else:
# Process full chunk safely
for val in chunk.to_pylist():
if val is not None and val in COMPANY_FORMS:
return True
# Update count of checked filings
if checked_count + chunk_size > filings_to_check:
checked_count += (filings_to_check - checked_count)
else:
checked_count += chunk_size
# Stop if we've checked enough
if checked_count >= filings_to_check:
break
return False
def normalize_cik(cik_or_identifier: Union[str, int]) -> int:
"""
Normalize a CIK to an integer by removing leading zeros.
Args:
cik_or_identifier: CIK as string or integer
Returns:
Normalized CIK as integer
Raises:
ValueError: If the identifier cannot be converted to a valid CIK
"""
if isinstance(cik_or_identifier, int):
return cik_or_identifier
if isinstance(cik_or_identifier, str):
# Remove leading zeros and convert to int
try:
return int(cik_or_identifier.lstrip('0') or '0')
except ValueError:
raise ValueError(f"Invalid CIK format: {cik_or_identifier}")
raise ValueError(f"CIK must be string or integer, got {type(cik_or_identifier)}")
def validate_cik(cik: int) -> bool:
"""
Validate that a CIK is within the expected range.
Args:
cik: CIK to validate
Returns:
True if CIK is valid, False otherwise
"""
# CIKs are typically 1-10 digits, with valid range roughly 1 to 2,000,000,000
return isinstance(cik, int) and 1 <= cik <= 2_000_000_000
def format_cik(cik: Union[str, int], zero_pad: int = 10) -> str:
"""
Format a CIK with zero padding for display or API calls.
Args:
cik: CIK to format
zero_pad: Number of digits to pad to (default 10)
Returns:
Zero-padded CIK string
Example:
>>> format_cik(320193)
'0000320193'
>>> format_cik('320193', zero_pad=6)
'320193'
"""
normalized_cik = normalize_cik(cik)
return str(normalized_cik).zfill(zero_pad)