Mirror of an open source Kubernetes-native API gateway for microservices built on the Envoy Proxy https://www.getambassador.io
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

config.py 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. # Copyright 2018 Datawire. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License
  14. import sys
  15. from typing import Any, ClassVar, Dict, Iterable, List, Optional, Tuple, Union
  16. from typing import cast as typecast
  17. import json
  18. import logging
  19. import os
  20. import jsonschema
  21. from pkg_resources import Requirement, resource_filename
  22. from ..utils import RichStatus
  23. from .acresource import ACResource
  24. from .acmapping import ACMapping
  25. #from .VERSION import Version
  26. #############################################################################
  27. ## config.py -- the main configuration parser for Ambassador
  28. ##
  29. ## Ambassador configures itself by creating a new Config object, which calls
  30. ## Config.__init__().
  31. # Custom types
  32. # StringOrList is either a string or a list of strings.
  33. StringOrList = Union[str, List[str]]
  34. class Config:
  35. # CLASS VARIABLES
  36. # When using multiple Ambassadors in one cluster, use AMBASSADOR_ID to distinguish them.
  37. ambassador_id: ClassVar[str] = os.environ.get('AMBASSADOR_ID', 'default')
  38. # INSTANCE VARIABLES
  39. current_resource: Optional[ACResource] = None
  40. # XXX flat wrong
  41. schemas: Dict[str, dict]
  42. config: Dict[str, Dict[str, ACResource]]
  43. breakers: Dict[str, ACResource]
  44. outliers: Dict[str, ACResource]
  45. # rkey => ACResource
  46. sources: Dict[str, ACResource]
  47. errors: Dict[str, List[str]]
  48. fatal_errors: int
  49. object_errors: int
  50. def __init__(self, schema_dir_path: Optional[str]=None) -> None:
  51. if not schema_dir_path:
  52. # Note that this "resource_filename" has to do with setuptool packages, not
  53. # with our ACResource class.
  54. schema_dir_path = resource_filename(Requirement.parse("ambassador"), "schemas")
  55. self.schema_dir_path = schema_dir_path
  56. self.logger = logging.getLogger("ambassador.config")
  57. self.logger.debug("SCHEMA DIR %s" % os.path.abspath(self.schema_dir_path))
  58. self._reset()
  59. def _reset(self) -> None:
  60. """
  61. Resets this Config to the empty, default state so it can load a new config.
  62. """
  63. self.logger.debug("RESET")
  64. self.current_resource = None
  65. self.schemas = {}
  66. self.config = {}
  67. self.breakers = {}
  68. self.outliers = {}
  69. self.sources = {}
  70. # Save our magic internal sources.
  71. self.save_source(ACResource.internal_resource())
  72. self.save_source(ACResource.diagnostics_resource())
  73. self.errors = {}
  74. self.fatal_errors = 0
  75. self.object_errors = 0
  76. def __str__(self) -> str:
  77. s = [ "<Config:" ]
  78. for kind, configs in self.config.items():
  79. s.append(" %s:" % kind)
  80. for rkey, resource in configs.items():
  81. s.append(" %s" % resource)
  82. s.append(">")
  83. return "\n".join(s)
  84. def as_dict(self) -> Dict[str, Any]:
  85. od = {
  86. '_errors': self.errors,
  87. '_sources': self.sources,
  88. }
  89. for kind, configs in self.config.items():
  90. od[kind] = {}
  91. for rkey, config in configs.items():
  92. od[kind][rkey] = config.as_dict()
  93. return od
  94. def as_json(self):
  95. return json.dumps(self.as_dict(), sort_keys=True, indent=4)
  96. def save_source(self, resource: ACResource) -> None:
  97. """
  98. Save a given ACResource as a source of Ambassador config information.
  99. """
  100. self.sources[resource.rkey] = resource
  101. def load_all(self, resources: Iterable[ACResource]) -> None:
  102. """
  103. Loads all of a set of ACResources. It is the caller's responsibility to arrange for
  104. the set of ACResources to be sorted in some way that makes sense.
  105. """
  106. for resource in resources:
  107. # Is an ambassador_id present in this object?
  108. allowed_ids: StringOrList = resource.get('ambassador_id', 'default')
  109. if allowed_ids:
  110. # Make sure it's a list. Yes, this is Draconian,
  111. # but the jsonschema will allow only a string or a list,
  112. # and guess what? Strings are Iterables.
  113. if type(allowed_ids) != list:
  114. allowed_ids = typecast(StringOrList, [ allowed_ids ])
  115. if Config.ambassador_id not in allowed_ids:
  116. self.logger.debug("LOAD_ALL: skip %s; id %s not in %s" %
  117. (resource, Config.ambassador_id, allowed_ids))
  118. continue
  119. # self.logger.debug("LOAD_ALL: %s @ %s" % (resource, resource.location))
  120. rc = self.process(resource)
  121. if not rc:
  122. # Object error. Not good but we'll allow the system to start.
  123. self.post_error(rc, resource=resource)
  124. if self.fatal_errors:
  125. # Kaboom.
  126. raise Exception("ERROR ERROR ERROR Unparseable configuration; exiting")
  127. if self.errors:
  128. self.logger.error("ERROR ERROR ERROR Starting with configuration errors")
  129. def post_error(self, rc: RichStatus, resource: ACResource=None):
  130. if not resource:
  131. resource = self.current_resource
  132. if not resource:
  133. raise Exception("FATAL: trying to post an error from a totally unknown resource??")
  134. self.save_source(resource)
  135. resource.post_error(rc)
  136. # XXX Probably don't need this data structure, since we can walk the source
  137. # list and get them all.
  138. errors = self.errors.setdefault(resource.rkey, [])
  139. errors.append(rc.as_dict())
  140. self.logger.error("%s: %s" % (resource, rc))
  141. def process(self, resource: ACResource) -> RichStatus:
  142. # This should be impossible.
  143. if not resource:
  144. return RichStatus.fromError("undefined object???")
  145. self.current_resource = resource
  146. if not resource.apiVersion:
  147. return RichStatus.fromError("need apiVersion")
  148. if not resource.kind:
  149. return RichStatus.fromError("need kind")
  150. # Make sure this resource has a name...
  151. if 'name' not in resource:
  152. return RichStatus.fromError("need name")
  153. # ...and off we go. Save the source info...
  154. self.save_source(resource)
  155. # ...and figure out if this thing is OK.
  156. rc = self.validate_object(resource)
  157. if not rc:
  158. # Well that's no good.
  159. return rc
  160. # OK, so far so good. Grab the handler for this object type.
  161. handler_name = "handle_%s" % resource.kind.lower()
  162. handler = getattr(self, handler_name, None)
  163. if not handler:
  164. handler = self.save_object
  165. self.logger.warning("%s: no handler for %s, just saving" % (resource, resource.kind))
  166. # else:
  167. # self.logger.debug("%s: handling %s..." % (resource, resource.kind))
  168. try:
  169. handler(resource)
  170. except Exception as e:
  171. # Bzzzt.
  172. raise
  173. return RichStatus.fromError("%s: could not process %s object: %s" % (resource, resource.kind, e))
  174. # OK, all's well.
  175. self.current_resource = None
  176. return RichStatus.OK(msg="%s object processed successfully" % resource.kind)
  177. def validate_object(self, resource: ACResource) -> RichStatus:
  178. # This is basically "impossible"
  179. if not (("apiVersion" in resource) and ("kind" in resource) and ("name" in resource)):
  180. return RichStatus.fromError("must have apiVersion, kind, and name")
  181. apiVersion = resource.apiVersion
  182. # Ditch the leading ambassador/ that really needs to be there.
  183. if apiVersion.startswith("ambassador/"):
  184. apiVersion = apiVersion.split('/')[1]
  185. else:
  186. return RichStatus.fromError("apiVersion %s unsupported" % apiVersion)
  187. # Do we already have this schema loaded?
  188. schema_key = "%s-%s" % (apiVersion, resource.kind)
  189. schema = self.schemas.get(schema_key, None)
  190. if not schema:
  191. # Not loaded. Go find it on disk.
  192. schema_path = os.path.join(self.schema_dir_path, apiVersion,
  193. "%s.schema" % resource.kind)
  194. try:
  195. # Load it up...
  196. schema = json.load(open(schema_path, "r"))
  197. # ...and then cache it, if it exists. Note that we'll never
  198. # get here if we find something that doesn't parse.
  199. if schema:
  200. self.schemas[schema_key] = typecast(Dict[Any, Any], schema)
  201. except OSError:
  202. self.logger.debug("no schema at %s, not validating" % schema_path)
  203. except json.decoder.JSONDecodeError as e:
  204. self.logger.warning("corrupt schema at %s, skipping (%s)" %
  205. (schema_path, e))
  206. if schema:
  207. # We have a schema. Does the object validate OK?
  208. try:
  209. jsonschema.validate(resource.as_dict(), schema)
  210. except jsonschema.exceptions.ValidationError as e:
  211. # Nope. Bzzzzt.
  212. return RichStatus.fromError("not a valid %s: %s" % (resource.kind, e))
  213. # All good. Return an OK.
  214. return RichStatus.OK(msg="valid %s" % resource.kind)
  215. def safe_store(self, storage_name: str, resource: ACResource, allow_log: bool=True) -> None:
  216. """
  217. Safely store a ACResource under a given storage name. The storage_name is separate
  218. because we may need to e.g. store a Module under the 'ratelimit' name or the like.
  219. Within a storage_name bucket, the ACResource will be stored under its name.
  220. :param storage_name: where shall we file this?
  221. :param resource: what shall we file?
  222. :param allow_log: if True, logs that we're saving this thing.
  223. """
  224. storage = self.config.setdefault(storage_name, {})
  225. if resource.name in storage:
  226. # Oooops.
  227. raise Exception("%s defines %s %s, which is already present" %
  228. (resource, resource.kind, resource.name))
  229. if allow_log:
  230. self.logger.debug("%s: saving %s %s" %
  231. (resource, resource.kind, resource.name))
  232. storage[resource.name] = resource
  233. def save_object(self, resource: ACResource, allow_log: bool=False) -> None:
  234. """
  235. Saves a ACResource using its kind as the storage class name. Sort of the
  236. defaulted version of safe_store.
  237. :param resource: what shall we file?
  238. :param allow_log: if True, logs that we're saving this thing.
  239. """
  240. self.safe_store(resource.kind, resource, allow_log=allow_log)
  241. def get_config(self, key: str) -> Optional[Dict[str, ACResource]]:
  242. return self.config.get(key, None)
  243. def get_module(self, module_name: str) -> Optional[ACResource]:
  244. """
  245. Fetch a module from the module store. Can return None if no
  246. such module exists.
  247. :param module_name: name of the module you want.
  248. """
  249. modules = self.get_config("modules")
  250. if modules:
  251. return modules.get(module_name, None)
  252. else:
  253. return None
  254. def module_lookup(self, module_name: str, key: str, default: Any=None) -> Any:
  255. """
  256. Look up a specific key in a given module. If the named module doesn't
  257. exist, or if the key doesn't exist in the module, return the default.
  258. :param module_name: name of the module you want.
  259. :param key: key to look up within the module
  260. :param default: default value if the module is missing or has no such key
  261. """
  262. module = self.get_module(module_name)
  263. if module:
  264. return module.get(key, default)
  265. return default
  266. def handle_module(self, resource: ACResource) -> None:
  267. """
  268. Handles a Module resource.
  269. """
  270. # Make a new ACResource from the 'config' element of this ACResource
  271. # Note that we leave the original serialization intact, since it will
  272. # indeed show a human the YAML that defined this module.
  273. #
  274. # XXX This should be Module.from_resource()...
  275. module_resource = ACResource.from_resource(resource, kind="Module", **resource.config)
  276. self.safe_store("modules", module_resource)
  277. def handle_ratelimitservice(self, resource: ACResource) -> None:
  278. """
  279. Handles a RateLimitService resource.
  280. """
  281. self.safe_store("ratelimit_configs", resource)
  282. def handle_tracingservice(self, resource: ACResource) -> None:
  283. """
  284. Handles a TracingService resource.
  285. """
  286. self.safe_store("tracing_configs", resource)
  287. def handle_authservice(self, resource: ACResource) -> None:
  288. """
  289. Handles an AuthService resource.
  290. """
  291. self.safe_store("auth_configs", resource)
  292. def handle_mapping(self, resource: ACMapping) -> None:
  293. """
  294. Handles a ACMapping resource.
  295. Mappings are complex things, so a lot of stuff gets buried in a ACMapping
  296. object.
  297. """
  298. self.safe_store("mappings", resource)