diff --git a/lib/fluent/clock.rb b/lib/fluent/clock.rb index 25125cd59c..1a565c71f9 100644 --- a/lib/fluent/clock.rb +++ b/lib/fluent/clock.rb @@ -43,6 +43,10 @@ def self.now_raw Process.clock_gettime(CLOCK_ID) end + def self.real_now(unit = :second) + Process.clock_gettime(Process::CLOCK_REALTIME, unit) + end + def self.dst_clock_from_time(time) diff_sec = Time.now - time now_raw - diff_sec diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index dc6b9f40bd..95e9a3a803 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -57,13 +57,31 @@ def initialize(metadata, compress: :text) @state = :unstaged @size = 0 - @created_at = Time.now - @modified_at = Time.now + @created_at = Fluent::Clock.real_now + @modified_at = Fluent::Clock.real_now extend Decompressable if compress == :gzip end - attr_reader :unique_id, :metadata, :created_at, :modified_at, :state + attr_reader :unique_id, :metadata, :state + + def raw_create_at + @created_at + end + + def raw_modified_at + @modified_at + end + + # for compatibility + def created_at + @created_at_object ||= Time.at(@created_at) + end + + # for compatibility + def modified_at + @modified_at_object ||= Time.at(@modified_at) + end # data is array of formatted record string def append(data, **kwargs) diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 9f2708ec3b..00d7bb87b7 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -74,7 +74,8 @@ def commit @size += @adding_size @bytesize += @adding_bytes @adding_bytes = @adding_size = 0 - @modified_at = Time.now + @modified_at = Fluent::Clock.real_now + @modified_at_object = nil true end @@ -216,12 +217,12 @@ def self.unique_id_from_path(path) def restore_metadata(bindata) data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {} - now = Time.now + now = Fluent::Clock.real_now @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id @size = data[:s] || 0 - @created_at = Time.at(data.fetch(:c, now.to_i)) - @modified_at = Time.at(data.fetch(:m, now.to_i)) + @created_at = data.fetch(:c, now.to_i) + @modified_at = data.fetch(:m, now.to_i) @metadata.timekey = data[:timekey] @metadata.tag = data[:tag] @@ -231,8 +232,8 @@ def restore_metadata(bindata) def restore_metadata_partially(chunk) @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id @size = 0 - @created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?) - @modified_at = chunk.mtime + @created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?) + @modified_at = chunk.mtime.to_i @metadata.timekey = nil @metadata.tag = nil @@ -243,8 +244,8 @@ def write_metadata(update: true) data = @metadata.to_h.merge({ id: @unique_id, s: (update ? @size + @adding_size : @size), - c: @created_at.to_i, - m: (update ? Time.now : @modified_at).to_i, + c: @created_at, + m: (update ? Fluent::Clock.real_now : @modified_at), }) bin = msgpack_packer.pack(data).to_s @meta.seek(0, IO::SEEK_SET) diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index ddb16e4a65..556c8c8a3d 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -43,7 +43,8 @@ def commit @chunk_bytes += @adding_bytes @adding_bytes = @adding_size = 0 - @modified_at = Time.now + @modified_at = Fluent::Clock.real_now + @modified_at_object = nil true end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index e427c78274..823e91081f 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1376,7 +1376,7 @@ def enqueue_thread_run # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. - @buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int } + @buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int } end if @chunk_key_time diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index e5722c2fa2..52eed0b1f5 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -111,7 +111,9 @@ def self.eq?(a, b) end def self.now - from_time(Time.now) + # This method is called many time. so call Process.clock_gettime directly instead of Fluent::Clock.real_now + now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) + Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000) end def self.parse(*args) diff --git a/test/helper.rb b/test/helper.rb index a373ede5db..323821be83 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -50,6 +50,7 @@ def to_masked_element require 'fluent/time' require 'serverengine' require 'helpers/fuzzy_assert' +require 'helpers/process_extenstion' module Fluent module Plugin diff --git a/test/helpers/process_extenstion.rb b/test/helpers/process_extenstion.rb new file mode 100644 index 0000000000..b420276258 --- /dev/null +++ b/test/helpers/process_extenstion.rb @@ -0,0 +1,33 @@ +require 'timecop' + +module Process + class << self + alias_method :clock_gettime_original, :clock_gettime + + def clock_gettime(clock_id, unit = :float_second) + # now only support CLOCK_REALTIME + if Process::CLOCK_REALTIME == clock_id + t = Time.now + + case unit + when :float_second + t.to_i + t.nsec / 1_000_000_000.0 + when :float_millisecond + t.to_i * 1_000 + t.nsec / 1_000_000.0 + when :float_microsecond + t.to_i * 1_000_000 + t.nsec / 1_000.0 + when :second + t.to_i + when :millisecond + t.to_i * 1000 + t.nsec / 1_000_000 + when :microsecond + t.to_i * 1_000_000 + t.nsec / 1_000 + when :nanosecond + t.to_i * 1_000_000_000 + t.nsec + end + else + Process.clock_gettime_original(clock_id, unit) + end + end + end +end