diff --git a/src/cli.py b/src/cli.py index 2e6e264..429d5e1 100755 --- a/src/cli.py +++ b/src/cli.py @@ -202,6 +202,11 @@ def main(): action="store_true", help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.", ) + parser.add_argument( + "--record-counts", + action="store_true", + help="Zobrazí počet záznamů podle časových období (týden, měsíc, čtvrtletí, pololetí, rok).", + ) parser.add_argument( "--change-threshold", type=float, @@ -289,6 +294,67 @@ def main(): else: text_output = data_validator.format_validation_text(results) print(text_output) + elif args.record_counts: + # Record counts command + if not args.currency: + print( + "Chyba: Pro --record-counts je nutné zadat měnu pomocí -c/--currency." + ) + sys.exit(1) + + debug_print(f"Získávám počty záznamů pro měnu {args.currency}...") + record_counts = data_validator.get_record_counts_by_period( + args.currency, args.year + ) + + if args.json: + output_json({"currency": args.currency, "record_counts": record_counts}) + else: + print(f"Record Counts for {args.currency}:") + print("=" * 50) + + for year_key, periods in record_counts.items(): + print(f"\nYear {year_key}:") + print(f" Total records: {periods.get('year', 0)}") + + # Half years + half_years = periods.get("half_year", {}) + if half_years: + print( + f" Half years: H1={half_years.get('H1', 0)}, H2={half_years.get('H2', 0)}" + ) + + # Quarters + quarters = periods.get("quarter", {}) + if quarters: + quarter_str = ", ".join( + [f"Q{q}={quarters.get(f'Q{q}', 0)}" for q in range(1, 5)] + ) + print(f" Quarters: {quarter_str}") + + # Months + months = periods.get("month", {}) + if months: + month_list = [] + for month in range(1, 13): + month_key = f"{month:02d}" + count = months.get(month_key, 0) + month_list.append(f"{month}={count}") + print(f" Months: {', '.join(month_list)}") + + # Weeks summary + weeks = periods.get("week", {}) + if weeks: + total_weeks = len(weeks) + if total_weeks <= 10: + week_list = sorted([f"{w}={weeks[w]}" for w in weeks.keys()]) + print(f" Weeks: {', '.join(week_list)}") + else: + sample_weeks = sorted(list(weeks.keys())[:5]) + week_sample = [f"{w}={weeks[w]}" for w in sample_weeks] + print( + f" Weeks: {', '.join(week_sample)}... ({total_weeks} total weeks)" + ) elif args.year: # Validation command base_threshold = args.change_threshold diff --git a/src/data_validator.py b/src/data_validator.py index 5bad08a..e2ce101 100644 --- a/src/data_validator.py +++ b/src/data_validator.py @@ -133,6 +133,176 @@ def calculate_working_days_gap(start_date, end_date): return working_days +def calculate_expected_trading_days(year): + """ + Calculate the expected number of trading days in a year (excluding weekends and holidays). + + :param year: Year to calculate for + :return: Dictionary with expected trading days and breakdown + """ + import calendar + + total_days = 366 if calendar.isleap(year) else 365 + weekend_days = 0 + holiday_days = 0 + + # Count weekends and holidays + for month in range(1, 13): + for day in range(1, calendar.monthrange(year, month)[1] + 1): + date_str = f"{day:02d}.{month:02d}.{year}" + if holidays.is_weekend(date_str): + weekend_days += 1 + elif holidays.is_holiday(date_str): + holiday_days += 1 + + expected_trading_days = total_days - weekend_days - holiday_days + + return { + "total_days": total_days, + "weekend_days": weekend_days, + "holiday_days": holiday_days, + "expected_trading_days": expected_trading_days, + } + + +def validate_trading_days_count(currency_code, year): + """ + Validate that a year has the appropriate number of trading day entries. + + :param currency_code: Currency to validate + :param year: Year to check + :return: Validation result with actual vs expected counts + """ + # Get expected trading days + expected = calculate_expected_trading_days(year) + + # Count actual data points for the year + actual_count = 0 + rates_data = [] + + start_date = datetime(year, 1, 1) + end_date = datetime(year, 12, 31) + + current_date = start_date + while current_date <= end_date: + date_str = current_date.strftime("%d.%m.%Y") + rate = database.get_rate(date_str, currency_code) + if rate is not None: + actual_count += 1 + rates_data.append((current_date, rate, date_str)) + current_date += timedelta(days=1) + + # Calculate discrepancy + discrepancy_days = actual_count - expected["expected_trading_days"] + discrepancy_percent = ( + (discrepancy_days / expected["expected_trading_days"]) * 100 + if expected["expected_trading_days"] > 0 + else 0 + ) + + # Determine severity + severity = "ok" + if abs(discrepancy_percent) > 15: + severity = "severe" + elif abs(discrepancy_percent) > 5: + severity = "moderate" + elif abs(discrepancy_percent) > 0: + severity = "minor" + + return { + "expected_trading_days": expected["expected_trading_days"], + "actual_data_points": actual_count, + "discrepancy_days": discrepancy_days, + "discrepancy_percent": round(discrepancy_percent, 2), + "severity": severity, + "total_days": expected["total_days"], + "weekend_days_excluded": expected["weekend_days"], + "holiday_days_excluded": expected["holiday_days"], + "data_completeness_percent": round( + (actual_count / expected["expected_trading_days"]) * 100, 1 + ) + if expected["expected_trading_days"] > 0 + else 0, + } + + +def get_record_counts_by_period(currency_code, year=None): + """ + Get record counts for different time periods. + + :param currency_code: Currency to analyze + :param year: Optional year filter + :return: Dictionary with counts by period + """ + if year: + years_to_check = [year] + else: + years_to_check = database.get_years_with_data() + if not years_to_check: + return {} + + results = {} + + for check_year in years_to_check: + year_results = {} + + # Get all data for the year + data_points = [] + start_date = datetime(check_year, 1, 1) + end_date = datetime(check_year, 12, 31) + + current_date = start_date + while current_date <= end_date: + date_str = current_date.strftime("%d.%m.%Y") + rate = database.get_rate(date_str, currency_code) + if rate is not None: + data_points.append((current_date, rate)) + current_date += timedelta(days=1) + + # Count by different periods + period_counts = { + "year": len(data_points), + "half_year": {}, + "quarter": {}, + "month": {}, + "week": {}, + } + + # Half years + period_counts["half_year"]["H1"] = len( + [d for d in data_points if d[0].month <= 6] + ) + period_counts["half_year"]["H2"] = len( + [d for d in data_points if d[0].month > 6] + ) + + # Quarters + for quarter in range(1, 5): + start_month = (quarter - 1) * 3 + 1 + end_month = quarter * 3 + period_counts["quarter"][f"Q{quarter}"] = len( + [d for d in data_points if start_month <= d[0].month <= end_month] + ) + + # Months + for month in range(1, 13): + period_counts["month"][f"{month:02d}"] = len( + [d for d in data_points if d[0].month == month] + ) + + # Weeks (approximate by week number) + week_counts = {} + for data_point in data_points: + week_num = data_point[0].isocalendar()[1] + week_key = f"W{week_num:02d}" + week_counts[week_key] = week_counts.get(week_key, 0) + 1 + period_counts["week"] = week_counts + + results[str(check_year)] = period_counts + + return results + + def detect_temporal_gaps(currency_code, year=None, max_gap_days=3): """ Detect temporal gaps in data sequence (missing working days). @@ -327,9 +497,19 @@ def validate_currency_data( # Temporal gaps gaps = detect_temporal_gaps(currency_code, year, max_gap_days) + # Trading days validation + trading_days_validation = None + if year: + trading_days_validation = validate_trading_days_count(currency_code, year) + + # Record counts by period + record_counts = get_record_counts_by_period(currency_code, year) + results["adaptive_analysis"] = adaptive_info results["price_change_violations"] = violations results["temporal_gaps"] = gaps + results["trading_days_validation"] = trading_days_validation + results["record_counts_by_period"] = record_counts # Summary statistics severity_counts = defaultdict(int) @@ -350,7 +530,7 @@ def validate_currency_data( "max_gap_days": max_gap_days, } - # Data quality score (simple heuristic) + # Data quality score (enhanced heuristic) quality_penalty = 0 if violations: quality_penalty += ( @@ -360,6 +540,11 @@ def validate_currency_data( quality_penalty += ( len(gaps) * 10 + gap_severity_counts.get("severe", 0) * 30 ) + if trading_days_validation and trading_days_validation["severity"] != "ok": + severity_penalty = {"minor": 5, "moderate": 15, "severe": 30} + quality_penalty += severity_penalty.get( + trading_days_validation["severity"], 0 + ) results["data_quality_score"] = max(0, 100 - quality_penalty) @@ -495,6 +680,82 @@ def format_validation_text(results): else: output.append("\nTemporal Gaps: None found") + # Trading days validation + trading_validation = results.get("trading_days_validation") + if trading_validation: + output.append("\nTrading Days Validation:") + output.append( + f"- Expected trading days: {trading_validation['expected_trading_days']} ({trading_validation.get('total_days', 'N/A')} total - {trading_validation.get('weekend_days_excluded', 0)} weekends - {trading_validation.get('holiday_days_excluded', 0)} holidays)" + ) + output.append( + f"- Actual data points: {trading_validation['actual_data_points']}" + ) + output.append( + f"- Discrepancy: {trading_validation['discrepancy_days']} days ({trading_validation['discrepancy_percent']}%)" + ) + output.append( + f"- Data completeness: {trading_validation['data_completeness_percent']}%" + ) + output.append(f"- Status: {trading_validation['severity'].upper()}") + + # Record counts by period + record_counts = results.get("record_counts_by_period", {}) + if record_counts: + for year_key, periods in record_counts.items(): + output.append(f"\nRecord Counts for {year_key}:") + output.append(f"- Year total: {periods.get('year', 0)} records") + + # Half years + half_years = periods.get("half_year", {}) + if half_years: + output.append( + f"- Half years: H1={half_years.get('H1', 0)}, H2={half_years.get('H2', 0)}" + ) + + # Quarters + quarters = periods.get("quarter", {}) + if quarters: + quarter_str = ", ".join( + [f"Q{q}={quarters.get(f'Q{q}', 0)}" for q in range(1, 5)] + ) + output.append(f"- Quarters: {quarter_str}") + + # Months summary + months = periods.get("month", {}) + if months: + month_list = [ + f"{m}={months.get(f'{int(m):02d}', 0)}" + for m in [ + "01", + "02", + "03", + "04", + "05", + "06", + "07", + "08", + "09", + "10", + "11", + "12", + ] + ] + output.append(f"- Months: {', '.join(month_list)}") + + # Weeks summary (show first few and indicate total) + weeks = periods.get("week", {}) + if weeks: + total_weeks = len(weeks) + if total_weeks <= 10: + week_list = [f"{w}={weeks[w]}" for w in sorted(weeks.keys())] + output.append(f"- Weeks: {', '.join(week_list)}") + else: + sample_weeks = sorted(list(weeks.keys())[:5]) + week_sample = [f"{w}={weeks[w]}" for w in sample_weeks] + output.append( + f"- Weeks: {', '.join(week_sample)}... ({total_weeks} total weeks)" + ) + summary = results.get("summary", {}) quality_score = results.get("data_quality_score", 0) output.append(f"\nData Quality Score: {quality_score}%")