Skip to content

Commit

Permalink
Remove redundant default broker configurations
Browse files Browse the repository at this point in the history
- Remove config based routing configs because the config based
  routing was removed long time ago
- Remove HelixBrokerStarter.getZkAddressForBroker() because it
  does not apply to the current implementation
- Replace config key strings with constants
- Change timeout for default broker and broker in integration
  test to 60s
- Replace PropertiesConfiguration with light weight
  BaseConfiguration if not read from config file
  • Loading branch information
Jackie-Jiang committed Apr 16, 2019
1 parent 88081bd commit 13890c3
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 271 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
Expand All @@ -51,7 +51,6 @@
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,14 +61,15 @@
*
*/
public class HelixBrokerStarter {
private static final String PROPERTY_STORE = "PROPERTYSTORE";
private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";

// Spectator Helix manager handles the custom change listeners, properties read/write
private final HelixManager _spectatorHelixManager;
// Participant Helix manager handles Helix functionality such as state transitions and messages
private final HelixManager _participantHelixManager;

private final Configuration _pinotHelixProperties;
private final Configuration _brokerConf;
private final HelixAdmin _helixAdmin;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final HelixDataAccessor _helixDataAccessor;
Expand All @@ -84,31 +84,27 @@ public class HelixBrokerStarter {
// Set after broker is started, which is actually in the constructor.
private AccessControlFactory _accessControlFactory;

private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);

private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";

public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration pinotHelixProperties)
public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration brokerConf)
throws Exception {
this(null, helixClusterName, zkServer, pinotHelixProperties);
this(null, helixClusterName, zkServer, brokerConf);
}

public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer,
Configuration pinotHelixProperties)
public HelixBrokerStarter(@Nullable String brokerHost, String helixClusterName, String zkServer,
Configuration brokerConf)
throws Exception {
LOGGER.info("Starting Pinot broker");

_pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
_brokerConf = brokerConf;

if (brokerHost == null) {
brokerHost = NetUtil.getHostAddress();
}

final String brokerId = _pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _pinotHelixProperties
String brokerId = _brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _brokerConf
.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));

_pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId);
_brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId);
setupHelixSystemProperties();

// Remove all white-spaces from the list of zkServers (if any).
Expand All @@ -123,10 +119,10 @@ public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkS
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
_helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
_tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
_liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
_brokerServerBuilder = startBroker(_pinotHelixProperties);
_brokerServerBuilder = startBroker(_brokerConf);
_metricsRegistry = _brokerServerBuilder.getMetricsRegistry();

// Initialize cluster change mediator
Expand Down Expand Up @@ -160,16 +156,16 @@ public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkS
stateMachineEngine
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory);
_participantHelixManager.connect();
_tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
_pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
_tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, _brokerConf
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), _tbiMessageHandler);

addInstanceTagIfNeeded(helixClusterName, brokerId);

// Register the service status handler
double minResourcePercentForStartup = _pinotHelixProperties
double minResourcePercentForStartup = _brokerConf
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
Expand All @@ -189,7 +185,7 @@ private void setupHelixSystemProperties() {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _pinotHelixProperties
System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
Expand All @@ -209,9 +205,6 @@ private void addInstanceTagIfNeeded(String clusterName, String instanceName) {
}

private BrokerServerBuilder startBroker(Configuration config) {
if (config == null) {
config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
}
BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
_helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
_accessControlFactory = brokerServerBuilder.getAccessControlFactory();
Expand Down Expand Up @@ -278,32 +271,6 @@ public AccessControlFactory getAccessControlFactory() {
return _accessControlFactory;
}

/**
* The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which applies
* the /helixClusterName/PROPERTY_STORE after chroot to all servers.
* Expected output for this method is:
* 127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE
*
* @param zkServers
* @param helixClusterName
* @return the full property store path
*
* @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int, org.apache.zookeeper.Watcher)
*/
public static String getZkAddressForBroker(String zkServers, String helixClusterName) {
List tokens = new ArrayList<String>();
String[] zkSplit = zkServers.split("/", 2);
String zkHosts = zkSplit[0];
String zkPathSuffix = StringUtil.join("/", helixClusterName, PROPERTY_STORE);
if (zkSplit.length > 1) {
zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix;
}
for (String token : zkHosts.split(",")) {
tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"), zkPathSuffix));
}
return StringUtils.join(tokens, ",");
}

public HelixManager getSpectatorHelixManager() {
return _spectatorHelixManager;
}
Expand All @@ -318,14 +285,11 @@ public BrokerServerBuilder getBrokerServerBuilder() {

public static HelixBrokerStarter startDefault()
throws Exception {
Configuration configuration = new PropertiesConfiguration();
Configuration brokerConf = new BaseConfiguration();
int port = 5001;
configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L);

final HelixBrokerStarter pinotHelixBrokerStarter =
new HelixBrokerStarter(null, "quickstart", "localhost:2122", configuration);
return pinotHelixBrokerStarter;
brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
return new HelixBrokerStarter(null, "quickstart", "localhost:2122", brokerConf);
}

public void shutdown() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.broker.routing.TimeBoundaryService;
Expand All @@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
private static final String RAW_DINING_TABLE_NAME = "dining";
private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf();

private final Configuration _brokerConf = new BaseConfiguration();

private ZkClient _zkClient;
private HelixBrokerStarter _helixBrokerStarter;
Expand All @@ -71,11 +72,9 @@ public void setUp()

startController();

_pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
_pinotHelixBrokerProperties
.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
_helixBrokerStarter =
new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
_brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
_brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
_helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _brokerConf);

ControllerRequestBuilderUtil
.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
Expand Down Expand Up @@ -139,8 +138,10 @@ public void testResourceAndTagAssignment()
throws Exception {
IdealState idealState;

Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
6);
idealState =
_helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);

ExternalView externalView =
Expand Down Expand Up @@ -173,8 +174,10 @@ public Boolean call()
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
_helixResourceManager.addTable(tableConfig);

Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
6);
idealState =
_helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);

Expand All @@ -183,8 +186,9 @@ public Boolean call()
@Override
public Boolean call()
throws Exception {
return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
return
_helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
}
}, 30000L);

Expand Down Expand Up @@ -273,8 +277,7 @@ public void testTimeBoundaryUpdate()
TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting().
getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue());
}, 5 * _pinotHelixBrokerProperties
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
}, 5 * _brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue()));
Expand Down
Loading

0 comments on commit 13890c3

Please sign in to comment.