Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lost data due to mysql tow-phase-commit #1054

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ func (this *Migrator) Migrate() (err error) {
if err := this.addDMLEventsListener(); err != nil {
return err
}

// write a heartbeat data into the ghc table, and avoid lost data due to MySQL two-phase-commit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaohk could you add a bit more background in this comment on "why" writing the heartbeat causes us to avoid lost data due to MySQL two-phase-commit? That will be useful to those without context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to describe the scene:

When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC, if an INSERT statement is being committed but blocks due to an unmet ack count, the data inserted by the transaction is not visible to ReadMigrationRangeValues, so the copy of the existing data in the table does not include the new row inserted by the transaction. However, the binlog event for the transaction is already written to the binlog, so the addDMLEventsListener only captures the binlog event after the transaction, and thus the transaction's binlog event is not captured, resulting in data loss.

If you add a heartbeat, and the transaction commit blocks because the ack is not met, then the heartbeat will not be able to write, so the ReadMigrationRangeValues will not be run. When the heartbeat writes successfully, the ReadMigrationRangeValues will read the newly inserted data, thus Avoiding data loss due to the above problem

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fanduzi that's my understanding too 👍

I'm hoping we can add some of this context into the code-comment on line 392, so that this is easier to read for those without context

Copy link
Collaborator

@timvaillancourt timvaillancourt Jun 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaohk two more questions:

  1. Would it make sense to move this to be within the .ReadMigrationRangeValues() func? The goal with the .Migrate() func is to leave a lot of the details to other functions
  2. What do you think about updating a ChangeLogState instead of the heartbeat? .WriteChangelogState() in the applier is used for this. Perhaps a new ChangeLogState like ReadMigrationRangeValues would work here 🤔. Example PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaohk two more questions:

  1. Would it make sense to move this to be within the .ReadMigrationRangeValues() func? The goal with the .Migrate() func is to leave a lot of the details to other functions
  2. What do you think about updating a ChangeLogState instead of the heartbeat? .WriteChangelogState() in the applier is used for this. Perhaps a new ChangeLogState like ReadMigrationRangeValues would work here 🤔. Example PR

Sorry, I just saw these messages.

  1. I complete the annotation information in the code.
  2. I think the above two suggestions are very good, I will revise the PR and mention it again.

Copy link
Contributor Author

@shaohk shaohk Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaohk two more questions:

  1. Would it make sense to move this to be within the .ReadMigrationRangeValues() func? The goal with the .Migrate() func is to leave a lot of the details to other functions
  2. What do you think about updating a ChangeLogState instead of the heartbeat? .WriteChangelogState() in the applier is used for this. Perhaps a new ChangeLogState like ReadMigrationRangeValues would work here 🤔. Example PR

@timvaillancourt
Use changelog state instead of the heartbeat PR

Copy link
Contributor Author

@shaohk shaohk Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to describe the scene:

When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC, if an INSERT statement is being committed but blocks due to an unmet ack count, the data inserted by the transaction is not visible to ReadMigrationRangeValues, so the copy of the existing data in the table does not include the new row inserted by the transaction. However, the binlog event for the transaction is already written to the binlog, so the addDMLEventsListener only captures the binlog event after the transaction, and thus the transaction's binlog event is not captured, resulting in data loss.

If you add a heartbeat, and the transaction commit blocks because the ack is not met, then the heartbeat will not be able to write, so the ReadMigrationRangeValues will not be run. When the heartbeat writes successfully, the ReadMigrationRangeValues will read the newly inserted data, thus Avoiding data loss due to the above problem

@Fanduzi
Thanks, I added your description to the code comments. #1141

if _, err := this.applier.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
return err
}

if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
Expand Down