mcst-linux-kernel/patches-2024.06.26/lxd-3.0.0/0048-Notify-the-cluster-lea...

269 lines
8.4 KiB
Diff

From 59e1978b996a21cb2a24488c82ee9c87ce380a22 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka@canonical.com>
Date: Mon, 16 Apr 2018 12:35:12 +0000
Subject: Notify the cluster leader after a node removal, so it can rebalance
Signed-off-by: Free Ekanayaka <free.ekanayaka@canonical.com>
---
lxd/api_cluster.go | 125 +++++++++++++++++++++++++++++++++++++++-
lxd/api_cluster_test.go | 58 +++++++++++++++++++
lxd/api_internal.go | 2 +
3 files changed, 183 insertions(+), 2 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 8fb21efe..0fd89ed3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -438,10 +438,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
force = 0
}
+ name := mux.Vars(r)["name"]
+ logger.Debugf("Delete node %s from cluster (force=%d)", name, force)
+
// First check that the node is clear from containers and images and
// make it leave the database cluster, if it's part of it.
- name := mux.Vars(r)["name"]
- address, _, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
+ address, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
if err != nil {
return SmartError(err)
}
@@ -483,6 +485,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
if err != nil {
return SmartError(errors.Wrap(err, "failed to remove node from database"))
}
+ // Try to notify the leader.
+ err = tryClusterRebalance(d)
+ if err != nil {
+ // This is not a fatal error, so let's just log it.
+ logger.Errorf("Failed to rebalance cluster: %v", err)
+ }
if force != 1 {
// Try to gracefully reset the database on the node.
@@ -502,6 +510,26 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
return EmptySyncResponse
}
+// This function is used to notify the leader that a node was removed, it will
+// decide whether to promote a new node as database node.
+func tryClusterRebalance(d *Daemon) error {
+ leader, err := d.gateway.LeaderAddress()
+ if err != nil {
+ // This is not a fatal error, so let's just log it.
+ return errors.Wrap(err, "failed to get current leader node")
+ }
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(leader, cert, true)
+ if err != nil {
+ return errors.Wrap(err, "failed to connect to leader node")
+ }
+ _, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "")
+ if err != nil {
+ return errors.Wrap(err, "request to rebalance cluster failed")
+ }
+ return nil
+}
+
var internalClusterAcceptCmd = Command{name: "cluster/accept", post: internalClusterPostAccept}
func internalClusterPostAccept(d *Daemon, r *http.Request) Response {
@@ -586,6 +614,99 @@ type internalRaftNode struct {
Address string `json:"address" yaml:"address"`
}
+var internalClusterRebalanceCmd = Command{name: "cluster/rebalance", post: internalClusterPostRebalance}
+
+// Used to update the cluster after a database node has been removed, and
+// possibly promote another one as database node.
+func internalClusterPostRebalance(d *Daemon, r *http.Request) Response {
+ // Redirect all requests to the leader, which is the one with with
+ // up-to-date knowledge of what nodes are part of the raft cluster.
+ localAddress, err := node.HTTPSAddress(d.db)
+ if err != nil {
+ return SmartError(err)
+ }
+ leader, err := d.gateway.LeaderAddress()
+ if err != nil {
+ return InternalError(err)
+ }
+ if localAddress != leader {
+ logger.Debugf("Redirect cluster rebalance request to %s", leader)
+ url := &url.URL{
+ Scheme: "https",
+ Path: "/internal/cluster/rebalance",
+ Host: leader,
+ }
+ return SyncResponseRedirect(url.String())
+ }
+
+ logger.Debugf("Rebalance cluster")
+
+ // Check if we have a spare node to promote.
+ address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
+ if err != nil {
+ return SmartError(err)
+ }
+ if address == "" {
+ return SyncResponse(true, nil) // Nothing to change
+ }
+
+ // Tell the node to promote itself.
+ post := &internalClusterPostPromoteRequest{}
+ for _, node := range nodes {
+ post.RaftNodes = append(post.RaftNodes, internalRaftNode{
+ ID: node.ID,
+ Address: node.Address,
+ })
+ }
+
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(address, cert, false)
+ if err != nil {
+ return SmartError(err)
+ }
+ _, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "")
+ if err != nil {
+ return SmartError(err)
+ }
+
+ return SyncResponse(true, nil)
+}
+
+var internalClusterPromoteCmd = Command{name: "cluster/promote", post: internalClusterPostPromote}
+
+// Used to promote the local non-database node to be a database one.
+func internalClusterPostPromote(d *Daemon, r *http.Request) Response {
+ req := internalClusterPostPromoteRequest{}
+
+ // Parse the request
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ return BadRequest(err)
+ }
+
+ // Sanity checks
+ if len(req.RaftNodes) == 0 {
+ return BadRequest(fmt.Errorf("No raft nodes provided"))
+ }
+
+ nodes := make([]db.RaftNode, len(req.RaftNodes))
+ for i, node := range req.RaftNodes {
+ nodes[i].ID = node.ID
+ nodes[i].Address = node.Address
+ }
+ err = cluster.Promote(d.State(), d.gateway, nodes)
+ if err != nil {
+ return SmartError(err)
+ }
+
+ return SyncResponse(true, nil)
+}
+
+// A request for the /internal/cluster/promote endpoint.
+type internalClusterPostPromoteRequest struct {
+ RaftNodes []internalRaftNode `json:"raft_nodes" yaml:"raft_nodes"`
+}
+
func clusterCheckStoragePoolsMatch(cluster *db.Cluster, reqPools []api.StoragePool) error {
poolNames, err := cluster.StoragePoolsNotPending()
if err != nil && err != db.ErrNoSuchObject {
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
index 569ffb52..b3c64735 100644
--- a/lxd/api_cluster_test.go
+++ b/lxd/api_cluster_test.go
@@ -294,6 +294,42 @@ func TestCluster_LeaveForce(t *testing.T) {
assert.Equal(t, []string{}, images)
}
+// If a spare non-database node is available after a nodes leaves, it gets
+// promoted as database node.
+func TestCluster_LeaveAndPromote(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping cluster promote test in short mode.")
+ }
+ daemons, cleanup := newDaemons(t, 4)
+ defer cleanup()
+
+ f := clusterFixture{t: t}
+ f.FormCluster(daemons)
+
+ // The first three nodes are database nodes, the fourth is not.
+ client := f.ClientUnix(f.Leader())
+ nodes, err := client.GetClusterMembers()
+ require.NoError(t, err)
+ assert.Len(t, nodes, 4)
+ assert.True(t, nodes[0].Database)
+ assert.True(t, nodes[1].Database)
+ assert.True(t, nodes[2].Database)
+ assert.False(t, nodes[3].Database)
+
+ client = f.ClientUnix(daemons[1])
+ err = client.DeleteClusterMember("rusp-0", false)
+ require.NoError(t, err)
+
+ // Only three nodes are left, and they are all database nodes.
+ client = f.ClientUnix(f.Leader())
+ nodes, err = client.GetClusterMembers()
+ require.NoError(t, err)
+ assert.Len(t, nodes, 3)
+ assert.True(t, nodes[0].Database)
+ assert.True(t, nodes[1].Database)
+ assert.True(t, nodes[2].Database)
+}
+
// A LXD node can be renamed.
func TestCluster_NodeRename(t *testing.T) {
daemon, cleanup := newDaemon(t)
@@ -323,10 +359,12 @@ func TestCluster_NodeRename(t *testing.T) {
type clusterFixture struct {
t *testing.T
clients map[*Daemon]lxd.ContainerServer
+ daemons []*Daemon
}
// Form a cluster using the given daemons. The first daemon will be the leader.
func (f *clusterFixture) FormCluster(daemons []*Daemon) {
+ f.daemons = daemons
for i, daemon := range daemons {
password := ""
if i == 0 {
@@ -416,3 +454,23 @@ func (f *clusterFixture) ClientUnix(daemon *Daemon) lxd.ContainerServer {
}
return client
}
+
+// Return the daemon which is currently the leader
+func (f *clusterFixture) Leader() *Daemon {
+ // Retry a few times since an election might still be happening
+ for i := 0; i < 5; i++ {
+ for _, daemon := range f.daemons {
+ address := daemon.endpoints.NetworkAddress()
+ leader, err := daemon.gateway.LeaderAddress()
+ if err != nil {
+ f.t.Fatal("failed to get leader address", err)
+ }
+ if address == leader {
+ return daemon
+ }
+ }
+ time.Sleep(time.Second)
+ }
+ f.t.Fatal("failed to get leader address")
+ return nil
+}
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index 5f234537..6f044324 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -30,6 +30,8 @@ var apiInternal = []Command{
internalContainersCmd,
internalSQLCmd,
internalClusterAcceptCmd,
+ internalClusterRebalanceCmd,
+ internalClusterPromoteCmd,
internalClusterContainerMovedCmd,
}