102 lines
3.3 KiB
Diff
102 lines
3.3 KiB
Diff
From 9676d5f2a0ecedc4fbfc6c5becd5f624d10ec140 Mon Sep 17 00:00:00 2001
|
|
From: Free Ekanayaka <free.ekanayaka@canonical.com>
|
|
Date: Mon, 16 Apr 2018 12:34:50 +0000
|
|
Subject: Add new cluster.Rebalance function to check if we need to add
|
|
database nodes
|
|
|
|
Signed-off-by: Free Ekanayaka <free.ekanayaka@canonical.com>
|
|
---
|
|
lxd/cluster/membership.go | 79 +++++++++++++++++++++++++++++++++++++++
|
|
1 file changed, 79 insertions(+)
|
|
|
|
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
|
|
index b21c87d0..8bea809a 100644
|
|
--- a/lxd/cluster/membership.go
|
|
+++ b/lxd/cluster/membership.go
|
|
@@ -432,6 +432,85 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
|
|
return nil
|
|
}
|
|
|
|
+// Rebalance the raft cluster, trying to see if we have a spare online node
|
|
+// that we can promote to database node if we are below membershipMaxRaftNodes.
|
|
+//
|
|
+// If there's such spare node, return its address as well as the new list of
|
|
+// raft nodes.
|
|
+func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
|
|
+ // First get the current raft members, since this method should be
|
|
+ // called after a node has left.
|
|
+ currentRaftNodes, err := gateway.currentRaftNodes()
|
|
+ if err != nil {
|
|
+ return "", nil, errors.Wrap(err, "failed to get current raft nodes")
|
|
+ }
|
|
+ if len(currentRaftNodes) >= membershipMaxRaftNodes {
|
|
+ // We're already at full capacity.
|
|
+ return "", nil, nil
|
|
+ }
|
|
+
|
|
+ currentRaftAddresses := make([]string, len(currentRaftNodes))
|
|
+ for i, node := range currentRaftNodes {
|
|
+ currentRaftAddresses[i] = node.Address
|
|
+ }
|
|
+
|
|
+ // Check if we have a spare node that we can turn into a database one.
|
|
+ address := ""
|
|
+ err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
|
|
+ config, err := ConfigLoad(tx)
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "failed load cluster configuration")
|
|
+ }
|
|
+ nodes, err := tx.Nodes()
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "failed to get cluster nodes")
|
|
+ }
|
|
+ // Find a node that is not part of the raft cluster yet.
|
|
+ for _, node := range nodes {
|
|
+ if shared.StringInSlice(node.Address, currentRaftAddresses) {
|
|
+ continue // This is already a database node
|
|
+ }
|
|
+ if node.IsOffline(config.OfflineThreshold()) {
|
|
+ continue // This node is offline
|
|
+ }
|
|
+ logger.Debugf(
|
|
+ "Found spare node %s (%s) to be promoted as database node", node.Name, node.Address)
|
|
+ address = node.Address
|
|
+ break
|
|
+ }
|
|
+
|
|
+ return nil
|
|
+ })
|
|
+ if err != nil {
|
|
+ return "", nil, err
|
|
+ }
|
|
+
|
|
+ if address == "" {
|
|
+ // No node to promote
|
|
+ return "", nil, nil
|
|
+ }
|
|
+
|
|
+ // Update the local raft_table adding the new member and building a new
|
|
+ // list.
|
|
+ updatedRaftNodes := currentRaftNodes
|
|
+ err = gateway.db.Transaction(func(tx *db.NodeTx) error {
|
|
+ id, err := tx.RaftNodeAdd(address)
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "failed to add new raft node")
|
|
+ }
|
|
+ updatedRaftNodes = append(updatedRaftNodes, db.RaftNode{ID: id, Address: address})
|
|
+ err = tx.RaftNodesReplace(updatedRaftNodes)
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "failed to update raft nodes")
|
|
+ }
|
|
+ return nil
|
|
+ })
|
|
+ if err != nil {
|
|
+ return "", nil, err
|
|
+ }
|
|
+ return address, updatedRaftNodes, nil
|
|
+}
|
|
+
|
|
// Promote makes a LXD node which is not a database node, become part of the
|
|
// raft cluster.
|
|
func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
|