Skip to content

Commit

Permalink
Large event streams from an emit will be split into multi chunks
Browse files Browse the repository at this point in the history
by split-and-join event streams.

With this change, locking/releasing was improved at the same time.
  • Loading branch information
tagomoris committed Jul 19, 2016
1 parent 0012d46 commit b5f2e9f
Showing 1 changed file with 165 additions and 55 deletions.
220 changes: 165 additions & 55 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,26 +180,38 @@ def write(metadata_and_data, format: nil, enqueue: false)

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
enqueued_chunks = []

begin
metadata_and_data.each do |metadata, data|
write_once(metadata, data, format: format) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
# this value includes bytes of unstaged chunks, but it will be removed by enqueue_unstaged_chunk
staged_bytesize += adding_bytesize
end
end

return if operated_chunks.empty?

# Now, this thread acquires many locks of chunks... getting buffer-global lock causes dead lock.
# Any operations needs buffer-global lock (including enqueueing) should be done after releasing locks.

first_chunk = operated_chunks.shift
# Following commits for other chunks also can finish successfully if the first commit operation
# finishes without any exceptions.
# In most cases, #commit just requires very small disk spaces, so major failure reason are
# permission errors, disk failures and other permanent(fatal) errors.
begin
first_chunk.commit
enqueue_chunk(first_chunk.metadata) if enqueue || chunk_size_full?(first_chunk)
if enqueue || first_chunk.unstaged? || chunk_size_full?(first_chunk)
enqueued_chunks << first_chunk
end
first_chunk.mon_exit
rescue
operated_chunks.unshift(first_chunk)
Expand All @@ -211,17 +223,43 @@ def write(metadata_and_data, format: nil, enqueue: false)
operated_chunks.each do |chunk|
begin
chunk.commit
enqueue_chunk(chunk.metadata) if enqueue || chunk_size_full?(chunk)
if enqueue || chunk.unstaged? || chunk_size_full?(chunk)
enqueued_chunks << chunk
end
chunk.mon_exit
rescue => e
chunk.rollback
chunk.mon_exit
errors << e
end
end
operated_chunks.clear if errors.empty?

@stage_size += staged_bytesize
# All locks about chunks are released.

synchronize do
# At here, staged chunks may be enqueued by other threads.
@stage_size += staged_bytesize

enqueued_chunks.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
if u.unstaged? && !chunk_size_full?(u)
@stage[m] = u.staged!
end
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
end
end

operated_chunks.clear if errors.empty?

if errors.size > 0
log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
Expand All @@ -230,6 +268,9 @@ def write(metadata_and_data, format: nil, enqueue: false)
ensure
operated_chunks.each do |chunk|
chunk.rollback rescue nil # nothing possible to do for #rollback failure
if chunk.unstaged?
chunk.purge rescue nil # to prevent leakage of unstaged chunks
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
end
Expand Down Expand Up @@ -271,6 +312,18 @@ def enqueue_chunk(metadata)
nil
end

def enqueue_unstaged_chunk(chunk)
synchronize do
chunk.synchronize do
metadata = chunk.metadata
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued! if chunk.respond_to?(:enqueued!)
end
@queue_size += chunk.bytesize
end
end

def enqueue_all
synchronize do
if block_given?
Expand Down Expand Up @@ -357,15 +410,20 @@ def chunk_size_full?(chunk)

class ShouldRetry < StandardError; end

# write once into a chunk
# 1. append whole data into existing chunk
# 2. commit it & return unless chunk_size_over?
# 3. enqueue existing chunk & retry whole method if chunk was not empty
# 4. go to step_by_step writing

def write_once(metadata, data, format: nil, &block)
return if data.empty?

stored = false
adding_bytesize = nil

chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
enqueue_list = []

chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata).staged! }
enqueue_chunk_before_retry = false
chunk.synchronize do
# retry this method if chunk is already queued (between getting chunk and entering critical section)
raise ShouldRetry unless chunk.staged?
Expand All @@ -376,18 +434,25 @@ def write_once(metadata, data, format: nil, &block)
begin
if format
serialized = format.call(data)
chunk.concat(content, data.size)
chunk.concat(serialized, data.size)
else
chunk.append(data)
end
adding_bytesize = chunk.bytesize - original_bytesize

if chunk_size_over?(chunk)
if empty_chunk && format ## TODO: re-split data into chunks
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes"
stored = true
else
chunk.rollback
if format && empty_chunk
log.warn "chunk bytes limit exceeds for an emitted event stream: #{adding_bytesize}bytes"
end
chunk.rollback

if format && !empty_chunk
# Event streams should be appended into a chunk at once
# as far as possible, to improve performance of formatting.
# Event stream may be a MessagePackEventStream. We don't want to split it into
# 2 or more chunks (except for a case that the event stream is larger than chunk limit).
enqueue_chunk_before_retry = true
raise ShouldRetry
end
else
stored = true
Expand All @@ -399,74 +464,119 @@ def write_once(metadata, data, format: nil, &block)

if stored
block.call(chunk, adding_bytesize)
elsif bulk
# this metadata might be enqueued already by other threads
# but #enqueue_chunk does nothing in such case
enqueue_list << metadata
raise ShouldRetry
end
end

unless stored
# try step-by-step appending if data can't be stored into existing a chunk in non-bulk mode
write_step_by_step(metadata, data, data.size / 3, &block)
write_step_by_step(metadata, data, format, 10, &block)
end
rescue ShouldRetry
enqueue_list.each do |m|
enqueue_chunk(m)
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
end

def write_step_by_step(metadata, data, attempt_records, &block)
while data.size > 0
if attempt_records < MINIMUM_APPEND_ATTEMPT_RECORDS
attempt_records = MINIMUM_APPEND_ATTEMPT_RECORDS
end
# EventStream can be split into many streams
# because (es1 + es2).to_msgpack_stream == es1.to_msgpack_stream + es2.to_msgpack_stream

# 1. split event streams into many (10 -> 100 -> 1000 -> ...) chunks
# 2. append splits into the staged chunks as much as possible
# 3. create unstaged chunk and append rest splits -> repeat it for all splits

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
if splits_count > data.size
splits_count = data.size
end
slice_size = if splits_count > data.size
data.size
elsif data.size % splits_count == 0
data.size / splits_count
else
data.size / (splits_count - 1)
end
slice_origin = 0
while slice_origin < data.size
splits << data.slice(slice_origin, slice_size)
slice_origin += slice_size
end

# This method will append events into the staged chunk at first.
# Then, will generate chunks not staged (not queued) to append rest data.
staged_chunk_used = false
modified_chunks = []
get_next_chunk = ->(){
c = if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
generate_chunk(metadata)
else
synchronize{ @stage[metadata] ||= generate_chunk(metadata).staged! }
end
modified_chunks << c
c
}

writing_splits_index = 0
enqueue_chunk_before_retry = false

while writing_splits_index < splits.size
chunk = get_next_chunk.call
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

chunk = synchronize{ @stage[metadata] ||= generate_chunk(metadata) }
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
raise ShouldRetry unless chunk.staged?
original_bytesize = chunk.bytesize
begin
empty_chunk = chunk.empty?
original_bytesize = chunk.bytesize
while writing_splits_index < splits.size
split = splits[writing_splits_index]
if format
chunk.concat(format.call(split), split.size)
else
chunk.append(split)
end

attempt = data.slice(0, attempt_records)
chunk.append(attempt)
adding_bytesize = (chunk.bytesize - original_bytesize)
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback

if chunk_size_over?(chunk)
chunk.rollback
if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
end

if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS
if empty_chunk # record is too large even for empty chunk
raise BufferChunkOverflowError, "minimum append batch '#{attempt.size}' records exceeds chunk bytes limit"
if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
splits_count *= 10
end
# no more records for this chunk -> enqueue -> to be flushed
enqueue_chunk(metadata) # `chunk` will be removed from stage
attempt_records = data.size # fresh chunk may have enough space
else
# whole data can be processed by twice operation
# ( by using apttempt /= 2, 3 operations required for odd numbers of data)
attempt_records = (attempt_records / 2) + 1

raise ShouldRetry
end

next
end
writing_splits_index += 1

block.call(chunk, adding_bytesize)
data.slice!(0, attempt_records)
# same attempt size
nil # discard return value of data.slice!() immediately
if chunk_size_full?(chunk)
break
end
end
rescue
chunk.rollback
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
end
end
rescue ShouldRetry
modified_chunks.each do |mc|
mc.rollback rescue nil
if mc.unstaged?
mc.purge rescue nil
end
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
end # write_step_by_step
end
end
end
end

0 comments on commit b5f2e9f

Please sign in to comment.