Skip to content

Commit

Permalink
Optimize DefaultTpsLimiter (#3654)
Browse files Browse the repository at this point in the history
  • Loading branch information
LiZhenNet authored and ralf0131 committed Mar 19, 2019
1 parent 32d59f7 commit 660624c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@
*/
public class DefaultTPSLimiter implements TPSLimiter {

private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();

@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
} else {
//rate or interval has changed, rebuild
if (statItem.getRate() != rate || statItem.getInterval() != interval) {
stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
}
return statItem.isAllowable();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.dubbo.rpc.filter.tps;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

/**
* Judge whether a particular invocation of service provider method should be allowed within a configured time interval.
Expand All @@ -30,7 +30,7 @@ class StatItem {

private long interval;

private AtomicInteger token;
private LongAdder token;

private int rate;

Expand All @@ -39,32 +39,39 @@ class StatItem {
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
this.token = buildLongAdder(rate);
}

public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
token = buildLongAdder(rate);
lastResetTime = now;
}

int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
if (token.sum() < 0) {
return false;
}
token.decrement();
return true;
}

public long getInterval() {
return interval;
}


return flag;
public int getRate() {
return rate;
}


long getLastResetTime() {
return lastResetTime;
}

int getToken() {
return token.get();
long getToken() {
return token.sum();
}

@Override
Expand All @@ -76,4 +83,10 @@ public String toString() {
.toString();
}

private LongAdder buildLongAdder(int rate) {
LongAdder adder = new LongAdder();
adder.add(rate);
return adder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.filter.tps;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.support.MockInvocation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* @author: lizhen
* @since: 2019-03-19
* @description:
*/
public class DefaultTPSLimiterTest {

private DefaultTPSLimiter defaultTPSLimiter = new DefaultTPSLimiter();

@Test
public void testIsAllowable() throws Exception {
Invocation invocation = new MockInvocation();
URL url = URL.valueOf("test://test");
url = url.addParameter(Constants.INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 2);
url = url.addParameter(Constants.TPS_LIMIT_INTERVAL_KEY, 1000);
for (int i = 0; i < 3; i++) {
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
}
}

@Test
public void testIsNotAllowable() throws Exception {
Invocation invocation = new MockInvocation();
URL url = URL.valueOf("test://test");
url = url.addParameter(Constants.INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 2);
url = url.addParameter(Constants.TPS_LIMIT_INTERVAL_KEY, 1000);
for (int i = 0; i < 4; i++) {
if (i == 3) {
Assertions.assertFalse(defaultTPSLimiter.isAllowable(url, invocation));
} else {
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
}
}
}


@Test
public void testConfigChange() throws Exception {
Invocation invocation = new MockInvocation();
URL url = URL.valueOf("test://test");
url = url.addParameter(Constants.INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 2);
url = url.addParameter(Constants.TPS_LIMIT_INTERVAL_KEY, 1000);
for (int i = 0; i < 3; i++) {
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
}
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 2000);
for (int i = 0; i < 3; i++) {
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.filter;
package org.apache.dubbo.rpc.filter.tps;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.filter.TpsLimitFilter;
import org.apache.dubbo.rpc.support.MockInvocation;
import org.apache.dubbo.rpc.support.MyInvoker;

Expand Down

0 comments on commit 660624c

Please sign in to comment.