Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wyxxxcat committed Jan 14, 2025
1 parent 4624dea commit 126ad13
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.binlog;

public class BinlogLagInfo {
private long lag;
private long firstCommitSeq;
private long lastCommitSeq;
private long firstCommitTs;
private long lastCommitTs;

public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs) {
this.lag = lag;
this.firstCommitSeq = firstCommitSeq;
this.lastCommitSeq = lastCommitSeq;
this.firstCommitTs = firstCommitTs;
this.lastCommitTs = lastCommitTs;
}

public BinlogLagInfo(long lag, long lastCommitSeq, long lastCommitTs) {
this.lag = lag;
this.lastCommitSeq = lastCommitSeq;
this.lastCommitTs = lastCommitTs;
}

public BinlogLagInfo() {
lag = 0;
firstCommitSeq = 0;
lastCommitSeq = 0;
firstCommitTs = 0;
lastCommitTs = 0;
}

public long getLag() {
return lag;
}

public long getFirstCommitSeq() {
return firstCommitSeq;
}

public long getLastCommitSeq() {
return lastCommitSeq;
}

public long getFirstCommitTs() {
return firstCommitTs;
}

public long getLastCommitTs() {
return lastCommitTs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommit
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommitSeq) {
public Pair<TStatus, BinlogLagInfo> getBinlogLag(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
Expand All @@ -502,7 +502,6 @@ public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommit
LOG.warn("dbBinlog not found. dbId: {}", dbId);
return Pair.of(status, null);
}

return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
} finally {
lock.readLock().unlock();
Expand Down
22 changes: 15 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,33 @@ public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long pr
}
}

public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
public static Pair<TStatus, BinlogLagInfo> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();

if (firstBinlog.getCommitSeq() > prevCommitSeq) {
return Pair.of(status, Long.valueOf(binlogs.size()));
BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(), firstBinlog.getCommitSeq(),
firstBinlog.getTimestamp());
return Pair.of(status, lagInfo);
}

// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
guard.setCommitSeq(prevCommitSeq);
TBinlog binlog = binlogs.higher(guard);
TBinlog firstTBinlog = binlogs.higher(guard);
TBinlog lastBinlog = binlogs.last();

// all prevCommitSeq <= commitSeq
if (binlog == null) {
return Pair.of(status, 0L);
} else {
return Pair.of(status, Long.valueOf(binlogs.tailSet(binlog).size()));
long lag = 0;
long lastCommitSeq = 0;
long lastCommitTs = 0;
if (lastBinlog != null) {
lag = binlogs.tailSet(firstTBinlog).size();
lastCommitSeq = lastBinlog.getCommitSeq();
lastCommitTs = lastBinlog.getTimestamp();
}
return Pair.of(status, new BinlogLagInfo(lag, firstTBinlog.getCommitSeq(), lastCommitSeq,
firstBinlog.getTimestamp(), lastCommitTs));
}

public static TBinlog newDummyBinlog(long dbId, long tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public List<Long> getDroppedIndexes() {
}
}

public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
public Pair<TStatus, BinlogLagInfo> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
Expand Down Expand Up @@ -723,4 +723,5 @@ private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long comm
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
}
}

public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
public Pair<TStatus, BinlogLagInfo> getBinlogLag(long prevCommitSeq) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
Expand Down Expand Up @@ -353,4 +353,5 @@ public void getBinlogInfo(Database db, BaseProcResult result) {
lock.readLock().unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -3291,16 +3292,19 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();

Pair<TStatus, Long> statusLagPair = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
TStatus status = statusLagPair.first;
Pair<TStatus, BinlogLagInfo> binlogLagInfo = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
TStatus status = binlogLagInfo.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
}
Long binlogLag = statusLagPair.second;
if (binlogLag != null) {
result.setLag(binlogLag);
BinlogLagInfo lagInfo = binlogLagInfo.second;
if (lagInfo != null) {
result.setLag(lagInfo.getLag());
result.setFirstCommitSeq(lagInfo.getFirstCommitSeq());
result.setLastCommitSeq(lagInfo.getLastCommitSeq());
result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs());
result.setLastBinlogTimestamp(lagInfo.getLastCommitTs());
}

return result;
}

Expand Down
4 changes: 4 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,10 @@ struct TGetBinlogLagResult {
1: optional Status.TStatus status
2: optional i64 lag
3: optional Types.TNetworkAddress master_address
4: optional i64 first_commit_seq
5: optional i64 last_commit_seq
6: optional i64 first_binlog_timestamp
7: optional i64 last_binlog_timestamp
}

struct TUpdateFollowerStatsCacheRequest {
Expand Down

0 comments on commit 126ad13

Please sign in to comment.