-
Notifications
You must be signed in to change notification settings - Fork 228
Customized View Aggregation
Helix supports different state models, e.g. "MASTER/SLAVE", "ONLINE/OFFLINE", etc. But each Helix cluster can only be associated with a single state model. All the resources on that cluster will use the same state model specified for that cluster. At any time, each replica of a partition of a resource can only be in one of the states defined in the state model. To support a better view of the states, Helix provides CURRENT_STATE at participant level, and EXTERNAL_VIEW at cluster level. CURRENT_STATE is a per resource aggregation of states for each participant, denoting what is the current state for a particular resource across all partitions on a certain participant. CURRENT_STATE, although already aggregated per resource, is still inconvenient for Helix customers to get an overview of all partition states. Helix provides a further aggregated view called EXTERNAL_VIEW. EXTERNAL_VIEW provides the aggregation of CURRENT_STATE across all Helix participants, still per resource, though. The possible states in EXTERNAL_VIEW are exactly the same as those in CURRENT_STATE.
However, in many scenarios, a Helix partition can not be fully described by a single state. There are increasing requests from our customers to have a mechanism to store their own defined customized states for different purposes. And in many cases, customers would also like an aggregated view of these states instead of raw inputs for easier consumption. Helix, as a centralized control engine for customer's clusters, would be a good fit to store the per partition customized states, aggregate them, and provide the results to customers.
Helix aims to provide a generic solution for all the customers to support customized state. Helix will work as a tunnel for customers to provide inputs, and generate an aggregated outputs based on the inputs, both in a defined format. Then customers then can consume these aggregated views according to their needs. This section will provider a high level overview of the solution and compare Helix based solution and customer application based solution in different aspects. The implementation details is described in the following section.
Helix will provide all customers the ability to define their own states for each partition, which are independent of Helix states. As these states are not in Helix state model, they will not participate in any Helix state transition either. The only requirement from Helix is that the customized state should be a per partition state, meaning each replica of a partition of a resource needs a corresponding state. This is to ensure the aggregation logic works properly on the customized states to generate an aggregated view. The state itself can be any arbitrary string. If a customer would like to use a json node instead of a simple string as the state, e.g., to support a state history record, it will need to encode the json file as a string, and decode it when using it.
The above figure shows the whole pipeline of Helix customized state view aggregation.
To use Helix customized state and aggregated view, customers should firstly call a Helix REST API or directly call a Helix java API to set a cluster level config, called aggregation config. If the customer does not config this field, they can still use Helix to record their customized states, but Helix will by default skip the aggregation process, as the aggregation will take a fair amount computing and storage resources. Only when customers specifically notify Helix that they want the aggregation by adding the state name in the aggregation config, Helix will do the aggregation and output the results to Zookeeper.
Then customers will be responsible for updating customized states in their application code. Helix provides a singleton factory, and customers can instantiate it when it needs to update customized state. After instantiation, customers will call the function in the factory with user defined parameters to build a customized state provider object, which is also a singleton. Helix has a detailed implementation for customize state provider which mainly store customized states to corresponding Zookeeper path in an efficient way. Customers only need to call the "update" function in customized state provider to update customized state to Zookeeper whenever they want.
After customers output their raw data to Zookeeper, Helix will listen to the changes of those states that need to be aggregated according to the aggregation config. Once there is any change, Helix will pass the raw data to aggregation service for aggregating them. Please note that Helix will only provide an aggregation function instead of any particular application specific logic, like merging states, filtering states, doing calculation, etc. This is to ensure the generality of the feature. Customers may perform their own logic before or after the state aggregation.
Finally, Helix will write the aggregated data back to Zookeeper, and the format is similar to existing EXTERNAL_VIEW, which is aggregated per resource. Helix will have a snapshot service which provides output for customers to consume. Currently, routing table provider can be leveraged to realize snapshot function.
There are a couple of scenarios that should be noted.
When a customer would like to drop a certain instance by calling Helix delete instance API, Helix will delete the instance as well as all subpaths under it with recursive deletion. Therefore, the CUSTOMIZED_STATE will also be deleted, and CUSTOMIZED_VIEW will be updated with the instance gone.
When a customer would like to drop a certain resource by calling Helix delete resource API, customer will be responsible for deleting CUSTOMIZED_STATE of all partitions for that resource across all instances.
When Helix rebalance happens, and a certain partition on a certain instance will be moved to another instance, customers will handle the cleanup in the callback function currently provided by Helix in the state transition logic.
When an unexpected disconnection happened in client side from Zookeeper, but does not trigger rebalance, Helix will still keep the CUSTOMIZED_STATE as it is and wait for the connection to be reset.
The data model defines the data contract between Helix and customers. We allow customers to update customized states at per partition granularity. While for data organization, we will organize the customized state the same way as CURRENT_STATE, meaning that each znode will aggregate all partitions for the same resource on the same instance. The difference is that CURRENT_STATE data is updated by Helix, but the new znode data is updated by customer. The most important advantage of this option is that the data size stored in each znode will be comparable to CURRENT_STATE data. Therefore we will for sure avoid the explosion of znode data. Another advantage is that during aggregation, we would be able to leverage the way how external view is calculated for customized way aggregation. The format of the customized state would be like the following.
We also need to determine where to put the customer states input and the aggregated view output. After investigation on a few different options, we made the following decision. We decide to put customized states at the same level with CURRENT_STATE under each instance. We will have a new znode called "CUSTOMIZED_STATE". Under this znode, each different customized state will be an individual child node with the name of that state. We choose not to expose the customized data at the instance level because too many customized data under instance makes it very difficult to find Helix related data, which is generally more fundamental and important for debugging. We decide to put the output aggregated data at the same level with EXTERNAL_VIEW. This is straightforward and no other alternative. Helix provides EXTERNAL_VIEW as a znode. Under this znode, there are a list of child znodes, and each child znode represents a resource. The data of a child znode contains an aggregation of current states across all instances for the partitions in a resource. The customized view znode will be adjacent to External view znode, and under this znode, each individual aggregated customized view will be a child znode. Under each aggregated customized view, each resource will be a child node. In each resource znode, there is a map view for each partition and their corresponding states in each instance
To get the customized state from customers, we may choose push model or pull model. Push model means customers provide Helix status update immediately when they need. Pull model means Helix call customers periodically to get the state updates. For example., in current health report implementation, Helix periodically query clients for health status update. Push model and pull model both have pros and cons and suit for different use cases.
Push model makes sure anything changed in clients' side will be updated to Helix on time. It provides more accurate information for Helix to make decision. But it will cause more traffic to ZK and is more suitable for latency sensitive case. Pull model ensures Helix gets latest data from clients when it needs and also reduce the traffic to ZK. However, due to the periodical query, some clients' updates may be missed. For latency insensitive use case, we may choose pull model to save resources.
Based on the above comparison, depending on the nature of the customized state, we may choose different model.
As mentioned before, not all customized states need aggregation, and we should not listen to them if they do not need aggregation in order to save resources. To turn on or turn off aggregation for a certain customized state is simply to update a Helix cluster level config. To avoid the interference with current cluster config, and also to enable the later separation of view aggregating service, we will put the aggregation config as a new config under config path of a cluster. The following sample code shows how the aggregation config will be defined.
Currently, Helix generic controller may serve the purpose of aggregating customized states, as it is already aggregating Helix CURRENT_STATE. The computing logic would be similar to the existing one which aggregates CURRENT_STATE to EXTERNAL_VIEW. In the future, we may move out the aggregation logic to be in a separate controller and make Helix controllers also partitioned. We can also deploy the view aggregation service as a separate service so that we can achieve better controller performance, and satisfy the scalability requirement when we expand the system to have more nodes.
To compute the aggregated view from customized states, extra logic is needed in Helix generic controller. First, we need to add a new ClusterEventType called CustomizedStateChange. Depending on the aggregation config, controller knows which paths it needs to listen to, and any change will trigger the CustomizedStateChange event. To handle this event, we need several new stages, e.g, customizedDataRefresh stage, and aggregatedCustomizedViewPipeline stage. The first stage is a synchronous stage, while the second one is an asynchronous stage. Then in the controller, we need to add a new pipeline: CustomizedAggregationPipeline. In this pipeline, only the two new stages will be executed. Adding a new pipeline will avoid adding the new stage to existing stages, which is unnecessary and increase cycles for other pipelines.
For Helix snapshot service, we find the routing table provider in Helix is a good candidate to implement the customized aggregated view output function. Currently, Helix Spectator/metadata repository listens to the External View changes and gets notified if there are any changes in the External View, and updates itself. We will implement similar functions for customized view. Routing table provider will need to listen to the change of the customized view, and update its local cached data accordingly. Customers will consume the data output from routing table in their application code. Please note that for different routers, Helix will ensure eventually consistent results delivered because different routing table providers may function at different speeds. To keep the aggregated customized view generic enough, Helix will only perform the aggregation logic instead of any other merging/filter functions for the client. If the client need special handling for their aggregated customized data output, it will need to embed the logic in their application code either before or after the aggregation happens.
Pull Request Description Template
ZooKeeper API module for Apache Helix
DataAccessor for Assignment Metadata
Concurrency and Parallelism for BucketDataAccessor
WAGED Rebalance Pipeline Redesign
WAGED rebalancer Hard Constraint Scope Expansion
IdealState Dependency Removal Progression Remove requested state in Task Framework