-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
2783 lines (2365 loc) · 126 KB
/
cli.py
File metadata and controls
2783 lines (2365 loc) · 126 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
CLI tool for PDF metadata extraction with ground truth validation.
Integrates all existing functions from get_metadata.py and ground_truth_validation.py.
"""
import os
import argparse
import json
import datetime
import time
import threading
import traceback
import logging
import queue
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from google import genai
from google.genai import types
import pandas as pd
import pydantic
from urllib.parse import urlparse, unquote
def get_filename_from_url(url: str) -> str:
"""
Extract the expected filename from a URL with proper decoding.
This ensures consistent filename handling across the system.
"""
parsed_url = urlparse(url)
filename = os.path.basename(unquote(parsed_url.path))
# Handle common URL encoding
filename = filename.replace('%20', ' ')
filename = filename.replace('%28', '(')
filename = filename.replace('%29', ')')
return filename
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('cli_errors.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Suppress verbose logging from external libraries
logging.getLogger('pikepdf._core').setLevel(logging.WARNING)
logging.getLogger('google').setLevel(logging.WARNING)
logging.getLogger('google.genai').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
def categorize_error(error_type: str, error_message: str, error_details: Dict = None) -> Dict[str, Any]:
"""Categorize errors for better debugging and resolution with detailed error information."""
error_msg_lower = error_message.lower()
# Extract additional details if available
http_status = None
api_code = None
if error_details:
http_status = error_details.get('http_status') or error_details.get('status_code')
api_code = error_details.get('api_code')
# Precise categorization based on HTTP status codes
if http_status:
if http_status == 429:
return {
'category': 'rate_limit',
'severity': 'medium',
'retryable': True,
'suggestion': 'Rate limit exceeded. Reduce worker count or add delays',
'http_status': http_status,
'confidence': 'high'
}
elif http_status in [401, 403]:
return {
'category': 'authentication',
'severity': 'high',
'retryable': False,
'suggestion': 'Authentication failed. Check API key and permissions',
'http_status': http_status,
'confidence': 'high'
}
elif http_status in [503, 502, 504]:
return {
'category': 'service_unavailable',
'severity': 'medium',
'retryable': True,
'suggestion': 'Service temporarily unavailable. Retry should resolve',
'http_status': http_status,
'confidence': 'high'
}
elif http_status >= 500:
return {
'category': 'server_error',
'severity': 'medium',
'retryable': True,
'suggestion': 'Server error occurred. Retry recommended',
'http_status': http_status,
'confidence': 'high'
}
elif http_status in [400, 422]:
return {
'category': 'client_error',
'severity': 'high',
'retryable': False,
'suggestion': 'Request format error. Check input data',
'http_status': http_status,
'confidence': 'high'
}
# Fallback to string-based detection with lower confidence
# API-related errors (fallback detection)
if any(keyword in error_msg_lower for keyword in ['rate limit', '429', 'quota', 'limit exceeded']):
return {
'category': 'rate_limit',
'severity': 'medium',
'retryable': True,
'suggestion': 'Reduce worker count or increase delays between requests',
'confidence': 'medium'
}
if any(keyword in error_msg_lower for keyword in ['503', 'service unavailable', 'timeout', 'connection']):
return {
'category': 'service_unavailable',
'severity': 'medium',
'retryable': True,
'suggestion': 'Temporary service issue, retry should resolve',
'confidence': 'medium'
}
if any(keyword in error_msg_lower for keyword in ['authentication', 'unauthorized', '401', 'api key']):
return {
'category': 'authentication',
'severity': 'high',
'retryable': False,
'suggestion': 'Check API key configuration',
'confidence': 'medium'
}
# File-related errors
if error_type == 'FileNotFoundError' or 'not found' in error_msg_lower:
return {
'category': 'file_not_found',
'severity': 'high',
'retryable': False,
'suggestion': 'Verify file paths and ensure files exist'
}
if any(keyword in error_msg_lower for keyword in ['permission', 'access denied']):
return {
'category': 'permission_error',
'severity': 'high',
'retryable': False,
'suggestion': 'Check file permissions and disk space'
}
# PDF processing errors
if any(keyword in error_msg_lower for keyword in ['pdf', 'pikepdf', 'corrupt', 'invalid format']):
return {
'category': 'pdf_processing',
'severity': 'medium',
'retryable': True,
'suggestion': 'PDF may be corrupted - check file integrity'
}
# Parsing errors
if any(keyword in error_msg_lower for keyword in ['json', 'validation', 'pydantic']):
return {
'category': 'parsing_error',
'severity': 'medium',
'retryable': True,
'suggestion': 'Response format issue - may resolve with retry'
}
# Memory/resource errors
if any(keyword in error_msg_lower for keyword in ['memory', 'out of memory', 'resource']):
return {
'category': 'resource_exhaustion',
'severity': 'high',
'retryable': False,
'suggestion': 'Reduce batch size or worker count'
}
# Unknown error
return {
'category': 'unknown',
'severity': 'medium',
'retryable': True,
'suggestion': 'Check logs for more details',
'confidence': 'low'
}
def generate_failure_analysis(failed_items: List[Dict], verbose: bool = False) -> None:
"""Generate comprehensive failure analysis with actionable recommendations."""
if not failed_items:
return
logger.info("Generating failure analysis...")
print(f"\n📊 FAILURE ANALYSIS")
print(f"{'='*50}")
# Categorize all failures
error_categories = {}
error_suggestions = set()
for failure in failed_items:
if isinstance(failure, dict):
error_type = failure.get('error', failure.get('error_type', 'Unknown'))
error_msg = failure.get('error_message', str(failure))
# Extract detailed error info if available
detailed_info = failure.get('detailed_error_info', {})
else:
error_type = 'Unknown'
error_msg = str(failure)
detailed_info = {}
category_info = categorize_error(error_type, error_msg, detailed_info)
category = category_info['category']
if category not in error_categories:
error_categories[category] = {
'count': 0,
'examples': [],
'info': category_info
}
error_categories[category]['count'] += 1
if len(error_categories[category]['examples']) < 3:
filename = failure.get('filename', 'unknown') if isinstance(failure, dict) else 'unknown'
# Enhanced example with HTTP status if available
error_display = f"{error_type}: {error_msg[:100]}..."
if detailed_info.get('http_status'):
error_display = f"HTTP {detailed_info['http_status']}: {error_display}"
error_categories[category]['examples'].append({
'filename': filename,
'error': error_display,
'http_status': detailed_info.get('http_status'),
'confidence': category_info.get('confidence', 'unknown')
})
error_suggestions.add(category_info['suggestion'])
print(f"Total failed files: {len(failed_items)}")
print(f"Error categories found: {len(error_categories)}")
# Display categories by frequency
sorted_categories = sorted(error_categories.items(), key=lambda x: x[1]['count'], reverse=True)
print(f"\n🔍 ERROR BREAKDOWN")
print(f"{'-'*50}")
for category, data in sorted_categories:
count = data['count']
percentage = (count / len(failed_items)) * 100
severity = data['info']['severity']
retryable = data['info']['retryable']
severity_emoji = {'low': '🟢', 'medium': '🟡', 'high': '🔴'}[severity]
retry_emoji = '🔄' if retryable else '⛔'
confidence = data['info'].get('confidence', 'unknown')
confidence_emoji = {'high': '🎯', 'medium': '🔍', 'low': '❓', 'unknown': '❓'}[confidence]
print(f"{severity_emoji} {category.replace('_', ' ').title()}: {count} ({percentage:.1f}%) {retry_emoji} {confidence_emoji}")
if verbose and data['examples']:
for example in data['examples']:
status_info = f" [HTTP {example['http_status']}]" if example.get('http_status') else ""
confidence_info = f" (confidence: {example.get('confidence', 'unknown')})"
print(f" • {example['filename']}: {example['error']}{status_info}{confidence_info}")
print(f"\n💡 RECOMMENDED ACTIONS")
print(f"{'-'*50}")
for i, suggestion in enumerate(error_suggestions, 1):
print(f"{i}. {suggestion}")
# Specific retry recommendations
retryable_count = sum(data['count'] for data in error_categories.values() if data['info']['retryable'])
if retryable_count > 0:
print(f"\n🔄 RETRY RECOMMENDATION")
print(f"{'-'*30}")
print(f"{retryable_count} of {len(failed_items)} failures appear retryable.")
print(f"Consider running: python cli.py --batch --retry-failed --max-retries 5")
# Save detailed report to file
try:
report_file = f"failure_analysis_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump({
'timestamp': datetime.datetime.now().isoformat(),
'total_failures': len(failed_items),
'categories': error_categories,
'suggestions': list(error_suggestions),
'retryable_count': retryable_count,
'analysis_version': '2.0',
'features': ['http_status_detection', 'confidence_scoring', 'detailed_error_capture']
}, f, indent=2)
print(f"\n📄 Detailed analysis saved to: {report_file}")
except Exception as e:
logger.error(f"Failed to save failure analysis: {e}")
# Import existing functions
from get_metadata import (
prepare_and_upload_pdf_subset,
get_metadata_from_gemini,
get_confidence_level,
recommend_action
)
from ground_truth_validation import (
load_ground_truth_metadata,
compare_with_ground_truth,
adjust_confidence_with_ground_truth,
generate_accuracy_report,
track_all_deviations,
export_deviations_to_excel,
print_ground_truth_stats
)
# Rolling workers architecture is the only batch processing method
# Rate limiting classes for Gemini API
@dataclass
class RateLimiter:
"""Thread-safe rate limiter for API calls."""
max_requests_per_minute: int
_requests: List[float] = None
_lock: threading.Lock = None
def __post_init__(self):
self._requests = []
self._lock = threading.Lock()
def wait_if_needed(self) -> float:
"""Wait if we're approaching rate limit. Returns wait time in seconds."""
with self._lock:
now = time.time()
# Remove requests older than 1 minute
cutoff = now - 60.0
self._requests = [req_time for req_time in self._requests if req_time > cutoff]
# Check if we need to wait
if len(self._requests) >= self.max_requests_per_minute:
# Calculate wait time until oldest request is > 1 minute old
oldest_request = min(self._requests)
wait_time = 60.0 - (now - oldest_request) + 0.1 # Add small buffer
if wait_time > 0:
return wait_time
# Record this request
self._requests.append(now)
return 0.0
def get_current_rate(self) -> float:
"""Get current requests per minute."""
with self._lock:
now = time.time()
cutoff = now - 60.0
recent_requests = [req_time for req_time in self._requests if req_time > cutoff]
return len(recent_requests)
@dataclass
class SearchQuotaTracker:
"""Track daily search quota usage."""
max_searches_per_day: int
quota_file: str = "search_quota.json"
_searches_today: int = 0
_date_today: str = ""
_lock: threading.Lock = None
def __post_init__(self):
self._lock = threading.Lock()
self._load_quota()
def _load_quota(self):
"""Load today's quota usage from file."""
today = datetime.date.today().isoformat()
if os.path.exists(self.quota_file):
try:
with open(self.quota_file, 'r') as f:
data = json.load(f)
if data.get('date') == today:
self._searches_today = data.get('count', 0)
self._date_today = today
return
except:
pass
# Reset for new day
self._searches_today = 0
self._date_today = today
self._save_quota()
def _save_quota(self):
"""Save quota to file."""
try:
data = {
'date': self._date_today,
'count': self._searches_today,
'max_searches': self.max_searches_per_day
}
with open(self.quota_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"⚠️ Warning: Could not save search quota to file: {e}")
# Continue anyway - don't let file I/O block the search
def can_use_search(self) -> bool:
"""Check if we can use search quota."""
# NOTE: This method should only be called from within use_search_quota() which already holds the lock
today = datetime.date.today().isoformat()
if today != self._date_today:
self._load_quota() # Reload for new day
return self._searches_today < self.max_searches_per_day
def use_search_quota(self) -> bool:
"""Use one search quota. Returns True if successful, False if quota exhausted."""
with self._lock:
if self.can_use_search():
self._searches_today += 1
self._save_quota()
return True
return False
def get_quota_status(self) -> Dict[str, Any]:
"""Get current quota status."""
with self._lock:
return {
'used': self._searches_today,
'max': self.max_searches_per_day,
'remaining': self.max_searches_per_day - self._searches_today,
'date': self._date_today
}
# Global rate limiters (initialized in main)
GEMINI_RATE_LIMITER = None
SEARCH_QUOTA_TRACKER = None
def calculate_optimal_workers(rate_limiter: RateLimiter, base_workers: int, max_workers: int = 50) -> int:
"""Calculate optimal number of workers based on current rate limit utilization."""
if not rate_limiter:
return base_workers
current_rate = rate_limiter.get_current_rate()
max_rate = rate_limiter.max_requests_per_minute
utilization = current_rate / max_rate
# Conservative scaling:
# - If utilization < 50%, can scale up to 150% of base workers
# - If utilization < 25%, can scale up to 200% of base workers
# - If utilization > 75%, scale down to 75% of base workers
# - If utilization > 90%, scale down to 50% of base workers
if utilization < 0.25:
optimal = min(int(base_workers * 2.0), max_workers)
elif utilization < 0.5:
optimal = min(int(base_workers * 1.5), max_workers)
elif utilization < 0.75:
optimal = base_workers
elif utilization < 0.9:
optimal = max(int(base_workers * 0.75), 1)
else:
optimal = max(int(base_workers * 0.5), 1)
return optimal
def wait_for_rate_limit(limiter: RateLimiter, operation: str = "API call") -> None:
"""Wait for rate limiter and show progress."""
wait_time = limiter.wait_if_needed()
if wait_time > 0:
current_rate = limiter.get_current_rate()
print(f"⏳ Rate limiting: {current_rate}/140 RPM - waiting {wait_time:.1f}s for {operation}...")
time.sleep(wait_time)
print(f"✅ Rate limit wait complete, proceeding with {operation}")
# Batch processing state management
@dataclass
class BatchProgress:
"""Track batch processing progress and state."""
total_files: int
completed: List[str]
failed: List[Dict[str, Any]]
pending: List[str]
start_time: str
last_checkpoint: str
# Export file paths for resume continuity
export_results_path: Optional[str] = None
export_deviations_path: Optional[str] = None
export_ground_truth_path: Optional[str] = None
def save_to_file(self, filepath: str):
"""Save progress to JSON file."""
data = {
'total_files': self.total_files,
'completed': self.completed,
'failed': self.failed,
'pending': self.pending,
'start_time': self.start_time,
'last_checkpoint': self.last_checkpoint,
'completion_rate': len(self.completed) / self.total_files if self.total_files > 0 else 0.0,
# Export paths for resume continuity
'export_results_path': self.export_results_path,
'export_deviations_path': self.export_deviations_path,
'export_ground_truth_path': self.export_ground_truth_path
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
@classmethod
def load_from_file(cls, filepath: str) -> Optional['BatchProgress']:
"""Load progress from JSON file."""
if not os.path.exists(filepath):
return None
try:
with open(filepath, 'r') as f:
data = json.load(f)
return cls(
total_files=data['total_files'],
completed=data['completed'],
failed=data['failed'],
pending=data['pending'],
start_time=data['start_time'],
last_checkpoint=data['last_checkpoint'],
# Load export paths (with backwards compatibility)
export_results_path=data.get('export_results_path'),
export_deviations_path=data.get('export_deviations_path'),
export_ground_truth_path=data.get('export_ground_truth_path')
)
except Exception as e:
print(f"Warning: Could not load progress file {filepath}: {e}")
return None
class BatchResults:
"""Thread-safe collector for batch processing results."""
def __init__(self):
self._lock = threading.Lock()
self.results = []
self.deviation_log = []
self.progress = None # Store reference to BatchProgress for export paths
self.summary_stats = {
'total_processed': 0,
'successful': 0,
'failed': 0,
'avg_confidence': 0.0,
'avg_accuracy': 0.0,
'search_resolutions': 0
}
def add_result(self, pdf_path: str, result_data: Dict[str, Any]):
"""Thread-safe method to add processing result."""
with self._lock:
self.results.append({
'pdf_path': pdf_path,
'filename': Path(pdf_path).name,
'timestamp': datetime.datetime.now().isoformat(),
**result_data
})
# Update summary statistics
self.summary_stats['total_processed'] += 1
if result_data.get('metadata'):
self.summary_stats['successful'] += 1
metadata = result_data['metadata']
if hasattr(metadata, 'overall_confidence') and metadata.overall_confidence:
self.summary_stats['avg_confidence'] += metadata.overall_confidence
comparison_results = result_data.get('comparison_results', {})
if comparison_results.get('overall_accuracy'):
self.summary_stats['avg_accuracy'] += comparison_results['overall_accuracy']
search_resolved = result_data.get('search_resolution_results', {}).get('resolved')
if search_resolved:
self.summary_stats['search_resolutions'] += len(search_resolved)
else:
self.summary_stats['failed'] += 1
# Add deviation entry if available
if result_data.get('deviation_entry', {}).get('status') != 'no_tracking':
self.deviation_log.append(result_data['deviation_entry'])
def get_summary(self) -> Dict[str, Any]:
"""Get current summary statistics."""
with self._lock:
stats = self.summary_stats.copy()
if stats['successful'] > 0:
stats['avg_confidence'] /= stats['successful']
stats['avg_accuracy'] /= stats['successful']
return stats
def calculate_final_stats(self) -> Dict[str, Any]:
"""Calculate final statistics from actual export data (single source of truth)."""
with self._lock:
# Count from deviation_log (actual unresolved discrepancies)
unresolved_discrepancies = sum(len(entry.get("all_deviations", [])) for entry in self.deviation_log if isinstance(entry, dict))
files_with_discrepancies = sum(1 for entry in self.deviation_log if isinstance(entry, dict) and entry.get("all_deviations"))
return {
'unresolved_discrepancies': unresolved_discrepancies,
'files_with_discrepancies': files_with_discrepancies,
'search_resolutions': self.summary_stats['search_resolutions']
}
def export_results(self, output_path: str, append_mode: bool = False):
"""Export all results to Excel file.
Args:
output_path: Path to Excel file
append_mode: If True, append to existing file instead of overwriting
"""
with self._lock:
if not self.results:
print("No results to export")
return None
# Prepare data for export
export_data = []
for result in self.results:
metadata = result.get('metadata')
comparison = result.get('comparison_results') or {}
search_results = result.get('search_resolution_results') or {}
discrepancies = comparison.get('discrepancies') or {}
row = {
'filename': result['filename'],
'pdf_path': result['pdf_path'],
'timestamp': result['timestamp'],
'success': metadata is not None,
'overall_confidence': metadata.overall_confidence if metadata else None,
'metadata_completeness': metadata.metadata_completeness if metadata else None,
'ground_truth_accuracy': comparison.get('overall_accuracy'),
'discrepancies_count': len(discrepancies),
'search_resolutions': len(search_results.get('resolved', {}) if search_results else {}),
'search_resolution_rate': search_results.get('resolution_rate', 0.0) if search_results else 0.0,
}
# Add error information for failed files
if not metadata:
row['error_type'] = result.get('error_type', '')
row['error_message'] = result.get('error_message', '')[:500] # Truncate long error messages
# Add metadata fields if available, including search resolution info
if metadata:
for field_name in ['title', 'doc_type', 'health_topic', 'creator', 'year', 'country', 'language', 'level']:
field = getattr(metadata, field_name, None)
if field and hasattr(field, 'value'):
value = field.value.value if hasattr(field.value, 'value') else field.value
row[f'{field_name}_extracted'] = value
row[f'{field_name}_confidence'] = field.confidence
# Add ground truth value and detailed info ONLY for conflicting fields
if field_name in discrepancies:
discrepancy = discrepancies[field_name]
row[f'{field_name}_ground_truth'] = discrepancy.get('reference', '')
row[f'{field_name}_evidence'] = field.evidence[:200] if field.evidence else '' # Truncate
row[f'{field_name}_source_page'] = field.source_page
# Add search resolution info if this field was resolved
if search_results and search_results.get('resolved') and field_name in search_results['resolved']:
resolution = search_results['resolved'][field_name]
# Derive resolved_value if missing, based on choice and known discrepancy
if 'resolved_value' not in resolution:
choice = resolution.get('choice')
if choice == 'extracted':
resolution['resolved_value'] = (
discrepancies.get(field_name, {}).get('extracted')
if isinstance(discrepancies, dict) and field_name in discrepancies else value
)
elif choice == 'reference':
resolution['resolved_value'] = (
discrepancies.get(field_name, {}).get('reference')
if isinstance(discrepancies, dict) and field_name in discrepancies else None
)
row[f'{field_name}_search_resolved'] = resolution.get('resolved_value', '')
row[f'{field_name}_search_confidence'] = resolution.get('confidence', 0.0)
row[f'{field_name}_search_recommendation'] = resolution.get('recommendation', '')
row[f'{field_name}_resolution_reason'] = resolution.get('reasoning', '')[:200] # Truncate long reasons
export_data.append(row)
# Create DataFrame with new data
new_df = pd.DataFrame(export_data)
if append_mode and os.path.exists(output_path):
# Append mode - load existing data and combine
try:
existing_df = pd.read_excel(output_path)
# Combine DataFrames, avoiding duplicates based on filename + timestamp
combined_df = pd.concat([existing_df, new_df], ignore_index=True)
# Remove duplicates based on filename and timestamp
combined_df = combined_df.drop_duplicates(subset=['filename', 'timestamp'], keep='last')
combined_df.to_excel(output_path, index=False)
print(f"Batch results appended to existing file: {output_path}")
print(f" • Previous entries: {len(existing_df)}")
print(f" • New entries: {len(new_df)}")
print(f" • Total entries: {len(combined_df)}")
except Exception as e:
print(f"Warning: Could not append to existing file, overwriting: {e}")
new_df.to_excel(output_path, index=False)
else:
# Normal mode - overwrite file
new_df.to_excel(output_path, index=False)
return output_path
def export_updated_ground_truth(self, output_path: str, ground_truth_path: str = "documents-info.xlsx"):
"""Export results in the exact format of documents-info.xlsx with updated values."""
with self._lock:
if not self.results:
print("No results to export")
return None
# Load the original ground truth Excel file
try:
original_df = pd.read_excel(ground_truth_path)
print(f"Loaded ground truth file with {len(original_df)} rows")
except Exception as e:
print(f"Error loading ground truth file: {e}")
return None
# Create a copy to update
updated_df = original_df.copy()
# Add new columns for tracking changes
updated_df['flagged_for_review'] = False
updated_df['updated'] = False
updated_df['updated_fields'] = ''
updated_df['flagged_fields'] = ''
# Create lookup dict from results by filename
results_by_filename = {}
for result in self.results:
# Extract just the filename without path
filename = Path(result['filename']).name
results_by_filename[filename] = result
# Track which files actually matched ground truth rows
matched_files = set()
# Update each row with extracted/resolved values
for idx, row in updated_df.iterrows():
# Try to match by URL filename (handle docx→pdf conversions)
if pd.notna(row['public_file_url']):
url_filename = get_filename_from_url(row['public_file_url'])
url_stem = Path(url_filename).stem
# First try exact match
matched_filename = None
if url_filename in results_by_filename:
matched_filename = url_filename
else:
# Try stem match (for docx→pdf conversions)
for result_filename in results_by_filename.keys():
if Path(result_filename).stem == url_stem:
matched_filename = result_filename
break
if matched_filename:
# Track that we matched this file
matched_files.add(matched_filename)
result = results_by_filename[matched_filename]
metadata = result.get('metadata')
comparison = result.get('comparison_results') or {}
search_results = result.get('search_resolution_results') or {}
discrepancies = comparison.get('discrepancies') or {}
if metadata:
# Update fields with extracted or search-resolved values
fields_to_update = {
'doc_type': metadata.doc_type,
'health_topic': metadata.health_topic,
'creator': metadata.creator,
'year': metadata.year,
'country': metadata.country,
'language': metadata.language,
'title': metadata.title
}
flagged = False
updated = False
updated_fields = []
flagged_fields = []
for field_name, field in fields_to_update.items():
if field and hasattr(field, 'value'):
# Get original value from Excel for comparison
original_value = row.get(field_name)
# Check if this field was resolved by search
if search_results and search_results.get('resolved') and field_name in search_results['resolved']:
resolution = search_results['resolved'][field_name]
# Use search-resolved value if confidence is high
if resolution.get('confidence', 0) >= 0.8:
value = resolution.get('resolved_value')
if value:
# Handle year as integer
if field_name == 'year':
try:
new_value = int(value)
except:
new_value = value
else:
# Handle enum values
new_value = value.value if hasattr(value, 'value') else str(value)
# Check if value actually changed
if str(original_value) != str(new_value):
updated_df.at[idx, field_name] = new_value
updated = True
updated_fields.append(field_name)
else:
# Flag for review if search confidence is low
flagged = True
flagged_fields.append(field_name)
elif field_name not in discrepancies:
# Use extracted value if it matches ground truth or no ground truth exists
value = field.value.value if hasattr(field.value, 'value') else field.value
if value:
# Handle year as integer
if field_name == 'year':
try:
new_value = int(value)
except:
new_value = value
else:
# Handle enum values
new_value = value.value if hasattr(value, 'value') else str(value)
# Check if value actually changed
if str(original_value) != str(new_value):
updated_df.at[idx, field_name] = new_value
updated = True
updated_fields.append(field_name)
else:
# Field has unresolved discrepancy
flagged = True
flagged_fields.append(field_name)
# Set flags and field lists
updated_df.at[idx, 'flagged_for_review'] = flagged
updated_df.at[idx, 'updated'] = updated
updated_df.at[idx, 'updated_fields'] = ', '.join(updated_fields) if updated_fields else ''
updated_df.at[idx, 'flagged_fields'] = ', '.join(flagged_fields) if flagged_fields else ''
# Save the updated Excel file
updated_df.to_excel(output_path, index=False)
# Count statistics - use the actual matched files we tracked
total_processed = len(matched_files)
flagged_count = updated_df['flagged_for_review'].sum()
updated_count = updated_df['updated'].sum()
print(f"\n📊 Ground truth format export complete:")
print(f" • Original rows: {len(original_df)}")
print(f" • Files processed: {total_processed}")
print(f" • Rows with updates: {updated_count}")
print(f" • Flagged for review: {flagged_count}")
print(f" • Output saved to: {output_path}")
return output_path
# Pydantic models for search resolution results
from enum import Enum
class ResolutionChoice(str, Enum):
"""Enum for resolution choices - forces model to choose between existing values."""
EXTRACTED = "extracted" # Use the AI-extracted value
REFERENCE = "reference" # Use the ground truth value
NONE = "none" # Neither value fits based on search
class FieldResolution(pydantic.BaseModel):
"""Resolution for a single metadata field conflict."""
choice: ResolutionChoice = pydantic.Field(description="Which value to use: extracted, reference, or none")
confidence: float = pydantic.Field(ge=0.0, le=1.0, description="Confidence in this choice (0.0-1.0)")
reasoning: str = pydantic.Field(description="Explanation of why this choice was made based on search results")
class SearchResolutionResponse(pydantic.BaseModel):
"""Complete response from search grounding resolution."""
resolutions: Dict[str, FieldResolution] = pydantic.Field(description="Resolution for each conflicting field")
search_evidence: str = pydantic.Field(description="Key evidence from search results")
sources: List[str] = pydantic.Field(default_factory=list, description="URLs or source descriptions")
overall_confidence: float = pydantic.Field(ge=0.0, le=1.0, description="Overall confidence in resolutions")
# Search grounding functions for automatic conflict resolution
def query_gemini_with_search(discrepancies: dict, extracted_metadata, pdf_filename: str, client, uploaded_files=None, verbose: bool = True) -> dict:
"""Use ONE search to resolve ALL metadata conflicts for this document."""
if verbose:
print("\n📝 PREPARING SEARCH GROUNDING REQUEST")
print("="*50)
# Build conflict summary
conflict_summary = []
for field_name, conflict_data in discrepancies.items():
conflict_summary.append(f"• {field_name}: Extracted='{conflict_data['extracted']}' vs Reference='{conflict_data['reference']}'")
# Show conflicts without biasing toward extracted values
if verbose:
print(f"Document: {Path(pdf_filename).name}")
print(f"\nConflicts to resolve ({len(discrepancies)}):")
for conflict in conflict_summary:
print(f" {conflict}")
# Build the expected schema for the response
schema_example = {
"resolutions": {},
"search_evidence": "key evidence from search results",
"sources": ["URL or source description"],
"overall_confidence": 0.0
}
# Add resolution structure for each conflicting field
for field_name in discrepancies.keys():
schema_example["resolutions"][field_name] = {
"choice": "extracted|reference|none",
"confidence": 0.0,
"reasoning": "explanation of why this choice was made based on search results"
}
# Build minimal context from ONLY uncontested fields
uncontested_context = []
all_fields = ['title', 'country', 'year', 'creator', 'doc_type', 'health_topic', 'language']
for field in all_fields:
if field not in discrepancies:
value = getattr(extracted_metadata, field, None)
if value and hasattr(value, 'value') and value.value:
uncontested_context.append(f"{field}: {value.value}")
# Build contents list starting with PDF files, then the prompt
contents = []
# Add PDF files first if available
if uploaded_files and uploaded_files[0]: # first_pages_file, last_pages_file
first_pages_file, last_pages_file = uploaded_files[0], uploaded_files[1]
contents.append(first_pages_file)
if last_pages_file:
contents.append(last_pages_file)
# Then add the analysis prompt
analysis_prompt = f"""
I need to resolve metadata conflicts for this health policy document.
{f"The document content is provided above for your direct examination." if uploaded_files and uploaded_files[0] else ""}
The conflicting metadata values found are:
{chr(10).join(conflict_summary)}
{f"Uncontested metadata: {', '.join(uncontested_context)}" if uncontested_context else "No additional context available."}
{"Please examine the document content AND search for authoritative information to resolve these conflicts." if uploaded_files and uploaded_files[0] else "Search for authoritative information to resolve these conflicts."}
Do not assume either the extracted or reference values are correct without verification.
Consider:
1. **Official Sources**: Government websites, institutional publications
2. **Document Catalogs**: Library systems, policy databases
3. **Publication Records**: Official publication dates and titles
4. **Organization Information**: Official names and attributions
**IMPORTANT**: You must choose between the existing values, do NOT create new values.
For each conflict, you have exactly THREE choices:
- "extracted": if search supports the extracted value (maintains categorical structure)
- "reference": if search supports the reference/ground truth value
- "none": if neither value fits what the search reveals
Based on your search results, provide resolutions following this EXACT JSON structure:
```json
{json.dumps(schema_example, indent=2)}
```
**Critical Rules**:
- Only choose between "extracted", "reference", or "none" - do NOT invent new values
- Only provide confidence >0.8 if search results clearly support one specific choice
- Prefer categorical consistency (e.g., if extracted="Agency" and search shows "CDC", choose "extracted")
- Use "none" only when neither value fits the search evidence
**IMPORTANT**: Return your response as a valid JSON object wrapped in ```json``` markdown code blocks.
"""
try:
if verbose:
print("\n🌐 EXECUTING SEARCH GROUNDING")
print("-"*50)
print("Model: gemini-2.5-flash")