269 lines
8.4 KiB
Diff
269 lines
8.4 KiB
Diff
From 59e1978b996a21cb2a24488c82ee9c87ce380a22 Mon Sep 17 00:00:00 2001
|
|
From: Free Ekanayaka <free.ekanayaka@canonical.com>
|
|
Date: Mon, 16 Apr 2018 12:35:12 +0000
|
|
Subject: Notify the cluster leader after a node removal, so it can rebalance
|
|
|
|
Signed-off-by: Free Ekanayaka <free.ekanayaka@canonical.com>
|
|
---
|
|
lxd/api_cluster.go | 125 +++++++++++++++++++++++++++++++++++++++-
|
|
lxd/api_cluster_test.go | 58 +++++++++++++++++++
|
|
lxd/api_internal.go | 2 +
|
|
3 files changed, 183 insertions(+), 2 deletions(-)
|
|
|
|
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
|
|
index 8fb21efe..0fd89ed3 100644
|
|
--- a/lxd/api_cluster.go
|
|
+++ b/lxd/api_cluster.go
|
|
@@ -438,10 +438,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
|
|
force = 0
|
|
}
|
|
|
|
+ name := mux.Vars(r)["name"]
|
|
+ logger.Debugf("Delete node %s from cluster (force=%d)", name, force)
|
|
+
|
|
// First check that the node is clear from containers and images and
|
|
// make it leave the database cluster, if it's part of it.
|
|
- name := mux.Vars(r)["name"]
|
|
- address, _, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
|
|
+ address, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
|
|
if err != nil {
|
|
return SmartError(err)
|
|
}
|
|
@@ -483,6 +485,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
|
|
if err != nil {
|
|
return SmartError(errors.Wrap(err, "failed to remove node from database"))
|
|
}
|
|
+ // Try to notify the leader.
|
|
+ err = tryClusterRebalance(d)
|
|
+ if err != nil {
|
|
+ // This is not a fatal error, so let's just log it.
|
|
+ logger.Errorf("Failed to rebalance cluster: %v", err)
|
|
+ }
|
|
|
|
if force != 1 {
|
|
// Try to gracefully reset the database on the node.
|
|
@@ -502,6 +510,26 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
|
|
return EmptySyncResponse
|
|
}
|
|
|
|
+// This function is used to notify the leader that a node was removed, it will
|
|
+// decide whether to promote a new node as database node.
|
|
+func tryClusterRebalance(d *Daemon) error {
|
|
+ leader, err := d.gateway.LeaderAddress()
|
|
+ if err != nil {
|
|
+ // This is not a fatal error, so let's just log it.
|
|
+ return errors.Wrap(err, "failed to get current leader node")
|
|
+ }
|
|
+ cert := d.endpoints.NetworkCert()
|
|
+ client, err := cluster.Connect(leader, cert, true)
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "failed to connect to leader node")
|
|
+ }
|
|
+ _, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "")
|
|
+ if err != nil {
|
|
+ return errors.Wrap(err, "request to rebalance cluster failed")
|
|
+ }
|
|
+ return nil
|
|
+}
|
|
+
|
|
var internalClusterAcceptCmd = Command{name: "cluster/accept", post: internalClusterPostAccept}
|
|
|
|
func internalClusterPostAccept(d *Daemon, r *http.Request) Response {
|
|
@@ -586,6 +614,99 @@ type internalRaftNode struct {
|
|
Address string `json:"address" yaml:"address"`
|
|
}
|
|
|
|
+var internalClusterRebalanceCmd = Command{name: "cluster/rebalance", post: internalClusterPostRebalance}
|
|
+
|
|
+// Used to update the cluster after a database node has been removed, and
|
|
+// possibly promote another one as database node.
|
|
+func internalClusterPostRebalance(d *Daemon, r *http.Request) Response {
|
|
+ // Redirect all requests to the leader, which is the one with with
|
|
+ // up-to-date knowledge of what nodes are part of the raft cluster.
|
|
+ localAddress, err := node.HTTPSAddress(d.db)
|
|
+ if err != nil {
|
|
+ return SmartError(err)
|
|
+ }
|
|
+ leader, err := d.gateway.LeaderAddress()
|
|
+ if err != nil {
|
|
+ return InternalError(err)
|
|
+ }
|
|
+ if localAddress != leader {
|
|
+ logger.Debugf("Redirect cluster rebalance request to %s", leader)
|
|
+ url := &url.URL{
|
|
+ Scheme: "https",
|
|
+ Path: "/internal/cluster/rebalance",
|
|
+ Host: leader,
|
|
+ }
|
|
+ return SyncResponseRedirect(url.String())
|
|
+ }
|
|
+
|
|
+ logger.Debugf("Rebalance cluster")
|
|
+
|
|
+ // Check if we have a spare node to promote.
|
|
+ address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
|
|
+ if err != nil {
|
|
+ return SmartError(err)
|
|
+ }
|
|
+ if address == "" {
|
|
+ return SyncResponse(true, nil) // Nothing to change
|
|
+ }
|
|
+
|
|
+ // Tell the node to promote itself.
|
|
+ post := &internalClusterPostPromoteRequest{}
|
|
+ for _, node := range nodes {
|
|
+ post.RaftNodes = append(post.RaftNodes, internalRaftNode{
|
|
+ ID: node.ID,
|
|
+ Address: node.Address,
|
|
+ })
|
|
+ }
|
|
+
|
|
+ cert := d.endpoints.NetworkCert()
|
|
+ client, err := cluster.Connect(address, cert, false)
|
|
+ if err != nil {
|
|
+ return SmartError(err)
|
|
+ }
|
|
+ _, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "")
|
|
+ if err != nil {
|
|
+ return SmartError(err)
|
|
+ }
|
|
+
|
|
+ return SyncResponse(true, nil)
|
|
+}
|
|
+
|
|
+var internalClusterPromoteCmd = Command{name: "cluster/promote", post: internalClusterPostPromote}
|
|
+
|
|
+// Used to promote the local non-database node to be a database one.
|
|
+func internalClusterPostPromote(d *Daemon, r *http.Request) Response {
|
|
+ req := internalClusterPostPromoteRequest{}
|
|
+
|
|
+ // Parse the request
|
|
+ err := json.NewDecoder(r.Body).Decode(&req)
|
|
+ if err != nil {
|
|
+ return BadRequest(err)
|
|
+ }
|
|
+
|
|
+ // Sanity checks
|
|
+ if len(req.RaftNodes) == 0 {
|
|
+ return BadRequest(fmt.Errorf("No raft nodes provided"))
|
|
+ }
|
|
+
|
|
+ nodes := make([]db.RaftNode, len(req.RaftNodes))
|
|
+ for i, node := range req.RaftNodes {
|
|
+ nodes[i].ID = node.ID
|
|
+ nodes[i].Address = node.Address
|
|
+ }
|
|
+ err = cluster.Promote(d.State(), d.gateway, nodes)
|
|
+ if err != nil {
|
|
+ return SmartError(err)
|
|
+ }
|
|
+
|
|
+ return SyncResponse(true, nil)
|
|
+}
|
|
+
|
|
+// A request for the /internal/cluster/promote endpoint.
|
|
+type internalClusterPostPromoteRequest struct {
|
|
+ RaftNodes []internalRaftNode `json:"raft_nodes" yaml:"raft_nodes"`
|
|
+}
|
|
+
|
|
func clusterCheckStoragePoolsMatch(cluster *db.Cluster, reqPools []api.StoragePool) error {
|
|
poolNames, err := cluster.StoragePoolsNotPending()
|
|
if err != nil && err != db.ErrNoSuchObject {
|
|
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
|
|
index 569ffb52..b3c64735 100644
|
|
--- a/lxd/api_cluster_test.go
|
|
+++ b/lxd/api_cluster_test.go
|
|
@@ -294,6 +294,42 @@ func TestCluster_LeaveForce(t *testing.T) {
|
|
assert.Equal(t, []string{}, images)
|
|
}
|
|
|
|
+// If a spare non-database node is available after a nodes leaves, it gets
|
|
+// promoted as database node.
|
|
+func TestCluster_LeaveAndPromote(t *testing.T) {
|
|
+ if testing.Short() {
|
|
+ t.Skip("skipping cluster promote test in short mode.")
|
|
+ }
|
|
+ daemons, cleanup := newDaemons(t, 4)
|
|
+ defer cleanup()
|
|
+
|
|
+ f := clusterFixture{t: t}
|
|
+ f.FormCluster(daemons)
|
|
+
|
|
+ // The first three nodes are database nodes, the fourth is not.
|
|
+ client := f.ClientUnix(f.Leader())
|
|
+ nodes, err := client.GetClusterMembers()
|
|
+ require.NoError(t, err)
|
|
+ assert.Len(t, nodes, 4)
|
|
+ assert.True(t, nodes[0].Database)
|
|
+ assert.True(t, nodes[1].Database)
|
|
+ assert.True(t, nodes[2].Database)
|
|
+ assert.False(t, nodes[3].Database)
|
|
+
|
|
+ client = f.ClientUnix(daemons[1])
|
|
+ err = client.DeleteClusterMember("rusp-0", false)
|
|
+ require.NoError(t, err)
|
|
+
|
|
+ // Only three nodes are left, and they are all database nodes.
|
|
+ client = f.ClientUnix(f.Leader())
|
|
+ nodes, err = client.GetClusterMembers()
|
|
+ require.NoError(t, err)
|
|
+ assert.Len(t, nodes, 3)
|
|
+ assert.True(t, nodes[0].Database)
|
|
+ assert.True(t, nodes[1].Database)
|
|
+ assert.True(t, nodes[2].Database)
|
|
+}
|
|
+
|
|
// A LXD node can be renamed.
|
|
func TestCluster_NodeRename(t *testing.T) {
|
|
daemon, cleanup := newDaemon(t)
|
|
@@ -323,10 +359,12 @@ func TestCluster_NodeRename(t *testing.T) {
|
|
type clusterFixture struct {
|
|
t *testing.T
|
|
clients map[*Daemon]lxd.ContainerServer
|
|
+ daemons []*Daemon
|
|
}
|
|
|
|
// Form a cluster using the given daemons. The first daemon will be the leader.
|
|
func (f *clusterFixture) FormCluster(daemons []*Daemon) {
|
|
+ f.daemons = daemons
|
|
for i, daemon := range daemons {
|
|
password := ""
|
|
if i == 0 {
|
|
@@ -416,3 +454,23 @@ func (f *clusterFixture) ClientUnix(daemon *Daemon) lxd.ContainerServer {
|
|
}
|
|
return client
|
|
}
|
|
+
|
|
+// Return the daemon which is currently the leader
|
|
+func (f *clusterFixture) Leader() *Daemon {
|
|
+ // Retry a few times since an election might still be happening
|
|
+ for i := 0; i < 5; i++ {
|
|
+ for _, daemon := range f.daemons {
|
|
+ address := daemon.endpoints.NetworkAddress()
|
|
+ leader, err := daemon.gateway.LeaderAddress()
|
|
+ if err != nil {
|
|
+ f.t.Fatal("failed to get leader address", err)
|
|
+ }
|
|
+ if address == leader {
|
|
+ return daemon
|
|
+ }
|
|
+ }
|
|
+ time.Sleep(time.Second)
|
|
+ }
|
|
+ f.t.Fatal("failed to get leader address")
|
|
+ return nil
|
|
+}
|
|
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
|
|
index 5f234537..6f044324 100644
|
|
--- a/lxd/api_internal.go
|
|
+++ b/lxd/api_internal.go
|
|
@@ -30,6 +30,8 @@ var apiInternal = []Command{
|
|
internalContainersCmd,
|
|
internalSQLCmd,
|
|
internalClusterAcceptCmd,
|
|
+ internalClusterRebalanceCmd,
|
|
+ internalClusterPromoteCmd,
|
|
internalClusterContainerMovedCmd,
|
|
}
|
|
|