Browse Source

Make new subscriptions kick old subscriptions

This is done simply by broadcasting a "kick" message to the instance
topic. A unique ID is used so that a subscription doesn't kick itself.

(This involved moving GUID (or pseudo-GUID) generation to its own
package, since I wanted guids for subscriptions too.)
Michael Bridgen 2 years ago
parent
commit
22d8d2774e
4 changed files with 62 additions and 13 deletions
  1. 17
    0
      guid/guid.go
  2. 2
    13
      jobs/job.go
  3. 16
    0
      platform/rpc/nats/bus.go
  4. 27
    0
      platform/rpc/nats/bus_test.go

+ 17
- 0
guid/guid.go View File

@@ -0,0 +1,17 @@
1
+package guid
2
+
3
+import (
4
+	"fmt"
5
+	"math/rand"
6
+	"time"
7
+)
8
+
9
+func init() {
10
+	rand.Seed(time.Now().UnixNano())
11
+}
12
+
13
+func New() string {
14
+	b := make([]byte, 16)
15
+	rand.Read(b)
16
+	return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
17
+}

+ 2
- 13
jobs/job.go View File

@@ -3,17 +3,12 @@ package jobs
3 3
 import (
4 4
 	"encoding/json"
5 5
 	"errors"
6
-	"fmt"
7
-	"math/rand"
8 6
 	"time"
9 7
 
10 8
 	"github.com/weaveworks/flux"
9
+	"github.com/weaveworks/flux/guid"
11 10
 )
12 11
 
13
-func init() {
14
-	rand.Seed(time.Now().UnixNano())
15
-}
16
-
17 12
 const (
18 13
 	// DefaultQueue is the queue to use if none is set.
19 14
 	DefaultQueue = "default"
@@ -67,13 +62,7 @@ type JobPopper interface {
67 62
 type JobID string
68 63
 
69 64
 func NewJobID() JobID {
70
-	b := make([]byte, 16)
71
-	rand.Read(b)
72
-	return JobID(fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]))
73
-}
74
-
75
-func init() {
76
-	rand.Seed(time.Now().UnixNano())
65
+	return JobID(guid.New())
77 66
 }
78 67
 
79 68
 // Job describes a worker job

+ 16
- 0
platform/rpc/nats/bus.go View File

@@ -9,6 +9,7 @@ import (
9 9
 	"github.com/nats-io/nats"
10 10
 
11 11
 	"github.com/weaveworks/flux"
12
+	"github.com/weaveworks/flux/guid"
12 13
 	"github.com/weaveworks/flux/platform"
13 14
 	fluxrpc "github.com/weaveworks/flux/platform/rpc"
14 15
 )
@@ -19,6 +20,7 @@ const (
19 20
 	presenceTick   = 50 * time.Millisecond
20 21
 	encoderType    = nats.JSON_ENCODER
21 22
 
23
+	methodKick         = ".Platform.Kick"
22 24
 	methodPing         = ".Platform.Ping"
23 25
 	methodAllServices  = ".Platform.AllServices"
24 26
 	methodSomeServices = ".Platform.SomeServices"
@@ -211,10 +213,24 @@ func (n *NATS) Subscribe(instID flux.InstanceID, remote platform.Platform, done
211 213
 		return
212 214
 	}
213 215
 
216
+	// It's possible that more than one connection for a particular
217
+	// instance will arrive at the service. To prevent confusion, when
218
+	// a subscription arrives, it sends a "kick" message with a unique
219
+	// ID (so it can recognise its own kick message). Any other
220
+	// subscription for the instance _should_ then exit upon receipt
221
+	// of the kick.
222
+	myID := guid.New()
223
+	n.rcv.Publish(string(instID)+methodKick, []byte(myID))
224
+
214 225
 	go func() {
215 226
 		var err error
216 227
 		for request := range requests {
217 228
 			switch {
229
+			case strings.HasSuffix(request.Subject, methodKick):
230
+				id := string(request.Data)
231
+				if id != myID {
232
+					err = platform.FatalError{errors.New("Kicked by new subscriber " + id)}
233
+				}
218 234
 			case strings.HasSuffix(request.Subject, methodPing):
219 235
 				var p ping
220 236
 				err = encoder.Decode(request.Subject, request.Data, &p)

+ 27
- 0
platform/rpc/nats/bus_test.go View File

@@ -159,3 +159,30 @@ func TestFatalErrorDisconnects(t *testing.T) {
159 159
 		t.Error("timed out waiting for expected error from subscription closing")
160 160
 	}
161 161
 }
162
+
163
+func TestNewConnectionKicks(t *testing.T) {
164
+	bus := setup(t)
165
+
166
+	instA := flux.InstanceID("foo")
167
+
168
+	mockA := &platform.MockPlatform{}
169
+	errA := make(chan error)
170
+	subscribe(t, bus, errA, instA, mockA)
171
+
172
+	mockB := &platform.MockPlatform{}
173
+	errB := make(chan error)
174
+	subscribe(t, bus, errB, instA, mockB)
175
+
176
+	select {
177
+	case <-errA:
178
+		break
179
+	case <-time.After(1 * time.Second):
180
+		t.Error("timed out waiting for connection to be kicked")
181
+	}
182
+
183
+	close(errB)
184
+	err := <-errB
185
+	if err != nil {
186
+		t.Errorf("expected no error from second connection, but got %q", err)
187
+	}
188
+}

Loading…
Cancel
Save