Coverage for censusdis/geography.py: 96%

201 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-04-03 05:39 +0000

1# Copyright (c) 2022 Darren Erik Vengroff 

2"""Utilities for managing hierarchies of geographies.""" 

3 

4import os 

5from collections import defaultdict 

6from dataclasses import dataclass 

7from pathlib import Path 

8from typing import ( 

9 Any, 

10 ClassVar, 

11 DefaultDict, 

12 Dict, 

13 Iterable, 

14 List, 

15 Mapping, 

16 Optional, 

17 Tuple, 

18 Union, 

19) 

20 

21import requests 

22 

23from .impl.exceptions import CensusApiException 

24from .impl.fetch import certificates 

25 

26 

27InSpecType = Union[str, Iterable[str]] 

28 

29 

30class GeoException(CensusApiException): 

31 """An exception raised by the `censusdis.geography` module.""" 

32 

33 

34class PathSpec: 

35 """ 

36 A path specification. 

37 

38 This class is used to represent a path of allowable geographies, 

39 such as state, county, census tract. 

40 """ 

41 

42 # We hide this object inside the class to make __init__ 

43 # effectively private. If you don't have access to this 

44 # key you can't successfully call __init___. 

45 __init_key = object() 

46 

47 def __init__(self, path: Iterable[str], init_key: Optional[Any] = None): 

48 if init_key is not PathSpec.__init_key: 

49 raise ValueError( 

50 "CanonicalGeographies cannot be created directly. " 

51 "Try `PathSpec.partial_matches(**kwargs)` or " 

52 "`PathSpec.full_match(**kwargs) instead." 

53 ) 

54 

55 self._path = list(path) 

56 

57 def __str__(self): 

58 """Convert to a string.""" 

59 return ":".join(self._path) 

60 

61 def __repr__(self): 

62 """Generate a representation.""" 

63 quoted_path = (f'"{c}"' for c in self._path) 

64 return f"PathSpec([{', '.join(quoted_path)}])" 

65 

66 def __len__(self): 

67 """How many components are in the path.""" 

68 return len(self._path) 

69 

70 @property 

71 def path(self): 

72 """The path.""" 

73 return self._path 

74 

75 @staticmethod 

76 def _u2s(**kwargs): 

77 return {k.replace("_", " "): v for k, v in kwargs.items()} 

78 

79 def _partial_match( 

80 self, 

81 is_prefix: bool = True, 

82 **kwargs: InSpecType, 

83 ) -> bool: 

84 # An empty path is matched by no kwargs. 

85 if not kwargs and not self._path: 

86 return True 

87 

88 kwargs = self._u2s(**kwargs) 

89 path_elements_in_kwargs = [key for key in self._path if key in kwargs] 

90 keys_from_kwargs = list(kwargs) 

91 

92 match = (len(path_elements_in_kwargs) > 0) and ( 

93 path_elements_in_kwargs == keys_from_kwargs 

94 ) 

95 

96 if is_prefix: 

97 return match and path_elements_in_kwargs[0] == self._path[0] 

98 

99 return match 

100 

101 def _full_match(self, **kwargs): 

102 return self._partial_match(**kwargs) and len(kwargs) == len(self._path) 

103 

104 def fill_in(self, **kwargs: InSpecType) -> InSpecType: 

105 """ 

106 Fill in missing levels in a partial specification. 

107 

108 This can only be done if a unique partial match can be found 

109 among the set of all valid paths. 

110 """ 

111 if not self._partial_match(is_prefix=False, **kwargs): 

112 raise ValueError("Must be at least a partial match to fill in.") 

113 reversed_result = {} 

114 matching = False 

115 kwargs = self._u2s(**kwargs) 

116 

117 for element in reversed(self._path): 

118 matching = matching or element in kwargs.keys() 

119 if matching: 

120 reversed_result[element] = kwargs.get(element, "*") 

121 

122 result = {k: reversed_result[k] for k in reversed(reversed_result.keys())} 

123 

124 return result 

125 

126 def keys(self) -> List[str]: 

127 """Get the keys identifying the path components.""" 

128 return list(self._path) 

129 

130 @classmethod 

131 def partial_matches( 

132 cls, dataset: str, year: int, is_prefix=True, **kwargs: InSpecType 

133 ) -> List["BoundGeographyPath"]: 

134 """Find all partial matches for the path.""" 

135 kwargs = PathSpec._u2s(**kwargs) 

136 

137 return [ 

138 BoundGeographyPath(num, path_spec, **kwargs) 

139 for num, path_spec in PathSpec.get_path_specs(dataset, year).items() 

140 if path_spec._partial_match(is_prefix, **kwargs) 

141 ] 

142 

143 @classmethod 

144 def partial_prefix_match( 

145 cls, dataset: str, year: int, **kwargs: InSpecType 

146 ) -> Optional["BoundGeographyPath"]: 

147 """Find the minimal partial prefix match.""" 

148 matches = cls.partial_matches(dataset, year, is_prefix=True, **kwargs) 

149 

150 min_bgp = None 

151 

152 for bgp in matches: 

153 if min_bgp is None or len(bgp.path_spec) < len(min_bgp.path_spec): 

154 min_bgp = bgp 

155 

156 return min_bgp 

157 

158 @classmethod 

159 def full_match(cls, dataset: str, year: int, **kwargs: InSpecType): 

160 """Find a full match.""" 

161 full_matches = [ 

162 (num, path_spec) 

163 for num, path_spec in cls.get_path_specs(dataset, year).items() 

164 if path_spec._full_match(**kwargs) 

165 ] 

166 if not full_matches: 

167 return None, None 

168 if len(full_matches) > 1: 

169 raise ValueError( 

170 f"Internal Error, multiple matches for {dataset} in {year} for {kwargs}." 

171 ) 

172 return full_matches[0] 

173 

174 @classmethod 

175 def by_number(cls, dataset: str, year: int, num: str): 

176 """ 

177 Get the path spec for a given U.S. Census numerical geography code. 

178 

179 For example, the code '050' represents a state and county specification. 

180 """ 

181 return cls.get_path_specs(dataset, year).get(num, None) 

182 

183 @staticmethod 

184 def _geo_url(dataset: str, year: int) -> str: 

185 if isinstance(year, int): 

186 return f"https://api.census.gov/data/{year}/{dataset}/geography.json" 

187 else: 

188 return f"https://api.census.gov/data/{dataset}/geography.json" 

189 

190 @staticmethod 

191 def empty_path_spec() -> "PathSpec": 

192 """Construct an empty path spec.""" 

193 return PathSpec([], PathSpec.__init_key) 

194 

195 @staticmethod 

196 def _fetch_path_specs(dataset: str, year: int) -> Dict[str, "PathSpec"]: 

197 url = PathSpec._geo_url(dataset, year) 

198 

199 request = requests.get( 

200 url, cert=certificates.data_cert, verify=certificates.data_verify 

201 ) 

202 

203 if request.status_code == 200: 

204 parsed_json = request.json() 

205 

206 path_specs = {} 

207 

208 if "fips" in parsed_json: 

209 for row in parsed_json["fips"]: 

210 level = row.get("geoLevelId", None) 

211 if level is None: 

212 level = row.get("geoLevelDisplay", None) 

213 

214 if level is not None: 

215 path = row.get("requires", []) 

216 path.append(row["name"]) 

217 

218 path_specs[level] = PathSpec(path, PathSpec.__init_key) 

219 

220 return path_specs 

221 

222 # Do our best to tell the user something informative. 

223 raise GeoException( 

224 f"Census API request to {request.url} failed with status {request.status_code}. {request.text}" 

225 ) 

226 

227 _PATH_SPECS_BY_DATASET_YEAR: DefaultDict[str, Dict[int, Dict[str, "PathSpec"]]] = ( 

228 defaultdict(dict) 

229 ) 

230 

231 _PATH_SPEC_SNAKE_MAP: DefaultDict[str, Dict[int, Dict[str, str]]] = defaultdict( 

232 dict 

233 ) 

234 _PATH_SPEC_SNAKE_INV_MAP: DefaultDict[str, Dict[int, Dict[str, str]]] = defaultdict( 

235 dict 

236 ) 

237 

238 _LODES_PATH_SPECS: Optional[Dict[str, "PathSpec"]] = None 

239 

240 @staticmethod 

241 def get_path_specs(dataset: str, vintage: int) -> Dict[str, "PathSpec"]: 

242 """Fetch all the path specifications for the given dataset and vintage.""" 

243 if dataset.startswith("lodes/"): 

244 # Special case for the LODES data sets, which go down a completely 

245 # different path. 

246 if PathSpec._LODES_PATH_SPECS is None: 

247 PathSpec._LODES_PATH_SPECS = { 

248 "040": PathSpec(["state"], PathSpec.__init_key), 

249 "050": PathSpec(["state", "county"], PathSpec.__init_key), 

250 "100": PathSpec( 

251 ["state", "county", "tract", "block"], PathSpec.__init_key 

252 ), 

253 "140": PathSpec(["state", "county", "tract"], PathSpec.__init_key), 

254 "150": PathSpec( 

255 ["state", "county", "tract", "block group"], PathSpec.__init_key 

256 ), 

257 } 

258 

259 return PathSpec._LODES_PATH_SPECS 

260 

261 if vintage not in PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset]: 

262 PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][vintage] = ( 

263 PathSpec._fetch_path_specs(dataset, vintage) 

264 ) 

265 PathSpec._PATH_SPEC_SNAKE_MAP[dataset][vintage] = { 

266 component.replace(" ", "_") 

267 .replace("/", "_") 

268 .replace("-", "_") 

269 .replace("(", "") 

270 .replace(")", "") 

271 .lower(): component 

272 for path_spec in PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][ 

273 vintage 

274 ].values() 

275 for component in path_spec.path 

276 } 

277 PathSpec._PATH_SPEC_SNAKE_INV_MAP[dataset][vintage] = { 

278 name: py_name 

279 for py_name, name in PathSpec._PATH_SPEC_SNAKE_MAP[dataset][ 

280 vintage 

281 ].items() 

282 } 

283 

284 return PathSpec._PATH_SPECS_BY_DATASET_YEAR[dataset][vintage] 

285 

286 

287class BoundGeographyPath: 

288 """A fully bound geography path.""" 

289 

290 def __init__(self, num: str, path_spec: PathSpec, **kwargs: InSpecType): 

291 """ 

292 Initialize a bound geography path. 

293 

294 This means it corresponds to a path spec with a numeric ID from 

295 the U.S. Census, like '050' for state and county. 

296 

297 It also means that it has bound values for all components. 

298 

299 Parameters 

300 ---------- 

301 num 

302 A numeric code from the U.S. Census. E.g. '050' for state and county. 

303 path_spec 

304 The path specification. 

305 kwargs 

306 Keyword args specifying the bound values. 

307 """ 

308 self._num = num 

309 self._path_spec = path_spec 

310 self._bindings = path_spec.fill_in(**kwargs) 

311 

312 @property 

313 def num(self) -> str: 

314 """ 

315 The U.S. Census numeric code for the geography. 

316 

317 For example, '050' for state and county. 

318 """ 

319 return self._num 

320 

321 @property 

322 def path_spec(self) -> PathSpec: 

323 """The path specification.""" 

324 return self._path_spec 

325 

326 @property 

327 def bindings(self) -> Mapping[str, InSpecType]: 

328 """The values bound for each path element.""" 

329 return self._bindings 

330 

331 

332class EnvironmentApiKey: 

333 """ 

334 A small class that holds an API key loaded from the environment. 

335 

336 There are two places it could come from, the environment variable 

337 US_CENSUS_API_KEY or a file ~/.censusdis/api_key.txt 

338 in the current users home directory. If it is in both, the environment 

339 variable value is used. 

340 """ 

341 

342 _env_var = "US_CENSUS_API_KEY" 

343 

344 _api_key = None 

345 

346 @classmethod 

347 def api_key(cls): 

348 """ 

349 Determine the API key we should use. 

350 

351 This could come from the environment variable US_CENSUS_API_KEY or, 

352 if that is not set, a value stored in a single line in the file 

353 `'~/.censusdis/api_key.txt'`. 

354 

355 If neither of these is set, access to the U.S. Census API may be throttled 

356 or limited. See https://api.census.gov/data/key_signup.html to sign up for 

357 a key. 

358 """ 

359 # Try the env var, 

360 if cls._api_key is None: 

361 cls._api_key = os.environ.get(cls._env_var, None) 

362 

363 # Try the file. 

364 if cls._api_key is None: 

365 path = Path.home() / ".censusdis" / "api_key.txt" 

366 

367 if path.is_file(): 

368 with path.open("r") as file: 

369 file_key = file.read().splitlines()[0] 

370 cls._api_key = file_key 

371 

372 return cls._api_key 

373 

374 

375@dataclass(init=False) 

376class CensusGeographyQuerySpec: 

377 """A specification for a geography query.""" 

378 

379 dataset: str 

380 year: int 

381 variables: List[str] 

382 bound_path: BoundGeographyPath 

383 api_key: Optional[str] = None 

384 

385 _BASE_URL: ClassVar[str] = "https://api.census.gov/data" 

386 

387 def __init__( 

388 self, 

389 dataset: str, 

390 year: int, 

391 variables: List[str], 

392 bound_path: BoundGeographyPath, 

393 api_key: Optional[str] = None, 

394 ): 

395 """ 

396 Construct a geographic query. 

397 

398 Parameters 

399 ---------- 

400 dataset 

401 The dataset to download from. For example `"acs/acs5"`, 

402 `"dec/pl"`, or `"timeseries/poverty/saipe/schdist"`. There are 

403 symbolic names for datasets, like `ACS5` for `"acs/acs5" 

404 in :py:module:`censusdis.datasets`. 

405 year 

406 The vintage to download data for. For most data sets this is 

407 an integer year, for example, `2020`. 

408 variables 

409 The variables to download. 

410 bound_path 

411 A bound geographic query. 

412 api_key 

413 An optional API key. You may be throttled or prevented from using 

414 the U.S. Census API if you don't provide one. 

415 """ 

416 self.dataset = dataset 

417 self.year = year 

418 self.variables = variables 

419 self.bound_path = bound_path 

420 

421 if api_key is None: 

422 api_key = EnvironmentApiKey.api_key() 

423 

424 self.api_key = api_key 

425 

426 @property 

427 def for_component(self) -> str: 

428 """The part of the query string that is the `for` clause.""" 

429 *_, (key, value) = self.bound_path.bindings.items() 

430 if value == "*": 

431 return f"{key}" 

432 return f"{key}:{value}" 

433 

434 @property 

435 def in_components(self) -> Optional[str]: 

436 """The part of the query string specifying the `in` components.""" 

437 if not self.bound_path.bindings: 

438 return None 

439 

440 *components, _ = self.bound_path.bindings.items() 

441 

442 if components: 

443 return " ".join(f"{k}:{v}" for (k, v) in components) 

444 

445 return None 

446 

447 def table_url( 

448 self, *, query_filter: Optional[Dict[str, str]] = None 

449 ) -> Tuple[str, Mapping[str, str]]: 

450 """ 

451 Construct the URL to query census data. 

452 

453 Parameters 

454 ---------- 

455 query_filter 

456 A dictionary of values to filter on. For example, if 

457 `query_filter={'NAICS2017': '72251'}` then only rows 

458 where the variable `NAICS2017` has a value of `'72251'` 

459 will be returned. 

460 

461 This filtering is done on the server side, not the client 

462 side, so it is far more efficient than querying without a 

463 query filter and then manually filtering the results. 

464 

465 Returns 

466 ------- 

467 The URL and the parameters to pass to it. 

468 """ 

469 if isinstance(self.year, int): 

470 url = "/".join([self._BASE_URL, f"{self.year:04}", self.dataset]) 

471 else: 

472 url = "/".join([self._BASE_URL, self.dataset]) 

473 

474 params = { 

475 "get": ",".join(self.variables), 

476 } 

477 

478 if self.bound_path.bindings: 

479 params["for"] = self.for_component 

480 

481 if query_filter is not None: 

482 params.update(query_filter) 

483 

484 in_components = self.in_components 

485 if in_components is not None: 

486 params["in"] = in_components 

487 

488 if self.api_key is not None: 

489 params["key"] = self.api_key 

490 

491 return url, params 

492 

493 

494def geo_path_specs(dataset: str, year: int) -> Dict[str, List[str]]: 

495 """Construct a map of all known path specs for a given data set and year.""" 

496 return { 

497 name: [c for c in path_spec.path] 

498 for name, path_spec in PathSpec.get_path_specs(dataset, year).items() 

499 } 

500 

501 

502def path_component_to_snake(dataset: str, year: int, component: str) -> str: 

503 """Convert path components to snake case.""" 

504 return PathSpec._PATH_SPEC_SNAKE_INV_MAP[dataset][year].get(component, component) 

505 

506 

507def path_component_from_snake(dataset: str, year: int, component: str) -> str: 

508 """Convert path components out of snake case.""" 

509 return PathSpec._PATH_SPEC_SNAKE_MAP[dataset][year].get(component, component) 

510 

511 

512def geo_path_snake_specs(dataset: str, year: int) -> Dict[str, List[str]]: 

513 """Construc a map to snake case for all know geo path specs.""" 

514 return { 

515 name: [path_component_to_snake(dataset, year, c) for c in path_spec.path] 

516 for name, path_spec in PathSpec.get_path_specs(dataset, year).items() 

517 }