etter
etter - Natural language geographic query parsing using LLMs.
Parse location queries into structured geographic queries using LLM.
1""" 2etter - Natural language geographic query parsing using LLMs. 3 4Parse location queries into structured geographic queries using LLM. 5""" 6 7from importlib.metadata import PackageNotFoundError, version 8 9try: 10 __version__ = version("etter") 11except PackageNotFoundError: # running from source without install 12 __version__ = "unknown" 13 14# Main API 15# Exceptions 16# Datasources 17from .datasources import ( 18 CompositeDataSource, 19 GeoDataSource, 20 IGNBDCartoSource, 21 PostGISDataSource, 22 SwissBoundaries3DSource, 23 SwissNames3DSource, 24) 25from .exceptions import ( 26 GeoFilterError, 27 LowConfidenceError, 28 LowConfidenceWarning, 29 NoReferenceLocationError, 30 ParsingError, 31 UnknownRelationError, 32 ValidationError, 33) 34from .geometry_format import convert_feature_geometry, convert_geometry 35 36# Models (for type hints and result access) 37from .models import ( 38 BufferConfig, 39 ConfidenceLevel, 40 ConfidenceScore, 41 GeometryFormat, 42 GeoQuery, 43 ReferenceLocation, 44 SpatialRelation, 45) 46from .parser import GeoFilterParser 47 48# Spatial operations 49from .spatial import apply_spatial_relation 50 51# Configuration 52from .spatial_config import RelationConfig, SpatialRelationConfig 53 54__all__ = [ 55 # Main API 56 "GeoFilterParser", 57 # Models 58 "GeoQuery", 59 "SpatialRelation", 60 "ReferenceLocation", 61 "BufferConfig", 62 "ConfidenceScore", 63 "ConfidenceLevel", 64 "GeometryFormat", 65 # Configuration 66 "SpatialRelationConfig", 67 "RelationConfig", 68 # Exceptions 69 "GeoFilterError", 70 "ParsingError", 71 "ValidationError", 72 "NoReferenceLocationError", 73 "UnknownRelationError", 74 "LowConfidenceError", 75 "LowConfidenceWarning", 76 # Datasources 77 "GeoDataSource", 78 "SwissNames3DSource", 79 "SwissBoundaries3DSource", 80 "IGNBDCartoSource", 81 "CompositeDataSource", 82 "PostGISDataSource", 83 # Spatial 84 "apply_spatial_relation", 85 "convert_geometry", 86 "convert_feature_geometry", 87]
19class GeoFilterParser: 20 """ 21 Main entry point for parsing natural language location queries. 22 23 This class orchestrates the entire parsing pipeline: 24 1. Initialize LLM with structured output 25 2. Build prompt with spatial relations and examples 26 3. Parse query through LLM 27 4. Validate and enrich with defaults 28 5. Return structured GeoQuery 29 30 Examples: 31 Basic usage: 32 >>> from langchain.chat_models import init_chat_model 33 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") 34 >>> parser = GeoFilterParser(llm=llm) 35 >>> result = parser.parse("restaurants in Lausanne") 36 >>> print(result.reference_location.name) 37 'Lausanne' 38 39 With strict confidence mode: 40 >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) 41 >>> result = parser.parse("near the station") # May raise LowConfidenceError 42 """ 43 44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt() 94 95 def _build_structured_llm(self): 96 """Create LLM with structured output using Pydantic model.""" 97 98 return self.llm.with_structured_output( 99 GeoQuery, 100 method="function_calling", # Use function_calling for broader schema support 101 include_raw=True, # For error debugging 102 ) 103 104 def _build_prompt(self) -> ChatPromptTemplate: 105 """Build prompt template with spatial relations, examples, and available types.""" 106 available_types = None 107 if self.datasource is not None: 108 available_types = self.datasource.get_available_types() 109 110 return build_prompt_template( 111 spatial_config=self.spatial_config, 112 include_examples=self.include_examples, 113 available_types=available_types, 114 additional_instructions=self.additional_instructions, 115 ) 116 117 def _unpack_response(self, response) -> GeoQuery: 118 """Extract and validate the GeoQuery from a structured-LLM response.""" 119 parsed = response.get("parsed") if isinstance(response, dict) else response 120 121 if parsed is None: 122 raw = response.get("raw", "") if isinstance(response, dict) else "" 123 error = response.get("parsing_error") if isinstance(response, dict) else None 124 raise ParsingError( 125 message="Failed to parse query into structured format. " 126 "LLM may have returned invalid JSON or missed required fields.", 127 raw_response=str(raw), 128 original_error=error, 129 ) 130 131 if not isinstance(parsed, GeoQuery): 132 raise ParsingError( 133 message=f"Expected GeoQuery, got {type(parsed).__name__}", 134 raw_response=str(parsed), 135 ) 136 return parsed 137 138 def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery: 139 """Set original_query and run the validation pipeline.""" 140 geo_query.original_query = query 141 142 return validate_query( 143 geo_query, 144 self.spatial_config, 145 confidence_threshold=self.confidence_threshold, 146 strict_mode=self.strict_mode, 147 ) 148 149 def parse(self, query: str) -> GeoQuery: 150 """ 151 Parse a natural language location query into structured format. 152 153 This is the main method for parsing queries. It: 154 1. Invokes the LLM with structured output 155 2. Validates the spatial relation is registered 156 3. Enriches with default parameters 157 4. Checks confidence threshold 158 159 Args: 160 query: Natural language query in any language 161 162 Returns: 163 GeoQuery: Structured query representation with confidence scores 164 165 Raises: 166 ParsingError: If LLM fails to parse query into valid structure 167 ValidationError: If parsed query fails business logic validation 168 UnknownRelationError: If spatial relation is not registered 169 LowConfidenceError: If confidence below threshold (strict mode only) 170 171 Warns: 172 LowConfidenceWarning: If confidence below threshold (permissive mode) 173 174 Examples: 175 Simple containment query: 176 >>> result = parser.parse("in Bern") 177 >>> result.reference_location.name 178 'Bern' 179 >>> result.spatial_relation.relation 180 'in' 181 182 Buffer query: 183 >>> result = parser.parse("near Lake Geneva") 184 >>> result.spatial_relation.relation 185 'near' 186 >>> result.buffer_config.distance_m 187 5000 188 189 Directional query: 190 >>> result = parser.parse("north of Lausanne") 191 >>> result.spatial_relation.relation 192 'north_of' 193 >>> result.reference_location.name 194 'Lausanne' 195 196 Multilingual: 197 >>> result = parser.parse("près de Genève") 198 >>> result.spatial_relation.relation 199 'near' 200 >>> result.reference_location.name 201 'Genève' 202 """ 203 formatted_messages = self.prompt.format_messages(query=query) 204 205 try: 206 response = self.structured_llm.invoke(formatted_messages) 207 except Exception as e: 208 raise ParsingError( 209 message=f"LLM invocation failed: {str(e)}", 210 raw_response="", 211 original_error=e, 212 ) from e 213 214 return self._finalize(self._unpack_response(response), query) 215 216 async def aparse(self, query: str) -> GeoQuery: 217 """ 218 Asynchronously parse a natural language location query into structured format. 219 220 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 221 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 222 blocking. Validation is synchronous and runs after the LLM call. 223 """ 224 formatted_messages = self.prompt.format_messages(query=query) 225 226 try: 227 response = await self.structured_llm.ainvoke(formatted_messages) 228 except Exception as e: 229 raise ParsingError( 230 message=f"LLM invocation failed: {str(e)}", 231 raw_response="", 232 original_error=e, 233 ) from e 234 235 return self._finalize(self._unpack_response(response), query) 236 237 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 238 """ 239 Parse a natural language location query with streaming reasoning and results. 240 241 This method provides real-time feedback during the parsing process by yielding 242 intermediate reasoning steps and the final GeoQuery result. This is useful for 243 providing users with transparency into the LLM's decision-making process and 244 for building responsive UIs. 245 246 The stream yields dictionaries with the following event types: 247 - {"type": "start"} - Stream started 248 - {"type": "reasoning", "content": str} - Intermediate processing steps 249 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 250 - {"type": "error", "content": str} - Errors encountered during processing 251 - {"type": "finish"} - Stream completed successfully 252 253 Args: 254 query: Natural language query in any language 255 256 Yields: 257 dict: Stream events with type and optional content fields 258 259 Raises: 260 ParsingError: If LLM fails to parse query into valid structure 261 ValidationError: If parsed query fails business logic validation 262 UnknownRelationError: If spatial relation is not registered 263 LowConfidenceError: If confidence below threshold (strict mode only) 264 265 Examples: 266 Basic usage with async iteration: 267 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 268 ... if event["type"] == "reasoning": 269 ... print(f"Reasoning: {event['content']}") 270 ... elif event["type"] == "data-response": 271 ... geo_query = event["content"] 272 ... print(f"Location: {geo_query['reference_location']['name']}") 273 ... elif event["type"] == "error": 274 ... print(f"Error: {event['content']}") 275 276 Using in a FastAPI streaming endpoint: 277 >>> from fastapi.responses import StreamingResponse 278 >>> @app.get("/stream") 279 >>> async def stream_endpoint(q: str): 280 ... async def event_stream(): 281 ... async for event in parser.parse_stream(q): 282 ... yield f"data: {json.dumps(event)}\\n\\n" 283 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 284 """ 285 try: 286 # Signal start of stream 287 yield {"type": "start"} 288 289 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 290 formatted_messages = self.prompt.format_messages(query=query) 291 292 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 293 try: 294 response = await self.structured_llm.ainvoke(formatted_messages) 295 except Exception as e: 296 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 297 raise ParsingError( 298 message=f"LLM invocation failed: {str(e)}", 299 raw_response="", 300 original_error=e, 301 ) from e 302 303 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 304 try: 305 geo_query = self._unpack_response(response) 306 except ParsingError: 307 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 308 raise 309 310 if geo_query.confidence_breakdown.reasoning: 311 yield { 312 "type": "reasoning", 313 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 314 } 315 316 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 317 geo_query = self._finalize(geo_query, query) 318 319 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 320 yield {"type": "data-response", "content": geo_query.model_dump()} 321 322 # Signal successful completion 323 yield {"type": "finish"} 324 325 except Exception as e: 326 # Emit error event before re-raising 327 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 328 raise 329 330 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 331 """ 332 Parse multiple queries in batch. 333 334 Note: This is a simple sequential implementation. 335 For true parallelization, consider using async methods or ThreadPoolExecutor. 336 337 Args: 338 queries: List of natural language queries 339 340 Returns: 341 List of GeoQuery objects (same order as input) 342 343 Raises: 344 Same exceptions as parse() for any failing query 345 """ 346 return [self.parse(query) for query in queries] 347 348 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 349 """ 350 Get list of available spatial relations. 351 352 Args: 353 category: Optional filter by category ("containment", "buffer", "directional") 354 355 Returns: 356 List of relation names 357 """ 358 return self.spatial_config.list_relations(category=category) 359 360 def describe_relation(self, relation_name: str) -> str: 361 """ 362 Get description of a spatial relation. 363 364 Args: 365 relation_name: Name of the relation 366 367 Returns: 368 Human-readable description 369 370 Raises: 371 UnknownRelationError: If relation is not registered 372 """ 373 config = self.spatial_config.get_config(relation_name) 374 return config.description
Main entry point for parsing natural language location queries.
This class orchestrates the entire parsing pipeline:
- Initialize LLM with structured output
- Build prompt with spatial relations and examples
- Parse query through LLM
- Validate and enrich with defaults
- Return structured GeoQuery
Examples:
Basic usage:
>>> from langchain.chat_models import init_chat_model >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") >>> parser = GeoFilterParser(llm=llm) >>> result = parser.parse("restaurants in Lausanne") >>> print(result.reference_location.name) 'Lausanne'With strict confidence mode:
>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) >>> result = parser.parse("near the station") # May raise LowConfidenceError
44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt()
Initialize the parser.
Arguments:
- llm: LangChain LLM instance (required).
- spatial_config: Spatial relation configuration. If None, uses defaults
- confidence_threshold: Minimum confidence to accept (0-1)
- strict_mode: If True, raise error on low confidence. If False, warn only
- include_examples: Whether to include few-shot examples in prompt
- datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
- additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model >>> from etter.datasources.swissnames3d import SwissNames3DSource >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) >>> datasource = SwissNames3DSource("data/") >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
149 def parse(self, query: str) -> GeoQuery: 150 """ 151 Parse a natural language location query into structured format. 152 153 This is the main method for parsing queries. It: 154 1. Invokes the LLM with structured output 155 2. Validates the spatial relation is registered 156 3. Enriches with default parameters 157 4. Checks confidence threshold 158 159 Args: 160 query: Natural language query in any language 161 162 Returns: 163 GeoQuery: Structured query representation with confidence scores 164 165 Raises: 166 ParsingError: If LLM fails to parse query into valid structure 167 ValidationError: If parsed query fails business logic validation 168 UnknownRelationError: If spatial relation is not registered 169 LowConfidenceError: If confidence below threshold (strict mode only) 170 171 Warns: 172 LowConfidenceWarning: If confidence below threshold (permissive mode) 173 174 Examples: 175 Simple containment query: 176 >>> result = parser.parse("in Bern") 177 >>> result.reference_location.name 178 'Bern' 179 >>> result.spatial_relation.relation 180 'in' 181 182 Buffer query: 183 >>> result = parser.parse("near Lake Geneva") 184 >>> result.spatial_relation.relation 185 'near' 186 >>> result.buffer_config.distance_m 187 5000 188 189 Directional query: 190 >>> result = parser.parse("north of Lausanne") 191 >>> result.spatial_relation.relation 192 'north_of' 193 >>> result.reference_location.name 194 'Lausanne' 195 196 Multilingual: 197 >>> result = parser.parse("près de Genève") 198 >>> result.spatial_relation.relation 199 'near' 200 >>> result.reference_location.name 201 'Genève' 202 """ 203 formatted_messages = self.prompt.format_messages(query=query) 204 205 try: 206 response = self.structured_llm.invoke(formatted_messages) 207 except Exception as e: 208 raise ParsingError( 209 message=f"LLM invocation failed: {str(e)}", 210 raw_response="", 211 original_error=e, 212 ) from e 213 214 return self._finalize(self._unpack_response(response), query)
Parse a natural language location query into structured format.
This is the main method for parsing queries. It:
- Invokes the LLM with structured output
- Validates the spatial relation is registered
- Enriches with default parameters
- Checks confidence threshold
Arguments:
- query: Natural language query in any language
Returns:
GeoQuery: Structured query representation with confidence scores
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Warns:
LowConfidenceWarning: If confidence below threshold (permissive mode)
Examples:
Simple containment query:
>>> result = parser.parse("in Bern") >>> result.reference_location.name 'Bern' >>> result.spatial_relation.relation 'in'Buffer query:
>>> result = parser.parse("near Lake Geneva") >>> result.spatial_relation.relation 'near' >>> result.buffer_config.distance_m 5000Directional query:
>>> result = parser.parse("north of Lausanne") >>> result.spatial_relation.relation 'north_of' >>> result.reference_location.name 'Lausanne'Multilingual:
>>> result = parser.parse("près de Genève") >>> result.spatial_relation.relation 'near' >>> result.reference_location.name 'Genève'
216 async def aparse(self, query: str) -> GeoQuery: 217 """ 218 Asynchronously parse a natural language location query into structured format. 219 220 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 221 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 222 blocking. Validation is synchronous and runs after the LLM call. 223 """ 224 formatted_messages = self.prompt.format_messages(query=query) 225 226 try: 227 response = await self.structured_llm.ainvoke(formatted_messages) 228 except Exception as e: 229 raise ParsingError( 230 message=f"LLM invocation failed: {str(e)}", 231 raw_response="", 232 original_error=e, 233 ) from e 234 235 return self._finalize(self._unpack_response(response), query)
Asynchronously parse a natural language location query into structured format.
Async counterpart to parse(). Uses ainvoke on the structured LLM
so it can be awaited inside event loops (e.g. FastAPI endpoints) without
blocking. Validation is synchronous and runs after the LLM call.
237 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 238 """ 239 Parse a natural language location query with streaming reasoning and results. 240 241 This method provides real-time feedback during the parsing process by yielding 242 intermediate reasoning steps and the final GeoQuery result. This is useful for 243 providing users with transparency into the LLM's decision-making process and 244 for building responsive UIs. 245 246 The stream yields dictionaries with the following event types: 247 - {"type": "start"} - Stream started 248 - {"type": "reasoning", "content": str} - Intermediate processing steps 249 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 250 - {"type": "error", "content": str} - Errors encountered during processing 251 - {"type": "finish"} - Stream completed successfully 252 253 Args: 254 query: Natural language query in any language 255 256 Yields: 257 dict: Stream events with type and optional content fields 258 259 Raises: 260 ParsingError: If LLM fails to parse query into valid structure 261 ValidationError: If parsed query fails business logic validation 262 UnknownRelationError: If spatial relation is not registered 263 LowConfidenceError: If confidence below threshold (strict mode only) 264 265 Examples: 266 Basic usage with async iteration: 267 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 268 ... if event["type"] == "reasoning": 269 ... print(f"Reasoning: {event['content']}") 270 ... elif event["type"] == "data-response": 271 ... geo_query = event["content"] 272 ... print(f"Location: {geo_query['reference_location']['name']}") 273 ... elif event["type"] == "error": 274 ... print(f"Error: {event['content']}") 275 276 Using in a FastAPI streaming endpoint: 277 >>> from fastapi.responses import StreamingResponse 278 >>> @app.get("/stream") 279 >>> async def stream_endpoint(q: str): 280 ... async def event_stream(): 281 ... async for event in parser.parse_stream(q): 282 ... yield f"data: {json.dumps(event)}\\n\\n" 283 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 284 """ 285 try: 286 # Signal start of stream 287 yield {"type": "start"} 288 289 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 290 formatted_messages = self.prompt.format_messages(query=query) 291 292 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 293 try: 294 response = await self.structured_llm.ainvoke(formatted_messages) 295 except Exception as e: 296 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 297 raise ParsingError( 298 message=f"LLM invocation failed: {str(e)}", 299 raw_response="", 300 original_error=e, 301 ) from e 302 303 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 304 try: 305 geo_query = self._unpack_response(response) 306 except ParsingError: 307 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 308 raise 309 310 if geo_query.confidence_breakdown.reasoning: 311 yield { 312 "type": "reasoning", 313 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 314 } 315 316 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 317 geo_query = self._finalize(geo_query, query) 318 319 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 320 yield {"type": "data-response", "content": geo_query.model_dump()} 321 322 # Signal successful completion 323 yield {"type": "finish"} 324 325 except Exception as e: 326 # Emit error event before re-raising 327 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 328 raise
Parse a natural language location query with streaming reasoning and results.
This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.
The stream yields dictionaries with the following event types:
- {"type": "start"} - Stream started
- {"type": "reasoning", "content": str} - Intermediate processing steps
- {"type": "data-response", "content": dict} - Final GeoQuery as JSON
- {"type": "error", "content": str} - Errors encountered during processing
- {"type": "finish"} - Stream completed successfully
Arguments:
- query: Natural language query in any language
Yields:
dict: Stream events with type and optional content fields
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Examples:
Basic usage with async iteration:
>>> async for event in parser.parse_stream("restaurants near Lake Geneva"): ... if event["type"] == "reasoning": ... print(f"Reasoning: {event['content']}") ... elif event["type"] == "data-response": ... geo_query = event["content"] ... print(f"Location: {geo_query['reference_location']['name']}") ... elif event["type"] == "error": ... print(f"Error: {event['content']}")Using in a FastAPI streaming endpoint:
>>> from fastapi.responses import StreamingResponse >>> @app.get("/stream") >>> async def stream_endpoint(q: str): ... async def event_stream(): ... async for event in parser.parse_stream(q): ... yield f"data: {json.dumps(event)}\n\n" ... return StreamingResponse(event_stream(), media_type="text/event-stream")
330 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 331 """ 332 Parse multiple queries in batch. 333 334 Note: This is a simple sequential implementation. 335 For true parallelization, consider using async methods or ThreadPoolExecutor. 336 337 Args: 338 queries: List of natural language queries 339 340 Returns: 341 List of GeoQuery objects (same order as input) 342 343 Raises: 344 Same exceptions as parse() for any failing query 345 """ 346 return [self.parse(query) for query in queries]
Parse multiple queries in batch.
Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.
Arguments:
- queries: List of natural language queries
Returns:
List of GeoQuery objects (same order as input)
Raises:
- Same exceptions as parse() for any failing query
348 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 349 """ 350 Get list of available spatial relations. 351 352 Args: 353 category: Optional filter by category ("containment", "buffer", "directional") 354 355 Returns: 356 List of relation names 357 """ 358 return self.spatial_config.list_relations(category=category)
Get list of available spatial relations.
Arguments:
- category: Optional filter by category ("containment", "buffer", "directional")
Returns:
List of relation names
360 def describe_relation(self, relation_name: str) -> str: 361 """ 362 Get description of a spatial relation. 363 364 Args: 365 relation_name: Name of the relation 366 367 Returns: 368 Human-readable description 369 370 Raises: 371 UnknownRelationError: If relation is not registered 372 """ 373 config = self.spatial_config.get_config(relation_name) 374 return config.description
Get description of a spatial relation.
Arguments:
- relation_name: Name of the relation
Returns:
Human-readable description
Raises:
- UnknownRelationError: If relation is not registered
123class GeoQuery(BaseModel): 124 """ 125 Root model representing a parsed geographic query. 126 This is the main output structure returned by the parser. 127 """ 128 129 query_type: Literal["simple", "compound", "split", "boolean"] = Field( 130 "simple", 131 description="Type of query. Phase 1 only supports 'simple'. " 132 "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations", 133 ) 134 spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location") 135 reference_location: ReferenceLocation | None = Field( 136 None, 137 description="Reference location for the spatial query. " 138 "None when the query contains no named geographic location.", 139 ) 140 buffer_config: BufferConfig | None = Field( 141 None, 142 description="Buffer configuration for buffer and directional relations. " 143 "Auto-generated with defaults by enrich_with_defaults() if not provided. " 144 "Required for 'near', 'around', 'north_of', etc. " 145 "Set to None for containment relations ('in').", 146 ) 147 confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse") 148 original_query: str = Field( 149 default="", 150 description="Original query text exactly as provided by the user", 151 ) 152 153 @model_validator(mode="after") 154 def validate_buffer_config_consistency(self) -> "GeoQuery": 155 """Validate buffer_config consistency with relation category.""" 156 # Buffer and directional relations must have buffer_config 157 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 158 raise ValueError( 159 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 160 ) 161 162 # Containment and clipping relations should not have buffer_config 163 if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None: 164 raise ValueError( 165 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 166 f"should not have buffer_config" 167 ) 168 169 return self
Root model representing a parsed geographic query. This is the main output structure returned by the parser.
Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations
Reference location for the spatial query. None when the query contains no named geographic location.
Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').
153 @model_validator(mode="after") 154 def validate_buffer_config_consistency(self) -> "GeoQuery": 155 """Validate buffer_config consistency with relation category.""" 156 # Buffer and directional relations must have buffer_config 157 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 158 raise ValueError( 159 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 160 ) 161 162 # Containment and clipping relations should not have buffer_config 163 if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None: 164 raise ValueError( 165 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 166 f"should not have buffer_config" 167 ) 168 169 return self
Validate buffer_config consistency with relation category.
101class SpatialRelation(BaseModel): 102 """A spatial relationship between target and reference.""" 103 104 relation: str = Field( 105 description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', " 106 "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list." 107 ) 108 category: RelationCategory = Field( 109 description="Category of spatial relation. " 110 "'containment' = exact boundary matching (in), " 111 "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), " 112 "'directional' = sector-based queries (north_of, south_of, east_of, west_of), " 113 "'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)" 114 ) 115 explicit_distance: float | None = Field( 116 None, 117 description="Distance in meters if explicitly mentioned by user. " 118 "For example: 'within 5km' → 5000, 'within 500 meters' → 500. " 119 "Leave null if not explicitly stated.", 120 )
A spatial relationship between target and reference.
Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.
Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), 'directional' = sector-based queries (north_of, south_of, east_of, west_of), 'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)
41class ReferenceLocation(BaseModel): 42 """A geographic reference location extracted from the query.""" 43 44 name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')") 45 # FIXME: enum ? 46 type: str | None = Field( 47 None, 48 description="Type hint for geographic feature (city, lake, mountain, canton, country, " 49 "train_station, airport, river, road, etc.). This is a HINT for ranking results, " 50 "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, " 51 "'Rhone' could be river or road), provide your best guess or leave null. " 52 "The datasource will return multiple types ranked by relevance.", 53 ) 54 type_confidence: ConfidenceLevel | None = Field( 55 None, 56 description="Confidence in the type inference (0-1). High confidence (>0.8) when type is " 57 "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous " 58 "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, " 59 "'in X' → city/region, 'on X' → lake/mountain.", 60 )
A geographic reference location extracted from the query.
Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')
Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.
Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.
63class BufferConfig(BaseModel): 64 """Configuration for buffer-based spatial operations.""" 65 66 distance_m: float = Field( 67 description="Buffer distance in meters. Positive values expand outward (proximity), " 68 "negative values erode inward (e.g., 'in the heart of'). " 69 "Examples: 5000 = 5km radius, -500 = 500m erosion" 70 ) 71 buffer_from: Literal["center", "boundary"] = Field( 72 description="Buffer origin. 'center' = buffer from centroid point (for proximity), " 73 "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)" 74 ) 75 ring_only: bool = Field( 76 False, 77 description="If True, exclude the reference feature itself to create a ring/donut shape. " 78 "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). " 79 "Only valid with buffer_from='boundary'.", 80 ) 81 side: Literal["left", "right"] | None = Field( 82 None, 83 description="Side of a linear feature for one-sided buffer. " 84 "'left' = left side relative to line direction, 'right' = right side. " 85 "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().", 86 ) 87 inferred: bool = Field( 88 True, 89 description="True if this configuration was inferred from relation defaults. " 90 "False if the user explicitly specified distance or buffer parameters.", 91 ) 92 93 @model_validator(mode="after") 94 def validate_ring_only(self) -> "BufferConfig": 95 """Validate that ring_only is only used with boundary buffers.""" 96 if self.ring_only and self.buffer_from == "center": 97 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 98 return self
Configuration for buffer-based spatial operations.
Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion
Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)
If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.
Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().
True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.
93 @model_validator(mode="after") 94 def validate_ring_only(self) -> "BufferConfig": 95 """Validate that ring_only is only used with boundary buffers.""" 96 if self.ring_only and self.buffer_from == "center": 97 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 98 return self
Validate that ring_only is only used with boundary buffers.
21class ConfidenceScore(BaseModel): 22 """Confidence scores for different aspects of the parsed query.""" 23 24 overall: ConfidenceLevel = Field( 25 description="Overall confidence score for the entire query parse. " 26 "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain", 27 ) 28 location_confidence: ConfidenceLevel = Field( 29 description="Confidence in correctly identifying the reference location", 30 ) 31 relation_confidence: ConfidenceLevel = Field( 32 description="Confidence in correctly identifying the spatial relation", 33 ) 34 reasoning: str | None = Field( 35 None, 36 description="Explanation for confidence scores. Always include reasoning for clarity and debugging. " 37 "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.", 38 )
Confidence scores for different aspects of the parsed query.
Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain
Confidence in correctly identifying the reference location
41class SpatialRelationConfig: 42 """ 43 Registry and configuration for spatial relations. 44 45 Manages built-in and custom spatial relations with their default parameters. 46 """ 47 48 def __init__(self): 49 """Initialize with built-in spatial relations.""" 50 self.relations: dict[str, RelationConfig] = {} 51 self._initialize_defaults() 52 53 def _initialize_defaults(self): 54 """Register built-in spatial relations from ARCHITECTURE.md.""" 55 56 # ===== CONTAINMENT RELATIONS ===== 57 self.register_relation( 58 RelationConfig( 59 name="in", 60 category="containment", 61 description="Feature is within the reference boundary", 62 ) 63 ) 64 65 # ===== BUFFER/PROXIMITY RELATIONS ===== 66 self.register_relation( 67 RelationConfig( 68 name="around", 69 category="buffer", 70 description="Proximity search around a point with default 1km radius", 71 default_distance_m=1000, 72 buffer_from="center", 73 ) 74 ) 75 76 self.register_relation( 77 RelationConfig( 78 name="near", 79 category="buffer", 80 description="Proximity search with default 5km radius", 81 default_distance_m=5000, 82 buffer_from="center", 83 ) 84 ) 85 86 self.register_relation( 87 RelationConfig( 88 name="on_shores_of", 89 category="buffer", 90 description="Ring buffer around lake/water boundary, excluding the water body itself", 91 default_distance_m=1000, 92 buffer_from="boundary", 93 ring_only=True, 94 ) 95 ) 96 97 self.register_relation( 98 RelationConfig( 99 name="along", 100 category="buffer", 101 description="Buffer following a linear feature like a river or road", 102 default_distance_m=500, 103 buffer_from="boundary", 104 ) 105 ) 106 107 self.register_relation( 108 RelationConfig( 109 name="left_bank", 110 category="buffer", 111 description="Left bank of a linear feature (river, road) relative to its direction/flow", 112 default_distance_m=500, 113 buffer_from="boundary", 114 side="left", 115 ) 116 ) 117 118 self.register_relation( 119 RelationConfig( 120 name="right_bank", 121 category="buffer", 122 description="Right bank of a linear feature (river, road) relative to its direction/flow", 123 default_distance_m=500, 124 buffer_from="boundary", 125 side="right", 126 ) 127 ) 128 129 self.register_relation( 130 RelationConfig( 131 name="in_the_heart_of", 132 category="buffer", 133 description="Central area excluding periphery (negative buffer - erosion)", 134 default_distance_m=-500, 135 buffer_from="boundary", 136 ) 137 ) 138 139 self.register_relation( 140 RelationConfig( 141 name="bordering", 142 category="buffer", 143 description="Thin ring just outside the reference boundary, for land-border adjacency queries (e.g. 'cities bordering Germany')", 144 default_distance_m=2000, 145 buffer_from="boundary", 146 ring_only=True, 147 ) 148 ) 149 150 # ===== CLIPPING RELATIONS ===== 151 # Clip the reference geometry to a directional half-plane using bbox intersection. 152 # These answer "what is in the northern/southern/eastern/western portion of X?" 153 # as opposed to directional relations which answer "what is north/south/etc. of X?". 154 self.register_relation( 155 RelationConfig( 156 name="northern_part_of", 157 category="clipping", 158 description="Northern half of the reference geometry (bbox clip to upper half)", 159 clip_direction="north", 160 ) 161 ) 162 163 self.register_relation( 164 RelationConfig( 165 name="southern_part_of", 166 category="clipping", 167 description="Southern half of the reference geometry (bbox clip to lower half)", 168 clip_direction="south", 169 ) 170 ) 171 172 self.register_relation( 173 RelationConfig( 174 name="eastern_part_of", 175 category="clipping", 176 description="Eastern half of the reference geometry (bbox clip to right half)", 177 clip_direction="east", 178 ) 179 ) 180 181 self.register_relation( 182 RelationConfig( 183 name="western_part_of", 184 category="clipping", 185 description="Western half of the reference geometry (bbox clip to left half)", 186 clip_direction="west", 187 ) 188 ) 189 190 # ===== DIRECTIONAL RELATIONS ===== 191 # All directional relations use consistent defaults: 192 # - Distance: 10km radius (default_distance_m=10000) 193 # - Sector: 90° angular wedge (sector_angle_degrees=90) 194 # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults) 195 # These defaults are applied automatically by enrich_with_defaults() for any directional query. 196 # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West) 197 self.register_relation( 198 RelationConfig( 199 name="north_of", 200 category="directional", 201 description="Directional sector north of reference", 202 default_distance_m=10000, 203 sector_angle_degrees=90, 204 direction_angle_degrees=0, 205 ) 206 ) 207 208 self.register_relation( 209 RelationConfig( 210 name="south_of", 211 category="directional", 212 description="Directional sector south of reference", 213 default_distance_m=10000, 214 sector_angle_degrees=90, 215 direction_angle_degrees=180, 216 ) 217 ) 218 219 self.register_relation( 220 RelationConfig( 221 name="east_of", 222 category="directional", 223 description="Directional sector east of reference", 224 default_distance_m=10000, 225 sector_angle_degrees=90, 226 direction_angle_degrees=90, 227 ) 228 ) 229 230 self.register_relation( 231 RelationConfig( 232 name="west_of", 233 category="directional", 234 description="Directional sector west of reference", 235 default_distance_m=10000, 236 sector_angle_degrees=90, 237 direction_angle_degrees=270, 238 ) 239 ) 240 241 # ===== DIAGONAL DIRECTIONAL RELATIONS ===== 242 self.register_relation( 243 RelationConfig( 244 name="northeast_of", 245 category="directional", 246 description="Directional sector northeast of reference", 247 default_distance_m=10000, 248 sector_angle_degrees=90, 249 direction_angle_degrees=45, 250 ) 251 ) 252 253 self.register_relation( 254 RelationConfig( 255 name="southeast_of", 256 category="directional", 257 description="Directional sector southeast of reference", 258 default_distance_m=10000, 259 sector_angle_degrees=90, 260 direction_angle_degrees=135, 261 ) 262 ) 263 264 self.register_relation( 265 RelationConfig( 266 name="southwest_of", 267 category="directional", 268 description="Directional sector southwest of reference", 269 default_distance_m=10000, 270 sector_angle_degrees=90, 271 direction_angle_degrees=225, 272 ) 273 ) 274 275 self.register_relation( 276 RelationConfig( 277 name="northwest_of", 278 category="directional", 279 description="Directional sector northwest of reference", 280 default_distance_m=10000, 281 sector_angle_degrees=90, 282 direction_angle_degrees=315, 283 ) 284 ) 285 286 def register_relation(self, config: RelationConfig) -> None: 287 """Register a new spatial relation.""" 288 self.relations[config.name] = config 289 290 def has_relation(self, name: str) -> bool: 291 """Check if a relation is registered.""" 292 return name in self.relations 293 294 def get_config(self, name: str) -> RelationConfig: 295 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 296 if not self.has_relation(name): 297 raise UnknownRelationError( 298 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 299 relation_name=name, 300 ) 301 return self.relations[name] 302 303 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 304 """List available relation names.""" 305 if category is None: 306 return sorted(self.relations.keys()) 307 return sorted(r.name for r in self.relations.values() if r.category == category) 308 309 def format_for_prompt(self) -> str: 310 """Format relations for inclusion in LLM prompt.""" 311 lines = [] 312 313 # Group by category 314 for category in get_args(RelationCategory): 315 category_relations = [r for r in self.relations.values() if r.category == category] 316 if not category_relations: 317 continue 318 319 lines.append(f"\n{category.upper()} RELATIONS:") 320 321 for rel in sorted(category_relations, key=lambda r: r.name): 322 # Build distance info 323 dist_info = "" 324 if rel.default_distance_m is not None: 325 dist_str = f"{abs(rel.default_distance_m)}m" 326 if rel.default_distance_m < 0: 327 dist_info = f" (default: {dist_str} erosion)" 328 else: 329 dist_info = f" (default: {dist_str})" 330 331 # Build special flags 332 flags = [] 333 if rel.ring_only: 334 flags.append("ring buffer") 335 if rel.buffer_from: 336 flags.append(f"from {rel.buffer_from}") 337 if rel.side: 338 flags.append(f"{rel.side} side only") 339 flag_info = f" [{', '.join(flags)}]" if flags else "" 340 341 # Format line 342 lines.append(f" • {rel.name}{dist_info}{flag_info}") 343 lines.append(f" {rel.description}") 344 345 # Add notes 346 lines.append("\nNOTES:") 347 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 348 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)") 349 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 350 lines.append(" • Clipping relations return a sub-area of the reference geometry (not a buffer outward)") 351 352 return "\n".join(lines)
Registry and configuration for spatial relations.
Manages built-in and custom spatial relations with their default parameters.
48 def __init__(self): 49 """Initialize with built-in spatial relations.""" 50 self.relations: dict[str, RelationConfig] = {} 51 self._initialize_defaults()
Initialize with built-in spatial relations.
286 def register_relation(self, config: RelationConfig) -> None: 287 """Register a new spatial relation.""" 288 self.relations[config.name] = config
Register a new spatial relation.
290 def has_relation(self, name: str) -> bool: 291 """Check if a relation is registered.""" 292 return name in self.relations
Check if a relation is registered.
294 def get_config(self, name: str) -> RelationConfig: 295 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 296 if not self.has_relation(name): 297 raise UnknownRelationError( 298 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 299 relation_name=name, 300 ) 301 return self.relations[name]
Get configuration for a relation. Raises UnknownRelationError if not found.
303 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 304 """List available relation names.""" 305 if category is None: 306 return sorted(self.relations.keys()) 307 return sorted(r.name for r in self.relations.values() if r.category == category)
List available relation names.
309 def format_for_prompt(self) -> str: 310 """Format relations for inclusion in LLM prompt.""" 311 lines = [] 312 313 # Group by category 314 for category in get_args(RelationCategory): 315 category_relations = [r for r in self.relations.values() if r.category == category] 316 if not category_relations: 317 continue 318 319 lines.append(f"\n{category.upper()} RELATIONS:") 320 321 for rel in sorted(category_relations, key=lambda r: r.name): 322 # Build distance info 323 dist_info = "" 324 if rel.default_distance_m is not None: 325 dist_str = f"{abs(rel.default_distance_m)}m" 326 if rel.default_distance_m < 0: 327 dist_info = f" (default: {dist_str} erosion)" 328 else: 329 dist_info = f" (default: {dist_str})" 330 331 # Build special flags 332 flags = [] 333 if rel.ring_only: 334 flags.append("ring buffer") 335 if rel.buffer_from: 336 flags.append(f"from {rel.buffer_from}") 337 if rel.side: 338 flags.append(f"{rel.side} side only") 339 flag_info = f" [{', '.join(flags)}]" if flags else "" 340 341 # Format line 342 lines.append(f" • {rel.name}{dist_info}{flag_info}") 343 lines.append(f" {rel.description}") 344 345 # Add notes 346 lines.append("\nNOTES:") 347 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 348 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)") 349 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 350 lines.append(" • Clipping relations return a sub-area of the reference geometry (not a buffer outward)") 351 352 return "\n".join(lines)
Format relations for inclusion in LLM prompt.
13@dataclass 14class RelationConfig: 15 """ 16 Configuration for a single spatial relation. 17 18 Attributes: 19 name: Relation identifier (e.g., "in", "near", "north_of") 20 category: Type of spatial operation 21 description: Human-readable description for LLM prompt 22 default_distance_m: Default buffer distance in meters 23 buffer_from: Buffer origin 24 ring_only: Exclude reference feature to create ring buffer 25 sector_angle_degrees: Angular sector for directional queries 26 direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise) 27 """ 28 29 name: str 30 category: RelationCategory 31 description: str 32 default_distance_m: float | None = None 33 buffer_from: Literal["center", "boundary"] | None = None 34 ring_only: bool = False 35 side: Literal["left", "right"] | None = None 36 sector_angle_degrees: float | None = None 37 direction_angle_degrees: float | None = None 38 clip_direction: Literal["north", "south", "east", "west"] | None = None
Configuration for a single spatial relation.
Attributes:
- name: Relation identifier (e.g., "in", "near", "north_of")
- category: Type of spatial operation
- description: Human-readable description for LLM prompt
- default_distance_m: Default buffer distance in meters
- buffer_from: Buffer origin
- ring_only: Exclude reference feature to create ring buffer
- sector_angle_degrees: Angular sector for directional queries
- direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
Base exception for all GeoFilter errors.
13class ParsingError(GeoFilterError): 14 """LLM failed to parse query into valid structure.""" 15 16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
LLM failed to parse query into valid structure.
16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
Initialize parsing error.
Arguments:
- message: Error description
- raw_response: Raw response from LLM
- original_error: Original exception that caused parsing failure
30class ValidationError(GeoFilterError): 31 """Structured output is valid but fails business logic validation.""" 32 33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Structured output is valid but fails business logic validation.
33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Initialize validation error.
Arguments:
- message: Error description
- field: Field name that failed validation
- detail: Additional detail about the validation failure
47class NoReferenceLocationError(ValidationError): 48 """Query contains no named geographic reference location.""" 49 50 def __init__(self, message: str): 51 super().__init__(message, field="reference_location")
Query contains no named geographic reference location.
54class UnknownRelationError(ValidationError): 55 """Spatial relation is not registered in configuration.""" 56 57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Spatial relation is not registered in configuration.
57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Initialize unknown relation error.
Arguments:
- message: Error description
- relation_name: The unknown relation name
69class LowConfidenceError(GeoFilterError): 70 """Query confidence is below threshold (strict mode).""" 71 72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Query confidence is below threshold (strict mode).
72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Initialize low confidence error.
Arguments:
- message: Error description
- confidence: Confidence score (0-1)
- reasoning: Optional explanation for low confidence
86class LowConfidenceWarning(UserWarning): 87 """Query confidence is below threshold (permissive mode).""" 88 89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Query confidence is below threshold (permissive mode).
89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Initialize low confidence warning.
Arguments:
- confidence: Confidence score (0-1)
- message: Warning message
14class GeoDataSource(Protocol): 15 """ 16 Protocol for geographic data sources. 17 18 Implementations resolve location names to geographic features. 19 Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326). 20 21 Example of returned feature: 22 { 23 "type": "Feature", 24 "id": "uuid-123", 25 "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, 26 "bbox": [8.4, 47.3, 8.6, 47.4], 27 "properties": { 28 "name": "Zürich", 29 "type": "city", 30 "confidence": 1.0, 31 ... 32 } 33 } 34 """ 35 36 def search( 37 self, 38 name: str, 39 type: str | None = None, 40 max_results: int = 10, 41 ) -> list[Feature]: 42 """ 43 Search for geographic features by name. 44 45 Args: 46 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 47 type: Optional type hint for filtering/ranking results. 48 Examples: "lake", "city", "mountain", "canton", "river". 49 When provided, matching types are ranked higher. 50 max_results: Maximum number of results to return. 51 52 Returns: 53 List of matching GeoJSON Feature dicts, ranked by relevance. 54 Returns empty list if no matches found. 55 """ 56 ... 57 58 def get_by_id(self, feature_id: str) -> Feature | None: 59 """ 60 Get a specific feature by its unique identifier. 61 62 Args: 63 feature_id: Unique identifier from the data source. 64 65 Returns: 66 The matching GeoJSON Feature dict, or None if not found. 67 """ 68 ... 69 70 def get_available_types(self) -> list[str]: 71 """ 72 Get list of concrete geographic types this datasource can return. 73 74 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 75 that this datasource uses in the "type" property of returned features. 76 These types can be matched against the location type hierarchy for fuzzy matching. 77 78 The returned types should be a subset of or mapped to the standard location 79 type hierarchy defined in location_types.TYPE_HIERARCHY. 80 81 Returns: 82 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 83 Empty list if this datasource does not provide type information. 84 85 Example: 86 >>> source = SwissNames3DSource("data/") 87 >>> types = source.get_available_types() 88 >>> print(types) 89 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 90 """ 91 ...
Protocol for geographic data sources.
Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
Example of returned feature:
{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }
1965def _no_init_or_replace_init(self, *args, **kwargs): 1966 cls = type(self) 1967 1968 if cls._is_protocol: 1969 raise TypeError('Protocols cannot be instantiated') 1970 1971 # Already using a custom `__init__`. No need to calculate correct 1972 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1973 if cls.__init__ is not _no_init_or_replace_init: 1974 return 1975 1976 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1977 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1978 # searches for a proper new `__init__` in the MRO. The new `__init__` 1979 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1980 # instantiation of the protocol subclass will thus use the new 1981 # `__init__` and no longer call `_no_init_or_replace_init`. 1982 for base in cls.__mro__: 1983 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1984 if init is not _no_init_or_replace_init: 1985 cls.__init__ = init 1986 break 1987 else: 1988 # should not happen 1989 cls.__init__ = object.__init__ 1990 1991 cls.__init__(self, *args, **kwargs)
36 def search( 37 self, 38 name: str, 39 type: str | None = None, 40 max_results: int = 10, 41 ) -> list[Feature]: 42 """ 43 Search for geographic features by name. 44 45 Args: 46 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 47 type: Optional type hint for filtering/ranking results. 48 Examples: "lake", "city", "mountain", "canton", "river". 49 When provided, matching types are ranked higher. 50 max_results: Maximum number of results to return. 51 52 Returns: 53 List of matching GeoJSON Feature dicts, ranked by relevance. 54 Returns empty list if no matches found. 55 """ 56 ...
Search for geographic features by name.
Arguments:
- name: Location name to search for (e.g., "Lake Geneva", "Bern").
- type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.
58 def get_by_id(self, feature_id: str) -> Feature | None: 59 """ 60 Get a specific feature by its unique identifier. 61 62 Args: 63 feature_id: Unique identifier from the data source. 64 65 Returns: 66 The matching GeoJSON Feature dict, or None if not found. 67 """ 68 ...
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier from the data source.
Returns:
The matching GeoJSON Feature dict, or None if not found.
70 def get_available_types(self) -> list[str]: 71 """ 72 Get list of concrete geographic types this datasource can return. 73 74 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 75 that this datasource uses in the "type" property of returned features. 76 These types can be matched against the location type hierarchy for fuzzy matching. 77 78 The returned types should be a subset of or mapped to the standard location 79 type hierarchy defined in location_types.TYPE_HIERARCHY. 80 81 Returns: 82 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 83 Empty list if this datasource does not provide type information. 84 85 Example: 86 >>> source = SwissNames3DSource("data/") 87 >>> types = source.get_available_types() 88 >>> print(types) 89 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 90 """ 91 ...
Get list of concrete geographic types this datasource can return.
Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.
The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.
Returns:
List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.
Example:
>>> source = SwissNames3DSource("data/") >>> types = source.get_available_types() >>> print(types) ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
158class SwissNames3DSource: 159 """ 160 Geographic data source backed by swisstopo's swissNAMES3D dataset. 161 162 Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase 163 and provides search by name with optional type filtering. 164 165 If data_path is a directory, automatically loads and concatenates all SwissNames3D 166 shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within. 167 168 All geometries are returned as GeoJSON in WGS84 (EPSG:4326). 169 170 Args: 171 data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles. 172 layer: Layer name within the data source (for multi-layer formats like GDB). 173 174 Example: 175 >>> source = SwissNames3DSource("data/") # Load all 3 geometry types 176 >>> results = source.search("Lac Léman", type="lake") 177 >>> print(results[0].geometry) # GeoJSON in WGS84 178 """ 179 180 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 181 self._data_path = Path(data_path) 182 self._layer = layer 183 self._gdf: gpd.GeoDataFrame | None = None 184 self._name_index: dict[str, list[int]] = {} 185 self._token_index: dict[str, set[str]] = {} 186 self._name_col: str = "" 187 self._type_col: str | None = None 188 self._id_col: str | None = None 189 self._extra_cols: list[str] = [] 190 191 def preload(self) -> None: 192 """Eagerly load data. Call at startup to avoid first-query latency.""" 193 self._ensure_loaded() 194 195 def _ensure_loaded(self) -> None: 196 """Load data lazily on first access.""" 197 if self._gdf is not None: 198 return 199 self._load_data() 200 201 def _load_data(self) -> None: 202 """Load SwissNames3D data and build the name index.""" 203 if self._data_path.is_dir(): 204 self._load_from_directory() 205 else: 206 kwargs: dict[str, Any] = {} 207 if self._layer is not None: 208 kwargs["layer"] = self._layer 209 self._gdf = gpd.read_file(str(self._data_path), **kwargs) 210 211 assert self._gdf is not None 212 213 # Drop Z coordinates once — vectorized; the source has LN02 height and 214 # single_sided buffers reject 3D geometries 215 self._gdf.geometry = force_2d(self._gdf.geometry.values) 216 217 # Reproject to WGS84 once — avoids per-query coordinate transform 218 self._gdf = self._gdf.to_crs("EPSG:4326") 219 220 # Cache column names once — reused on every _row_to_feature() call 221 self._name_col = self._detect_name_column() 222 self._type_col = self._detect_type_column() 223 self._id_col = self._detect_id_column() 224 skip = {self._name_col, "geometry"} 225 if self._type_col: 226 skip.add(self._type_col) 227 if self._id_col: 228 skip.add(self._id_col) 229 self._extra_cols = [c for c in self._gdf.columns if c not in skip] 230 231 self._build_name_index() 232 233 def _load_from_directory(self) -> None: 234 """Load and concatenate all SwissNames3D shapefiles from a directory.""" 235 # Look for the 3 standard SwissNames3D shapefiles 236 shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"] 237 gdfs: list[gpd.GeoDataFrame] = [] 238 239 for name in shapefile_names: 240 shp_path = self._data_path / f"{name}.shp" 241 if shp_path.exists(): 242 gdf = gpd.read_file(str(shp_path)) 243 gdfs.append(gdf) 244 245 if not gdfs: 246 raise ValueError( 247 f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}" 248 ) 249 250 # Find common columns across all loaded GeoDataFrames 251 common_cols = set(gdfs[0].columns) 252 for gdf in gdfs[1:]: 253 common_cols &= set(gdf.columns) 254 255 # Keep only common columns and concatenate 256 gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs] 257 self._gdf = gpd.GeoDataFrame(pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry") 258 259 def _build_name_index(self) -> None: 260 """Build normalized name → row indices and token → candidate names indexes.""" 261 assert self._gdf is not None 262 self._name_index = {} 263 self._token_index = {} 264 265 for idx, name in enumerate(self._gdf[self._name_col]): 266 if not isinstance(name, str) or not name.strip(): 267 continue 268 normalized = _normalize_name(name) 269 if normalized not in self._name_index: 270 self._name_index[normalized] = [] 271 self._name_index[normalized].append(idx) 272 for token in normalized.split(): 273 if token not in self._token_index: 274 self._token_index[token] = set() 275 self._token_index[token].add(normalized) 276 277 def _detect_name_column(self) -> str: 278 """Detect the name column in the data.""" 279 assert self._gdf is not None 280 for col in self._gdf.columns: 281 if col.upper() in ("NAME", "BEZEICHNUNG"): 282 return col 283 raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}") 284 285 def _detect_type_column(self) -> str | None: 286 """Detect the feature type column in the data.""" 287 assert self._gdf is not None 288 for col in self._gdf.columns: 289 if col.upper() == "OBJEKTART": 290 return col 291 return None 292 293 def _detect_id_column(self) -> str | None: 294 """Detect the unique ID column in the data.""" 295 assert self._gdf is not None 296 for candidate in ("UUID", "FID", "OBJECTID", "ID"): 297 for col in self._gdf.columns: 298 if col.upper() == candidate: 299 return col 300 return None 301 302 def _row_to_feature(self, idx: int) -> Feature: 303 """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry.""" 304 assert self._gdf is not None 305 row = self._gdf.iloc[idx] 306 307 name = str(row[self._name_col]) 308 309 raw_type = str(row[self._type_col]) if self._type_col and row.get(self._type_col) else "unknown" 310 normalized_type = _objektart_to_type(raw_type) 311 312 feature_id = str(row[self._id_col]) if self._id_col and row.get(self._id_col) else str(idx) 313 314 # Geometry is already in WGS84 (2D) — pre-converted at load time 315 geom = row.geometry 316 if geom is None or geom.is_empty: 317 geometry = {"type": "Point", "coordinates": [0, 0]} 318 bbox = None 319 else: 320 geometry = mapping(geom) 321 bounds = geom.bounds 322 bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) 323 324 properties: dict[str, Any] = { 325 "name": name, 326 "type": normalized_type, 327 "confidence": 1.0, 328 } 329 for col in self._extra_cols: 330 val = row.get(col) 331 if val is not None and str(val) != "nan": 332 properties[col] = val 333 334 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 335 336 def search( 337 self, 338 name: str, 339 type: str | None = None, 340 max_results: int = 10, 341 ) -> list[Feature]: 342 """ 343 Search for geographic features by name. 344 345 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 346 First tries exact matching, then falls back to fuzzy matching if no exact 347 matches found. 348 349 Args: 350 name: Location name to search for. 351 type: Optional type hint to filter results. If provided, only features 352 of this type are returned. 353 max_results: Maximum number of results to return. 354 355 Returns: 356 List of matching GeoJSON Feature dicts. If type is provided, only 357 features of that type are returned. Empty list if no matches found. 358 """ 359 self._ensure_loaded() 360 361 normalized = _normalize_name(name) 362 indices = self._name_index.get(normalized, []) 363 364 # If no exact match, try fuzzy matching 365 if not indices: 366 indices = self._fuzzy_search(normalized) 367 368 features = [self._row_to_feature(idx) for idx in indices] 369 370 # Filter by type if type hint provided. 371 # Expand via the type hierarchy so that category hints (e.g. "water") match 372 # all concrete types within that category ("lake", "river", "pond", ...). 373 if type is not None: 374 matching_types = get_matching_types(type) 375 if matching_types: 376 features = [f for f in features if f["properties"].get("type") in matching_types] 377 else: 378 # Unknown type hint, fall back to exact string match 379 features = [f for f in features if f["properties"].get("type") == type.lower()] 380 381 return features[:max_results] 382 383 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 384 return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold) 385 386 def get_by_id(self, feature_id: str) -> Feature | None: 387 """ 388 Get a specific feature by its unique identifier. 389 390 Args: 391 feature_id: Unique identifier (UUID or row index). 392 393 Returns: 394 The matching GeoJSON Feature dict, or None if not found. 395 """ 396 self._ensure_loaded() 397 assert self._gdf is not None 398 399 if self._id_col: 400 matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id] 401 if not matches.empty: 402 return self._row_to_feature(matches.index[0]) 403 404 # Fallback: try as row index 405 try: 406 idx = int(feature_id) 407 if 0 <= idx < len(self._gdf): 408 return self._row_to_feature(idx) 409 except ValueError: 410 pass 411 412 return None 413 414 def get_available_types(self) -> list[str]: 415 """ 416 Get list of concrete geographic types this datasource can return. 417 418 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 419 representing all possible types that SwissNames3D data can be classified as. 420 421 Returns: 422 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 423 """ 424 return sorted(OBJEKTART_TYPE_MAP.keys())
Geographic data source backed by swisstopo's swissNAMES3D dataset.
Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.
If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
Arguments:
- data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
- layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/") # Load all 3 geometry types >>> results = source.search("Lac Léman", type="lake") >>> print(results[0].geometry) # GeoJSON in WGS84
180 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 181 self._data_path = Path(data_path) 182 self._layer = layer 183 self._gdf: gpd.GeoDataFrame | None = None 184 self._name_index: dict[str, list[int]] = {} 185 self._token_index: dict[str, set[str]] = {} 186 self._name_col: str = "" 187 self._type_col: str | None = None 188 self._id_col: str | None = None 189 self._extra_cols: list[str] = []
191 def preload(self) -> None: 192 """Eagerly load data. Call at startup to avoid first-query latency.""" 193 self._ensure_loaded()
Eagerly load data. Call at startup to avoid first-query latency.
336 def search( 337 self, 338 name: str, 339 type: str | None = None, 340 max_results: int = 10, 341 ) -> list[Feature]: 342 """ 343 Search for geographic features by name. 344 345 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 346 First tries exact matching, then falls back to fuzzy matching if no exact 347 matches found. 348 349 Args: 350 name: Location name to search for. 351 type: Optional type hint to filter results. If provided, only features 352 of this type are returned. 353 max_results: Maximum number of results to return. 354 355 Returns: 356 List of matching GeoJSON Feature dicts. If type is provided, only 357 features of that type are returned. Empty list if no matches found. 358 """ 359 self._ensure_loaded() 360 361 normalized = _normalize_name(name) 362 indices = self._name_index.get(normalized, []) 363 364 # If no exact match, try fuzzy matching 365 if not indices: 366 indices = self._fuzzy_search(normalized) 367 368 features = [self._row_to_feature(idx) for idx in indices] 369 370 # Filter by type if type hint provided. 371 # Expand via the type hierarchy so that category hints (e.g. "water") match 372 # all concrete types within that category ("lake", "river", "pond", ...). 373 if type is not None: 374 matching_types = get_matching_types(type) 375 if matching_types: 376 features = [f for f in features if f["properties"].get("type") in matching_types] 377 else: 378 # Unknown type hint, fall back to exact string match 379 features = [f for f in features if f["properties"].get("type") == type.lower()] 380 381 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.
Arguments:
- name: Location name to search for.
- type: Optional type hint to filter results. If provided, only features of this type are returned.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.
386 def get_by_id(self, feature_id: str) -> Feature | None: 387 """ 388 Get a specific feature by its unique identifier. 389 390 Args: 391 feature_id: Unique identifier (UUID or row index). 392 393 Returns: 394 The matching GeoJSON Feature dict, or None if not found. 395 """ 396 self._ensure_loaded() 397 assert self._gdf is not None 398 399 if self._id_col: 400 matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id] 401 if not matches.empty: 402 return self._row_to_feature(matches.index[0]) 403 404 # Fallback: try as row index 405 try: 406 idx = int(feature_id) 407 if 0 <= idx < len(self._gdf): 408 return self._row_to_feature(idx) 409 except ValueError: 410 pass 411 412 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier (UUID or row index).
Returns:
The matching GeoJSON Feature dict, or None if not found.
414 def get_available_types(self) -> list[str]: 415 """ 416 Get list of concrete geographic types this datasource can return. 417 418 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 419 representing all possible types that SwissNames3D data can be classified as. 420 421 Returns: 422 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 423 """ 424 return sorted(OBJEKTART_TYPE_MAP.keys())
Get list of concrete geographic types this datasource can return.
Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.
Returns:
Sorted list of type strings (e.g., ["lake", "city", "river", ...])
67class SwissBoundaries3DSource: 68 """ 69 Geographic data source backed by swisstopo's swissBOUNDARIES3D dataset. 70 71 Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase 72 and provides search by name with optional type filtering. 73 74 If data_path is a directory, automatically loads and concatenates all swissBoundaries3D 75 shapefiles (swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET, swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET, swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET) found within. 76 77 IMPORTANT: 78 The swissBOUNDARIES3D_1_5_TLM_LANDESGEBIET is NOT read because it contains enclaves of Germany which are not relevant for Swiss geographic names. 79 The swissBOUNDARIES3D_1_5_TLM_HOHEITSGRENZE is NOT read because it contains lines already in swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET. 80 All geometries are returned as GeoJSON in WGS84 (EPSG:4326). 81 82 Args: 83 data_path: Path to swissBoundaries3D data file or directory containing swissBoundaries3D shapefiles. 84 layer: Layer name within the data source (for multi-layer formats like GDB). 85 86 Example: 87 >>> source = SwissBoundaries3DSource("data/") # Load all 3 geometry types 88 >>> results = source.search("Bern", type="canton") 89 >>> print(results[0].geometry) # GeoJSON in WGS84 90 """ 91 92 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 93 self._data_path = Path(data_path) 94 self._layer = layer 95 self._gdf: gpd.GeoDataFrame | None = None 96 self._name_index: dict[str, list[int]] = {} 97 self._token_index: dict[str, set[str]] = {} 98 self._name_col: str = "" 99 self._type_col: str | None = None 100 self._id_col: str | None = None 101 self._extra_cols: list[str] = [] 102 103 def preload(self) -> None: 104 """Eagerly load data. Call at startup to avoid first-query latency.""" 105 self._ensure_loaded() 106 107 def _ensure_loaded(self) -> None: 108 """Load data lazily on first access.""" 109 if self._gdf is not None: 110 return 111 self._load_data() 112 113 def _load_data(self) -> None: 114 """Load swissBoundaries3D data and build the name index.""" 115 if self._data_path.is_dir(): 116 self._load_from_directory() 117 else: 118 kwargs: dict[str, Any] = {} 119 if self._layer is not None: 120 kwargs["layer"] = self._layer 121 self._gdf = gpd.read_file(str(self._data_path), **kwargs) 122 123 assert self._gdf is not None 124 125 # Drop Z coordinates once — vectorized; the source has LN02 height and 126 # single_sided buffers reject 3D geometries 127 self._gdf.geometry = force_2d(self._gdf.geometry.values) 128 129 # Reproject to WGS84 once — avoids per-query coordinate transform 130 self._gdf = self._gdf.to_crs("EPSG:4326") 131 132 # Cache column names once — reused on every _row_to_feature() call 133 self._name_col = self._detect_name_column() 134 self._type_col = self._detect_type_column() 135 self._id_col = self._detect_id_column() 136 skip = {self._name_col, "geometry"} 137 if self._type_col: 138 skip.add(self._type_col) 139 if self._id_col: 140 skip.add(self._id_col) 141 self._extra_cols = [c for c in self._gdf.columns if c not in skip] 142 143 self._build_name_index() 144 145 def _load_from_directory(self) -> None: 146 """Load and concatenate all swissBoundaries3D shapefiles from a directory.""" 147 # Look for the 3 standard swissBoundaries3D shapefiles 148 shapefile_names = [ 149 "swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET", 150 "swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET", 151 "swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET", 152 ] 153 gdfs: list[gpd.GeoDataFrame] = [] 154 155 for name in shapefile_names: 156 shp_path = self._data_path / f"{name}.shp" 157 if shp_path.exists(): 158 gdf = gpd.read_file(str(shp_path)) 159 gdfs.append(gdf) 160 161 if not gdfs: 162 raise ValueError( 163 f"No swissBoundaries3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}" 164 ) 165 166 # Find common columns across all loaded GeoDataFrames 167 common_cols = set(gdfs[0].columns) 168 for gdf in gdfs[1:]: 169 common_cols &= set(gdf.columns) 170 171 # Keep only common columns and concatenate 172 gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs] 173 self._gdf = gpd.GeoDataFrame(pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry") 174 175 def _build_name_index(self) -> None: 176 """Build normalized name → row indices and token → candidate names indexes.""" 177 assert self._gdf is not None 178 self._name_index = {} 179 self._token_index = {} 180 181 for idx, name in enumerate(self._gdf[self._name_col]): 182 if not isinstance(name, str) or not name.strip(): 183 continue 184 normalized = _normalize_name(name) 185 if normalized not in self._name_index: 186 self._name_index[normalized] = [] 187 self._name_index[normalized].append(idx) 188 for token in normalized.split(): 189 if token not in self._token_index: 190 self._token_index[token] = set() 191 self._token_index[token].add(normalized) 192 193 def _detect_name_column(self) -> str: 194 """Detect the name column in the data.""" 195 assert self._gdf is not None 196 for col in self._gdf.columns: 197 if col.upper() in ("NAME", "BEZEICHNUNG"): 198 return col 199 raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}") 200 201 def _detect_type_column(self) -> str | None: 202 """Detect the feature type column in the data.""" 203 assert self._gdf is not None 204 for col in self._gdf.columns: 205 if col.upper() == "OBJEKTART": 206 return col 207 return None 208 209 def _detect_id_column(self) -> str | None: 210 """Detect the unique ID column in the data.""" 211 assert self._gdf is not None 212 for candidate in ("UUID", "FID", "OBJECTID", "ID"): 213 for col in self._gdf.columns: 214 if col.upper() == candidate: 215 return col 216 return None 217 218 def _row_to_feature(self, idx: int) -> Feature: 219 """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry.""" 220 assert self._gdf is not None 221 row = self._gdf.iloc[idx] 222 223 name = str(row[self._name_col]) 224 225 raw_type = str(row[self._type_col]) if self._type_col and row.get(self._type_col) else "unknown" 226 normalized_type = _objektart_to_type(raw_type) 227 228 feature_id = str(row[self._id_col]) if self._id_col and row.get(self._id_col) else str(idx) 229 230 # Geometry is already in WGS84 (2D) — pre-converted at load time 231 geom = row.geometry 232 if geom is None or geom.is_empty: 233 geometry = {"type": "Point", "coordinates": [0, 0]} 234 bbox = None 235 else: 236 geometry = mapping(geom) 237 bounds = geom.bounds 238 bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) 239 240 properties: dict[str, Any] = { 241 "name": name, 242 "type": normalized_type, 243 "confidence": 1.0, 244 } 245 for col in self._extra_cols: 246 val = row.get(col) 247 if val is not None and str(val) != "nan": 248 properties[col] = val 249 250 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 251 252 def search( 253 self, 254 name: str, 255 type: str | None = None, 256 max_results: int = 10, 257 ) -> list[Feature]: 258 """ 259 Search for geographic features by name. 260 261 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 262 First tries exact matching, then falls back to fuzzy matching if no exact 263 matches found. 264 265 Args: 266 name: Location name to search for. 267 type: Optional type hint to filter results. If provided, only features 268 of this type are returned. 269 max_results: Maximum number of results to return. 270 271 Returns: 272 List of matching GeoJSON Feature dicts. If type is provided, only 273 features of that type are returned. Empty list if no matches found. 274 """ 275 self._ensure_loaded() 276 277 normalized = _normalize_name(name) 278 indices = self._name_index.get(normalized, []) 279 280 # If no exact match, try fuzzy matching 281 if not indices: 282 indices = self._fuzzy_search(normalized) 283 284 features = [self._row_to_feature(idx) for idx in indices] 285 286 # Filter by type if type hint provided. 287 # Expand via the type hierarchy so that category hints (e.g. "water") match 288 # all concrete types within that category ("lake", "river", "pond", ...). 289 if type is not None: 290 matching_types = get_matching_types(type) 291 if matching_types: 292 features = [f for f in features if f["properties"].get("type") in matching_types] 293 else: 294 # Unknown type hint, fall back to exact string match 295 features = [f for f in features if f["properties"].get("type") == type.lower()] 296 297 return features[:max_results] 298 299 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 300 return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold) 301 302 def get_by_id(self, feature_id: str) -> Feature | None: 303 """ 304 Get a specific feature by its unique identifier. 305 306 Args: 307 feature_id: Unique identifier (UUID or row index). 308 309 Returns: 310 The matching GeoJSON Feature dict, or None if not found. 311 """ 312 self._ensure_loaded() 313 assert self._gdf is not None 314 315 if self._id_col: 316 matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id] 317 if not matches.empty: 318 return self._row_to_feature(matches.index[0]) 319 320 # Fallback: try as row index 321 try: 322 idx = int(feature_id) 323 if 0 <= idx < len(self._gdf): 324 return self._row_to_feature(idx) 325 except ValueError: 326 pass 327 328 return None 329 330 def get_available_types(self) -> list[str]: 331 """ 332 Get list of concrete geographic types this datasource can return. 333 334 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 335 representing all possible types that swissBoundaries3D data can be classified as. 336 337 Returns: 338 Sorted list of type strings (e.g., ["canton", "municipality", "district", ...]) 339 """ 340 return sorted(OBJEKTART_TYPE_MAP.keys())
Geographic data source backed by swisstopo's swissBOUNDARIES3D dataset.
Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.
If data_path is a directory, automatically loads and concatenates all swissBoundaries3D shapefiles (swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET, swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET, swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET) found within.
IMPORTANT:
The swissBOUNDARIES3D_1_5_TLM_LANDESGEBIET is NOT read because it contains enclaves of Germany which are not relevant for Swiss geographic names. The swissBOUNDARIES3D_1_5_TLM_HOHEITSGRENZE is NOT read because it contains lines already in swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET.
All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
Arguments:
- data_path: Path to swissBoundaries3D data file or directory containing swissBoundaries3D shapefiles.
- layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissBoundaries3DSource("data/") # Load all 3 geometry types >>> results = source.search("Bern", type="canton") >>> print(results[0].geometry) # GeoJSON in WGS84
92 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 93 self._data_path = Path(data_path) 94 self._layer = layer 95 self._gdf: gpd.GeoDataFrame | None = None 96 self._name_index: dict[str, list[int]] = {} 97 self._token_index: dict[str, set[str]] = {} 98 self._name_col: str = "" 99 self._type_col: str | None = None 100 self._id_col: str | None = None 101 self._extra_cols: list[str] = []
103 def preload(self) -> None: 104 """Eagerly load data. Call at startup to avoid first-query latency.""" 105 self._ensure_loaded()
Eagerly load data. Call at startup to avoid first-query latency.
252 def search( 253 self, 254 name: str, 255 type: str | None = None, 256 max_results: int = 10, 257 ) -> list[Feature]: 258 """ 259 Search for geographic features by name. 260 261 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 262 First tries exact matching, then falls back to fuzzy matching if no exact 263 matches found. 264 265 Args: 266 name: Location name to search for. 267 type: Optional type hint to filter results. If provided, only features 268 of this type are returned. 269 max_results: Maximum number of results to return. 270 271 Returns: 272 List of matching GeoJSON Feature dicts. If type is provided, only 273 features of that type are returned. Empty list if no matches found. 274 """ 275 self._ensure_loaded() 276 277 normalized = _normalize_name(name) 278 indices = self._name_index.get(normalized, []) 279 280 # If no exact match, try fuzzy matching 281 if not indices: 282 indices = self._fuzzy_search(normalized) 283 284 features = [self._row_to_feature(idx) for idx in indices] 285 286 # Filter by type if type hint provided. 287 # Expand via the type hierarchy so that category hints (e.g. "water") match 288 # all concrete types within that category ("lake", "river", "pond", ...). 289 if type is not None: 290 matching_types = get_matching_types(type) 291 if matching_types: 292 features = [f for f in features if f["properties"].get("type") in matching_types] 293 else: 294 # Unknown type hint, fall back to exact string match 295 features = [f for f in features if f["properties"].get("type") == type.lower()] 296 297 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.
Arguments:
- name: Location name to search for.
- type: Optional type hint to filter results. If provided, only features of this type are returned.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.
302 def get_by_id(self, feature_id: str) -> Feature | None: 303 """ 304 Get a specific feature by its unique identifier. 305 306 Args: 307 feature_id: Unique identifier (UUID or row index). 308 309 Returns: 310 The matching GeoJSON Feature dict, or None if not found. 311 """ 312 self._ensure_loaded() 313 assert self._gdf is not None 314 315 if self._id_col: 316 matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id] 317 if not matches.empty: 318 return self._row_to_feature(matches.index[0]) 319 320 # Fallback: try as row index 321 try: 322 idx = int(feature_id) 323 if 0 <= idx < len(self._gdf): 324 return self._row_to_feature(idx) 325 except ValueError: 326 pass 327 328 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier (UUID or row index).
Returns:
The matching GeoJSON Feature dict, or None if not found.
330 def get_available_types(self) -> list[str]: 331 """ 332 Get list of concrete geographic types this datasource can return. 333 334 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 335 representing all possible types that swissBoundaries3D data can be classified as. 336 337 Returns: 338 Sorted list of type strings (e.g., ["canton", "municipality", "district", ...]) 339 """ 340 return sorted(OBJEKTART_TYPE_MAP.keys())
Get list of concrete geographic types this datasource can return.
Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that swissBoundaries3D data can be classified as.
Returns:
Sorted list of type strings (e.g., ["canton", "municipality", "district", ...])
264class IGNBDCartoSource: 265 """ 266 Geographic data source backed by IGN's BD-CARTO 5.0 dataset. 267 268 Loads French geographic data from GeoPackage files extracted to a directory. 269 Supports administrative boundaries (communes, departments, regions, …), 270 hydrography (rivers, lakes, …), named places (quarters, hamlets, …), 271 orographic features (peaks, passes, valleys, …) and protected areas. 272 273 Data must first be downloaded with ``make download-data-ign``, which places 274 the GeoPackage files in ``data/bdcarto/``. 275 276 All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 277 (EPSG:4326) and returned as standard GeoJSON Feature dicts. 278 279 Args: 280 data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``). 281 282 Example: 283 >>> source = IGNBDCartoSource("data/bdcarto") 284 >>> results = source.search("Ardèche", type="department") 285 >>> results = source.search("Lyon", type="city") 286 >>> results = source.search("Rhône", type="river") 287 """ 288 289 def __init__(self, data_path: str | Path) -> None: 290 self._data_path = Path(data_path) 291 self._gdf: gpd.GeoDataFrame | None = None 292 self._name_index: dict[str, list[int]] = {} 293 self._token_index: dict[str, set[str]] = {} 294 295 def preload(self) -> None: 296 """Eagerly load data. Call at startup to avoid first-query latency.""" 297 self._ensure_loaded() 298 299 def _ensure_loaded(self) -> None: 300 if self._gdf is not None: 301 return 302 self._load_data() 303 304 def _load_data(self) -> None: 305 if self._data_path.is_dir(): 306 self._gdf = self._load_from_directory() 307 else: 308 self._gdf = self._load_from_file(self._data_path) 309 310 self._build_name_index() 311 312 def _load_from_file(self, path: Path) -> gpd.GeoDataFrame: 313 """Load from a GeoJSON fixture file. Features must include a ``_layer`` column.""" 314 full_gdf = gpd.read_file(str(path)) 315 if "_layer" not in full_gdf.columns: 316 raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column") 317 318 gdfs: list[gpd.GeoDataFrame] = [] 319 for layer_name, cfg in _LAYER_CONFIGS.items(): 320 rows = full_gdf[full_gdf["_layer"] == layer_name].copy() 321 if rows.empty: 322 continue 323 name_col: str = cfg["name_col"] 324 if name_col not in rows.columns: 325 continue 326 rows[_NAME_COL] = rows[name_col].astype(str) 327 _assign_type_col(rows, cfg) 328 rows = rows.to_crs("EPSG:4326") 329 gdfs.append(rows) 330 331 if not gdfs: 332 raise ValueError(f"No matching BD-CARTO features found in {path}") 333 334 combined = pd.concat(gdfs, ignore_index=True) 335 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 336 337 def _load_from_directory(self) -> gpd.GeoDataFrame: 338 """Load and concatenate all configured layers from the data directory.""" 339 gdfs: list[gpd.GeoDataFrame] = [] 340 341 for layer_name, cfg in _LAYER_CONFIGS.items(): 342 gpkg_path = self._data_path / f"{layer_name}.gpkg" 343 if not gpkg_path.exists(): 344 continue 345 346 gdf = gpd.read_file(str(gpkg_path)) 347 348 name_col: str = cfg["name_col"] 349 if name_col not in gdf.columns: 350 continue 351 352 gdf[_NAME_COL] = gdf[name_col].astype(str) 353 _assign_type_col(gdf, cfg) 354 gdf["_layer"] = layer_name 355 gdf = gdf.to_crs("EPSG:4326") 356 357 gdfs.append(gdf) 358 359 if not gdfs: 360 raise ValueError( 361 f"No BD-CARTO GeoPackage files found in {self._data_path}. " 362 f"Run 'make download-data-ign' to download the dataset." 363 ) 364 365 combined = pd.concat(gdfs, ignore_index=True) 366 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 367 368 def _build_name_index(self) -> None: 369 """Build normalized name → row indices and token → candidate names indexes.""" 370 assert self._gdf is not None 371 self._name_index = {} 372 self._token_index = {} 373 for idx, name in enumerate(self._gdf[_NAME_COL]): 374 if not isinstance(name, str) or not name.strip() or name == "nan": 375 continue 376 for key in _index_keys(name): 377 if key not in self._name_index: 378 self._name_index[key] = [] 379 self._name_index[key].append(idx) 380 for token in key.split(): 381 if token not in self._token_index: 382 self._token_index[token] = set() 383 self._token_index[token].add(key) 384 385 def _row_to_feature(self, idx: int) -> Feature: 386 """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84).""" 387 assert self._gdf is not None 388 row = self._gdf.iloc[idx] 389 390 name = str(row[_NAME_COL]) 391 normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown" 392 feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx) 393 394 geom = row.geometry 395 if geom is None or geom.is_empty: 396 geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]} 397 bbox = None 398 else: 399 geometry = mapping(geom) 400 bounds = geom.bounds 401 bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3]) 402 403 skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"} 404 properties: dict[str, Any] = { 405 "name": name, 406 "type": normalized_type, 407 "confidence": 1.0, 408 } 409 for col in self._gdf.columns: 410 if col not in skip_cols: 411 val = _to_json_value(row.get(col)) 412 if val is not None: 413 properties[col] = val 414 415 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 416 417 def search( 418 self, 419 name: str, 420 type: str | None = None, 421 max_results: int = 10, 422 ) -> list[Feature]: 423 """ 424 Search for geographic features by name. 425 426 Uses case-insensitive, accent-normalized exact matching with fuzzy 427 fallback when no exact match is found. 428 429 Args: 430 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 431 ``"Rhône"``). 432 type: Optional type hint for filtering. Supports both concrete types 433 (``"department"``, ``"city"``, ``"river"``) and category hints 434 (``"administrative"``, ``"water"``). 435 max_results: Maximum number of results. 436 437 Returns: 438 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 439 """ 440 self._ensure_loaded() 441 442 normalized = _normalize_name(name) 443 indices = self._name_index.get(normalized, []) 444 445 if not indices: 446 indices = self._fuzzy_search(normalized) 447 448 features = [self._row_to_feature(idx) for idx in indices] 449 450 if type is not None: 451 matching_types = get_matching_types(type) 452 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 453 if matching_types: 454 features = [f for f in features if f["properties"].get("type") in matching_types] 455 else: 456 features = [f for f in features if f["properties"].get("type") == type.lower()] 457 458 features = merge_segments(features) 459 460 return features[:max_results] 461 462 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 463 return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold) 464 465 def get_by_id(self, feature_id: str) -> Feature | None: 466 """ 467 Get a feature by its ``cleabs`` identifier or row index. 468 469 Args: 470 feature_id: ``cleabs`` string or integer row index. 471 472 Returns: 473 Matching GeoJSON Feature dict, or ``None``. 474 """ 475 self._ensure_loaded() 476 assert self._gdf is not None 477 478 if "cleabs" in self._gdf.columns: 479 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 480 if not matches.empty: 481 return self._row_to_feature(matches.index[0]) 482 483 try: 484 idx = int(feature_id) 485 if 0 <= idx < len(self._gdf): 486 return self._row_to_feature(idx) 487 except ValueError: 488 pass 489 490 return None 491 492 def get_available_types(self) -> list[str]: 493 """ 494 Return the union of all normalized types this source can return. 495 496 Returns: 497 Sorted list of type strings. 498 """ 499 types: set[str] = set() 500 for cfg in _LAYER_CONFIGS.values(): 501 if cfg.get("commune_flags"): 502 types.update({"city", "municipality"}) 503 elif cfg.get("fixed_type"): 504 types.add(cfg["fixed_type"]) 505 elif cfg.get("type_map"): 506 types.update(cfg["type_map"].values()) 507 return sorted(types)
Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.
Data must first be downloaded with make download-data-ign, which places
the GeoPackage files in data/bdcarto/.
All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.
Arguments:
- data_path: Directory containing the
.gpkgfiles (e.g."data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto") >>> results = source.search("Ardèche", type="department") >>> results = source.search("Lyon", type="city") >>> results = source.search("Rhône", type="river")
295 def preload(self) -> None: 296 """Eagerly load data. Call at startup to avoid first-query latency.""" 297 self._ensure_loaded()
Eagerly load data. Call at startup to avoid first-query latency.
417 def search( 418 self, 419 name: str, 420 type: str | None = None, 421 max_results: int = 10, 422 ) -> list[Feature]: 423 """ 424 Search for geographic features by name. 425 426 Uses case-insensitive, accent-normalized exact matching with fuzzy 427 fallback when no exact match is found. 428 429 Args: 430 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 431 ``"Rhône"``). 432 type: Optional type hint for filtering. Supports both concrete types 433 (``"department"``, ``"city"``, ``"river"``) and category hints 434 (``"administrative"``, ``"water"``). 435 max_results: Maximum number of results. 436 437 Returns: 438 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 439 """ 440 self._ensure_loaded() 441 442 normalized = _normalize_name(name) 443 indices = self._name_index.get(normalized, []) 444 445 if not indices: 446 indices = self._fuzzy_search(normalized) 447 448 features = [self._row_to_feature(idx) for idx in indices] 449 450 if type is not None: 451 matching_types = get_matching_types(type) 452 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 453 if matching_types: 454 features = [f for f in features if f["properties"].get("type") in matching_types] 455 else: 456 features = [f for f in features if f["properties"].get("type") == type.lower()] 457 458 features = merge_segments(features) 459 460 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.
Arguments:
- name: Location name to search for (e.g.
"Ardèche","Lyon","Rhône"). - type: Optional type hint for filtering. Supports both concrete types
(
"department","city","river") and category hints ("administrative","water"). - max_results: Maximum number of results.
Returns:
List of GeoJSON Feature dicts in WGS84. Empty list if no match.
465 def get_by_id(self, feature_id: str) -> Feature | None: 466 """ 467 Get a feature by its ``cleabs`` identifier or row index. 468 469 Args: 470 feature_id: ``cleabs`` string or integer row index. 471 472 Returns: 473 Matching GeoJSON Feature dict, or ``None``. 474 """ 475 self._ensure_loaded() 476 assert self._gdf is not None 477 478 if "cleabs" in self._gdf.columns: 479 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 480 if not matches.empty: 481 return self._row_to_feature(matches.index[0]) 482 483 try: 484 idx = int(feature_id) 485 if 0 <= idx < len(self._gdf): 486 return self._row_to_feature(idx) 487 except ValueError: 488 pass 489 490 return None
Get a feature by its cleabs identifier or row index.
Arguments:
- feature_id:
cleabsstring or integer row index.
Returns:
Matching GeoJSON Feature dict, or
None.
492 def get_available_types(self) -> list[str]: 493 """ 494 Return the union of all normalized types this source can return. 495 496 Returns: 497 Sorted list of type strings. 498 """ 499 types: set[str] = set() 500 for cfg in _LAYER_CONFIGS.values(): 501 if cfg.get("commune_flags"): 502 types.update({"city", "municipality"}) 503 elif cfg.get("fixed_type"): 504 types.add(cfg["fixed_type"]) 505 elif cfg.get("type_map"): 506 types.update(cfg["type_map"].values()) 507 return sorted(types)
Return the union of all normalized types this source can return.
Returns:
Sorted list of type strings.
20class CompositeDataSource: 21 """ 22 Fan-out datasource that delegates to an ordered list of GeoDataSource instances. 23 24 ``search`` queries every registered source and merges results in order. 25 26 ``get_by_id`` tries each source in order and returns the first hit. 27 28 ``get_available_types`` returns the union of all sources' types. 29 30 Args: 31 sources: One or more GeoDataSource instances. 32 33 Example: 34 >>> swiss = SwissNames3DSource("data/") 35 >>> ign = IGNBDTopoSource("data/") 36 >>> combo = CompositeDataSource(swiss, ign) 37 >>> results = combo.search("Geneva", type="city") 38 """ 39 40 def __init__(self, *sources: GeoDataSource) -> None: 41 if not sources: 42 raise ValueError("At least one datasource is required.") 43 self._sources: list[GeoDataSource] = list(sources) 44 45 def preload(self) -> None: 46 """Eagerly load all sources that support preloading.""" 47 for source in self._sources: 48 if isinstance(source, _Preloadable): 49 source.preload() 50 51 # Public API (mirrors GeoDataSource protocol) 52 53 def search( 54 self, 55 name: str, 56 type: str | None = None, 57 max_results: int = 10, 58 ) -> list[Feature]: 59 """ 60 Search all registered sources and return merged. 61 62 Args: 63 name: Location name to search for. 64 type: Optional type hint passed through to every source. 65 max_results: Maximum results per source. 66 67 Returns: 68 List of GeoJSON Feature dicts, merged from all sources. 69 """ 70 merged: list[Feature] = [] 71 72 for source in self._sources: 73 merged.extend(source.search(name, type=type, max_results=max_results)) 74 75 return merged 76 77 def get_by_id(self, feature_id: str) -> Feature | None: 78 """ 79 Get a feature by ID, trying each source in order. 80 81 Args: 82 feature_id: Unique identifier to look up. 83 84 Returns: 85 The first matching GeoJSON Feature dict, or None. 86 """ 87 for source in self._sources: 88 result = source.get_by_id(feature_id) 89 if result is not None: 90 return result 91 return None 92 93 def get_available_types(self) -> list[str]: 94 """ 95 Return the union of all sources' available types, sorted. 96 97 Returns: 98 Sorted list of unique type strings. 99 """ 100 types: set[str] = set() 101 for source in self._sources: 102 types.update(source.get_available_types()) 103 return sorted(types)
Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
search queries every registered source and merges results in order.
get_by_id tries each source in order and returns the first hit.
get_available_types returns the union of all sources' types.
Arguments:
- sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/") >>> ign = IGNBDTopoSource("data/") >>> combo = CompositeDataSource(swiss, ign) >>> results = combo.search("Geneva", type="city")
45 def preload(self) -> None: 46 """Eagerly load all sources that support preloading.""" 47 for source in self._sources: 48 if isinstance(source, _Preloadable): 49 source.preload()
Eagerly load all sources that support preloading.
53 def search( 54 self, 55 name: str, 56 type: str | None = None, 57 max_results: int = 10, 58 ) -> list[Feature]: 59 """ 60 Search all registered sources and return merged. 61 62 Args: 63 name: Location name to search for. 64 type: Optional type hint passed through to every source. 65 max_results: Maximum results per source. 66 67 Returns: 68 List of GeoJSON Feature dicts, merged from all sources. 69 """ 70 merged: list[Feature] = [] 71 72 for source in self._sources: 73 merged.extend(source.search(name, type=type, max_results=max_results)) 74 75 return merged
Search all registered sources and return merged.
Arguments:
- name: Location name to search for.
- type: Optional type hint passed through to every source.
- max_results: Maximum results per source.
Returns:
List of GeoJSON Feature dicts, merged from all sources.
77 def get_by_id(self, feature_id: str) -> Feature | None: 78 """ 79 Get a feature by ID, trying each source in order. 80 81 Args: 82 feature_id: Unique identifier to look up. 83 84 Returns: 85 The first matching GeoJSON Feature dict, or None. 86 """ 87 for source in self._sources: 88 result = source.get_by_id(feature_id) 89 if result is not None: 90 return result 91 return None
Get a feature by ID, trying each source in order.
Arguments:
- feature_id: Unique identifier to look up.
Returns:
The first matching GeoJSON Feature dict, or None.
93 def get_available_types(self) -> list[str]: 94 """ 95 Return the union of all sources' available types, sorted. 96 97 Returns: 98 Sorted list of unique type strings. 99 """ 100 types: set[str] = set() 101 for source in self._sources: 102 types.update(source.get_available_types()) 103 return sorted(types)
Return the union of all sources' available types, sorted.
Returns:
Sorted list of unique type strings.
67class PostGISDataSource: 68 """ 69 Geographic data source backed by a PostGIS table. 70 71 The table must expose at minimum a name column, a geometry column, and 72 optionally a type column. The expected schema is: 73 74 .. code-block:: sql 75 76 CREATE TABLE <table> ( 77 id TEXT PRIMARY KEY, 78 name TEXT NOT NULL, 79 type TEXT, 80 geom GEOMETRY(Geometry, 4326) 81 ); 82 83 The ``type`` column may store either: 84 85 - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D), 86 pass ``type_map`` so the datasource can translate between raw values and 87 the normalized etter type names. 88 - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``), 89 leave ``type_map=None`` (default). 90 91 Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly 92 reprojection. 93 94 Args: 95 connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a 96 connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``). 97 When a string is provided the engine is created internally. 98 table: Fully-qualified table name, e.g. ``"public.swissnames3d"``. 99 name_column: Column used for name-based search (default ``"name"``). 100 type_column: Column used for type filtering. Pass ``None`` to disable 101 type filtering (default ``"type"``). 102 geometry_column: PostGIS geometry column (default ``"geom"``). 103 id_column: Primary-key column (default ``"id"``). 104 crs: CRS of the stored geometries as an EPSG string. Defaults to 105 ``"EPSG:4326"`` (no reprojection). 106 type_map: Optional mapping from **normalized etter type names** to 107 **lists of raw type column values** present in the database. 108 This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP`` 109 and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be 110 passed directly:: 111 112 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 113 source = PostGISDataSource( 114 engine, 115 table="public.swissnames3d", 116 type_map=OBJEKTART_TYPE_MAP, 117 ) 118 119 When ``type_map`` is provided the datasource: 120 121 - Translates raw DB values → normalized types in returned features. 122 - Translates user type hints → raw DB values in SQL ``WHERE`` clauses. 123 - Returns normalized type names from ``get_available_types()``. 124 125 When ``None`` (default) the stored values are used as-is. 126 fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for 127 fuzzy fallback search when no exact ``ILIKE`` match is found. 128 129 Example: unmodified SwissNames3D table:: 130 131 from sqlalchemy import create_engine 132 from etter.datasources import PostGISDataSource 133 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 134 135 engine = create_engine(...) 136 source = PostGISDataSource( 137 engine, 138 table="public.swissnames3d", 139 type_map=OBJEKTART_TYPE_MAP, 140 ) 141 results = source.search("Lac Léman", type="lake") 142 """ 143 144 def __init__( 145 self, 146 connection: str | Engine, 147 table: str, 148 name_column: str = "name", 149 type_column: str | None = "type", 150 geometry_column: str = "geom", 151 id_column: str = "id", 152 crs: str = "EPSG:4326", 153 type_map: TypeMap | None = None, 154 fuzzy_threshold: float = 0.65, 155 ) -> None: 156 sa = _require_sqlalchemy() 157 158 if isinstance(connection, str): 159 self._engine = sa.create_engine(connection) 160 else: 161 self._engine = connection 162 163 try: 164 with self._engine.connect() as conn: 165 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 166 except Exception as exc: 167 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 168 169 self._table = table 170 self._name_col = name_column 171 self._type_col = type_column 172 self._geom_col = geometry_column 173 self._id_col = id_column 174 self._crs = crs 175 self._fuzzy_threshold = fuzzy_threshold 176 177 # Build bidirectional lookup structures from the user-supplied map. 178 if type_map: 179 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 180 self._raw_to_normalized: dict[str, str] = { 181 raw: normalized for normalized, raws in type_map.items() for raw in raws 182 } 183 else: 184 self._normalized_to_raw = {} 185 self._raw_to_normalized = {} 186 187 self._trgm_available: bool | None = None 188 self._unaccent_available: bool | None = None 189 190 def _get_connection(self) -> Connection: 191 """Return a SQLAlchemy connection from the engine.""" 192 return self._engine.connect() 193 194 def _check_trgm(self, conn: Connection) -> bool: 195 """Return True if pg_trgm extension is available in the database.""" 196 if self._trgm_available is not None: 197 return self._trgm_available 198 sa = _require_sqlalchemy() 199 try: 200 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'")) 201 self._trgm_available = result.fetchone() is not None 202 except Exception: 203 logger.exception("Failed to check pg_trgm availability") 204 self._trgm_available = False 205 return self._trgm_available 206 207 def _check_unaccent(self, conn: Connection) -> bool: 208 """Return True if the unaccent extension is available in the database.""" 209 if self._unaccent_available is not None: 210 return self._unaccent_available 211 sa = _require_sqlalchemy() 212 try: 213 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'")) 214 self._unaccent_available = result.fetchone() is not None 215 except Exception: 216 logger.exception("Failed to check unaccent availability") 217 self._unaccent_available = False 218 return self._unaccent_available 219 220 def _normalize_type(self, raw_type: str | None) -> str | None: 221 """Translate a raw DB type value to its normalized etter name. 222 223 If no type_map was supplied the value is returned unchanged. 224 """ 225 if raw_type is None: 226 return None 227 return self._raw_to_normalized.get(raw_type, raw_type) 228 229 def _row_to_feature(self, row: Row) -> Feature: 230 """Convert a SQLAlchemy Row to a GeoJSON Feature dict.""" 231 feature_id = str(row.id) 232 name = str(row.name) 233 raw_type = getattr(row, "type", None) 234 normalized_type = self._normalize_type(raw_type) 235 236 geojson_str = row.geojson 237 if geojson_str: 238 geometry = json.loads(geojson_str) 239 else: 240 geometry = {"type": "Point", "coordinates": [0, 0]} 241 242 bbox = _bbox_from_geojson(geometry) 243 244 properties: dict[str, Any] = { 245 "name": name, 246 "type": normalized_type, 247 "confidence": 1.0, 248 } 249 250 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 251 252 def _build_select_columns(self) -> str: 253 """Build the SELECT column list as a SQL fragment.""" 254 type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type" 255 if self._crs.upper() != "EPSG:4326": 256 geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson" 257 else: 258 geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson" 259 return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}" 260 261 def search( 262 self, 263 name: str, 264 type: str | None = None, 265 max_results: int = 10, 266 ) -> list[Feature]: 267 """ 268 Search for geographic features by name. 269 270 Uses a three-step cascade, stopping as soon as any step returns results: 271 272 1. **Normalized exact match** 273 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 274 3. **ILIKE substring** 275 276 ``merge_segments`` is applied after all rows are fetched so that 277 multi-segment linestrings (rivers, roads) are merged before the 278 ``max_results`` cap is applied. 279 280 Args: 281 name: Location name to search for. 282 type: Optional type hint for filtering results. 283 max_results: Maximum number of results to return. 284 285 Returns: 286 List of matching GeoJSON Feature dicts in WGS84. 287 """ 288 sa = _require_sqlalchemy() 289 cols = self._build_select_columns() 290 291 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 292 type_filter_values: list[str] | None = None 293 if type is not None and self._type_col is not None: 294 matching_types = get_matching_types(type) 295 concrete_types = matching_types if matching_types else [type.lower()] 296 if self._normalized_to_raw: 297 raw_values: list[str] = [] 298 for t in concrete_types: 299 raw_values.extend(self._normalized_to_raw.get(t, [t])) 300 type_filter_values = raw_values if raw_values else concrete_types 301 else: 302 type_filter_values = concrete_types 303 304 # Fetch more rows than requested so that merge_segments has the full 305 # set of segments to work with. Without this, a SQL LIMIT applied 306 # *before* merging would only return a partial set of linestring 307 # segments, producing incorrect / truncated geometries. 308 # We cap the internal limit at 2000 to avoid unbounded queries. 309 internal_limit = min(max(max_results * 20, 100), 2000) 310 311 with self._get_connection() as conn: 312 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 313 314 if not features: 315 with self._get_connection() as conn: 316 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 317 318 if not features: 319 with self._get_connection() as conn: 320 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 321 322 features = merge_segments(features) 323 return features[:max_results] 324 325 def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]: 326 """Return a WHERE clause fragment and bind params for type filtering.""" 327 if not values or self._type_col is None: 328 return "", {} 329 placeholders = ", ".join(f":type_{i}" for i in range(len(values))) 330 clause = f" AND {self._type_col} IN ({placeholders})" 331 params = {f"type_{i}": v for i, v in enumerate(values)} 332 return clause, params 333 334 def _search_normalized( 335 self, 336 conn: Connection, 337 sa: types.ModuleType, 338 cols: str, 339 name: str, 340 type_filter: list[str] | None, 341 fetch_limit: int, 342 ) -> list[Feature]: 343 """ 344 Exact accent- and case-insensitive search. 345 346 Accent normalization (NFD decomposition + diacritic strip) is done in 347 Python before the query is sent to the DB. 348 """ 349 type_clause, type_params = self._type_filter_sql(type_filter) 350 name_expr = f"lower({self._name_col})" 351 if self._check_unaccent(conn): 352 name_expr = f"unaccent({name_expr})" 353 sql = sa.text( 354 f"SELECT {cols} FROM {self._table} " # noqa: S608 355 f"WHERE {name_expr} = :query{type_clause} " 356 f"LIMIT :limit" 357 ) 358 params: dict[str, Any] = { 359 "query": _normalize_name(name), 360 "limit": fetch_limit, 361 **type_params, 362 } 363 try: 364 result = conn.execute(sql, params) 365 return [self._row_to_feature(row) for row in result] 366 except Exception: 367 logger.exception("Normalized search failed for %r", name) 368 return [] 369 370 def _search_ilike( 371 self, 372 conn: Connection, 373 sa: types.ModuleType, 374 cols: str, 375 name: str, 376 type_filter: list[str] | None, 377 fetch_limit: int, 378 ) -> list[Feature]: 379 """Case-insensitive substring fallback using ``ILIKE '%name%'``. 380 381 When the ``unaccent`` extension is available, both the stored name column 382 and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches 383 ``"Rhône"``. Without ``unaccent``, standard ILIKE is used (case-insensitive 384 only). 385 """ 386 type_clause, type_params = self._type_filter_sql(type_filter) 387 normalized = _normalize_name(name) 388 if self._check_unaccent(conn): 389 name_expr = f"unaccent(lower({self._name_col}))" 390 pattern = f"%{normalized}%" 391 else: 392 name_expr = self._name_col 393 pattern = f"%{name}%" 394 sql = sa.text( 395 f"SELECT {cols} FROM {self._table} " # noqa: S608 396 f"WHERE {name_expr} ILIKE :pattern{type_clause} " 397 f"LIMIT :limit" 398 ) 399 params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params} 400 try: 401 result = conn.execute(sql, params) 402 return [self._row_to_feature(row) for row in result] 403 except Exception: 404 logger.exception("ILIKE search failed for %r", name) 405 return [] 406 407 def _search_fuzzy( 408 self, 409 conn: Connection, 410 sa: types.ModuleType, 411 cols: str, 412 name: str, 413 type_filter: list[str] | None, 414 fetch_limit: int, 415 ) -> list[Feature]: 416 """Fuzzy fallback using pg_trgm similarity (if extension is available).""" 417 if not self._check_trgm(conn): 418 logger.warning( 419 "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;" 420 ) 421 return [] 422 normalized_query = _normalize_name(name) 423 if self._check_unaccent(conn): 424 name_expr = f"unaccent(lower({self._name_col}))" 425 else: 426 logger.warning( 427 "unaccent extension not available. Accent-insensitive fuzzy search degraded. " 428 "Install it with: CREATE EXTENSION unaccent;" 429 ) 430 name_expr = f"lower({self._name_col})" 431 type_clause, type_params = self._type_filter_sql(type_filter) 432 sql = sa.text( 433 f"SELECT {cols} FROM {self._table} " # noqa: S608 434 f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} " 435 f"ORDER BY word_similarity({name_expr}, :query) DESC " 436 f"LIMIT :limit" 437 ) 438 params: dict[str, Any] = { 439 "query": normalized_query, 440 "threshold": self._fuzzy_threshold, 441 "limit": fetch_limit, 442 **type_params, 443 } 444 try: 445 result = conn.execute(sql, params) 446 return [self._row_to_feature(row) for row in result] 447 except Exception: 448 logger.exception("Fuzzy search failed for %r", name) 449 return [] 450 451 def get_by_id(self, feature_id: str) -> Feature | None: 452 """ 453 Get a specific feature by its unique identifier. 454 455 Args: 456 feature_id: Value of the ``id`` column. 457 458 Returns: 459 The matching GeoJSON Feature dict, or ``None`` if not found. 460 """ 461 sa = _require_sqlalchemy() 462 cols = self._build_select_columns() 463 sql = sa.text( 464 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 465 ) 466 with self._get_connection() as conn: 467 try: 468 result = conn.execute(sql, {"id": feature_id}) 469 row = result.fetchone() 470 return self._row_to_feature(row) if row else None 471 except Exception: 472 logger.exception("get_by_id failed for %r", feature_id) 473 return None 474 475 def get_available_types(self) -> list[str]: 476 """ 477 Return the distinct ``type`` values present in the table. 478 479 Returns: 480 Sorted list of concrete type strings, or an empty list if the table 481 has no type column. 482 """ 483 if self._type_col is None: 484 return [] 485 sa = _require_sqlalchemy() 486 sql = sa.text( 487 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 488 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 489 ) 490 with self._get_connection() as conn: 491 try: 492 result = conn.execute(sql) 493 raw_types = [row.type for row in result] 494 except Exception: 495 logger.exception("get_available_types failed") 496 return [] 497 498 normalized = {self._normalize_type(t) for t in raw_types if t} 499 return sorted(t for t in normalized if t)
Geographic data source backed by a PostGIS table.
The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:
CREATE TABLE <table> (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT,
geom GEOMETRY(Geometry, 4326)
);
The type column may store either:
- Raw dataset values (e.g.
"See","Berg"for SwissNames3D), passtype_mapso the datasource can translate between raw values and the normalized etter type names. - Already-normalized values (e.g.
"lake","mountain"), leavetype_map=None(default).
Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly
reprojection.
Arguments:
- connection: A SQLAlchemy
~sqlalchemy.engine.Engineor a connection URL string (e.g."postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally. - table: Fully-qualified table name, e.g.
"public.swissnames3d". - name_column: Column used for name-based search (default
"name"). - type_column: Column used for type filtering. Pass
Noneto disable type filtering (default"type"). - geometry_column: PostGIS geometry column (default
"geom"). - id_column: Primary-key column (default
"id"). - crs: CRS of the stored geometries as an EPSG string. Defaults to
"EPSG:4326"(no reprojection). type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as
SwissNames3DSource.OBJEKTART_TYPE_MAPandIGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP source = PostGISDataSource( engine, table="public.swissnames3d", type_map=OBJEKTART_TYPE_MAP, )When
type_mapis provided the datasource:- Translates raw DB values → normalized types in returned features.
- Translates user type hints → raw DB values in SQL
WHEREclauses. - Returns normalized type names from
get_available_types().
When
None(default) the stored values are used as-is.- fuzzy_threshold: Minimum
pg_trgmsimilarity score (0-1) used for fuzzy fallback search when no exactILIKEmatch is found.
Example: unmodified SwissNames3D table::
from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
engine = create_engine(...)
source = PostGISDataSource(
engine,
table="public.swissnames3d",
type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
144 def __init__( 145 self, 146 connection: str | Engine, 147 table: str, 148 name_column: str = "name", 149 type_column: str | None = "type", 150 geometry_column: str = "geom", 151 id_column: str = "id", 152 crs: str = "EPSG:4326", 153 type_map: TypeMap | None = None, 154 fuzzy_threshold: float = 0.65, 155 ) -> None: 156 sa = _require_sqlalchemy() 157 158 if isinstance(connection, str): 159 self._engine = sa.create_engine(connection) 160 else: 161 self._engine = connection 162 163 try: 164 with self._engine.connect() as conn: 165 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 166 except Exception as exc: 167 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 168 169 self._table = table 170 self._name_col = name_column 171 self._type_col = type_column 172 self._geom_col = geometry_column 173 self._id_col = id_column 174 self._crs = crs 175 self._fuzzy_threshold = fuzzy_threshold 176 177 # Build bidirectional lookup structures from the user-supplied map. 178 if type_map: 179 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 180 self._raw_to_normalized: dict[str, str] = { 181 raw: normalized for normalized, raws in type_map.items() for raw in raws 182 } 183 else: 184 self._normalized_to_raw = {} 185 self._raw_to_normalized = {} 186 187 self._trgm_available: bool | None = None 188 self._unaccent_available: bool | None = None
261 def search( 262 self, 263 name: str, 264 type: str | None = None, 265 max_results: int = 10, 266 ) -> list[Feature]: 267 """ 268 Search for geographic features by name. 269 270 Uses a three-step cascade, stopping as soon as any step returns results: 271 272 1. **Normalized exact match** 273 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 274 3. **ILIKE substring** 275 276 ``merge_segments`` is applied after all rows are fetched so that 277 multi-segment linestrings (rivers, roads) are merged before the 278 ``max_results`` cap is applied. 279 280 Args: 281 name: Location name to search for. 282 type: Optional type hint for filtering results. 283 max_results: Maximum number of results to return. 284 285 Returns: 286 List of matching GeoJSON Feature dicts in WGS84. 287 """ 288 sa = _require_sqlalchemy() 289 cols = self._build_select_columns() 290 291 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 292 type_filter_values: list[str] | None = None 293 if type is not None and self._type_col is not None: 294 matching_types = get_matching_types(type) 295 concrete_types = matching_types if matching_types else [type.lower()] 296 if self._normalized_to_raw: 297 raw_values: list[str] = [] 298 for t in concrete_types: 299 raw_values.extend(self._normalized_to_raw.get(t, [t])) 300 type_filter_values = raw_values if raw_values else concrete_types 301 else: 302 type_filter_values = concrete_types 303 304 # Fetch more rows than requested so that merge_segments has the full 305 # set of segments to work with. Without this, a SQL LIMIT applied 306 # *before* merging would only return a partial set of linestring 307 # segments, producing incorrect / truncated geometries. 308 # We cap the internal limit at 2000 to avoid unbounded queries. 309 internal_limit = min(max(max_results * 20, 100), 2000) 310 311 with self._get_connection() as conn: 312 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 313 314 if not features: 315 with self._get_connection() as conn: 316 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 317 318 if not features: 319 with self._get_connection() as conn: 320 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 321 322 features = merge_segments(features) 323 return features[:max_results]
Search for geographic features by name.
Uses a three-step cascade, stopping as soon as any step returns results:
- Normalized exact match
- pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
- ILIKE substring
merge_segments is applied after all rows are fetched so that
multi-segment linestrings (rivers, roads) are merged before the
max_results cap is applied.
Arguments:
- name: Location name to search for.
- type: Optional type hint for filtering results.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts in WGS84.
451 def get_by_id(self, feature_id: str) -> Feature | None: 452 """ 453 Get a specific feature by its unique identifier. 454 455 Args: 456 feature_id: Value of the ``id`` column. 457 458 Returns: 459 The matching GeoJSON Feature dict, or ``None`` if not found. 460 """ 461 sa = _require_sqlalchemy() 462 cols = self._build_select_columns() 463 sql = sa.text( 464 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 465 ) 466 with self._get_connection() as conn: 467 try: 468 result = conn.execute(sql, {"id": feature_id}) 469 row = result.fetchone() 470 return self._row_to_feature(row) if row else None 471 except Exception: 472 logger.exception("get_by_id failed for %r", feature_id) 473 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Value of the
idcolumn.
Returns:
The matching GeoJSON Feature dict, or
Noneif not found.
475 def get_available_types(self) -> list[str]: 476 """ 477 Return the distinct ``type`` values present in the table. 478 479 Returns: 480 Sorted list of concrete type strings, or an empty list if the table 481 has no type column. 482 """ 483 if self._type_col is None: 484 return [] 485 sa = _require_sqlalchemy() 486 sql = sa.text( 487 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 488 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 489 ) 490 with self._get_connection() as conn: 491 try: 492 result = conn.execute(sql) 493 raw_types = [row.type for row in result] 494 except Exception: 495 logger.exception("get_available_types failed") 496 return [] 497 498 normalized = {self._normalize_type(t) for t in raw_types if t} 499 return sorted(t for t in normalized if t)
Return the distinct type values present in the table.
Returns:
Sorted list of concrete type strings, or an empty list if the table has no type column.
112def apply_spatial_relation( 113 geometry: GeoJsonGeometry | list[GeoJsonGeometry], 114 relation: SpatialRelation, 115 buffer_config: BufferConfig | None = None, 116 spatial_config: SpatialRelationConfig | None = None, 117 geometry_format: GeometryFormat = "geojson", 118) -> GeoJsonGeometry | str: 119 """Transform one or more reference geometries according to a spatial relation. 120 121 A list of geometries is unioned into one before the transformation, so that 122 features split across multiple datasource records (e.g. a river in segments) 123 produce a single coherent search area. 124 125 When ``buffer_config.inferred`` is True (i.e. no explicit distance was 126 stated), the buffer distance is refined from the actual geometry area so 127 that small features receive small buffers and large regions receive large 128 ones. 129 130 Args: 131 geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84). 132 relation: Spatial relation to apply. 133 buffer_config: Required for buffer/directional relations. 134 spatial_config: Relation registry; defaults to the module-level singleton. 135 geometry_format: "geojson" (default), "wkt", or "wkb". 136 137 Returns: 138 Transformed geometry in the requested format. 139 """ 140 if isinstance(geometry, list): 141 if not geometry: 142 raise ValueError("geometry list must not be empty") 143 geom = unary_union([shape(g) for g in geometry]) 144 geom_dict: GeoJsonGeometry = mapping(geom) 145 else: 146 geom = shape(geometry) 147 geom_dict = geometry 148 149 # Refine inferred buffer distance from geometry area before dispatching. 150 if buffer_config is not None and buffer_config.inferred: 151 buffer_config = _refine_buffer_config(geom, buffer_config, relation) 152 153 if relation.category == "containment": 154 result = geom_dict 155 elif relation.category == "buffer": 156 if buffer_config is None: 157 raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config") 158 result = _apply_buffer(geom, buffer_config) 159 elif relation.category == "directional": 160 if buffer_config is None: 161 raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config") 162 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 163 relation_config = cfg.get_config(relation.relation) 164 direction = relation_config.direction_angle_degrees or 0 165 sector_angle = relation_config.sector_angle_degrees or 90 166 result = _apply_directional(geom, buffer_config, direction, sector_angle) 167 elif relation.category == "clipping": 168 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 169 relation_config = cfg.get_config(relation.relation) 170 clip_direction = relation_config.clip_direction or "north" 171 result = _apply_clipping(geom, clip_direction) 172 else: 173 raise ValueError(f"Unknown relation category: '{relation.category}'") 174 175 return convert_geometry(result, geometry_format)
Transform one or more reference geometries according to a spatial relation.
A list of geometries is unioned into one before the transformation, so that features split across multiple datasource records (e.g. a river in segments) produce a single coherent search area.
When buffer_config.inferred is True (i.e. no explicit distance was
stated), the buffer distance is refined from the actual geometry area so
that small features receive small buffers and large regions receive large
ones.
Arguments:
- geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
- relation: Spatial relation to apply.
- buffer_config: Required for buffer/directional relations.
- spatial_config: Relation registry; defaults to the module-level singleton.
- geometry_format: "geojson" (default), "wkt", or "wkb".
Returns:
Transformed geometry in the requested format.
11def convert_geometry(geometry: GeoJsonGeometry, fmt: GeometryFormat) -> GeoJsonGeometry | str: 12 """ 13 Convert a GeoJSON geometry dict to the requested format. 14 15 Args: 16 geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]}) 17 fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, 18 "wkb" returns a hex-encoded WKB string. 19 20 Returns: 21 The geometry in the requested format. 22 """ 23 if fmt == "geojson": 24 return geometry 25 geom = shape(geometry) 26 if fmt == "wkt": 27 return geom.wkt 28 return geom.wkb_hex
Convert a GeoJSON geometry dict to the requested format.
Arguments:
- geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
- fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:
The geometry in the requested format.
31def convert_feature_geometry(feature: Feature, fmt: GeometryFormat) -> Feature | dict: 32 """ 33 Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format. 34 35 Args: 36 feature: GeoJSON Feature dict with a "geometry" key. 37 fmt: Target geometry format. 38 39 Returns: 40 A new dict identical to the input except the "geometry" value is converted. 41 Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string). 42 """ 43 if fmt == "geojson": 44 return feature 45 return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}
Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
Arguments:
- feature: GeoJSON Feature dict with a "geometry" key.
- fmt: Target geometry format.
Returns:
A new dict identical to the input except the "geometry" value is converted. Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).