feat: Add temporal gap detection to data validation

- Add temporal gap analysis to detect missing working days in data sequences
- Implement calculate_working_days_gap() to count business days between dates
- Add detect_temporal_gaps() function with configurable gap threshold
- Integrate gap detection into validate_currency_data() and validate_all_currencies()
- Update format_validation_text() to display temporal gap information
- Add --gap-threshold CLI argument (default: 3 working days)
- Enhance data quality scoring to include temporal gaps
- Update JSON output schema to include temporal gap details

Gap Detection Features:
- Excludes weekends and Czech public holidays from gap calculations
- Classifies gaps by severity (minor: 1-2x threshold, moderate: 2-3x, severe: >3x)
- Provides actionable recommendations for data gaps
- Configurable sensitivity via --gap-threshold parameter

Integration with Existing Validation:
- Combines temporal gap analysis with price change anomaly detection
- Unified data quality scoring incorporating both gap and price metrics
- Consistent JSON/text output formats
- Maintains backward compatibility

Technical Implementation:
- Uses existing holidays.py for Czech holiday calendar
- Efficient date iteration with proper boundary handling
- Robust error handling for edge cases
- Clean integration with existing validation pipeline

Usage Examples:
  python src/cli.py --validate --currency USD --year 2025 --gap-threshold 2
  python src/cli.py --validate --all-currencies --json

Quality Assurance:
-  Pyright type checking: 0 errors, 0 warnings
-  Syntax validation: No errors
-  Functional testing: Gap detection working correctly
-  JSON output: Proper schema and formatting
This commit is contained in:
kdusek
2026-01-12 23:10:35 +01:00
parent 7d9dfa309c
commit 65a1485ff9
2 changed files with 150 additions and 33 deletions

View File

@@ -209,9 +209,10 @@ def main():
help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).", help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).",
) )
parser.add_argument( parser.add_argument(
"--no-adaptive", "--gap-threshold",
action="store_true", type=int,
help="Vypne adaptivní učení prahů na základě historických dat.", default=3,
help="Maximální přijatelná mezera v pracovních dnech (výchozí: 3).",
) )
parser.add_argument( parser.add_argument(
"--debug", action="store_true", help="Zobrazí podrobné ladicí informace." "--debug", action="store_true", help="Zobrazí podrobné ladicí informace."
@@ -221,30 +222,11 @@ def main():
action="store_true", action="store_true",
help="Výstup ve formátu JSON místo prostého textu pro programové zpracování.", help="Výstup ve formátu JSON místo prostého textu pro programové zpracování.",
) )
parser.add_argument(
"--validate",
action="store_true",
help="Validuje data pro měnu nebo všechny měny. Zkontroluje konzistenci kurzů a detekuje možné chyby.",
)
parser.add_argument(
"--change-threshold",
type=float,
default=1.0,
help="Práh pro detekci změn kurzů v procentech (výchozí: 1.0).",
)
parser.add_argument( parser.add_argument(
"--no-adaptive", "--no-adaptive",
action="store_true", action="store_true",
help="Vypne adaptivní učení prahů na základě historických dat.", help="Vypne adaptivní učení prahů na základě historických dat.",
) )
parser.add_argument(
"--debug", action="store_true", help="Zobrazí podrobné ladicí informace."
)
parser.add_argument(
"--json",
action="store_true",
help="Výstup ve formátu JSON místo prostého textu pro programové zpracování.",
)
args = parser.parse_args() args = parser.parse_args()
@@ -281,12 +263,13 @@ def main():
# Validation command # Validation command
base_threshold = args.change_threshold base_threshold = args.change_threshold
adaptive = not args.no_adaptive adaptive = not args.no_adaptive
max_gap_days = getattr(args, "gap_threshold", 3) # Default to 3 if not defined
if args.currency: if args.currency:
# Validate specific currency # Validate specific currency
debug_print(f"Validuji data pro měnu {args.currency}...") debug_print(f"Validuji data pro měnu {args.currency}...")
results = data_validator.validate_currency_data( results = data_validator.validate_currency_data(
args.currency, args.year, base_threshold, adaptive args.currency, args.year, base_threshold, adaptive, max_gap_days
) )
if args.json: if args.json:
@@ -298,7 +281,7 @@ def main():
# Validate all currencies # Validate all currencies
debug_print("Validuji data pro všechny měny...") debug_print("Validuji data pro všechny měny...")
results = data_validator.validate_all_currencies( results = data_validator.validate_all_currencies(
args.year, base_threshold, adaptive args.year, base_threshold, adaptive, max_gap_days
) )
if args.json: if args.json:

View File

@@ -113,6 +113,94 @@ def calculate_adaptive_threshold(currency_code, base_threshold=1.0, learning_mon
} }
def calculate_working_days_gap(start_date, end_date):
"""
Calculate the number of working days (excluding weekends and holidays) between two dates.
:param start_date: Start date (datetime)
:param end_date: End date (datetime)
:return: Number of working days between the dates (exclusive)
"""
working_days = 0
current = start_date + timedelta(days=1) # Start from day after start_date
while current < end_date:
date_str = current.strftime("%d.%m.%Y")
if not holidays.is_weekend(date_str) and not holidays.is_holiday(date_str):
working_days += 1
current += timedelta(days=1)
return working_days
def detect_temporal_gaps(currency_code, year=None, max_gap_days=3):
"""
Detect temporal gaps in data sequence (missing working days).
:param currency_code: Currency to validate
:param year: Optional year filter
:param max_gap_days: Maximum acceptable working days gap
:return: List of gap violations
"""
gaps = []
try:
# Get all dates and rates for the currency/year
rates_data = []
if year:
# Specific year
start_date = datetime(year, 1, 1)
end_date = datetime(year, 12, 31)
else:
# All available data
years_with_data = database.get_years_with_data()
if not years_with_data:
return gaps
start_year = min(years_with_data)
end_year = max(years_with_data)
start_date = datetime(start_year, 1, 1)
end_date = datetime(end_year, 12, 31)
current_date = start_date
while current_date <= datetime.now() and current_date <= end_date:
date_str = current_date.strftime("%d.%m.%Y")
rate = database.get_rate(date_str, currency_code)
if rate is not None:
rates_data.append((current_date, rate, date_str))
current_date += timedelta(days=1)
# Check for gaps between consecutive data points
for i in range(1, len(rates_data)):
prev_date, _, prev_date_str = rates_data[i - 1]
curr_date, _, curr_date_str = rates_data[i]
# Calculate working days gap
working_days_gap = calculate_working_days_gap(prev_date, curr_date)
if working_days_gap > max_gap_days:
# Determine severity
severity = "minor"
if working_days_gap > max_gap_days * 3:
severity = "severe"
elif working_days_gap > max_gap_days * 2:
severity = "moderate"
gap = {
"start_date": prev_date_str,
"end_date": curr_date_str,
"working_days_missing": working_days_gap,
"severity": severity,
"max_expected_gap": max_gap_days,
"recommendation": f"Check data source for {working_days_gap} missing working days",
}
gaps.append(gap)
except Exception as e:
debug_print(f"Error detecting temporal gaps: {e}")
return gaps
def detect_price_change_violations( def detect_price_change_violations(
currency_code, year=None, base_threshold=1.0, adaptive=True currency_code, year=None, base_threshold=1.0, adaptive=True
): ):
@@ -211,7 +299,9 @@ def detect_price_change_violations(
return violations, adaptive_info return violations, adaptive_info
def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptive=True): def validate_currency_data(
currency_code, year=None, base_threshold=1.0, adaptive=True, max_gap_days=3
):
""" """
Comprehensive validation for a currency. Comprehensive validation for a currency.
@@ -219,6 +309,7 @@ def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptiv
:param year: Optional year filter :param year: Optional year filter
:param base_threshold: Base threshold for price changes :param base_threshold: Base threshold for price changes
:param adaptive: Whether to use adaptive thresholds :param adaptive: Whether to use adaptive thresholds
:param max_gap_days: Maximum acceptable working days gap
:return: Validation results :return: Validation results
""" """
results = { results = {
@@ -233,31 +324,44 @@ def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptiv
currency_code, year, base_threshold, adaptive currency_code, year, base_threshold, adaptive
) )
# Temporal gaps
gaps = detect_temporal_gaps(currency_code, year, max_gap_days)
results["adaptive_analysis"] = adaptive_info results["adaptive_analysis"] = adaptive_info
results["price_change_violations"] = violations results["price_change_violations"] = violations
results["temporal_gaps"] = gaps
# Summary statistics # Summary statistics
severity_counts = defaultdict(int) severity_counts = defaultdict(int)
for v in violations: for v in violations:
severity_counts[v["severity"]] += 1 severity_counts[v["severity"]] += 1
gap_severity_counts = defaultdict(int)
for g in gaps:
gap_severity_counts[g["severity"]] += 1
results["summary"] = { results["summary"] = {
"total_violations": len(violations), "total_violations": len(violations),
"total_gaps": len(gaps),
"severity_breakdown": dict(severity_counts), "severity_breakdown": dict(severity_counts),
"gap_severity_breakdown": dict(gap_severity_counts),
"base_threshold": base_threshold, "base_threshold": base_threshold,
"adaptive_enabled": adaptive, "adaptive_enabled": adaptive,
"max_gap_days": max_gap_days,
} }
# Data quality score (simple heuristic) # Data quality score (simple heuristic)
quality_penalty = 0
if violations: if violations:
# Penalize based on violations quality_penalty += (
quality_score = max( len(violations) * 5 + severity_counts.get("severe", 0) * 20
0, 100 - (len(violations) * 5) - (severity_counts["severe"] * 20) )
if gaps:
quality_penalty += (
len(gaps) * 10 + gap_severity_counts.get("severe", 0) * 30
) )
else:
quality_score = 100
results["data_quality_score"] = quality_score results["data_quality_score"] = max(0, 100 - quality_penalty)
except Exception as e: except Exception as e:
results["error"] = str(e) results["error"] = str(e)
@@ -266,13 +370,16 @@ def validate_currency_data(currency_code, year=None, base_threshold=1.0, adaptiv
return results return results
def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True): def validate_all_currencies(
year=None, base_threshold=1.0, adaptive=True, max_gap_days=3
):
""" """
Validates all available currencies. Validates all available currencies.
:param year: Optional year filter :param year: Optional year filter
:param base_threshold: Base threshold for price changes :param base_threshold: Base threshold for price changes
:param adaptive: Whether to use adaptive thresholds :param adaptive: Whether to use adaptive thresholds
:param max_gap_days: Maximum acceptable working days gap
:return: Validation results for all currencies :return: Validation results for all currencies
""" """
results = { results = {
@@ -280,6 +387,7 @@ def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True):
"validation_year": year, "validation_year": year,
"base_threshold": base_threshold, "base_threshold": base_threshold,
"adaptive_enabled": adaptive, "adaptive_enabled": adaptive,
"max_gap_days": max_gap_days,
"validation_date": datetime.now().isoformat() + "Z", "validation_date": datetime.now().isoformat() + "Z",
"currency_results": [], "currency_results": [],
} }
@@ -291,7 +399,7 @@ def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True):
for currency in currencies_to_check: for currency in currencies_to_check:
try: try:
currency_result = validate_currency_data( currency_result = validate_currency_data(
currency, year, base_threshold, adaptive currency, year, base_threshold, adaptive, max_gap_days
) )
results["currency_results"].append(currency_result) results["currency_results"].append(currency_result)
except Exception as e: except Exception as e:
@@ -305,16 +413,28 @@ def validate_all_currencies(year=None, base_threshold=1.0, adaptive=True):
for r in results["currency_results"] for r in results["currency_results"]
if "summary" in r if "summary" in r
) )
total_gaps = sum(
r.get("summary", {}).get("total_gaps", 0)
for r in results["currency_results"]
if "summary" in r
)
severe_violations = sum( severe_violations = sum(
r.get("summary", {}).get("severity_breakdown", {}).get("severe", 0) r.get("summary", {}).get("severity_breakdown", {}).get("severe", 0)
for r in results["currency_results"] for r in results["currency_results"]
if "summary" in r if "summary" in r
) )
severe_gaps = sum(
r.get("summary", {}).get("gap_severity_breakdown", {}).get("severe", 0)
for r in results["currency_results"]
if "summary" in r
)
results["overall_summary"] = { results["overall_summary"] = {
"currencies_checked": len(results["currency_results"]), "currencies_checked": len(results["currency_results"]),
"total_violations": total_violations, "total_violations": total_violations,
"total_gaps": total_gaps,
"severe_violations": severe_violations, "severe_violations": severe_violations,
"severe_gaps": severe_gaps,
} }
except Exception as e: except Exception as e:
@@ -362,10 +482,24 @@ def format_validation_text(results):
else: else:
output.append("\nPrice Change Violations: None found") output.append("\nPrice Change Violations: None found")
gaps = results.get("temporal_gaps", [])
if gaps:
output.append("\nTemporal Gaps:")
for i, g in enumerate(gaps, 1):
severity = g["severity"].upper()
output.append(
f"{i}. [{severity}] {g['start_date']}{g['end_date']}: {g['working_days_missing']} working days missing"
)
if "recommendation" in g:
output.append(f"{g['recommendation']}")
else:
output.append("\nTemporal Gaps: None found")
summary = results.get("summary", {}) summary = results.get("summary", {})
quality_score = results.get("data_quality_score", 0) quality_score = results.get("data_quality_score", 0)
output.append(f"\nData Quality Score: {quality_score}%") output.append(f"\nData Quality Score: {quality_score}%")
output.append(f"Total violations: {summary.get('total_violations', 0)}") output.append(f"Total violations: {summary.get('total_violations', 0)}")
output.append(f"Total gaps: {summary.get('total_gaps', 0)}")
elif "currency_results" in results: elif "currency_results" in results:
# Multi-currency validation # Multi-currency validation