Coverage for censusdis/geography.py: 96%
201 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 05:39 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 05:39 +0000
1# Copyright (c) 2022 Darren Erik Vengroff
2"""Utilities for managing hierarchies of geographies."""
4import os
5from collections import defaultdict
6from dataclasses import dataclass
7from pathlib import Path
8from typing import (
9 Any,
10 ClassVar,
11 DefaultDict,
12 Dict,
13 Iterable,
14 List,
15 Mapping,
16 Optional,
17 Tuple,
18 Union,
19)
21import requests
23from .impl.exceptions import CensusApiException
24from .impl.fetch import certificates
27InSpecType = Union[str, Iterable[str]]
30class GeoException(CensusApiException):
31 """An exception raised by the `censusdis.geography` module."""
34class PathSpec:
35 """
36 A path specification.
38 This class is used to represent a path of allowable geographies,
39 such as state, county, census tract.
40 """
42 # We hide this object inside the class to make __init__
43 # effectively private. If you don't have access to this
44 # key you can't successfully call __init___.
45 __init_key = object()
47 def __init__(self, path: Iterable[str], init_key: Optional[Any] = None):
48 if init_key is not PathSpec.__init_key:
49 raise ValueError(
50 "CanonicalGeographies cannot be created directly. "
51 "Try `PathSpec.partial_matches(**kwargs)` or "
52 "`PathSpec.full_match(**kwargs) instead."
53 )
55 self._path = list(path)
57 def __str__(self):
58 """Convert to a string."""
59 return ":".join(self._path)
61 def __repr__(self):
62 """Generate a representation."""
63 quoted_path = (f'"{c}"' for c in self._path)
64 return f"PathSpec([{', '.join(quoted_path)}])"
66 def __len__(self):
67 """How many components are in the path."""
68 return len(self._path)
70 @property
71 def path(self):
72 """The path."""
73 return self._path
75 @staticmethod
76 def _u2s(**kwargs):
77 return {k.replace("_", " "): v for k, v in kwargs.items()}
79 def _partial_match(
80 self,
81 is_prefix: bool = True,
82 **kwargs: InSpecType,
83 ) -> bool:
84 # An empty path is matched by no kwargs.
85 if not kwargs and not self._path:
86 return True
88 kwargs = self._u2s(**kwargs)
89 path_elements_in_kwargs = [key for key in self._path if key in kwargs]
90 keys_from_kwargs = list(kwargs)
92 match = (len(path_elements_in_kwargs) > 0) and (
93 path_elements_in_kwargs == keys_from_kwargs
94 )
96 if is_prefix:
97 return match and path_elements_in_kwargs[0] == self._path[0]
99 return match
101 def _full_match(self, **kwargs):
102 return self._partial_match(**kwargs) and len(kwargs) == len(self._path)
104 def fill_in(self, **kwargs: InSpecType) -> InSpecType:
105 """
106 Fill in missing levels in a partial specification.
108 This can only be done if a unique partial match can be found
109 among the set of all valid paths.
110 """
111 if not self._partial_match(is_prefix=False, **kwargs):
112 raise ValueError("Must be at least a partial match to fill in.")
113 reversed_result = {}
114 matching = False
115 kwargs = self._u2s(**kwargs)
117 for element in reversed(self._path):
118 matching = matching or element in kwargs.keys()
119 if matching:
120 reversed_result[element] = kwargs.get(element, "*")
122 result = {k: reversed_result[k] for k in reversed(reversed_result.keys())}
124 return result
126 def keys(self) -> List[str]:
127 """Get the keys identifying the path components."""
128 return list(self._path)
130 @classmethod
131 def partial_matches(
132 cls, dataset: str, year: int, is_prefix=True, **kwargs: InSpecType
133 ) -> List["BoundGeographyPath"]:
134 """Find all partial matches for the path."""
135 kwargs = PathSpec._u2s(**kwargs)
137 return [
138 BoundGeographyPath(num, path_spec, **kwargs)
139 for num, path_spec in PathSpec.get_path_specs(dataset, year).items()
140 if path_spec._partial_match(is_prefix, **kwargs)
141 ]
143 @classmethod
144 def partial_prefix_match(
145 cls, dataset: str, year: int, **kwargs: InSpecType
146 ) -> Optional["BoundGeographyPath"]:
147 """Find the minimal partial prefix match."""
148 matches = cls.partial_matches(dataset, year, is_prefix=True, **kwargs)
150 min_bgp = None
152 for bgp in matches:
153 if min_bgp is None or len(bgp.path_spec) < len(min_bgp.path_spec):
154 min_bgp = bgp
156 return min_bgp
158 @classmethod
159 def full_match(cls, dataset: str, year: int, **kwargs: InSpecType):
160 """Find a full match."""
161 full_matches = [
162 (num, path_spec)
163 for num, path_spec in cls.get_path_specs(dataset, year).items()
164 if path_spec._full_match(**kwargs)
165 ]
166 if not full_matches:
167 return None, None
168 if len(full_matches) > 1:
169 raise ValueError(
170 f"Internal Error, multiple matches for {dataset} in {year} for {kwargs}."
171 )
172 return full_matches[0]
174 @classmethod
175 def by_number(cls, dataset: str, year: int, num: str):
176 """
177 Get the path spec for a given U.S. Census numerical geography code.
179 For example, the code '050' represents a state and county specification.
180 """
181 return cls.get_path_specs(dataset, year).get(num, None)
183 @staticmethod
184 def _geo_url(dataset: str, year: int) -> str:
185 if isinstance(year, int):
186 return f"https://api.census.gov/data/{year}/{dataset}/geography.json"
187 else:
188 return f"https://api.census.gov/data/{dataset}/geography.json"
190 @staticmethod
191 def empty_path_spec() -> "PathSpec":
192 """Construct an empty path spec."""
193 return PathSpec([], PathSpec.__init_key)
195 @staticmethod
196 def _fetch_path_specs(dataset: str, year: int) -> Dict[str, "PathSpec"]:
197 url = PathSpec._geo_url(dataset, year)
199 request = requests.get(
200 url, cert=certificates.data_cert, verify=certificates.data_verify
201 )
203 if request.status_code == 200:
204 parsed_json = request.json()
206 path_specs = {}
208 if "fips" in parsed_json:
209 for row in parsed_json["fips"]:
210 level = row.get("geoLevelId", None)
211 if level is None:
212 level = row.get("geoLevelDisplay", None)
214 if level is not None:
215 path = row.get("requires", [])
216 path.append(row["name"])
218 path_specs[level] = PathSpec(path, PathSpec.__init_key)
220 return path_specs
222 # Do our best to tell the user something informative.
223 raise GeoException(
224 f"Census API request to {request.url} failed with status {request.status_code}. {request.text}"
225 )
227 _PATH_SPECS_BY_DATASET_YEAR: DefaultDict[str, Dict[int, Dict[str, "PathSpec"]]] = (
228 defaultdict(dict)
229 )
231 _PATH_SPEC_SNAKE_MAP: DefaultDict[str, Dict[int, Dict[str, str]]] = defaultdict(
232 dict
233 )
234 _PATH_SPEC_SNAKE_INV_MAP: DefaultDict[str, Dict[int, Dict[str, str]]] = defaultdict(
235 dict
236 )
238 _LODES_PATH_SPECS: Optional[Dict[str, "PathSpec"]] = None
240 @staticmethod
241 def get_path_specs(dataset: str, vintage: int) -> Dict[str, "PathSpec"]:
242 """Fetch all the path specifications for the given dataset and vintage."""
243 if dataset.startswith("lodes/"):
244 # Special case for the LODES data sets, which go down a completely
245 # different path.
246 if PathSpec._LODES_PATH_SPECS is None:
247 PathSpec._LODES_PATH_SPECS = {
248 "040": PathSpec(["state"], PathSpec.__init_key),
249 "050": PathSpec(["state", "county"], PathSpec.__init_key),
250 "100": PathSpec(
251 ["state", "county", "tract", "block"], PathSpec.__init_key
252 ),
253 "140": PathSpec(["state", "county", "tract"], PathSpec.__init_key),
254 "150": PathSpec(
255 ["state", "county", "tract", "block group"], PathSpec.__init_key
256 ),
257 }
259 return PathSpec._LODES_PATH_SPECS
261 if vintage not in PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset]:
262 PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][vintage] = (
263 PathSpec._fetch_path_specs(dataset, vintage)
264 )
265 PathSpec._PATH_SPEC_SNAKE_MAP[dataset][vintage] = {
266 component.replace(" ", "_")
267 .replace("/", "_")
268 .replace("-", "_")
269 .replace("(", "")
270 .replace(")", "")
271 .lower(): component
272 for path_spec in PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][
273 vintage
274 ].values()
275 for component in path_spec.path
276 }
277 PathSpec._PATH_SPEC_SNAKE_INV_MAP[dataset][vintage] = {
278 name: py_name
279 for py_name, name in PathSpec._PATH_SPEC_SNAKE_MAP[dataset][
280 vintage
281 ].items()
282 }
284 return PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][vintage]
287class BoundGeographyPath:
288 """A fully bound geography path."""
290 def __init__(self, num: str, path_spec: PathSpec, **kwargs: InSpecType):
291 """
292 Initialize a bound geography path.
294 This means it corresponds to a path spec with a numeric ID from
295 the U.S. Census, like '050' for state and county.
297 It also means that it has bound values for all components.
299 Parameters
300 ----------
301 num
302 A numeric code from the U.S. Census. E.g. '050' for state and county.
303 path_spec
304 The path specification.
305 kwargs
306 Keyword args specifying the bound values.
307 """
308 self._num = num
309 self._path_spec = path_spec
310 self._bindings = path_spec.fill_in(**kwargs)
312 @property
313 def num(self) -> str:
314 """
315 The U.S. Census numeric code for the geography.
317 For example, '050' for state and county.
318 """
319 return self._num
321 @property
322 def path_spec(self) -> PathSpec:
323 """The path specification."""
324 return self._path_spec
326 @property
327 def bindings(self) -> Mapping[str, InSpecType]:
328 """The values bound for each path element."""
329 return self._bindings
332class EnvironmentApiKey:
333 """
334 A small class that holds an API key loaded from the environment.
336 There are two places it could come from, the environment variable
337 US_CENSUS_API_KEY or a file ~/.censusdis/api_key.txt
338 in the current users home directory. If it is in both, the environment
339 variable value is used.
340 """
342 _env_var = "US_CENSUS_API_KEY"
344 _api_key = None
346 @classmethod
347 def api_key(cls):
348 """
349 Determine the API key we should use.
351 This could come from the environment variable US_CENSUS_API_KEY or,
352 if that is not set, a value stored in a single line in the file
353 `'~/.censusdis/api_key.txt'`.
355 If neither of these is set, access to the U.S. Census API may be throttled
356 or limited. See https://api.census.gov/data/key_signup.html to sign up for
357 a key.
358 """
359 # Try the env var,
360 if cls._api_key is None:
361 cls._api_key = os.environ.get(cls._env_var, None)
363 # Try the file.
364 if cls._api_key is None:
365 path = Path.home() / ".censusdis" / "api_key.txt"
367 if path.is_file():
368 with path.open("r") as file:
369 file_key = file.read().splitlines()[0]
370 cls._api_key = file_key
372 return cls._api_key
375@dataclass(init=False)
376class CensusGeographyQuerySpec:
377 """A specification for a geography query."""
379 dataset: str
380 year: int
381 variables: List[str]
382 bound_path: BoundGeographyPath
383 api_key: Optional[str] = None
385 _BASE_URL: ClassVar[str] = "https://api.census.gov/data"
387 def __init__(
388 self,
389 dataset: str,
390 year: int,
391 variables: List[str],
392 bound_path: BoundGeographyPath,
393 api_key: Optional[str] = None,
394 ):
395 """
396 Construct a geographic query.
398 Parameters
399 ----------
400 dataset
401 The dataset to download from. For example `"acs/acs5"`,
402 `"dec/pl"`, or `"timeseries/poverty/saipe/schdist"`. There are
403 symbolic names for datasets, like `ACS5` for `"acs/acs5"
404 in :py:module:`censusdis.datasets`.
405 year
406 The vintage to download data for. For most data sets this is
407 an integer year, for example, `2020`.
408 variables
409 The variables to download.
410 bound_path
411 A bound geographic query.
412 api_key
413 An optional API key. You may be throttled or prevented from using
414 the U.S. Census API if you don't provide one.
415 """
416 self.dataset = dataset
417 self.year = year
418 self.variables = variables
419 self.bound_path = bound_path
421 if api_key is None:
422 api_key = EnvironmentApiKey.api_key()
424 self.api_key = api_key
426 @property
427 def for_component(self) -> str:
428 """The part of the query string that is the `for` clause."""
429 *_, (key, value) = self.bound_path.bindings.items()
430 if value == "*":
431 return f"{key}"
432 return f"{key}:{value}"
434 @property
435 def in_components(self) -> Optional[str]:
436 """The part of the query string specifying the `in` components."""
437 if not self.bound_path.bindings:
438 return None
440 *components, _ = self.bound_path.bindings.items()
442 if components:
443 return " ".join(f"{k}:{v}" for (k, v) in components)
445 return None
447 def table_url(
448 self, *, query_filter: Optional[Dict[str, str]] = None
449 ) -> Tuple[str, Mapping[str, str]]:
450 """
451 Construct the URL to query census data.
453 Parameters
454 ----------
455 query_filter
456 A dictionary of values to filter on. For example, if
457 `query_filter={'NAICS2017': '72251'}` then only rows
458 where the variable `NAICS2017` has a value of `'72251'`
459 will be returned.
461 This filtering is done on the server side, not the client
462 side, so it is far more efficient than querying without a
463 query filter and then manually filtering the results.
465 Returns
466 -------
467 The URL and the parameters to pass to it.
468 """
469 if isinstance(self.year, int):
470 url = "/".join([self._BASE_URL, f"{self.year:04}", self.dataset])
471 else:
472 url = "/".join([self._BASE_URL, self.dataset])
474 params = {
475 "get": ",".join(self.variables),
476 }
478 if self.bound_path.bindings:
479 params["for"] = self.for_component
481 if query_filter is not None:
482 params.update(query_filter)
484 in_components = self.in_components
485 if in_components is not None:
486 params["in"] = in_components
488 if self.api_key is not None:
489 params["key"] = self.api_key
491 return url, params
494def geo_path_specs(dataset: str, year: int) -> Dict[str, List[str]]:
495 """Construct a map of all known path specs for a given data set and year."""
496 return {
497 name: [c for c in path_spec.path]
498 for name, path_spec in PathSpec.get_path_specs(dataset, year).items()
499 }
502def path_component_to_snake(dataset: str, year: int, component: str) -> str:
503 """Convert path components to snake case."""
504 return PathSpec._PATH_SPEC_SNAKE_INV_MAP[dataset][year].get(component, component)
507def path_component_from_snake(dataset: str, year: int, component: str) -> str:
508 """Convert path components out of snake case."""
509 return PathSpec._PATH_SPEC_SNAKE_MAP[dataset][year].get(component, component)
512def geo_path_snake_specs(dataset: str, year: int) -> Dict[str, List[str]]:
513 """Construc a map to snake case for all know geo path specs."""
514 return {
515 name: [path_component_to_snake(dataset, year, c) for c in path_spec.path]
516 for name, path_spec in PathSpec.get_path_specs(dataset, year).items()
517 }