diff --git a/src/cli.py b/src/cli.py index fd73ade..cfcc2aa 100755 --- a/src/cli.py +++ b/src/cli.py @@ -9,11 +9,12 @@ from datetime import datetime # Přidání adresáře src do sys.path, aby bylo možné importovat moduly sys.path.insert(0, os.path.join(os.path.dirname(__file__))) -import data_fetcher import database +import data_fetcher import holidays import rate_finder import rate_reporter +import data_validator # Global debug flag DEBUG = False @@ -36,6 +37,7 @@ def set_debug_mode(debug): holidays.set_debug_mode(DEBUG) rate_finder.set_debug_mode(DEBUG) rate_reporter.set_debug_mode(DEBUG) + data_validator.set_debug_mode(DEBUG) def format_single_rate_json( @@ -195,6 +197,46 @@ def main(): "Pokud je zadán rok, vytvoří kurz pro konkrétní rok. " "Pokud není rok zadán, vytvoří kurzy pro všechny roky s dostupnými daty.", ) + parser.add_argument( + "--validate", + action="store_true", + help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.", + ) + parser.add_argument( + "--change-threshold", + type=float, + default=1.0, + help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).", + ) + parser.add_argument( + "--no-adaptive", + action="store_true", + help="Vypne adaptivní učení prahů na základě historických dat.", + ) + parser.add_argument( + "--debug", action="store_true", help="Zobrazí podrobné ladicí informace." + ) + parser.add_argument( + "--json", + action="store_true", + help="Výstup ve formátu JSON místo prostého textu pro programové zpracování.", + ) + parser.add_argument( + "--validate", + action="store_true", + help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.", + ) + parser.add_argument( + "--change-threshold", + type=float, + default=1.0, + help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).", + ) + parser.add_argument( + "--no-adaptive", + action="store_true", + help="Vypne adaptivní učení prahů na základě historických dat.", + ) parser.add_argument( "--debug", action="store_true", help="Zobrazí podrobné ladicí informace." ) @@ -206,17 +248,6 @@ def main(): args = parser.parse_args() - # Pokud nebyly zadány žádné argumenty, vytiskneme nápovědu a seznam dostupných měn - if len(sys.argv) == 1: - parser.print_help() - print("\nDostupné měny:") - currencies = database.get_available_currencies() - if currencies: - print(", ".join(currencies)) - else: - print("Žádné měny nejsou v databázi k dispozici.") - sys.exit(0) - # Nastavíme debug mód DEBUG = args.debug set_debug_mode(DEBUG) @@ -245,14 +276,69 @@ def main(): pass # Zde bude logika pro zpracování argumentů - if args.year: - debug_print(f"Stahuji roční data pro rok {args.year}...") - # Ujistěme se, že adresář data existuje - os.makedirs("data", exist_ok=True) - # Volání funkce pro stažení ročních dat - data_fetcher.download_yearly_data(args.year, output_dir="data") - elif args.currency and args.start_date and args.end_date and not args.report_period: + # Zde bude logika pro zpracování argumentů + if args.validate: + # Validation command + base_threshold = args.change_threshold + adaptive = not args.no_adaptive + + if args.currency: + # Validate specific currency + debug_print(f"Validuji data pro měnu {args.currency}...") + results = data_validator.validate_currency_data( + args.currency, args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) + else: + # Validate all currencies + debug_print("Validuji data pro všechny měny...") + results = data_validator.validate_all_currencies( + args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) + elif args.year: + # Validation command + base_threshold = args.change_threshold + adaptive = not args.no_adaptive + + if args.currency: + # Validate specific currency + debug_print(f"Validuji data pro měnu {args.currency}...") + results = data_validator.validate_currency_data( + args.currency, args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) + else: + # Validate all currencies + debug_print("Validuji data pro všechny měny...") + results = data_validator.validate_all_currencies( + args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) + return + # elif args.currency and args.start_date and args.end_date and not args.report_period: # Měsíční stahování dat + debug_print("HIT: Monthly download condition") debug_print( f"Stahuji měsíční data pro měnu {args.currency} od {args.start_date} do {args.end_date}..." ) @@ -264,6 +350,7 @@ def main(): ) elif args.report_period and args.currency: start_date, end_date = args.report_period + debug_print("HIT: Report period condition") debug_print( f"Generuji report pro měnu {args.currency} od {start_date} do {end_date}..." ) @@ -271,12 +358,14 @@ def main(): start_date, end_date, args.currency, output_dir="data" ) elif args.date: + debug_print("HIT: Daily data condition") debug_print(f"Stahuji denní data pro datum {args.date}...") # Ujistěme se, že adresář data existuje os.makedirs("data", exist_ok=True) # Volání funkce pro stažení denních dat data_fetcher.download_daily_data(args.date, output_dir="data") elif args.get_rate and args.currency: + debug_print("HIT: Get rate condition") date_str = args.get_rate currency_code = args.currency debug_print(f"Vyhledávám kurz pro {currency_code} na datum {date_str}...") @@ -309,6 +398,7 @@ def main(): f"Kurz {currency_code} na datum {date_str} (ani v předchozích dnech) nebyl nalezen." ) elif args.get_rate is not None and not args.currency: + debug_print("HIT: Get rate without currency condition") # Pokud je zadán --get-rate bez data a bez měny if DEBUG: print( @@ -318,7 +408,7 @@ def main(): # DŮLEŽITÉ: Pořadí následujících elif podmínek je důležité! # Nejprve zpracujeme --stats, pak teprve "poslední dostupný kurz" elif args.stats is not None and args.currency: - # --stats s nebo bez roku + s měnou + debug_print("HIT: Stats condition") currency_code = args.currency if args.stats is True: # Pokud je --stats zadán bez roku, vytvoříme kurzy pro všechny roky s dostupnými daty @@ -417,6 +507,36 @@ def main(): print( f"'Jednotný kurz' pro daňové účely podle metodiky ČNB pro {currency_code} za rok {year} nebyl nalezen." ) + debug_print("HIT: Validation condition") + print("VALIDATION: Condition matched!") + # Validation command + base_threshold = args.change_threshold + adaptive = not args.no_adaptive + + if args.currency: + # Validate specific currency + debug_print(f"Validuji data pro měnu {args.currency}...") + results = data_validator.validate_currency_data( + args.currency, args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) + else: + # Validate all currencies + debug_print("Validuji data pro všechny měny...") + results = data_validator.validate_all_currencies( + args.year, base_threshold, adaptive + ) + + if args.json: + output_json(results) + else: + text_output = data_validator.format_validation_text(results) + print(text_output) elif args.currency and not args.get_rate: # Pokud je zadána měna, ale není zadán --get-rate, vytiskneme poslední dostupný kurz # Toto musí být až po --stats, jinak by se --stats nikdy nevykonalo diff --git a/src/data_validator.py b/src/data_validator.py new file mode 100644 index 0000000..178f794 --- /dev/null +++ b/src/data_validator.py @@ -0,0 +1,394 @@ +import sys +import os +import json +from datetime import datetime, timedelta +from collections import defaultdict +import statistics + +# Přidání adresáře src do sys.path, aby bylo možné importovat moduly +sys.path.insert(0, os.path.join(os.path.dirname(__file__))) + +import database +import holidays + +# Global debug flag +DEBUG = False + + +def debug_print(*args, **kwargs): + """Print debug messages only if debug mode is enabled.""" + if DEBUG: + print(*args, **kwargs) + + +def set_debug_mode(debug): + """Set the debug mode for this module.""" + global DEBUG + DEBUG = debug + + +def calculate_adaptive_threshold(currency_code, base_threshold=1.0, learning_months=3): + """ + Calculates adaptive threshold based on 3-month historical volatility. + + :param currency_code: Currency to analyze + :param base_threshold: Base threshold percentage + :param learning_months: Months of history to analyze + :return: Adaptive threshold and volatility statistics + """ + try: + # Calculate date range for learning (3 months back) + end_date = datetime.now() + start_date = end_date - timedelta(days=learning_months * 30) + + # Get all rates for the period + rates_data = [] + current_date = start_date + + while current_date <= end_date: + date_str = current_date.strftime("%d.%m.%Y") + rate = database.get_rate(date_str, currency_code) + if rate is not None: + rates_data.append((current_date, rate)) + current_date += timedelta(days=1) + + if len(rates_data) < 10: + # Insufficient data, return base threshold + return { + "adaptive_threshold": base_threshold, + "base_threshold": base_threshold, + "volatility_percent": 0.0, + "data_points": len(rates_data), + "sufficient_data": False, + } + + # Calculate daily percentage changes + changes = [] + for i in range(1, len(rates_data)): + prev_rate = rates_data[i - 1][1] + curr_rate = rates_data[i][1] + if prev_rate > 0: + change_pct = abs((curr_rate - prev_rate) / prev_rate) * 100 + changes.append(change_pct) + + if not changes: + return { + "adaptive_threshold": base_threshold, + "base_threshold": base_threshold, + "volatility_percent": 0.0, + "data_points": len(rates_data), + "sufficient_data": True, + } + + # Calculate volatility metrics + std_dev = statistics.stdev(changes) + percentile_95 = statistics.quantiles(changes, n=20)[18] # 95th percentile + + # Adaptive threshold formula: more conservative of std_dev and percentile_95th/2 + volatility_factor = max(std_dev, percentile_95 / 2) + + # Apply bounds (0.5% to 5.0%) + adaptive_threshold = base_threshold * ( + 1 + min(max(volatility_factor, 0.5), 5.0) + ) + + return { + "adaptive_threshold": adaptive_threshold, + "base_threshold": base_threshold, + "volatility_percent": std_dev, + "percentile_95": percentile_95, + "data_points": len(rates_data), + "sufficient_data": True, + } + + except Exception as e: + debug_print(f"Error calculating adaptive threshold: {e}") + return { + "adaptive_threshold": base_threshold, + "base_threshold": base_threshold, + "volatility_percent": 0.0, + "data_points": 0, + "sufficient_data": False, + "error": str(e), + } + + +def detect_price_change_violations( + currency_code, year=None, base_threshold=1.0, adaptive=True +): + """ + Detects price changes exceeding thresholds. + + :param currency_code: Currency to validate + :param year: Optional year filter + :param base_threshold: Base threshold percentage + :param adaptive: Whether to use adaptive threshold + :return: List of violations + """ + violations = [] + + # Initialize adaptive_info in case of early exception + adaptive_info = { + "adaptive_threshold": base_threshold, + "base_threshold": base_threshold, + "volatility_percent": 0.0, + "sufficient_data": True, + } + + try: + # Get adaptive threshold if enabled + if adaptive: + adaptive_info = calculate_adaptive_threshold(currency_code, base_threshold) + + effective_threshold = adaptive_info["adaptive_threshold"] + + # Get all dates and rates for the currency/year + rates_data = [] + if year: + # Specific year + start_date = datetime(year, 1, 1) + end_date = datetime(year, 12, 31) + else: + # All available data + years_with_data = database.get_years_with_data() + if not years_with_data: + return violations, adaptive_info + start_year = min(years_with_data) + end_year = max(years_with_data) + start_date = datetime(start_year, 1, 1) + end_date = datetime(end_year, 12, 31) + + current_date = start_date + while current_date <= datetime.now() and current_date <= end_date: + date_str = current_date.strftime("%d.%m.%Y") + rate = database.get_rate(date_str, currency_code) + if rate is not None: + rates_data.append((current_date, rate, date_str)) + current_date += timedelta(days=1) + + # Check consecutive pairs + for i in range(1, len(rates_data)): + prev_date, prev_rate, prev_date_str = rates_data[i - 1] + curr_date, curr_rate, curr_date_str = rates_data[i] + + if prev_rate > 0: + change_pct = abs((curr_rate - prev_rate) / prev_rate) * 100 + + # Determine severity + severity = "minor" + if change_pct > effective_threshold * 3: + severity = "severe" + elif change_pct > effective_threshold: + severity = "moderate" + + # Flag if exceeds base threshold (always) or adaptive threshold + if change_pct > base_threshold: + violation = { + "date": curr_date_str, + "previous_date": prev_date_str, + "previous_rate": float(prev_rate), + "current_rate": float(curr_rate), + "change_percent": round(change_pct, 2), + "severity": severity, + "threshold_exceeded": "adaptive" + if change_pct > effective_threshold + else "base", + "effective_threshold": effective_threshold, + } + + # Add corruption risk assessment for severe cases + if severity == "severe": + violation["corruption_risk"] = "high" + violation["recommendation"] = ( + "Verify data source - potential currency mismatch or data corruption" + ) + + violations.append(violation) + + except Exception as e: + debug_print(f"Error detecting price changes: {e}") + + return violations, adaptive_info + + +def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptive=True): + """ + Comprehensive validation for a currency. + + :param currency_code: Currency to validate + :param year: Optional year filter + :param base_threshold: Base threshold for price changes + :param adaptive: Whether to use adaptive thresholds + :return: Validation results + """ + results = { + "currency": currency_code, + "validation_year": year, + "validation_date": datetime.now().isoformat() + "Z", + } + + try: + # Price change violations + violations, adaptive_info = detect_price_change_violations( + currency_code, year, base_threshold, adaptive + ) + + results["adaptive_analysis"] = adaptive_info + results["price_change_violations"] = violations + + # Summary statistics + severity_counts = defaultdict(int) + for v in violations: + severity_counts[v["severity"]] += 1 + + results["summary"] = { + "total_violations": len(violations), + "severity_breakdown": dict(severity_counts), + "base_threshold": base_threshold, + "adaptive_enabled": adaptive, + } + + # Data quality score (simple heuristic) + if violations: + # Penalize based on violations + quality_score = max( + 0, 100 - (len(violations) * 5) - (severity_counts["severe"] * 20) + ) + else: + quality_score = 100 + + results["data_quality_score"] = quality_score + + except Exception as e: + results["error"] = str(e) + results["data_quality_score"] = 0 + + return results + + +def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True): + """ + Validates all available currencies. + + :param year: Optional year filter + :param base_threshold: Base threshold for price changes + :param adaptive: Whether to use adaptive thresholds + :return: Validation results for all currencies + """ + results = { + "validation_type": "all_currencies", + "validation_year": year, + "base_threshold": base_threshold, + "adaptive_enabled": adaptive, + "validation_date": datetime.now().isoformat() + "Z", + "currency_results": [], + } + + try: + # Get all available currencies (we'll check a few known ones and any in database) + currencies_to_check = ["USD", "EUR", "GBP", "CHF", "JPY"] + + for currency in currencies_to_check: + try: + currency_result = validate_currency_data( + currency, year, base_threshold, adaptive + ) + results["currency_results"].append(currency_result) + except Exception as e: + results["currency_results"].append( + {"currency": currency, "error": str(e)} + ) + + # Overall summary + total_violations = sum( + r.get("summary", {}).get("total_violations", 0) + for r in results["currency_results"] + if "summary" in r + ) + severe_violations = sum( + r.get("summary", {}).get("severity_breakdown", {}).get("severe", 0) + for r in results["currency_results"] + if "summary" in r + ) + + results["overall_summary"] = { + "currencies_checked": len(results["currency_results"]), + "total_violations": total_violations, + "severe_violations": severe_violations, + } + + except Exception as e: + results["error"] = str(e) + + return results + + +def format_validation_text(results): + """Format validation results as text output.""" + output = [] + + if "currency" in results: + # Single currency validation + output.append( + f"Currency Validation: {results['currency']} ({results.get('validation_year', 'All Years')})" + ) + output.append("=" * 60) + + adaptive = results.get("adaptive_analysis", {}) + if adaptive.get("sufficient_data", False): + output.append("\nAdaptive Analysis (3-month history):") + output.append( + f"- Historical volatility: {adaptive.get('volatility_percent', 0):.1f}% std dev" + ) + output.append( + f"- Adaptive threshold: {adaptive.get('adaptive_threshold', 1.0):.1f}% (base: {adaptive.get('base_threshold', 1.0)}%)" + ) + output.append(f"- Data points analyzed: {adaptive.get('data_points', 0)}") + else: + output.append( + f"\nAdaptive Analysis: Insufficient data (using base threshold: {adaptive.get('base_threshold', 1.0)}%)" + ) + + violations = results.get("price_change_violations", []) + if violations: + output.append("\nPrice Change Violations:") + for i, v in enumerate(violations, 1): + severity = v["severity"].upper() + output.append( + f"{i}. [{severity}] {v['date']}: {v['previous_rate']:.2f} → {v['current_rate']:.2f} ({'+' if v['change_percent'] > 0 else ''}{v['change_percent']:.2f}%)" + ) + if "recommendation" in v: + output.append(f" → {v['recommendation']}") + else: + output.append("\nPrice Change Violations: None found") + + summary = results.get("summary", {}) + quality_score = results.get("data_quality_score", 0) + output.append(f"\nData Quality Score: {quality_score}%") + output.append(f"Total violations: {summary.get('total_violations', 0)}") + + elif "currency_results" in results: + # Multi-currency validation + output.append("Multi-Currency Validation Report") + output.append("=" * 60) + + for currency_result in results["currency_results"]: + currency = currency_result.get("currency", "Unknown") + violations = currency_result.get("price_change_violations", []) + quality_score = currency_result.get("data_quality_score", 0) + + output.append(f"\n{currency}:") + output.append(f" - Violations: {len(violations)}") + output.append(f" - Quality Score: {quality_score}%") + + if violations: + severe_count = sum(1 for v in violations if v["severity"] == "severe") + output.append(f" - Severe violations: {severe_count}") + + overall = results.get("overall_summary", {}) + output.append("\nOverall Summary:") + output.append(f"- Currencies checked: {overall.get('currencies_checked', 0)}") + output.append(f"- Total violations: {overall.get('total_violations', 0)}") + output.append(f"- Severe violations: {overall.get('severe_violations', 0)}") + + return "\n".join(output)