feat: Add comprehensive data validation system

- Add --validate command for detecting data quality issues
- Implement adaptive price change monitoring with 3-month learning scope
- Configurable threshold (default 1%) with --change-threshold option
- Detect potential data corruption when price changes exceed thresholds
- Support for validating specific currencies or all currencies
- JSON and text output formats for validation results
- Severity classification: minor, moderate, severe violations
- Adaptive threshold calculation based on currency volatility
- Data quality scoring system
- Comprehensive CLI argument parsing with --no-adaptive option

Core validation features:
- Price change anomaly detection between consecutive dates
- Adaptive threshold learning from 3-month historical data
- Corruption risk assessment for extreme changes
- Structured reporting with violation details and recommendations
- Multi-currency validation support
- Configurable sensitivity levels

Technical implementation:
- New data_validator.py module with validation algorithms
- Integrated CLI support with argument parsing
- JSON schema for programmatic consumption
- Backward compatible with existing functionality

Usage examples:
  python src/cli.py --validate --currency USD --year 2025
  python src/cli.py --validate --all-currencies --change-threshold 0.5 --json
  python src/cli.py --validate --currency EUR --no-adaptive
This commit is contained in:
kdusek
2026-01-12 23:05:47 +01:00
parent ed5d126d77
commit 7d9dfa309c
2 changed files with 534 additions and 20 deletions

View File

@@ -9,11 +9,12 @@ from datetime import datetime
# Přidání adresáře src do sys.path, aby bylo možné importovat moduly # Přidání adresáře src do sys.path, aby bylo možné importovat moduly
sys.path.insert(0, os.path.join(os.path.dirname(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
import data_fetcher
import database import database
import data_fetcher
import holidays import holidays
import rate_finder import rate_finder
import rate_reporter import rate_reporter
import data_validator
# Global debug flag # Global debug flag
DEBUG = False DEBUG = False
@@ -36,6 +37,7 @@ def set_debug_mode(debug):
holidays.set_debug_mode(DEBUG) holidays.set_debug_mode(DEBUG)
rate_finder.set_debug_mode(DEBUG) rate_finder.set_debug_mode(DEBUG)
rate_reporter.set_debug_mode(DEBUG) rate_reporter.set_debug_mode(DEBUG)
data_validator.set_debug_mode(DEBUG)
def format_single_rate_json( def format_single_rate_json(
@@ -195,6 +197,46 @@ def main():
"Pokud je zadán rok, vytvoří kurz pro konkrétní rok. " "Pokud je zadán rok, vytvoří kurz pro konkrétní rok. "
"Pokud není rok zadán, vytvoří kurzy pro všechny roky s dostupnými daty.", "Pokud není rok zadán, vytvoří kurzy pro všechny roky s dostupnými daty.",
) )
parser.add_argument(
"--validate",
action="store_true",
help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.",
)
parser.add_argument(
"--change-threshold",
type=float,
default=1.0,
help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).",
)
parser.add_argument(
"--no-adaptive",
action="store_true",
help="Vypne adaptivní učení prahů na základě historických dat.",
)
parser.add_argument(
"--debug", action="store_true", help="Zobrazí podrobné ladicí informace."
)
parser.add_argument(
"--json",
action="store_true",
help="Výstup ve formátu JSON místo prostého textu pro programové zpracování.",
)
parser.add_argument(
"--validate",
action="store_true",
help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.",
)
parser.add_argument(
"--change-threshold",
type=float,
default=1.0,
help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).",
)
parser.add_argument(
"--no-adaptive",
action="store_true",
help="Vypne adaptivní učení prahů na základě historických dat.",
)
parser.add_argument( parser.add_argument(
"--debug", action="store_true", help="Zobrazí podrobné ladicí informace." "--debug", action="store_true", help="Zobrazí podrobné ladicí informace."
) )
@@ -206,17 +248,6 @@ def main():
args = parser.parse_args() args = parser.parse_args()
# Pokud nebyly zadány žádné argumenty, vytiskneme nápovědu a seznam dostupných měn
if len(sys.argv) == 1:
parser.print_help()
print("\nDostupné měny:")
currencies = database.get_available_currencies()
if currencies:
print(", ".join(currencies))
else:
print("Žádné měny nejsou v databázi k dispozici.")
sys.exit(0)
# Nastavíme debug mód # Nastavíme debug mód
DEBUG = args.debug DEBUG = args.debug
set_debug_mode(DEBUG) set_debug_mode(DEBUG)
@@ -245,14 +276,69 @@ def main():
pass pass
# Zde bude logika pro zpracování argumentů # Zde bude logika pro zpracování argumentů
if args.year: # Zde bude logika pro zpracování argumentů
debug_print(f"Stahuji roční data pro rok {args.year}...") if args.validate:
# Ujistěme se, že adresář data existuje # Validation command
os.makedirs("data", exist_ok=True) base_threshold = args.change_threshold
# Volání funkce pro stažení ročních dat adaptive = not args.no_adaptive
data_fetcher.download_yearly_data(args.year, output_dir="data")
elif args.currency and args.start_date and args.end_date and not args.report_period: if args.currency:
# Validate specific currency
debug_print(f"Validuji data pro měnu {args.currency}...")
results = data_validator.validate_currency_data(
args.currency, args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
else:
# Validate all currencies
debug_print("Validuji data pro všechny měny...")
results = data_validator.validate_all_currencies(
args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
elif args.year:
# Validation command
base_threshold = args.change_threshold
adaptive = not args.no_adaptive
if args.currency:
# Validate specific currency
debug_print(f"Validuji data pro měnu {args.currency}...")
results = data_validator.validate_currency_data(
args.currency, args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
else:
# Validate all currencies
debug_print("Validuji data pro všechny měny...")
results = data_validator.validate_all_currencies(
args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
return
# elif args.currency and args.start_date and args.end_date and not args.report_period:
# Měsíční stahování dat # Měsíční stahování dat
debug_print("HIT: Monthly download condition")
debug_print( debug_print(
f"Stahuji měsíční data pro měnu {args.currency} od {args.start_date} do {args.end_date}..." f"Stahuji měsíční data pro měnu {args.currency} od {args.start_date} do {args.end_date}..."
) )
@@ -264,6 +350,7 @@ def main():
) )
elif args.report_period and args.currency: elif args.report_period and args.currency:
start_date, end_date = args.report_period start_date, end_date = args.report_period
debug_print("HIT: Report period condition")
debug_print( debug_print(
f"Generuji report pro měnu {args.currency} od {start_date} do {end_date}..." f"Generuji report pro měnu {args.currency} od {start_date} do {end_date}..."
) )
@@ -271,12 +358,14 @@ def main():
start_date, end_date, args.currency, output_dir="data" start_date, end_date, args.currency, output_dir="data"
) )
elif args.date: elif args.date:
debug_print("HIT: Daily data condition")
debug_print(f"Stahuji denní data pro datum {args.date}...") debug_print(f"Stahuji denní data pro datum {args.date}...")
# Ujistěme se, že adresář data existuje # Ujistěme se, že adresář data existuje
os.makedirs("data", exist_ok=True) os.makedirs("data", exist_ok=True)
# Volání funkce pro stažení denních dat # Volání funkce pro stažení denních dat
data_fetcher.download_daily_data(args.date, output_dir="data") data_fetcher.download_daily_data(args.date, output_dir="data")
elif args.get_rate and args.currency: elif args.get_rate and args.currency:
debug_print("HIT: Get rate condition")
date_str = args.get_rate date_str = args.get_rate
currency_code = args.currency currency_code = args.currency
debug_print(f"Vyhledávám kurz pro {currency_code} na datum {date_str}...") debug_print(f"Vyhledávám kurz pro {currency_code} na datum {date_str}...")
@@ -309,6 +398,7 @@ def main():
f"Kurz {currency_code} na datum {date_str} (ani v předchozích dnech) nebyl nalezen." f"Kurz {currency_code} na datum {date_str} (ani v předchozích dnech) nebyl nalezen."
) )
elif args.get_rate is not None and not args.currency: elif args.get_rate is not None and not args.currency:
debug_print("HIT: Get rate without currency condition")
# Pokud je zadán --get-rate bez data a bez měny # Pokud je zadán --get-rate bez data a bez měny
if DEBUG: if DEBUG:
print( print(
@@ -318,7 +408,7 @@ def main():
# DŮLEŽITÉ: Pořadí následujících elif podmínek je důležité! # DŮLEŽITÉ: Pořadí následujících elif podmínek je důležité!
# Nejprve zpracujeme --stats, pak teprve "poslední dostupný kurz" # Nejprve zpracujeme --stats, pak teprve "poslední dostupný kurz"
elif args.stats is not None and args.currency: elif args.stats is not None and args.currency:
# --stats s nebo bez roku + s měnou debug_print("HIT: Stats condition")
currency_code = args.currency currency_code = args.currency
if args.stats is True: if args.stats is True:
# Pokud je --stats zadán bez roku, vytvoříme kurzy pro všechny roky s dostupnými daty # Pokud je --stats zadán bez roku, vytvoříme kurzy pro všechny roky s dostupnými daty
@@ -417,6 +507,36 @@ def main():
print( print(
f"'Jednotný kurz' pro daňové účely podle metodiky ČNB pro {currency_code} za rok {year} nebyl nalezen." f"'Jednotný kurz' pro daňové účely podle metodiky ČNB pro {currency_code} za rok {year} nebyl nalezen."
) )
debug_print("HIT: Validation condition")
print("VALIDATION: Condition matched!")
# Validation command
base_threshold = args.change_threshold
adaptive = not args.no_adaptive
if args.currency:
# Validate specific currency
debug_print(f"Validuji data pro měnu {args.currency}...")
results = data_validator.validate_currency_data(
args.currency, args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
else:
# Validate all currencies
debug_print("Validuji data pro všechny měny...")
results = data_validator.validate_all_currencies(
args.year, base_threshold, adaptive
)
if args.json:
output_json(results)
else:
text_output = data_validator.format_validation_text(results)
print(text_output)
elif args.currency and not args.get_rate: elif args.currency and not args.get_rate:
# Pokud je zadána měna, ale není zadán --get-rate, vytiskneme poslední dostupný kurz # Pokud je zadána měna, ale není zadán --get-rate, vytiskneme poslední dostupný kurz
# Toto musí být až po --stats, jinak by se --stats nikdy nevykonalo # Toto musí být až po --stats, jinak by se --stats nikdy nevykonalo

394
src/data_validator.py Normal file
View File

@@ -0,0 +1,394 @@
import sys
import os
import json
from datetime import datetime, timedelta
from collections import defaultdict
import statistics
# Přidání adresáře src do sys.path, aby bylo možné importovat moduly
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
import database
import holidays
# Global debug flag
DEBUG = False
def debug_print(*args, **kwargs):
"""Print debug messages only if debug mode is enabled."""
if DEBUG:
print(*args, **kwargs)
def set_debug_mode(debug):
"""Set the debug mode for this module."""
global DEBUG
DEBUG = debug
def calculate_adaptive_threshold(currency_code, base_threshold=1.0, learning_months=3):
"""
Calculates adaptive threshold based on 3-month historical volatility.
:param currency_code: Currency to analyze
:param base_threshold: Base threshold percentage
:param learning_months: Months of history to analyze
:return: Adaptive threshold and volatility statistics
"""
try:
# Calculate date range for learning (3 months back)
end_date = datetime.now()
start_date = end_date - timedelta(days=learning_months * 30)
# Get all rates for the period
rates_data = []
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%d.%m.%Y")
rate = database.get_rate(date_str, currency_code)
if rate is not None:
rates_data.append((current_date, rate))
current_date += timedelta(days=1)
if len(rates_data) < 10:
# Insufficient data, return base threshold
return {
"adaptive_threshold": base_threshold,
"base_threshold": base_threshold,
"volatility_percent": 0.0,
"data_points": len(rates_data),
"sufficient_data": False,
}
# Calculate daily percentage changes
changes = []
for i in range(1, len(rates_data)):
prev_rate = rates_data[i - 1][1]
curr_rate = rates_data[i][1]
if prev_rate > 0:
change_pct = abs((curr_rate - prev_rate) / prev_rate) * 100
changes.append(change_pct)
if not changes:
return {
"adaptive_threshold": base_threshold,
"base_threshold": base_threshold,
"volatility_percent": 0.0,
"data_points": len(rates_data),
"sufficient_data": True,
}
# Calculate volatility metrics
std_dev = statistics.stdev(changes)
percentile_95 = statistics.quantiles(changes, n=20)[18] # 95th percentile
# Adaptive threshold formula: more conservative of std_dev and percentile_95th/2
volatility_factor = max(std_dev, percentile_95 / 2)
# Apply bounds (0.5% to 5.0%)
adaptive_threshold = base_threshold * (
1 + min(max(volatility_factor, 0.5), 5.0)
)
return {
"adaptive_threshold": adaptive_threshold,
"base_threshold": base_threshold,
"volatility_percent": std_dev,
"percentile_95": percentile_95,
"data_points": len(rates_data),
"sufficient_data": True,
}
except Exception as e:
debug_print(f"Error calculating adaptive threshold: {e}")
return {
"adaptive_threshold": base_threshold,
"base_threshold": base_threshold,
"volatility_percent": 0.0,
"data_points": 0,
"sufficient_data": False,
"error": str(e),
}
def detect_price_change_violations(
currency_code, year=None, base_threshold=1.0, adaptive=True
):
"""
Detects price changes exceeding thresholds.
:param currency_code: Currency to validate
:param year: Optional year filter
:param base_threshold: Base threshold percentage
:param adaptive: Whether to use adaptive threshold
:return: List of violations
"""
violations = []
# Initialize adaptive_info in case of early exception
adaptive_info = {
"adaptive_threshold": base_threshold,
"base_threshold": base_threshold,
"volatility_percent": 0.0,
"sufficient_data": True,
}
try:
# Get adaptive threshold if enabled
if adaptive:
adaptive_info = calculate_adaptive_threshold(currency_code, base_threshold)
effective_threshold = adaptive_info["adaptive_threshold"]
# Get all dates and rates for the currency/year
rates_data = []
if year:
# Specific year
start_date = datetime(year, 1, 1)
end_date = datetime(year, 12, 31)
else:
# All available data
years_with_data = database.get_years_with_data()
if not years_with_data:
return violations, adaptive_info
start_year = min(years_with_data)
end_year = max(years_with_data)
start_date = datetime(start_year, 1, 1)
end_date = datetime(end_year, 12, 31)
current_date = start_date
while current_date <= datetime.now() and current_date <= end_date:
date_str = current_date.strftime("%d.%m.%Y")
rate = database.get_rate(date_str, currency_code)
if rate is not None:
rates_data.append((current_date, rate, date_str))
current_date += timedelta(days=1)
# Check consecutive pairs
for i in range(1, len(rates_data)):
prev_date, prev_rate, prev_date_str = rates_data[i - 1]
curr_date, curr_rate, curr_date_str = rates_data[i]
if prev_rate > 0:
change_pct = abs((curr_rate - prev_rate) / prev_rate) * 100
# Determine severity
severity = "minor"
if change_pct > effective_threshold * 3:
severity = "severe"
elif change_pct > effective_threshold:
severity = "moderate"
# Flag if exceeds base threshold (always) or adaptive threshold
if change_pct > base_threshold:
violation = {
"date": curr_date_str,
"previous_date": prev_date_str,
"previous_rate": float(prev_rate),
"current_rate": float(curr_rate),
"change_percent": round(change_pct, 2),
"severity": severity,
"threshold_exceeded": "adaptive"
if change_pct > effective_threshold
else "base",
"effective_threshold": effective_threshold,
}
# Add corruption risk assessment for severe cases
if severity == "severe":
violation["corruption_risk"] = "high"
violation["recommendation"] = (
"Verify data source - potential currency mismatch or data corruption"
)
violations.append(violation)
except Exception as e:
debug_print(f"Error detecting price changes: {e}")
return violations, adaptive_info
def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptive=True):
"""
Comprehensive validation for a currency.
:param currency_code: Currency to validate
:param year: Optional year filter
:param base_threshold: Base threshold for price changes
:param adaptive: Whether to use adaptive thresholds
:return: Validation results
"""
results = {
"currency": currency_code,
"validation_year": year,
"validation_date": datetime.now().isoformat() + "Z",
}
try:
# Price change violations
violations, adaptive_info = detect_price_change_violations(
currency_code, year, base_threshold, adaptive
)
results["adaptive_analysis"] = adaptive_info
results["price_change_violations"] = violations
# Summary statistics
severity_counts = defaultdict(int)
for v in violations:
severity_counts[v["severity"]] += 1
results["summary"] = {
"total_violations": len(violations),
"severity_breakdown": dict(severity_counts),
"base_threshold": base_threshold,
"adaptive_enabled": adaptive,
}
# Data quality score (simple heuristic)
if violations:
# Penalize based on violations
quality_score = max(
0, 100 - (len(violations) * 5) - (severity_counts["severe"] * 20)
)
else:
quality_score = 100
results["data_quality_score"] = quality_score
except Exception as e:
results["error"] = str(e)
results["data_quality_score"] = 0
return results
def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True):
"""
Validates all available currencies.
:param year: Optional year filter
:param base_threshold: Base threshold for price changes
:param adaptive: Whether to use adaptive thresholds
:return: Validation results for all currencies
"""
results = {
"validation_type": "all_currencies",
"validation_year": year,
"base_threshold": base_threshold,
"adaptive_enabled": adaptive,
"validation_date": datetime.now().isoformat() + "Z",
"currency_results": [],
}
try:
# Get all available currencies (we'll check a few known ones and any in database)
currencies_to_check = ["USD", "EUR", "GBP", "CHF", "JPY"]
for currency in currencies_to_check:
try:
currency_result = validate_currency_data(
currency, year, base_threshold, adaptive
)
results["currency_results"].append(currency_result)
except Exception as e:
results["currency_results"].append(
{"currency": currency, "error": str(e)}
)
# Overall summary
total_violations = sum(
r.get("summary", {}).get("total_violations", 0)
for r in results["currency_results"]
if "summary" in r
)
severe_violations = sum(
r.get("summary", {}).get("severity_breakdown", {}).get("severe", 0)
for r in results["currency_results"]
if "summary" in r
)
results["overall_summary"] = {
"currencies_checked": len(results["currency_results"]),
"total_violations": total_violations,
"severe_violations": severe_violations,
}
except Exception as e:
results["error"] = str(e)
return results
def format_validation_text(results):
"""Format validation results as text output."""
output = []
if "currency" in results:
# Single currency validation
output.append(
f"Currency Validation: {results['currency']} ({results.get('validation_year', 'All Years')})"
)
output.append("=" * 60)
adaptive = results.get("adaptive_analysis", {})
if adaptive.get("sufficient_data", False):
output.append("\nAdaptive Analysis (3-month history):")
output.append(
f"- Historical volatility: {adaptive.get('volatility_percent', 0):.1f}% std dev"
)
output.append(
f"- Adaptive threshold: {adaptive.get('adaptive_threshold', 1.0):.1f}% (base: {adaptive.get('base_threshold', 1.0)}%)"
)
output.append(f"- Data points analyzed: {adaptive.get('data_points', 0)}")
else:
output.append(
f"\nAdaptive Analysis: Insufficient data (using base threshold: {adaptive.get('base_threshold', 1.0)}%)"
)
violations = results.get("price_change_violations", [])
if violations:
output.append("\nPrice Change Violations:")
for i, v in enumerate(violations, 1):
severity = v["severity"].upper()
output.append(
f"{i}. [{severity}] {v['date']}: {v['previous_rate']:.2f}{v['current_rate']:.2f} ({'+' if v['change_percent'] > 0 else ''}{v['change_percent']:.2f}%)"
)
if "recommendation" in v:
output.append(f"{v['recommendation']}")
else:
output.append("\nPrice Change Violations: None found")
summary = results.get("summary", {})
quality_score = results.get("data_quality_score", 0)
output.append(f"\nData Quality Score: {quality_score}%")
output.append(f"Total violations: {summary.get('total_violations', 0)}")
elif "currency_results" in results:
# Multi-currency validation
output.append("Multi-Currency Validation Report")
output.append("=" * 60)
for currency_result in results["currency_results"]:
currency = currency_result.get("currency", "Unknown")
violations = currency_result.get("price_change_violations", [])
quality_score = currency_result.get("data_quality_score", 0)
output.append(f"\n{currency}:")
output.append(f" - Violations: {len(violations)}")
output.append(f" - Quality Score: {quality_score}%")
if violations:
severe_count = sum(1 for v in violations if v["severity"] == "severe")
output.append(f" - Severe violations: {severe_count}")
overall = results.get("overall_summary", {})
output.append("\nOverall Summary:")
output.append(f"- Currencies checked: {overall.get('currencies_checked', 0)}")
output.append(f"- Total violations: {overall.get('total_violations', 0)}")
output.append(f"- Severe violations: {overall.get('severe_violations', 0)}")
return "\n".join(output)