Browse Source

Fixing various issues in the new config pipeline in Galley. (#17142)

- Updating the new service entry transform pipeline to use the correct
annotations.
- Updating proto parser to allow unrecognized fields.
- Updating the meshcfg to use the common defaults.
- Relaxing the test predicates of meshcfg test to avoid falkes.
- Fix some of the data races in the snapshotter code
Ozben Evren 3 weeks ago
parent
commit
ccb257b240

+ 0
- 2
galley/pkg/config/meshcfg/defaults.go View File

@@ -23,7 +23,5 @@ import (
23 23
 // Default mesh configuration
24 24
 func Default() *v1alpha1.MeshConfig {
25 25
 	meshconfig := mesh.DefaultMeshConfig()
26
-	meshconfig.IngressClass = "istio"
27
-	meshconfig.IngressControllerMode = v1alpha1.MeshConfig_STRICT
28 26
 	return &meshconfig
29 27
 }

+ 2
- 4
galley/pkg/config/meshcfg/defaults_test.go View File

@@ -19,7 +19,7 @@ import (
19 19
 
20 20
 	. "github.com/onsi/gomega"
21 21
 
22
-	"istio.io/api/mesh/v1alpha1"
22
+	"istio.io/istio/pkg/config/mesh"
23 23
 )
24 24
 
25 25
 func TestDefaults(t *testing.T) {
@@ -27,7 +27,5 @@ func TestDefaults(t *testing.T) {
27 27
 
28 28
 	m := Default()
29 29
 
30
-	// A couple of point-wise checks.
31
-	g.Expect(m.IngressClass).To(Equal("istio"))
32
-	g.Expect(m.IngressControllerMode).To(Equal(v1alpha1.MeshConfig_STRICT))
30
+	g.Expect(*m).To(Equal(mesh.DefaultMeshConfig()))
33 31
 }

+ 2
- 2
galley/pkg/config/meshcfg/fs_test.go View File

@@ -111,7 +111,7 @@ func TestFsSource_NoInitialFile_UpdateAfterStart(t *testing.T) {
111 111
 			Source: IstioMeshconfig,
112 112
 		},
113 113
 	}
114
-	g.Eventually(acc.Events).Should(Equal(expected))
114
+	g.Eventually(acc.Events).Should(ContainElement(expected[0]))
115 115
 }
116 116
 
117 117
 func TestFsSource_InitialFile_UpdateAfterStart(t *testing.T) {
@@ -161,7 +161,7 @@ func TestFsSource_InitialFile_UpdateAfterStart(t *testing.T) {
161 161
 			Source: IstioMeshconfig,
162 162
 		},
163 163
 	}
164
-	g.Eventually(acc.Events).Should(Equal(expected))
164
+	g.Eventually(acc.Events).Should(ContainElement(expected[0]))
165 165
 }
166 166
 
167 167
 func TestFsSource_InitialFile(t *testing.T) {

+ 19
- 6
galley/pkg/config/processing/snapshotter/snapshotter.go View File

@@ -16,6 +16,7 @@ package snapshotter
16 16
 
17 17
 import (
18 18
 	"fmt"
19
+	"sync/atomic"
19 20
 	"time"
20 21
 
21 22
 	"istio.io/istio/galley/pkg/config/collection"
@@ -40,7 +41,7 @@ type Snapshotter struct {
40 41
 	pendingEvents int64
41 42
 
42 43
 	// lastSnapshotTime records the last time a snapshotImpl was published.
43
-	lastSnapshotTime time.Time
44
+	lastSnapshotTime atomic.Value
44 45
 }
45 46
 
46 47
 var _ event.Processor = &Snapshotter{}
@@ -190,10 +191,8 @@ func (s *Snapshotter) publish(o SnapshotOptions) {
190 191
 	set := collection.NewSetFromCollections(collections)
191 192
 	sn := &Snapshot{set: set}
192 193
 
193
-	now := time.Now()
194
-	monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
195
-	s.lastSnapshotTime = now
196
-	s.pendingEvents = 0
194
+	s.markSnapshotTime()
195
+	atomic.StoreInt64(&s.pendingEvents, 0)
197 196
 	scope.Processing.Infoa("Publishing snapshot for group: ", o.Group)
198 197
 	scope.Processing.Debuga(sn)
199 198
 	o.Distributor.Distribute(o.Group, sn)
@@ -223,6 +222,20 @@ func (s *Snapshotter) Handle(e event.Event) {
223 222
 	now := time.Now()
224 223
 	monitoring.RecordProcessorEventProcessed(now.Sub(s.lastEventTime))
225 224
 	s.lastEventTime = now
226
-	s.pendingEvents++
225
+	atomic.AddInt64(&s.pendingEvents, 1)
227 226
 	s.selector.Handle(e)
228 227
 }
228
+
229
+func (s *Snapshotter) markSnapshotTime() {
230
+	now := time.Now()
231
+	lst := s.lastSnapshotTime.Load()
232
+	if lst == nil {
233
+		lst = time.Time{}
234
+	}
235
+	lastSnapshotTime := lst.(time.Time)
236
+
237
+	pe := atomic.SwapInt64(&s.pendingEvents, 0)
238
+
239
+	monitoring.RecordProcessorSnapshotPublished(pe, now.Sub(lastSnapshotTime))
240
+	s.lastSnapshotTime.Store(lastSnapshotTime)
241
+}

+ 0
- 30
galley/pkg/config/processor/transforms/serviceentry/annotations/annotations.go View File

@@ -1,30 +0,0 @@
1
-// Copyright 2019 Istio Authors
2
-//
3
-// Licensed under the Apache License, Version 2.0 (the "License");
4
-// you may not use this file except in compliance with the License.
5
-// You may obtain a copy of the License at
6
-//
7
-//     http://www.apache.org/licenses/LICENSE-2.0
8
-//
9
-// Unless required by applicable law or agreed to in writing, software
10
-// distributed under the License is distributed on an "AS IS" BASIS,
11
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
-// See the License for the specific language governing permissions and
13
-// limitations under the License.
14
-
15
-package annotations
16
-
17
-const (
18
-	// TODO: Move annotations to istio/api and use from there
19
-
20
-	// ServiceVersion provides the raw resource version from the most recent k8s Service update. This will always
21
-	// be available for synthetic service entries.
22
-	ServiceVersion = "networking.istio.io/serviceVersion"
23
-
24
-	// EndpointsVersion provides the raw resource version of the most recent k8s Endpoints update (if available).
25
-	EndpointsVersion = "networking.istio.io/endpointsVersion"
26
-
27
-	// NotReadyEndpoints is an annotation providing the "NotReadyAddresses" from the Kubernetes Endpoints
28
-	// resource. The value is a comma-separated list of IP:port.
29
-	NotReadyEndpoints = "networking.istio.io/notReadyEndpoints"
30
-)

+ 3
- 4
galley/pkg/config/processor/transforms/serviceentry/converter/instance.go View File

@@ -24,7 +24,6 @@ import (
24 24
 	"istio.io/api/annotation"
25 25
 	networking "istio.io/api/networking/v1alpha3"
26 26
 
27
-	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/annotations"
28 27
 	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/pod"
29 28
 	"istio.io/istio/galley/pkg/config/resource"
30 29
 	"istio.io/istio/pkg/config/constants"
@@ -112,7 +111,7 @@ func (i *Instance) convertService(service *resource.Entry, outMeta *resource.Met
112 111
 	outMeta.Annotations = service.Metadata.Annotations.CloneOrCreate()
113 112
 
114 113
 	// Add an annotation for the version of the service resource.
115
-	outMeta.Annotations[annotations.ServiceVersion] = string(service.Metadata.Version)
114
+	outMeta.Annotations[annotation.AlphaNetworkingServiceVersion.Name] = string(service.Metadata.Version)
116 115
 }
117 116
 
118 117
 func convertExportTo(annotations resource.StringMap) []string {
@@ -209,11 +208,11 @@ func (i *Instance) convertEndpoints(endpoints *resource.Entry, outMeta *resource
209 208
 
210 209
 	// Add an annotation for the version of the Endpoints resource.
211 210
 	outMeta.Annotations = outMeta.Annotations.CloneOrCreate()
212
-	outMeta.Annotations[annotations.EndpointsVersion] = string(endpoints.Metadata.Version)
211
+	outMeta.Annotations[annotation.AlphaNetworkingEndpointsVersion.Name] = string(endpoints.Metadata.Version)
213 212
 
214 213
 	// Add an annotation for any "not ready" endpoints.
215 214
 	if notReadyBuilder.Len() > 0 {
216
-		outMeta.Annotations[annotations.NotReadyEndpoints] = notReadyBuilder.String()
215
+		outMeta.Annotations[annotation.AlphaNetworkingNotReadyEndpoints.Name] = notReadyBuilder.String()
217 216
 	}
218 217
 }
219 218
 

+ 14
- 15
galley/pkg/config/processor/transforms/serviceentry/converter/instance_test.go View File

@@ -27,7 +27,6 @@ import (
27 27
 	"istio.io/api/annotation"
28 28
 	networking "istio.io/api/networking/v1alpha3"
29 29
 
30
-	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/annotations"
31 30
 	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/converter"
32 31
 	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/pod"
33 32
 	"istio.io/istio/galley/pkg/config/resource"
@@ -92,9 +91,9 @@ func TestServiceDefaults(t *testing.T) {
92 91
 			"l2": "v2",
93 92
 		},
94 93
 		Annotations: resource.StringMap{
95
-			"a1":                       "v1",
96
-			"a2":                       "v2",
97
-			annotations.ServiceVersion: version,
94
+			"a1": "v1",
95
+			"a2": "v2",
96
+			annotation.AlphaNetworkingServiceVersion.Name: version,
98 97
 		},
99 98
 	}
100 99
 	expected := networking.ServiceEntry{
@@ -137,8 +136,8 @@ func TestServiceExportTo(t *testing.T) {
137 136
 		Name:       fullName,
138 137
 		CreateTime: tnow,
139 138
 		Annotations: resource.StringMap{
140
-			annotation.NetworkingExportTo.Name: "c, a, b",
141
-			annotations.ServiceVersion:         "v1",
139
+			annotation.NetworkingExportTo.Name:            "c, a, b",
140
+			annotation.AlphaNetworkingServiceVersion.Name: "v1",
142 141
 		},
143 142
 	}
144 143
 
@@ -175,7 +174,7 @@ func TestNoNamespaceShouldUseDefault(t *testing.T) {
175 174
 		Name:       service.Metadata.Name,
176 175
 		CreateTime: tnow,
177 176
 		Annotations: resource.StringMap{
178
-			annotations.ServiceVersion: "v1",
177
+			annotation.AlphaNetworkingServiceVersion.Name: "v1",
179 178
 		},
180 179
 	}
181 180
 
@@ -247,7 +246,7 @@ func TestServicePorts(t *testing.T) {
247 246
 				Name:       service.Metadata.Name,
248 247
 				CreateTime: tnow,
249 248
 				Annotations: resource.StringMap{
250
-					annotations.ServiceVersion: version,
249
+					annotation.AlphaNetworkingServiceVersion.Name: version,
251 250
 				},
252 251
 			}
253 252
 			expected := networking.ServiceEntry{
@@ -305,7 +304,7 @@ func TestClusterIPWithNoResolution(t *testing.T) {
305 304
 				Name:       service.Metadata.Name,
306 305
 				CreateTime: tnow,
307 306
 				Annotations: resource.StringMap{
308
-					annotations.ServiceVersion: version,
307
+					annotation.AlphaNetworkingServiceVersion.Name: version,
309 308
 				},
310 309
 			}
311 310
 			expected := networking.ServiceEntry{
@@ -351,7 +350,7 @@ func TestExternalService(t *testing.T) {
351 350
 		Name:       service.Metadata.Name,
352 351
 		CreateTime: tnow,
353 352
 		Annotations: resource.StringMap{
354
-			annotations.ServiceVersion: version,
353
+			annotation.AlphaNetworkingServiceVersion.Name: version,
355 354
 		},
356 355
 	}
357 356
 	expected := networking.ServiceEntry{
@@ -395,7 +394,7 @@ func TestEndpointsWithNoSubsets(t *testing.T) {
395 394
 
396 395
 	expectedMeta := resource.Metadata{
397 396
 		Annotations: resource.StringMap{
398
-			annotations.EndpointsVersion: version,
397
+			annotation.AlphaNetworkingEndpointsVersion.Name: version,
399 398
 		},
400 399
 	}
401 400
 	expected := networking.ServiceEntry{
@@ -491,8 +490,8 @@ func TestEndpoints(t *testing.T) {
491 490
 
492 491
 	expectedMeta := resource.Metadata{
493 492
 		Annotations: resource.StringMap{
494
-			annotations.EndpointsVersion: version,
495
-			annotations.NotReadyEndpoints: fmt.Sprintf("%s:%d,%s:%d,%s:%d,%s:%d,%s:%d,%s:%d",
493
+			annotation.AlphaNetworkingEndpointsVersion.Name: version,
494
+			annotation.AlphaNetworkingNotReadyEndpoints.Name: fmt.Sprintf("%s:%d,%s:%d,%s:%d,%s:%d,%s:%d,%s:%d",
496 495
 				ip1, 80,
497 496
 				ip2, 80,
498 497
 				ip3, 80,
@@ -574,7 +573,7 @@ func TestEndpointsPodNotFound(t *testing.T) {
574 573
 
575 574
 	expectedMeta := resource.Metadata{
576 575
 		Annotations: resource.StringMap{
577
-			annotations.EndpointsVersion: version,
576
+			annotation.AlphaNetworkingEndpointsVersion.Name: version,
578 577
 		},
579 578
 	}
580 579
 	expected := networking.ServiceEntry{
@@ -635,7 +634,7 @@ func TestEndpointsNodeNotFound(t *testing.T) {
635 634
 
636 635
 	expectedMeta := resource.Metadata{
637 636
 		Annotations: resource.StringMap{
638
-			annotations.EndpointsVersion: version,
637
+			annotation.AlphaNetworkingEndpointsVersion.Name: version,
639 638
 		},
640 639
 	}
641 640
 	expected := networking.ServiceEntry{

+ 5
- 4
galley/pkg/config/processor/transforms/serviceentry/create_test.go View File

@@ -26,8 +26,10 @@ import (
26 26
 	"github.com/gogo/protobuf/types"
27 27
 	. "github.com/onsi/gomega"
28 28
 
29
+	"istio.io/api/annotation"
29 30
 	mcp "istio.io/api/mcp/v1alpha1"
30 31
 	networking "istio.io/api/networking/v1alpha3"
32
+
31 33
 	"istio.io/istio/galley/pkg/config/collection"
32 34
 	"istio.io/istio/galley/pkg/config/event"
33 35
 	"istio.io/istio/galley/pkg/config/meshcfg"
@@ -36,7 +38,6 @@ import (
36 38
 	"istio.io/istio/galley/pkg/config/processing/snapshotter/strategy"
37 39
 	"istio.io/istio/galley/pkg/config/processor/metadata"
38 40
 	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry"
39
-	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/annotations"
40 41
 	"istio.io/istio/galley/pkg/config/processor/transforms/serviceentry/pod"
41 42
 	"istio.io/istio/galley/pkg/config/resource"
42 43
 	"istio.io/istio/galley/pkg/config/testing/fixtures"
@@ -988,11 +989,11 @@ func (b *metadataBuilder) Build() *mcp.Metadata {
988 989
 	for k, v := range b.service.Metadata.Annotations {
989 990
 		annos[k] = v
990 991
 	}
991
-	annos[annotations.ServiceVersion] = string(b.service.Metadata.Version)
992
+	annos[annotation.AlphaNetworkingServiceVersion.Name] = string(b.service.Metadata.Version)
992 993
 	if b.endpoints != nil {
993
-		annos[annotations.EndpointsVersion] = string(b.endpoints.Metadata.Version)
994
+		annos[annotation.AlphaNetworkingEndpointsVersion.Name] = string(b.endpoints.Metadata.Version)
994 995
 		if len(b.notReadyIPs) > 0 {
995
-			annos[annotations.NotReadyEndpoints] = notReadyAnnotation(b.notReadyIPs...)
996
+			annos[annotation.AlphaNetworkingNotReadyEndpoints.Name] = notReadyAnnotation(b.notReadyIPs...)
996 997
 		}
997 998
 	}
998 999
 

+ 4
- 1
galley/pkg/config/util/pb/proto.go View File

@@ -15,6 +15,8 @@
15 15
 package pb
16 16
 
17 17
 import (
18
+	"strings"
19
+
18 20
 	"github.com/ghodss/yaml"
19 21
 	"github.com/gogo/protobuf/jsonpb"
20 22
 	"github.com/gogo/protobuf/proto"
@@ -25,7 +27,8 @@ import (
25 27
 func UnmarshalData(pb proto.Message, data interface{}) error {
26 28
 	js, err := toJSON(data)
27 29
 	if err == nil {
28
-		err = jsonpb.UnmarshalString(js, pb)
30
+		u := jsonpb.Unmarshaler{AllowUnknownFields: true}
31
+		err = u.Unmarshal(strings.NewReader(js), pb)
29 32
 	}
30 33
 	return err
31 34
 }

+ 16
- 0
galley/pkg/config/util/pb/proto_test.go View File

@@ -46,6 +46,22 @@ func TestToProto_Success(t *testing.T) {
46 46
 	g.Expect(p).To(Equal(expected))
47 47
 }
48 48
 
49
+func TestToProto_UnknownFields(t *testing.T) {
50
+	g := NewGomegaWithT(t)
51
+
52
+	data := map[string]interface{}{
53
+		"foo": "bar",
54
+		"boo": "baz",
55
+	}
56
+
57
+	p := &gogoTypes.Empty{}
58
+	err := UnmarshalData(p, data)
59
+	g.Expect(err).To(BeNil())
60
+	expected := &gogoTypes.Empty{}
61
+
62
+	g.Expect(p).To(Equal(expected))
63
+}
64
+
49 65
 func TestToProto_Error(t *testing.T) {
50 66
 	g := NewGomegaWithT(t)
51 67
 

Loading…
Cancel
Save