44
44
)
45
45
from opentelemetry .sdk .resources import Resource as SDKResource
46
46
from opentelemetry .sdk .util .instrumentation import InstrumentationScope
47
+ from opentelemetry .test .concurrency_test import ConcurrencyTestBase
47
48
from opentelemetry .trace import TraceFlags
48
49
from opentelemetry .trace .span import INVALID_SPAN_CONTEXT
49
50
@@ -559,7 +560,7 @@ def bulk_log_and_flush(num_logs):
559
560
hasattr (os , "fork" ),
560
561
"needs *nix" ,
561
562
)
562
- def test_batch_log_record_processor_fork (self ):
563
+ def test_batch_log_record_processor_fork_clears_logs_from_child (self ):
563
564
exporter = InMemoryLogExporter ()
564
565
log_record_processor = BatchLogRecordProcessor (
565
566
exporter ,
@@ -572,15 +573,13 @@ def test_batch_log_record_processor_fork(self):
572
573
for _ in range (10 ):
573
574
log_record_processor .emit (EMPTY_LOG )
574
575
576
+ # The below test also needs this, but it can only be set once.
575
577
multiprocessing .set_start_method ("fork" )
576
578
577
579
def child (conn ):
578
- for _ in range (100 ):
579
- log_record_processor .emit (EMPTY_LOG )
580
580
log_record_processor .force_flush ()
581
-
582
581
logs = exporter .get_finished_logs ()
583
- conn .send (len (logs ) == 100 )
582
+ conn .send (len (logs ) == 0 )
584
583
conn .close ()
585
584
586
585
parent_conn , child_conn = multiprocessing .Pipe ()
@@ -591,6 +590,34 @@ def child(conn):
591
590
log_record_processor .force_flush ()
592
591
self .assertTrue (len (exporter .get_finished_logs ()) == 10 )
593
592
593
+ @unittest .skipUnless (
594
+ hasattr (os , "fork" ),
595
+ "needs *nix" ,
596
+ )
597
+ def test_batch_log_record_processor_fork_doesnot_deadlock (self ):
598
+ exporter = InMemoryLogExporter ()
599
+ log_record_processor = BatchLogRecordProcessor (
600
+ exporter ,
601
+ max_export_batch_size = 64 ,
602
+ schedule_delay_millis = 30000 ,
603
+ )
604
+
605
+ def child (conn ):
606
+ def _target ():
607
+ log_record_processor .emit (EMPTY_LOG )
608
+
609
+ ConcurrencyTestBase .run_with_many_threads (_target , 100 )
610
+ log_record_processor .force_flush ()
611
+ logs = exporter .get_finished_logs ()
612
+ conn .send (len (logs ) == 100 )
613
+ conn .close ()
614
+
615
+ parent_conn , child_conn = multiprocessing .Pipe ()
616
+ process = multiprocessing .Process (target = child , args = (child_conn ,))
617
+ process .start ()
618
+ self .assertTrue (parent_conn .recv ())
619
+ process .join ()
620
+
594
621
595
622
class TestConsoleLogExporter (unittest .TestCase ):
596
623
def test_export (self ): # pylint: disable=no-self-use
0 commit comments