Skip to content

Commit 8396c53

Browse files
authored
event: change row size calculation method (#337)
* change row size calculation Signed-off-by: dongmen <414110582@qq.com> * simplify the code Signed-off-by: dongmen <414110582@qq.com> * hack schema store Signed-off-by: dongmen <414110582@qq.com> * use memoryUsage Signed-off-by: dongmen <414110582@qq.com> * Revert "hack schema store" This reverts commit 7011002. * revert useless change Signed-off-by: dongmen <414110582@qq.com> --------- Signed-off-by: dongmen <414110582@qq.com>
1 parent 4c3751b commit 8396c53

File tree

12 files changed

+38
-30
lines changed

12 files changed

+38
-30
lines changed

downstreamadapter/sink/helper/eventrouter/partition/columns.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newColumnsPartitionGenerator(columns []string) *ColumnsPartitionGenerator {
4242
}
4343
}
4444

45-
func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error) {
45+
func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChange, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error) {
4646
r.lock.Lock()
4747
defer r.lock.Unlock()
4848
r.hasher.Reset()

downstreamadapter/sink/helper/eventrouter/partition/generator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
type PartitionGenerator interface {
2525
// GeneratePartitionIndexAndKey returns an index of partitions or a partition key for event.
2626
// Concurrency Note: This method is thread-safe.
27-
GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error)
27+
GeneratePartitionIndexAndKey(row *common.RowChange, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error)
2828
}
2929

3030
func GetPartitionGenerator(rule string, scheme string, indexName string, columns []string) PartitionGenerator {

downstreamadapter/sink/helper/eventrouter/partition/index_value.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func newIndexValuePartitionGenerator(indexName string) *IndexValuePartitionGener
3939
}
4040
}
4141

42-
func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
42+
func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChange,
4343
partitionNum int32,
4444
tableInfo *common.TableInfo,
4545
commitTs uint64,

downstreamadapter/sink/helper/eventrouter/partition/key.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func newKeyPartitionGenerator(partitionKey string) *KeyPartitionGenerator {
2525
}
2626
}
2727

28-
func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
28+
func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChange,
2929
partitionNum int32,
3030
tableInfo *common.TableInfo,
3131
commitTs uint64,

downstreamadapter/sink/helper/eventrouter/partition/table.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func newTablePartitionGenerator() *TablePartitionGenerator {
3535

3636
// GeneratePartitionIndexAndKey returns the target partition to which a row changed event should be dispatched.
3737
func (t *TablePartitionGenerator) GeneratePartitionIndexAndKey(
38-
row *common.RowDelta,
38+
row *common.RowChange,
3939
partitionNum int32,
4040
tableInfo *common.TableInfo,
4141
commitTs uint64,

downstreamadapter/sink/helper/eventrouter/partition/ts.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func newTsPartitionGenerator() *TsPartitionGenerator {
2828
}
2929

3030
func (t *TsPartitionGenerator) GeneratePartitionIndexAndKey(
31-
row *common.RowDelta,
31+
row *common.RowChange,
3232
partitionNum int32,
3333
tableInfo *common.TableInfo,
3434
commitTs uint64,

downstreamadapter/writer/mysql_writer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (w *MysqlWriter) prepareDMLs(events []*common.DMLEvent) *preparedDMLs {
227227
}
228228
// For metrics and logging.
229229
rowCount += event.Len()
230-
approximateSize += event.Rows.MemoryUsage()
230+
approximateSize += event.GetSize()
231231
if len(startTs) == 0 || startTs[len(startTs)-1] != event.StartTs {
232232
startTs = append(startTs, event.StartTs)
233233
}

downstreamadapter/writer/sql_builder.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type preparedDMLs struct {
3535
// sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)`
3636
func buildInsert(
3737
tableInfo *common.TableInfo,
38-
row common.RowDelta,
38+
row common.RowChange,
3939
safeMode bool,
4040
) (string, []interface{}) {
4141
args, err := getArgs(&row.Row, tableInfo)
@@ -64,7 +64,7 @@ func buildInsert(
6464

6565
// prepareDelete builds a parametric DELETE statement as following
6666
// sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1`
67-
func buildDelete(tableInfo *common.TableInfo, row common.RowDelta) (string, []interface{}) {
67+
func buildDelete(tableInfo *common.TableInfo, row common.RowChange) (string, []interface{}) {
6868
var builder strings.Builder
6969
quoteTable := tableInfo.TableName.QuoteString()
7070
builder.WriteString("DELETE FROM ")
@@ -94,7 +94,7 @@ func buildDelete(tableInfo *common.TableInfo, row common.RowDelta) (string, []in
9494
return sql, args
9595
}
9696

97-
func buildUpdate(tableInfo *common.TableInfo, row common.RowDelta) (string, []interface{}) {
97+
func buildUpdate(tableInfo *common.TableInfo, row common.RowChange) (string, []interface{}) {
9898
var builder strings.Builder
9999
if tableInfo.GetPreUpdateSQL() == "" {
100100
log.Panic("PreUpdateSQL should not be empty")

downstreamadapter/writer/sql_builder_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ var preInsertDataSQL = `insert into t values (
100100
'测试', "中国", "上海", "你好,世界", 0xC4E3BAC3CAC0BDE7
101101
);`
102102

103-
func getRowForTest(t testing.TB) (insert, delete, update common.RowDelta, tableInfo *common.TableInfo) {
103+
func getRowForTest(t testing.TB) (insert, delete, update common.RowChange, tableInfo *common.TableInfo) {
104104
helper := mounter.NewEventTestHelper(t)
105105
defer helper.Close()
106106

@@ -122,7 +122,7 @@ func getRowForTest(t testing.TB) (insert, delete, update common.RowDelta, tableI
122122
update.PreRow = insert.Row
123123
update.RowType = common.RowTypeUpdate
124124

125-
delete = common.RowDelta{
125+
delete = common.RowChange{
126126
PreRow: insert.Row,
127127
RowType: common.RowTypeDelete,
128128
}

logservice/schemastore/persist_storage.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,9 @@ func completePersistedDDLEvent(
693693
// TODO: is the following SchemaName and TableName correct?
694694
event.SchemaName = getSchemaName(event.SchemaID)
695695
event.TableName = event.TableInfo.Name.O
696-
case model.ActionCreateView, model.ActionCreateTables:
696+
case model.ActionCreateView,
697+
// FIXME: support create tables
698+
model.ActionCreateTables:
697699

698700
// ignore
699701
default:

pkg/common/event.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ type Event interface {
3131
GetDispatcherID() DispatcherID
3232
GetCommitTs() Ts
3333
GetStartTs() Ts
34-
GetChunkSize() int64
34+
// GetSize returns the approximate size of the event in bytes.
35+
// It's used for memory control and monitoring.
36+
GetSize() int64
3537
}
3638

3739
// FlushableEvent is an event that can be flushed to downstream by a dispatcher.
@@ -109,7 +111,8 @@ func (b *BatchResolvedEvent) Unmarshal(data []byte) error {
109111
return nil
110112
}
111113

112-
func (b *BatchResolvedEvent) GetChunkSize() int64 {
114+
// No one will use this method, just for implementing Event interface.
115+
func (b *BatchResolvedEvent) GetSize() int64 {
113116
return 0
114117
}
115118

@@ -155,7 +158,8 @@ func (e ResolvedEvent) String() string {
155158
return fmt.Sprintf("ResolvedEvent{DispatcherID: %s, ResolvedTs: %d}", e.DispatcherID, e.ResolvedTs)
156159
}
157160

158-
func (e ResolvedEvent) GetChunkSize() int64 {
161+
// No one will use this method, just for implementing Event interface.
162+
func (e ResolvedEvent) GetSize() int64 {
159163
return 0
160164
}
161165

@@ -170,9 +174,10 @@ type DMLEvent struct {
170174
Offset int `json:"offset"`
171175
len int
172176

173-
TableInfo *TableInfo `json:"table_info"`
174-
Rows *chunk.Chunk `json:"rows"`
175-
RowTypes []RowType `json:"row_types"`
177+
TableInfo *TableInfo `json:"table_info"`
178+
Rows *chunk.Chunk `json:"rows"`
179+
RowTypes []RowType `json:"row_types"`
180+
ApproximateSize int64 `json:"approximate_size"`
176181

177182
// The following fields are set and used by dispatcher.
178183
ReplicatingTs uint64 `json:"replicating_ts"`
@@ -221,6 +226,7 @@ func (t *DMLEvent) AppendRow(raw *RawKVEntry,
221226
t.RowTypes = append(t.RowTypes, RowType, RowType)
222227
}
223228
t.len += 1
229+
t.ApproximateSize += int64(len(raw.Key) + len(raw.Value) + len(raw.OldValue))
224230
return nil
225231
}
226232

@@ -250,28 +256,28 @@ func (t *DMLEvent) AddPostFlushFunc(f func()) {
250256
t.PostTxnFlushed = append(t.PostTxnFlushed, f)
251257
}
252258

253-
func (t *DMLEvent) GetNextRow() (RowDelta, bool) {
259+
func (t *DMLEvent) GetNextRow() (RowChange, bool) {
254260
if t.Offset >= len(t.RowTypes) {
255-
return RowDelta{}, false
261+
return RowChange{}, false
256262
}
257263
rowType := t.RowTypes[t.Offset]
258264
switch rowType {
259265
case RowTypeInsert:
260-
row := RowDelta{
266+
row := RowChange{
261267
Row: t.Rows.GetRow(t.Offset),
262268
RowType: rowType,
263269
}
264270
t.Offset++
265271
return row, true
266272
case RowTypeDelete:
267-
row := RowDelta{
273+
row := RowChange{
268274
PreRow: t.Rows.GetRow(t.Offset),
269275
RowType: rowType,
270276
}
271277
t.Offset++
272278
return row, true
273279
case RowTypeUpdate:
274-
row := RowDelta{
280+
row := RowChange{
275281
PreRow: t.Rows.GetRow(t.Offset),
276282
Row: t.Rows.GetRow(t.Offset + 1),
277283
RowType: rowType,
@@ -281,7 +287,7 @@ func (t *DMLEvent) GetNextRow() (RowDelta, bool) {
281287
default:
282288
log.Panic("TEvent.GetNextRow: invalid row type")
283289
}
284-
return RowDelta{}, false
290+
return RowChange{}, false
285291
}
286292

287293
// Len returns the number of row change events in the transaction.
@@ -303,11 +309,11 @@ func (t *DMLEvent) Unmarshal(data []byte) error {
303309
return nil
304310
}
305311

306-
func (t *DMLEvent) GetChunkSize() int64 {
307-
return t.Rows.MemoryUsage()
312+
func (t *DMLEvent) GetSize() int64 {
313+
return t.ApproximateSize
308314
}
309315

310-
type RowDelta struct {
316+
type RowChange struct {
311317
PreRow chunk.Row
312318
Row chunk.Row
313319
RowType RowType
@@ -454,7 +460,7 @@ func (t *DDLEvent) Unmarshal(data []byte) error {
454460
}
455461

456462
// TODO: fix it
457-
func (t *DDLEvent) GetChunkSize() int64 {
463+
func (t *DDLEvent) GetSize() int64 {
458464
return 0
459465
}
460466

pkg/common/row_change.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ type MQRowEvent struct {
264264
type RowEvent struct {
265265
TableInfo *TableInfo
266266
CommitTs uint64
267-
Event RowDelta
267+
Event RowChange
268268
ColumnSelector Selector
269269
Callback func()
270270
}

0 commit comments

Comments
 (0)