Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_out_exec_filter: add a sleep to ensure the stream is emitted #4755

Merged
merged 1 commit into from
Jan 6, 2025

Conversation

Watson1978
Copy link
Contributor

Which issue(s) this PR fixes:
Fixes #4754

What this PR does / why we need it:
This PR will stable the tests.

Docs Changes:

Release Note:

@Watson1978 Watson1978 added the CI Test/CI issues label Jan 5, 2025
@daipom
Copy link
Contributor

daipom commented Jan 6, 2025

Thanks for this fix!
The use of the driver in this test is wrong because Fluent::Test::Driver::BaseOwner keeps @event_streams.
expect_emits should be as follows.

d.run(default_tag: 'test', expect_emits: 1, ...
d.run(default_tag: 'test', expect_emits: 2, ...
d.run(default_tag: 'test', expect_emits: 3, ...
d.run(default_tag: 'test', expect_emits: 4, ...

The following would avoid the need to manage start and shutdown.
Could you try the following?

diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb
index 5a7dc710..c808eda0 100644
--- a/test/plugin/test_out_exec_filter.rb
+++ b/test/plugin/test_out_exec_filter.rb
@@ -500,10 +500,15 @@ class ExecFilterOutputTest < Test::Unit::TestCase
     d = create_driver(conf)
     time = event_time('2011-01-02 13:14:15')
 
-    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: false){ d.feed(time, {"k1" => 0}) }
-    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 1}) }
-    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 2}) }
-    d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false){ d.feed(time, {"k1" => 3}) }
+    d.run(default_tag: 'test', expect_emits: 4) do
+      d.feed(time, {"k1" => 0})
+      d.flush
+      d.feed(time, {"k1" => 1})
+      d.flush
+      d.feed(time, {"k1" => 2})
+      d.flush
+      d.feed(time, {"k1" => 3})
+    end
 
     assert_equal "2011-01-02 13:14:15\ttest\t0\n", d.formatted[0]
     assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[1]
@@ -524,9 +529,6 @@ class ExecFilterOutputTest < Test::Unit::TestCase
     assert_equal pid_list[1], events[1][2]['child_pid']
     assert_equal pid_list[0], events[2][2]['child_pid']
     assert_equal pid_list[1], events[3][2]['child_pid']
-
-  ensure
-    d.run(start: false, shutdown: true)
   end
 
   # child process exits per 3 lines

@Watson1978
Copy link
Contributor Author

@daipom I tried your suggession. However, unfortunatry it causes failure as following...

ExecFilterOutputTest:
  test: using child processes by round robin[with sections]:                                                                    F
====================================================================================================================================================
Failure: test: using child processes by round robin[with sections](ExecFilterOutputTest)
test/plugin/test_out_exec_filter.rb:530:in `block in <class:ExecFilterOutputTest>'
     527:
     528:     assert_equal pid_list[0], events[0][2]['child_pid']
     529:     assert_equal pid_list[1], events[1][2]['child_pid']
  => 530:     assert_equal pid_list[0], events[2][2]['child_pid']
     531:     assert_equal pid_list[1], events[3][2]['child_pid']
     532:   end
     533:
<"8132"> expected but was
<"11304">

diff:
? 8132
? 1  04
? ?  ?
====================================================================================================================================================
: (4.781393)

Finished in 4.7823206 seconds.
----------------------------------------------------------------------------------------------------------------------------------------------------
1 tests, 9 assertions, 1 failures, 0 errors, 0 pendings, 0 omissions, 0 notifications
0% passed
----------------------------------------------------------------------------------------------------------------------------------------------------
0.21 tests/s, 1.88 assertions/s
rake aborted!
Command failed with status (1): [bundle exec ruby -w -v -I"lib:test" -Eascii-8bit:ascii-8bit test/plugin/test_out_exec_filter.rb -v --name="test: using child processes by round robin[with sections]"]
C:/src/fluentd/runner.rake:4:in `block (2 levels) in <top (required)>'
C:/src/fluentd/runner.rake:2:in `times'
C:/src/fluentd/runner.rake:2:in `block in <top (required)>'
Tasks: TOP => default
(See full trace by running task with --trace)

@daipom
Copy link
Contributor

daipom commented Jan 6, 2025

@daipom I tried your suggession. However, unfortunatry it causes failure as following...

Hmm, I can't reproduce it.
Is it unstable?

Ubuntu focal, Ruby 3.2.2

  data(
    'with sections' => CONFIG_ROUND_ROBIN,
    'traditional' => CONFIG_ROUND_ROBIN_COMPAT,
  )
  test 'using child processes by round robin' do |conf|
    d = create_driver(conf)
    time = event_time('2011-01-02 13:14:15')

    d.run(default_tag: 'test', expect_emits: 4) do
      d.feed(time, {"k1" => 0})
      d.flush
      d.feed(time, {"k1" => 1})
      d.flush
      d.feed(time, {"k1" => 2})
      d.flush
      d.feed(time, {"k1" => 3})
    end

    assert_equal "2011-01-02 13:14:15\ttest\t0\n", d.formatted[0]
    assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[1]
    assert_equal "2011-01-02 13:14:15\ttest\t2\n", d.formatted[2]
    assert_equal "2011-01-02 13:14:15\ttest\t3\n", d.formatted[3]

    events = d.events
    assert_equal 4, events.length

    pid_list = []
    events.each do |event|
      pid = event[2]['child_pid']
      pid_list << pid unless pid_list.include?(pid)
    end
    assert_equal 2, pid_list.size, "the number of pids should be same with number of child processes: #{pid_list.inspect}"

    assert_equal pid_list[0], events[0][2]['child_pid']
    assert_equal pid_list[1], events[1][2]['child_pid']
    assert_equal pid_list[0], events[2][2]['child_pid']
    assert_equal pid_list[1], events[3][2]['child_pid']
  end

@Watson1978
Copy link
Contributor Author

Watson1978 commented Jan 6, 2025

Is it unstable?

In my environment, it fails with 5 ~ 10 %.
It should be repeated at least 20 times to confirm the results.

I tryed on Windows on virtual machine when I got the error.

@daipom
Copy link
Contributor

daipom commented Jan 6, 2025

Thanks!
Indeed, it sometimes fails on Windows.
Looks like we need to add some sleep to wait Output#write completion.

    d.run(default_tag: 'test', expect_emits: 4) do
      d.feed(time, {"k1" => 0})
      d.flush
      sleep 0.5
      d.feed(time, {"k1" => 1})
      d.flush
      sleep 0.5
      d.feed(time, {"k1" => 2})
      d.flush
      sleep 0.5
      d.feed(time, {"k1" => 3})
    end

Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
@Watson1978 Watson1978 force-pushed the test_out_exec_filter branch from d39f3c7 to a4e733a Compare January 6, 2025 08:44
@Watson1978
Copy link
Contributor Author

Thanks. I updated this PR with your suggession.

@daipom
Copy link
Contributor

daipom commented Jan 6, 2025

Thanks!! Waiting CI...

Note: The reason why we need sleep here is as follows.

Fluent::Test::Driver::Output#wait_flush_completion does not accurately wait flush completion.

def wait_flush_completion
buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
dequeued_chunks = ->(){
@instance.dequeued_chunks_mutex &&
@instance.dequeued_chunks &&
@instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
}
Timeout.timeout(10) do
while buffer_queue.call || dequeued_chunks.call
sleep 0.1
end
end
end

It waits for buffer.queue to become empty.
However, dequeue happens at the beginning of the flush process.

def try_flush
chunk = @buffer.dequeue_chunk

So, it does not wait flush completion.

This test depends on the order of the flushes. Therefore, this makes the test unstable.
In the future, we should improve Fluent::Test::Driver::Output#wait_flush_completion instead of using sleep.

@daipom daipom added the backport to LTS We will backport this fix to the LTS branch label Jan 6, 2025
Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@daipom daipom merged commit abe335a into fluent:master Jan 6, 2025
13 checks passed
@Watson1978 Watson1978 deleted the test_out_exec_filter branch January 6, 2025 09:53
@daipom daipom added this to the v1.19.0 milestone Jan 28, 2025
kenhys pushed a commit that referenced this pull request Jan 29, 2025
)

**Which issue(s) this PR fixes**:
Fixes #4754

**What this PR does / why we need it**:
This PR will stable the tests.

**Docs Changes**:

**Release Note**:

Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
daipom pushed a commit that referenced this pull request Jan 29, 2025
…m is emitted (#4755) (#4801)

**Which issue(s) this PR fixes**:

Backport #4755

**What this PR does / why we need it**:

This PR will stable the tests.

**Docs Changes**:

**Release Note**:

Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport to LTS We will backport this fix to the LTS branch CI Test/CI issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CI: Fails "test: using child processes by round robin[with sections](ExecFilterOutputTest)" test
2 participants