1
1
import copy
2
+ import time
2
3
from datetime import datetime
3
4
from datetime import timedelta
4
5
from datetime import timezone
@@ -465,11 +466,13 @@ def _fetch_document_batches(
465
466
# number of documents/errors yielded
466
467
yield_count = 0
467
468
469
+ end = (end or time .time ()) + 3600 * 24
468
470
checkpoint = copy .deepcopy (checkpoint )
469
471
prev_doc_ids = checkpoint .last_seen_doc_ids
470
472
checkpoint .last_seen_doc_ids = []
471
473
# use "start" when last_updated is 0
472
- page_query = self ._construct_page_query (checkpoint .last_updated or start , end )
474
+ start_ts = checkpoint .last_updated or start
475
+ page_query = self ._construct_page_query (start_ts , end )
473
476
logger .debug (f"page_query: { page_query } " )
474
477
475
478
# most requests will include a few pages to skip, so we limit each page to
@@ -480,13 +483,23 @@ def _fetch_document_batches(
480
483
limit = 2 * self .batch_size ,
481
484
):
482
485
# create checkpoint after enough documents have been processed
483
- if yield_count >= self .batch_size :
484
- return checkpoint
486
+ # if yield_count >= self.batch_size:
487
+ # return checkpoint
485
488
486
489
if page ["id" ] in prev_doc_ids :
487
490
# There are a few seconds of fuzziness in the request,
488
491
# so we skip if we saw this page on the last run
489
492
continue
493
+
494
+ ts = datetime_from_string (page ["version" ]["when" ]).timestamp ()
495
+
496
+ if ts < checkpoint .last_updated :
497
+ logger .warning (
498
+ f"Confluence Returned results out of order. Request start time: { start_ts } , "
499
+ f"current item time: { ts } , checkpoint.last_updated: { checkpoint .last_updated } "
500
+ )
501
+ continue
502
+
490
503
# Build doc from page
491
504
doc_or_failure = self ._convert_page_to_document (page )
492
505
yield_count += 1
@@ -495,9 +508,7 @@ def _fetch_document_batches(
495
508
yield doc_or_failure
496
509
continue
497
510
498
- checkpoint .last_updated = datetime_from_string (
499
- page ["version" ]["when" ]
500
- ).timestamp ()
511
+ checkpoint .last_updated = ts
501
512
502
513
# Now get attachments for that page:
503
514
doc_or_failure = self ._fetch_page_attachments (page , doc_or_failure )
0 commit comments