etter
etter - Natural language geographic query parsing using LLMs.
Parse location queries into structured geographic queries using LLM.
1""" 2etter - Natural language geographic query parsing using LLMs. 3 4Parse location queries into structured geographic queries using LLM. 5""" 6 7from importlib.metadata import PackageNotFoundError, version 8 9try: 10 __version__ = version("etter") 11except PackageNotFoundError: # running from source without install 12 __version__ = "unknown" 13 14# Main API 15# Exceptions 16# Datasources 17from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource 18from .exceptions import ( 19 GeoFilterError, 20 LowConfidenceError, 21 LowConfidenceWarning, 22 NoReferenceLocationError, 23 ParsingError, 24 UnknownRelationError, 25 ValidationError, 26) 27from .geometry_format import convert_feature_geometry, convert_geometry 28 29# Models (for type hints and result access) 30from .models import ( 31 BufferConfig, 32 ConfidenceLevel, 33 ConfidenceScore, 34 GeometryFormat, 35 GeoQuery, 36 ReferenceLocation, 37 SpatialRelation, 38) 39from .parser import GeoFilterParser 40 41# Spatial operations 42from .spatial import apply_spatial_relation 43 44# Configuration 45from .spatial_config import RelationConfig, SpatialRelationConfig 46 47__all__ = [ 48 # Main API 49 "GeoFilterParser", 50 # Models 51 "GeoQuery", 52 "SpatialRelation", 53 "ReferenceLocation", 54 "BufferConfig", 55 "ConfidenceScore", 56 "ConfidenceLevel", 57 "GeometryFormat", 58 # Configuration 59 "SpatialRelationConfig", 60 "RelationConfig", 61 # Exceptions 62 "GeoFilterError", 63 "ParsingError", 64 "ValidationError", 65 "NoReferenceLocationError", 66 "UnknownRelationError", 67 "LowConfidenceError", 68 "LowConfidenceWarning", 69 # Datasources 70 "GeoDataSource", 71 "SwissNames3DSource", 72 "IGNBDCartoSource", 73 "CompositeDataSource", 74 "PostGISDataSource", 75 # Spatial 76 "apply_spatial_relation", 77 "convert_geometry", 78 "convert_feature_geometry", 79]
19class GeoFilterParser: 20 """ 21 Main entry point for parsing natural language location queries. 22 23 This class orchestrates the entire parsing pipeline: 24 1. Initialize LLM with structured output 25 2. Build prompt with spatial relations and examples 26 3. Parse query through LLM 27 4. Validate and enrich with defaults 28 5. Return structured GeoQuery 29 30 Examples: 31 Basic usage: 32 >>> from langchain.chat_models import init_chat_model 33 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") 34 >>> parser = GeoFilterParser(llm=llm) 35 >>> result = parser.parse("restaurants in Lausanne") 36 >>> print(result.reference_location.name) 37 'Lausanne' 38 39 With strict confidence mode: 40 >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) 41 >>> result = parser.parse("near the station") # May raise LowConfidenceError 42 """ 43 44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt() 94 95 def _build_structured_llm(self): 96 """Create LLM with structured output using Pydantic model.""" 97 98 return self.llm.with_structured_output( 99 GeoQuery, 100 method="function_calling", # Use function_calling for broader schema support 101 include_raw=True, # For error debugging 102 ) 103 104 def _build_prompt(self) -> ChatPromptTemplate: 105 """Build prompt template with spatial relations, examples, and available types.""" 106 available_types = None 107 if self.datasource is not None: 108 available_types = self.datasource.get_available_types() 109 110 return build_prompt_template( 111 spatial_config=self.spatial_config, 112 include_examples=self.include_examples, 113 available_types=available_types, 114 additional_instructions=self.additional_instructions, 115 ) 116 117 def _unpack_response(self, response) -> GeoQuery: 118 """Extract and validate the GeoQuery from a structured-LLM response.""" 119 parsed = response.get("parsed") if isinstance(response, dict) else response 120 121 if parsed is None: 122 raw = response.get("raw", "") if isinstance(response, dict) else "" 123 error = response.get("parsing_error") if isinstance(response, dict) else None 124 raise ParsingError( 125 message="Failed to parse query into structured format. " 126 "LLM may have returned invalid JSON or missed required fields.", 127 raw_response=str(raw), 128 original_error=error, 129 ) 130 131 assert isinstance(parsed, GeoQuery), "Parsed result must be GeoQuery" 132 return parsed 133 134 def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery: 135 """Set original_query and run the validation pipeline.""" 136 if not geo_query.original_query or geo_query.original_query != query: 137 geo_query.original_query = query 138 139 return validate_query( 140 geo_query, 141 self.spatial_config, 142 confidence_threshold=self.confidence_threshold, 143 strict_mode=self.strict_mode, 144 ) 145 146 def parse(self, query: str) -> GeoQuery: 147 """ 148 Parse a natural language location query into structured format. 149 150 This is the main method for parsing queries. It: 151 1. Invokes the LLM with structured output 152 2. Validates the spatial relation is registered 153 3. Enriches with default parameters 154 4. Checks confidence threshold 155 156 Args: 157 query: Natural language query in any language 158 159 Returns: 160 GeoQuery: Structured query representation with confidence scores 161 162 Raises: 163 ParsingError: If LLM fails to parse query into valid structure 164 ValidationError: If parsed query fails business logic validation 165 UnknownRelationError: If spatial relation is not registered 166 LowConfidenceError: If confidence below threshold (strict mode only) 167 168 Warns: 169 LowConfidenceWarning: If confidence below threshold (permissive mode) 170 171 Examples: 172 Simple containment query: 173 >>> result = parser.parse("in Bern") 174 >>> result.reference_location.name 175 'Bern' 176 >>> result.spatial_relation.relation 177 'in' 178 179 Buffer query: 180 >>> result = parser.parse("near Lake Geneva") 181 >>> result.spatial_relation.relation 182 'near' 183 >>> result.buffer_config.distance_m 184 5000 185 186 Directional query: 187 >>> result = parser.parse("north of Lausanne") 188 >>> result.spatial_relation.relation 189 'north_of' 190 >>> result.reference_location.name 191 'Lausanne' 192 193 Multilingual: 194 >>> result = parser.parse("près de Genève") 195 >>> result.spatial_relation.relation 196 'near' 197 >>> result.reference_location.name 198 'Genève' 199 """ 200 formatted_messages = self.prompt.format_messages(query=query) 201 202 try: 203 response = self.structured_llm.invoke(formatted_messages) 204 except Exception as e: 205 raise ParsingError( 206 message=f"LLM invocation failed: {str(e)}", 207 raw_response="", 208 original_error=e, 209 ) from e 210 211 return self._finalize(self._unpack_response(response), query) 212 213 async def aparse(self, query: str) -> GeoQuery: 214 """ 215 Asynchronously parse a natural language location query into structured format. 216 217 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 218 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 219 blocking. Validation is synchronous and runs after the LLM call. 220 """ 221 formatted_messages = self.prompt.format_messages(query=query) 222 223 try: 224 response = await self.structured_llm.ainvoke(formatted_messages) 225 except Exception as e: 226 raise ParsingError( 227 message=f"LLM invocation failed: {str(e)}", 228 raw_response="", 229 original_error=e, 230 ) from e 231 232 return self._finalize(self._unpack_response(response), query) 233 234 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 235 """ 236 Parse a natural language location query with streaming reasoning and results. 237 238 This method provides real-time feedback during the parsing process by yielding 239 intermediate reasoning steps and the final GeoQuery result. This is useful for 240 providing users with transparency into the LLM's decision-making process and 241 for building responsive UIs. 242 243 The stream yields dictionaries with the following event types: 244 - {"type": "start"} - Stream started 245 - {"type": "reasoning", "content": str} - Intermediate processing steps 246 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 247 - {"type": "error", "content": str} - Errors encountered during processing 248 - {"type": "finish"} - Stream completed successfully 249 250 Args: 251 query: Natural language query in any language 252 253 Yields: 254 dict: Stream events with type and optional content fields 255 256 Raises: 257 ParsingError: If LLM fails to parse query into valid structure 258 ValidationError: If parsed query fails business logic validation 259 UnknownRelationError: If spatial relation is not registered 260 LowConfidenceError: If confidence below threshold (strict mode only) 261 262 Examples: 263 Basic usage with async iteration: 264 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 265 ... if event["type"] == "reasoning": 266 ... print(f"Reasoning: {event['content']}") 267 ... elif event["type"] == "data-response": 268 ... geo_query = event["content"] 269 ... print(f"Location: {geo_query['reference_location']['name']}") 270 ... elif event["type"] == "error": 271 ... print(f"Error: {event['content']}") 272 273 Using in a FastAPI streaming endpoint: 274 >>> from fastapi.responses import StreamingResponse 275 >>> @app.get("/stream") 276 >>> async def stream_endpoint(q: str): 277 ... async def event_stream(): 278 ... async for event in parser.parse_stream(q): 279 ... yield f"data: {json.dumps(event)}\\n\\n" 280 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 281 """ 282 try: 283 # Signal start of stream 284 yield {"type": "start"} 285 286 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 287 formatted_messages = self.prompt.format_messages(query=query) 288 289 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 290 try: 291 response = await self.structured_llm.ainvoke(formatted_messages) 292 except Exception as e: 293 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 294 raise ParsingError( 295 message=f"LLM invocation failed: {str(e)}", 296 raw_response="", 297 original_error=e, 298 ) from e 299 300 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 301 try: 302 geo_query = self._unpack_response(response) 303 except ParsingError: 304 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 305 raise 306 307 if geo_query.confidence_breakdown.reasoning: 308 yield { 309 "type": "reasoning", 310 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 311 } 312 313 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 314 geo_query = self._finalize(geo_query, query) 315 316 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 317 yield {"type": "data-response", "content": geo_query.model_dump()} 318 319 # Signal successful completion 320 yield {"type": "finish"} 321 322 except Exception as e: 323 # Emit error event before re-raising 324 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 325 raise 326 327 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 328 """ 329 Parse multiple queries in batch. 330 331 Note: This is a simple sequential implementation. 332 For true parallelization, consider using async methods or ThreadPoolExecutor. 333 334 Args: 335 queries: List of natural language queries 336 337 Returns: 338 List of GeoQuery objects (same order as input) 339 340 Raises: 341 Same exceptions as parse() for any failing query 342 """ 343 return [self.parse(query) for query in queries] 344 345 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 346 """ 347 Get list of available spatial relations. 348 349 Args: 350 category: Optional filter by category ("containment", "buffer", "directional") 351 352 Returns: 353 List of relation names 354 """ 355 return self.spatial_config.list_relations(category=category) 356 357 def describe_relation(self, relation_name: str) -> str: 358 """ 359 Get description of a spatial relation. 360 361 Args: 362 relation_name: Name of the relation 363 364 Returns: 365 Human-readable description 366 367 Raises: 368 UnknownRelationError: If relation is not registered 369 """ 370 config = self.spatial_config.get_config(relation_name) 371 return config.description
Main entry point for parsing natural language location queries.
This class orchestrates the entire parsing pipeline:
- Initialize LLM with structured output
- Build prompt with spatial relations and examples
- Parse query through LLM
- Validate and enrich with defaults
- Return structured GeoQuery
Examples:
Basic usage:
>>> from langchain.chat_models import init_chat_model >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") >>> parser = GeoFilterParser(llm=llm) >>> result = parser.parse("restaurants in Lausanne") >>> print(result.reference_location.name) 'Lausanne'With strict confidence mode:
>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) >>> result = parser.parse("near the station") # May raise LowConfidenceError
44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt()
Initialize the parser.
Arguments:
- llm: LangChain LLM instance (required).
- spatial_config: Spatial relation configuration. If None, uses defaults
- confidence_threshold: Minimum confidence to accept (0-1)
- strict_mode: If True, raise error on low confidence. If False, warn only
- include_examples: Whether to include few-shot examples in prompt
- datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
- additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model >>> from etter.datasources.swissnames3d import SwissNames3DSource >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) >>> datasource = SwissNames3DSource("data/") >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
146 def parse(self, query: str) -> GeoQuery: 147 """ 148 Parse a natural language location query into structured format. 149 150 This is the main method for parsing queries. It: 151 1. Invokes the LLM with structured output 152 2. Validates the spatial relation is registered 153 3. Enriches with default parameters 154 4. Checks confidence threshold 155 156 Args: 157 query: Natural language query in any language 158 159 Returns: 160 GeoQuery: Structured query representation with confidence scores 161 162 Raises: 163 ParsingError: If LLM fails to parse query into valid structure 164 ValidationError: If parsed query fails business logic validation 165 UnknownRelationError: If spatial relation is not registered 166 LowConfidenceError: If confidence below threshold (strict mode only) 167 168 Warns: 169 LowConfidenceWarning: If confidence below threshold (permissive mode) 170 171 Examples: 172 Simple containment query: 173 >>> result = parser.parse("in Bern") 174 >>> result.reference_location.name 175 'Bern' 176 >>> result.spatial_relation.relation 177 'in' 178 179 Buffer query: 180 >>> result = parser.parse("near Lake Geneva") 181 >>> result.spatial_relation.relation 182 'near' 183 >>> result.buffer_config.distance_m 184 5000 185 186 Directional query: 187 >>> result = parser.parse("north of Lausanne") 188 >>> result.spatial_relation.relation 189 'north_of' 190 >>> result.reference_location.name 191 'Lausanne' 192 193 Multilingual: 194 >>> result = parser.parse("près de Genève") 195 >>> result.spatial_relation.relation 196 'near' 197 >>> result.reference_location.name 198 'Genève' 199 """ 200 formatted_messages = self.prompt.format_messages(query=query) 201 202 try: 203 response = self.structured_llm.invoke(formatted_messages) 204 except Exception as e: 205 raise ParsingError( 206 message=f"LLM invocation failed: {str(e)}", 207 raw_response="", 208 original_error=e, 209 ) from e 210 211 return self._finalize(self._unpack_response(response), query)
Parse a natural language location query into structured format.
This is the main method for parsing queries. It:
- Invokes the LLM with structured output
- Validates the spatial relation is registered
- Enriches with default parameters
- Checks confidence threshold
Arguments:
- query: Natural language query in any language
Returns:
GeoQuery: Structured query representation with confidence scores
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Warns:
LowConfidenceWarning: If confidence below threshold (permissive mode)
Examples:
Simple containment query:
>>> result = parser.parse("in Bern") >>> result.reference_location.name 'Bern' >>> result.spatial_relation.relation 'in'Buffer query:
>>> result = parser.parse("near Lake Geneva") >>> result.spatial_relation.relation 'near' >>> result.buffer_config.distance_m 5000Directional query:
>>> result = parser.parse("north of Lausanne") >>> result.spatial_relation.relation 'north_of' >>> result.reference_location.name 'Lausanne'Multilingual:
>>> result = parser.parse("près de Genève") >>> result.spatial_relation.relation 'near' >>> result.reference_location.name 'Genève'
213 async def aparse(self, query: str) -> GeoQuery: 214 """ 215 Asynchronously parse a natural language location query into structured format. 216 217 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 218 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 219 blocking. Validation is synchronous and runs after the LLM call. 220 """ 221 formatted_messages = self.prompt.format_messages(query=query) 222 223 try: 224 response = await self.structured_llm.ainvoke(formatted_messages) 225 except Exception as e: 226 raise ParsingError( 227 message=f"LLM invocation failed: {str(e)}", 228 raw_response="", 229 original_error=e, 230 ) from e 231 232 return self._finalize(self._unpack_response(response), query)
Asynchronously parse a natural language location query into structured format.
Async counterpart to parse(). Uses ainvoke on the structured LLM
so it can be awaited inside event loops (e.g. FastAPI endpoints) without
blocking. Validation is synchronous and runs after the LLM call.
234 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 235 """ 236 Parse a natural language location query with streaming reasoning and results. 237 238 This method provides real-time feedback during the parsing process by yielding 239 intermediate reasoning steps and the final GeoQuery result. This is useful for 240 providing users with transparency into the LLM's decision-making process and 241 for building responsive UIs. 242 243 The stream yields dictionaries with the following event types: 244 - {"type": "start"} - Stream started 245 - {"type": "reasoning", "content": str} - Intermediate processing steps 246 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 247 - {"type": "error", "content": str} - Errors encountered during processing 248 - {"type": "finish"} - Stream completed successfully 249 250 Args: 251 query: Natural language query in any language 252 253 Yields: 254 dict: Stream events with type and optional content fields 255 256 Raises: 257 ParsingError: If LLM fails to parse query into valid structure 258 ValidationError: If parsed query fails business logic validation 259 UnknownRelationError: If spatial relation is not registered 260 LowConfidenceError: If confidence below threshold (strict mode only) 261 262 Examples: 263 Basic usage with async iteration: 264 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 265 ... if event["type"] == "reasoning": 266 ... print(f"Reasoning: {event['content']}") 267 ... elif event["type"] == "data-response": 268 ... geo_query = event["content"] 269 ... print(f"Location: {geo_query['reference_location']['name']}") 270 ... elif event["type"] == "error": 271 ... print(f"Error: {event['content']}") 272 273 Using in a FastAPI streaming endpoint: 274 >>> from fastapi.responses import StreamingResponse 275 >>> @app.get("/stream") 276 >>> async def stream_endpoint(q: str): 277 ... async def event_stream(): 278 ... async for event in parser.parse_stream(q): 279 ... yield f"data: {json.dumps(event)}\\n\\n" 280 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 281 """ 282 try: 283 # Signal start of stream 284 yield {"type": "start"} 285 286 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 287 formatted_messages = self.prompt.format_messages(query=query) 288 289 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 290 try: 291 response = await self.structured_llm.ainvoke(formatted_messages) 292 except Exception as e: 293 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 294 raise ParsingError( 295 message=f"LLM invocation failed: {str(e)}", 296 raw_response="", 297 original_error=e, 298 ) from e 299 300 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 301 try: 302 geo_query = self._unpack_response(response) 303 except ParsingError: 304 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 305 raise 306 307 if geo_query.confidence_breakdown.reasoning: 308 yield { 309 "type": "reasoning", 310 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 311 } 312 313 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 314 geo_query = self._finalize(geo_query, query) 315 316 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 317 yield {"type": "data-response", "content": geo_query.model_dump()} 318 319 # Signal successful completion 320 yield {"type": "finish"} 321 322 except Exception as e: 323 # Emit error event before re-raising 324 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 325 raise
Parse a natural language location query with streaming reasoning and results.
This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.
The stream yields dictionaries with the following event types:
- {"type": "start"} - Stream started
- {"type": "reasoning", "content": str} - Intermediate processing steps
- {"type": "data-response", "content": dict} - Final GeoQuery as JSON
- {"type": "error", "content": str} - Errors encountered during processing
- {"type": "finish"} - Stream completed successfully
Arguments:
- query: Natural language query in any language
Yields:
dict: Stream events with type and optional content fields
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Examples:
Basic usage with async iteration:
>>> async for event in parser.parse_stream("restaurants near Lake Geneva"): ... if event["type"] == "reasoning": ... print(f"Reasoning: {event['content']}") ... elif event["type"] == "data-response": ... geo_query = event["content"] ... print(f"Location: {geo_query['reference_location']['name']}") ... elif event["type"] == "error": ... print(f"Error: {event['content']}")Using in a FastAPI streaming endpoint:
>>> from fastapi.responses import StreamingResponse >>> @app.get("/stream") >>> async def stream_endpoint(q: str): ... async def event_stream(): ... async for event in parser.parse_stream(q): ... yield f"data: {json.dumps(event)}\n\n" ... return StreamingResponse(event_stream(), media_type="text/event-stream")
327 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 328 """ 329 Parse multiple queries in batch. 330 331 Note: This is a simple sequential implementation. 332 For true parallelization, consider using async methods or ThreadPoolExecutor. 333 334 Args: 335 queries: List of natural language queries 336 337 Returns: 338 List of GeoQuery objects (same order as input) 339 340 Raises: 341 Same exceptions as parse() for any failing query 342 """ 343 return [self.parse(query) for query in queries]
Parse multiple queries in batch.
Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.
Arguments:
- queries: List of natural language queries
Returns:
List of GeoQuery objects (same order as input)
Raises:
- Same exceptions as parse() for any failing query
345 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 346 """ 347 Get list of available spatial relations. 348 349 Args: 350 category: Optional filter by category ("containment", "buffer", "directional") 351 352 Returns: 353 List of relation names 354 """ 355 return self.spatial_config.list_relations(category=category)
Get list of available spatial relations.
Arguments:
- category: Optional filter by category ("containment", "buffer", "directional")
Returns:
List of relation names
357 def describe_relation(self, relation_name: str) -> str: 358 """ 359 Get description of a spatial relation. 360 361 Args: 362 relation_name: Name of the relation 363 364 Returns: 365 Human-readable description 366 367 Raises: 368 UnknownRelationError: If relation is not registered 369 """ 370 config = self.spatial_config.get_config(relation_name) 371 return config.description
Get description of a spatial relation.
Arguments:
- relation_name: Name of the relation
Returns:
Human-readable description
Raises:
- UnknownRelationError: If relation is not registered
117class GeoQuery(BaseModel): 118 """ 119 Root model representing a parsed geographic query. 120 This is the main output structure returned by the parser. 121 """ 122 123 query_type: Literal["simple", "compound", "split", "boolean"] = Field( 124 "simple", 125 description="Type of query. Phase 1 only supports 'simple'. " 126 "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations", 127 ) 128 spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location") 129 reference_location: ReferenceLocation | None = Field( 130 None, 131 description="Reference location for the spatial query. " 132 "None when the query contains no named geographic location.", 133 ) 134 buffer_config: BufferConfig | None = Field( 135 None, 136 description="Buffer configuration for buffer and directional relations. " 137 "Auto-generated with defaults by enrich_with_defaults() if not provided. " 138 "Required for 'near', 'around', 'north_of', etc. " 139 "Set to None for containment relations ('in').", 140 ) 141 confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse") 142 original_query: str = Field(description="Original query text exactly as provided by the user") 143 144 @model_validator(mode="after") 145 def validate_buffer_config_consistency(self) -> "GeoQuery": 146 """Validate buffer_config consistency with relation category.""" 147 # Buffer and directional relations must have buffer_config 148 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 149 raise ValueError( 150 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 151 ) 152 153 # Containment relations should not have buffer_config 154 if self.spatial_relation.category == "containment" and self.buffer_config is not None: 155 raise ValueError( 156 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 157 f"should not have buffer_config" 158 ) 159 160 return self
Root model representing a parsed geographic query. This is the main output structure returned by the parser.
Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations
Reference location for the spatial query. None when the query contains no named geographic location.
Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').
144 @model_validator(mode="after") 145 def validate_buffer_config_consistency(self) -> "GeoQuery": 146 """Validate buffer_config consistency with relation category.""" 147 # Buffer and directional relations must have buffer_config 148 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 149 raise ValueError( 150 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 151 ) 152 153 # Containment relations should not have buffer_config 154 if self.spatial_relation.category == "containment" and self.buffer_config is not None: 155 raise ValueError( 156 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 157 f"should not have buffer_config" 158 ) 159 160 return self
Validate buffer_config consistency with relation category.
96class SpatialRelation(BaseModel): 97 """A spatial relationship between target and reference.""" 98 99 relation: str = Field( 100 description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', " 101 "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list." 102 ) 103 category: RelationCategory = Field( 104 description="Category of spatial relation. " 105 "'containment' = exact boundary matching (in), " 106 "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), " 107 "'directional' = sector-based queries (north_of, south_of, east_of, west_of)" 108 ) 109 explicit_distance: float | None = Field( 110 None, 111 description="Distance in meters if explicitly mentioned by user. " 112 "For example: 'within 5km' → 5000, 'within 500 meters' → 500. " 113 "Leave null if not explicitly stated.", 114 )
A spatial relationship between target and reference.
Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.
Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), 'directional' = sector-based queries (north_of, south_of, east_of, west_of)
36class ReferenceLocation(BaseModel): 37 """A geographic reference location extracted from the query.""" 38 39 name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')") 40 # FIXME: enum ? 41 type: str | None = Field( 42 None, 43 description="Type hint for geographic feature (city, lake, mountain, canton, country, " 44 "train_station, airport, river, road, etc.). This is a HINT for ranking results, " 45 "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, " 46 "'Rhone' could be river or road), provide your best guess or leave null. " 47 "The datasource will return multiple types ranked by relevance.", 48 ) 49 type_confidence: ConfidenceLevel | None = Field( 50 None, 51 description="Confidence in the type inference (0-1). High confidence (>0.8) when type is " 52 "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous " 53 "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, " 54 "'in X' → city/region, 'on X' → lake/mountain.", 55 )
A geographic reference location extracted from the query.
Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')
Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.
Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.
58class BufferConfig(BaseModel): 59 """Configuration for buffer-based spatial operations.""" 60 61 distance_m: float = Field( 62 description="Buffer distance in meters. Positive values expand outward (proximity), " 63 "negative values erode inward (e.g., 'in the heart of'). " 64 "Examples: 5000 = 5km radius, -500 = 500m erosion" 65 ) 66 buffer_from: Literal["center", "boundary"] = Field( 67 description="Buffer origin. 'center' = buffer from centroid point (for proximity), " 68 "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)" 69 ) 70 ring_only: bool = Field( 71 False, 72 description="If True, exclude the reference feature itself to create a ring/donut shape. " 73 "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). " 74 "Only valid with buffer_from='boundary'.", 75 ) 76 side: Literal["left", "right"] | None = Field( 77 None, 78 description="Side of a linear feature for one-sided buffer. " 79 "'left' = left side relative to line direction, 'right' = right side. " 80 "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().", 81 ) 82 inferred: bool = Field( 83 True, 84 description="True if this configuration was inferred from relation defaults. " 85 "False if the user explicitly specified distance or buffer parameters.", 86 ) 87 88 @model_validator(mode="after") 89 def validate_ring_only(self) -> "BufferConfig": 90 """Validate that ring_only is only used with boundary buffers.""" 91 if self.ring_only and self.buffer_from == "center": 92 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 93 return self
Configuration for buffer-based spatial operations.
Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion
Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)
If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.
Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().
True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.
88 @model_validator(mode="after") 89 def validate_ring_only(self) -> "BufferConfig": 90 """Validate that ring_only is only used with boundary buffers.""" 91 if self.ring_only and self.buffer_from == "center": 92 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 93 return self
Validate that ring_only is only used with boundary buffers.
16class ConfidenceScore(BaseModel): 17 """Confidence scores for different aspects of the parsed query.""" 18 19 overall: ConfidenceLevel = Field( 20 description="Overall confidence score for the entire query parse. " 21 "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain", 22 ) 23 location_confidence: ConfidenceLevel = Field( 24 description="Confidence in correctly identifying the reference location", 25 ) 26 relation_confidence: ConfidenceLevel = Field( 27 description="Confidence in correctly identifying the spatial relation", 28 ) 29 reasoning: str | None = Field( 30 None, 31 description="Explanation for confidence scores. Always include reasoning for clarity and debugging. " 32 "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.", 33 )
Confidence scores for different aspects of the parsed query.
Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain
Confidence in correctly identifying the reference location
40class SpatialRelationConfig: 41 """ 42 Registry and configuration for spatial relations. 43 44 Manages built-in and custom spatial relations with their default parameters. 45 """ 46 47 def __init__(self): 48 """Initialize with built-in spatial relations.""" 49 self.relations: dict[str, RelationConfig] = {} 50 self._initialize_defaults() 51 52 def _initialize_defaults(self): 53 """Register built-in spatial relations from ARCHITECTURE.md.""" 54 55 # ===== CONTAINMENT RELATIONS ===== 56 self.register_relation( 57 RelationConfig( 58 name="in", 59 category="containment", 60 description="Feature is within the reference boundary", 61 ) 62 ) 63 64 # ===== BUFFER/PROXIMITY RELATIONS ===== 65 self.register_relation( 66 RelationConfig( 67 name="near", 68 category="buffer", 69 description="Proximity search with default 5km radius", 70 default_distance_m=5000, 71 buffer_from="center", 72 ) 73 ) 74 75 self.register_relation( 76 RelationConfig( 77 name="on_shores_of", 78 category="buffer", 79 description="Ring buffer around lake/water boundary, excluding the water body itself", 80 default_distance_m=1000, 81 buffer_from="boundary", 82 ring_only=True, 83 ) 84 ) 85 86 self.register_relation( 87 RelationConfig( 88 name="along", 89 category="buffer", 90 description="Buffer following a linear feature like a river or road", 91 default_distance_m=500, 92 buffer_from="boundary", 93 ) 94 ) 95 96 self.register_relation( 97 RelationConfig( 98 name="left_bank", 99 category="buffer", 100 description="Left bank of a linear feature (river, road) relative to its direction/flow", 101 default_distance_m=500, 102 buffer_from="boundary", 103 side="left", 104 ) 105 ) 106 107 self.register_relation( 108 RelationConfig( 109 name="right_bank", 110 category="buffer", 111 description="Right bank of a linear feature (river, road) relative to its direction/flow", 112 default_distance_m=500, 113 buffer_from="boundary", 114 side="right", 115 ) 116 ) 117 118 self.register_relation( 119 RelationConfig( 120 name="in_the_heart_of", 121 category="buffer", 122 description="Central area excluding periphery (negative buffer - erosion)", 123 default_distance_m=-500, 124 buffer_from="boundary", 125 ) 126 ) 127 128 # ===== DIRECTIONAL RELATIONS ===== 129 # All directional relations use consistent defaults: 130 # - Distance: 10km radius (default_distance_m=10000) 131 # - Sector: 90° angular wedge (sector_angle_degrees=90) 132 # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults) 133 # These defaults are applied automatically by enrich_with_defaults() for any directional query. 134 # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West) 135 self.register_relation( 136 RelationConfig( 137 name="north_of", 138 category="directional", 139 description="Directional sector north of reference", 140 default_distance_m=10000, 141 sector_angle_degrees=90, 142 direction_angle_degrees=0, 143 ) 144 ) 145 146 self.register_relation( 147 RelationConfig( 148 name="south_of", 149 category="directional", 150 description="Directional sector south of reference", 151 default_distance_m=10000, 152 sector_angle_degrees=90, 153 direction_angle_degrees=180, 154 ) 155 ) 156 157 self.register_relation( 158 RelationConfig( 159 name="east_of", 160 category="directional", 161 description="Directional sector east of reference", 162 default_distance_m=10000, 163 sector_angle_degrees=90, 164 direction_angle_degrees=90, 165 ) 166 ) 167 168 self.register_relation( 169 RelationConfig( 170 name="west_of", 171 category="directional", 172 description="Directional sector west of reference", 173 default_distance_m=10000, 174 sector_angle_degrees=90, 175 direction_angle_degrees=270, 176 ) 177 ) 178 179 # ===== DIAGONAL DIRECTIONAL RELATIONS ===== 180 self.register_relation( 181 RelationConfig( 182 name="northeast_of", 183 category="directional", 184 description="Directional sector northeast of reference", 185 default_distance_m=10000, 186 sector_angle_degrees=90, 187 direction_angle_degrees=45, 188 ) 189 ) 190 191 self.register_relation( 192 RelationConfig( 193 name="southeast_of", 194 category="directional", 195 description="Directional sector southeast of reference", 196 default_distance_m=10000, 197 sector_angle_degrees=90, 198 direction_angle_degrees=135, 199 ) 200 ) 201 202 self.register_relation( 203 RelationConfig( 204 name="southwest_of", 205 category="directional", 206 description="Directional sector southwest of reference", 207 default_distance_m=10000, 208 sector_angle_degrees=90, 209 direction_angle_degrees=225, 210 ) 211 ) 212 213 self.register_relation( 214 RelationConfig( 215 name="northwest_of", 216 category="directional", 217 description="Directional sector northwest of reference", 218 default_distance_m=10000, 219 sector_angle_degrees=90, 220 direction_angle_degrees=315, 221 ) 222 ) 223 224 def register_relation(self, config: RelationConfig) -> None: 225 """Register a new spatial relation.""" 226 self.relations[config.name] = config 227 228 def has_relation(self, name: str) -> bool: 229 """Check if a relation is registered.""" 230 return name in self.relations 231 232 def get_config(self, name: str) -> RelationConfig: 233 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 234 if not self.has_relation(name): 235 raise UnknownRelationError( 236 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 237 relation_name=name, 238 ) 239 return self.relations[name] 240 241 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 242 """List available relation names.""" 243 if category is None: 244 return sorted(self.relations.keys()) 245 return sorted(r.name for r in self.relations.values() if r.category == category) 246 247 def format_for_prompt(self) -> str: 248 """Format relations for inclusion in LLM prompt.""" 249 lines = [] 250 251 # Group by category 252 for category in get_args(RelationCategory): 253 category_relations = [r for r in self.relations.values() if r.category == category] 254 if not category_relations: 255 continue 256 257 lines.append(f"\n{category.upper()} RELATIONS:") 258 259 for rel in sorted(category_relations, key=lambda r: r.name): 260 # Build distance info 261 dist_info = "" 262 if rel.default_distance_m is not None: 263 dist_str = f"{abs(rel.default_distance_m)}m" 264 if rel.default_distance_m < 0: 265 dist_info = f" (default: {dist_str} erosion)" 266 else: 267 dist_info = f" (default: {dist_str})" 268 269 # Build special flags 270 flags = [] 271 if rel.ring_only: 272 flags.append("ring buffer") 273 if rel.buffer_from: 274 flags.append(f"from {rel.buffer_from}") 275 if rel.side: 276 flags.append(f"{rel.side} side only") 277 flag_info = f" [{', '.join(flags)}]" if flags else "" 278 279 # Format line 280 lines.append(f" • {rel.name}{dist_info}{flag_info}") 281 lines.append(f" {rel.description}") 282 283 # Add notes 284 lines.append("\nNOTES:") 285 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 286 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake)") 287 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 288 289 return "\n".join(lines)
Registry and configuration for spatial relations.
Manages built-in and custom spatial relations with their default parameters.
47 def __init__(self): 48 """Initialize with built-in spatial relations.""" 49 self.relations: dict[str, RelationConfig] = {} 50 self._initialize_defaults()
Initialize with built-in spatial relations.
224 def register_relation(self, config: RelationConfig) -> None: 225 """Register a new spatial relation.""" 226 self.relations[config.name] = config
Register a new spatial relation.
228 def has_relation(self, name: str) -> bool: 229 """Check if a relation is registered.""" 230 return name in self.relations
Check if a relation is registered.
232 def get_config(self, name: str) -> RelationConfig: 233 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 234 if not self.has_relation(name): 235 raise UnknownRelationError( 236 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 237 relation_name=name, 238 ) 239 return self.relations[name]
Get configuration for a relation. Raises UnknownRelationError if not found.
241 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 242 """List available relation names.""" 243 if category is None: 244 return sorted(self.relations.keys()) 245 return sorted(r.name for r in self.relations.values() if r.category == category)
List available relation names.
247 def format_for_prompt(self) -> str: 248 """Format relations for inclusion in LLM prompt.""" 249 lines = [] 250 251 # Group by category 252 for category in get_args(RelationCategory): 253 category_relations = [r for r in self.relations.values() if r.category == category] 254 if not category_relations: 255 continue 256 257 lines.append(f"\n{category.upper()} RELATIONS:") 258 259 for rel in sorted(category_relations, key=lambda r: r.name): 260 # Build distance info 261 dist_info = "" 262 if rel.default_distance_m is not None: 263 dist_str = f"{abs(rel.default_distance_m)}m" 264 if rel.default_distance_m < 0: 265 dist_info = f" (default: {dist_str} erosion)" 266 else: 267 dist_info = f" (default: {dist_str})" 268 269 # Build special flags 270 flags = [] 271 if rel.ring_only: 272 flags.append("ring buffer") 273 if rel.buffer_from: 274 flags.append(f"from {rel.buffer_from}") 275 if rel.side: 276 flags.append(f"{rel.side} side only") 277 flag_info = f" [{', '.join(flags)}]" if flags else "" 278 279 # Format line 280 lines.append(f" • {rel.name}{dist_info}{flag_info}") 281 lines.append(f" {rel.description}") 282 283 # Add notes 284 lines.append("\nNOTES:") 285 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 286 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake)") 287 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 288 289 return "\n".join(lines)
Format relations for inclusion in LLM prompt.
13@dataclass 14class RelationConfig: 15 """ 16 Configuration for a single spatial relation. 17 18 Attributes: 19 name: Relation identifier (e.g., "in", "near", "north_of") 20 category: Type of spatial operation 21 description: Human-readable description for LLM prompt 22 default_distance_m: Default buffer distance in meters 23 buffer_from: Buffer origin 24 ring_only: Exclude reference feature to create ring buffer 25 sector_angle_degrees: Angular sector for directional queries 26 direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise) 27 """ 28 29 name: str 30 category: RelationCategory 31 description: str 32 default_distance_m: float | None = None 33 buffer_from: Literal["center", "boundary"] | None = None 34 ring_only: bool = False 35 side: Literal["left", "right"] | None = None 36 sector_angle_degrees: float | None = None 37 direction_angle_degrees: float | None = None
Configuration for a single spatial relation.
Attributes:
- name: Relation identifier (e.g., "in", "near", "north_of")
- category: Type of spatial operation
- description: Human-readable description for LLM prompt
- default_distance_m: Default buffer distance in meters
- buffer_from: Buffer origin
- ring_only: Exclude reference feature to create ring buffer
- sector_angle_degrees: Angular sector for directional queries
- direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
Base exception for all GeoFilter errors.
13class ParsingError(GeoFilterError): 14 """LLM failed to parse query into valid structure.""" 15 16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
LLM failed to parse query into valid structure.
16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
Initialize parsing error.
Arguments:
- message: Error description
- raw_response: Raw response from LLM
- original_error: Original exception that caused parsing failure
30class ValidationError(GeoFilterError): 31 """Structured output is valid but fails business logic validation.""" 32 33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Structured output is valid but fails business logic validation.
33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Initialize validation error.
Arguments:
- message: Error description
- field: Field name that failed validation
- detail: Additional detail about the validation failure
47class NoReferenceLocationError(ValidationError): 48 """Query contains no named geographic reference location.""" 49 50 def __init__(self, message: str): 51 super().__init__(message, field="reference_location")
Query contains no named geographic reference location.
54class UnknownRelationError(ValidationError): 55 """Spatial relation is not registered in configuration.""" 56 57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Spatial relation is not registered in configuration.
57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Initialize unknown relation error.
Arguments:
- message: Error description
- relation_name: The unknown relation name
69class LowConfidenceError(GeoFilterError): 70 """Query confidence is below threshold (strict mode).""" 71 72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Query confidence is below threshold (strict mode).
72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Initialize low confidence error.
Arguments:
- message: Error description
- confidence: Confidence score (0-1)
- reasoning: Optional explanation for low confidence
86class LowConfidenceWarning(UserWarning): 87 """Query confidence is below threshold (permissive mode).""" 88 89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Query confidence is below threshold (permissive mode).
89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Initialize low confidence warning.
Arguments:
- confidence: Confidence score (0-1)
- message: Warning message
12class GeoDataSource(Protocol): 13 """ 14 Protocol for geographic data sources. 15 16 Implementations resolve location names to geographic features. 17 Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326). 18 19 Example of returned feature: 20 { 21 "type": "Feature", 22 "id": "uuid-123", 23 "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, 24 "bbox": [8.4, 47.3, 8.6, 47.4], 25 "properties": { 26 "name": "Zürich", 27 "type": "city", 28 "confidence": 1.0, 29 ... 30 } 31 } 32 """ 33 34 def search( 35 self, 36 name: str, 37 type: str | None = None, 38 max_results: int = 10, 39 ) -> list[dict[str, Any]]: 40 """ 41 Search for geographic features by name. 42 43 Args: 44 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 45 type: Optional type hint for filtering/ranking results. 46 Examples: "lake", "city", "mountain", "canton", "river". 47 When provided, matching types are ranked higher. 48 max_results: Maximum number of results to return. 49 50 Returns: 51 List of matching GeoJSON Feature dicts, ranked by relevance. 52 Returns empty list if no matches found. 53 """ 54 ... 55 56 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 57 """ 58 Get a specific feature by its unique identifier. 59 60 Args: 61 feature_id: Unique identifier from the data source. 62 63 Returns: 64 The matching GeoJSON Feature dict, or None if not found. 65 """ 66 ... 67 68 def get_available_types(self) -> list[str]: 69 """ 70 Get list of concrete geographic types this datasource can return. 71 72 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 73 that this datasource uses in the "type" property of returned features. 74 These types can be matched against the location type hierarchy for fuzzy matching. 75 76 The returned types should be a subset of or mapped to the standard location 77 type hierarchy defined in location_types.TYPE_HIERARCHY. 78 79 Returns: 80 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 81 Empty list if this datasource does not provide type information. 82 83 Example: 84 >>> source = SwissNames3DSource("data/") 85 >>> types = source.get_available_types() 86 >>> print(types) 87 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 88 """ 89 ...
Protocol for geographic data sources.
Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
Example of returned feature:
{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }
1960def _no_init_or_replace_init(self, *args, **kwargs): 1961 cls = type(self) 1962 1963 if cls._is_protocol: 1964 raise TypeError('Protocols cannot be instantiated') 1965 1966 # Already using a custom `__init__`. No need to calculate correct 1967 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1968 if cls.__init__ is not _no_init_or_replace_init: 1969 return 1970 1971 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1972 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1973 # searches for a proper new `__init__` in the MRO. The new `__init__` 1974 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1975 # instantiation of the protocol subclass will thus use the new 1976 # `__init__` and no longer call `_no_init_or_replace_init`. 1977 for base in cls.__mro__: 1978 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1979 if init is not _no_init_or_replace_init: 1980 cls.__init__ = init 1981 break 1982 else: 1983 # should not happen 1984 cls.__init__ = object.__init__ 1985 1986 cls.__init__(self, *args, **kwargs)
34 def search( 35 self, 36 name: str, 37 type: str | None = None, 38 max_results: int = 10, 39 ) -> list[dict[str, Any]]: 40 """ 41 Search for geographic features by name. 42 43 Args: 44 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 45 type: Optional type hint for filtering/ranking results. 46 Examples: "lake", "city", "mountain", "canton", "river". 47 When provided, matching types are ranked higher. 48 max_results: Maximum number of results to return. 49 50 Returns: 51 List of matching GeoJSON Feature dicts, ranked by relevance. 52 Returns empty list if no matches found. 53 """ 54 ...
Search for geographic features by name.
Arguments:
- name: Location name to search for (e.g., "Lake Geneva", "Bern").
- type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.
56 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 57 """ 58 Get a specific feature by its unique identifier. 59 60 Args: 61 feature_id: Unique identifier from the data source. 62 63 Returns: 64 The matching GeoJSON Feature dict, or None if not found. 65 """ 66 ...
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier from the data source.
Returns:
The matching GeoJSON Feature dict, or None if not found.
68 def get_available_types(self) -> list[str]: 69 """ 70 Get list of concrete geographic types this datasource can return. 71 72 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 73 that this datasource uses in the "type" property of returned features. 74 These types can be matched against the location type hierarchy for fuzzy matching. 75 76 The returned types should be a subset of or mapped to the standard location 77 type hierarchy defined in location_types.TYPE_HIERARCHY. 78 79 Returns: 80 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 81 Empty list if this datasource does not provide type information. 82 83 Example: 84 >>> source = SwissNames3DSource("data/") 85 >>> types = source.get_available_types() 86 >>> print(types) 87 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 88 """ 89 ...
Get list of concrete geographic types this datasource can return.
Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.
The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.
Returns:
List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.
Example:
>>> source = SwissNames3DSource("data/") >>> types = source.get_available_types() >>> print(types) ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
162class SwissNames3DSource: 163 """ 164 Geographic data source backed by swisstopo's swissNAMES3D dataset. 165 166 Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase 167 and provides search by name with optional type filtering. 168 169 If data_path is a directory, automatically loads and concatenates all SwissNames3D 170 shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within. 171 172 All geometries are returned as GeoJSON in WGS84 (EPSG:4326). 173 174 Args: 175 data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles. 176 layer: Layer name within the data source (for multi-layer formats like GDB). 177 178 Example: 179 >>> source = SwissNames3DSource("data/") # Load all 3 geometry types 180 >>> results = source.search("Lac Léman", type="lake") 181 >>> print(results[0].geometry) # GeoJSON in WGS84 182 """ 183 184 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 185 self._data_path = Path(data_path) 186 self._layer = layer 187 self._gdf: gpd.GeoDataFrame | None = None 188 self._name_index: dict[str, list[int]] = {} 189 190 def _ensure_loaded(self) -> None: 191 """Load data lazily on first access.""" 192 if self._gdf is not None: 193 return 194 self._load_data() 195 196 def _load_data(self) -> None: 197 """Load SwissNames3D data and build the name index.""" 198 # Check if data_path is a directory 199 if self._data_path.is_dir(): 200 self._load_from_directory() 201 else: 202 # Load single file 203 kwargs: dict[str, Any] = {} 204 if self._layer is not None: 205 kwargs["layer"] = self._layer 206 self._gdf = gpd.read_file(str(self._data_path), **kwargs) 207 208 self._build_name_index() 209 210 def _load_from_directory(self) -> None: 211 """Load and concatenate all SwissNames3D shapefiles from a directory.""" 212 # Look for the 3 standard SwissNames3D shapefiles 213 shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"] 214 gdfs: list[gpd.GeoDataFrame] = [] 215 216 for name in shapefile_names: 217 shp_path = self._data_path / f"{name}.shp" 218 if shp_path.exists(): 219 gdf = gpd.read_file(str(shp_path)) 220 gdfs.append(gdf) 221 222 if not gdfs: 223 raise ValueError( 224 f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}" 225 ) 226 227 # Find common columns across all loaded GeoDataFrames 228 common_cols = set(gdfs[0].columns) 229 for gdf in gdfs[1:]: 230 common_cols &= set(gdf.columns) 231 232 # Keep only common columns and concatenate 233 gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs] 234 self._gdf = gpd.GeoDataFrame( 235 gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry" 236 ) 237 238 def _build_name_index(self) -> None: 239 """Build a normalized name → row indices lookup for fast search.""" 240 assert self._gdf is not None 241 self._name_index = {} 242 243 name_col = self._detect_name_column() 244 for idx, name in enumerate(self._gdf[name_col]): 245 if not isinstance(name, str) or not name.strip(): 246 continue 247 normalized = _normalize_name(name) 248 if normalized not in self._name_index: 249 self._name_index[normalized] = [] 250 self._name_index[normalized].append(idx) 251 252 def _detect_name_column(self) -> str: 253 """Detect the name column in the data.""" 254 assert self._gdf is not None 255 for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"): 256 if candidate in self._gdf.columns: 257 return candidate 258 raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}") 259 260 def _detect_type_column(self) -> str | None: 261 """Detect the feature type column in the data.""" 262 assert self._gdf is not None 263 for candidate in ("OBJEKTART", "objektart", "Objektart"): 264 if candidate in self._gdf.columns: 265 return candidate 266 return None 267 268 def _detect_id_column(self) -> str | None: 269 """Detect the unique ID column in the data.""" 270 assert self._gdf is not None 271 for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"): 272 if candidate in self._gdf.columns: 273 return candidate 274 return None 275 276 def _row_to_feature(self, idx: int) -> dict[str, Any]: 277 """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry.""" 278 assert self._gdf is not None 279 row = self._gdf.iloc[idx] 280 281 # Get name 282 name_col = self._detect_name_column() 283 name = str(row[name_col]) 284 285 # Get type 286 type_col = self._detect_type_column() 287 raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown" 288 normalized_type = _objektart_to_type(raw_type) 289 290 # Get ID 291 id_col = self._detect_id_column() 292 feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx) 293 294 # Convert geometry to WGS84 GeoJSON 295 geom = row.geometry 296 if geom is None or geom.is_empty: 297 geometry = {"type": "Point", "coordinates": [0, 0]} 298 bbox = None 299 else: 300 # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer 301 # Drop Z coordinates — they are not needed and cause issues with single_sided buffers 302 wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom)) 303 geometry = mapping(wgs84_geom) 304 bounds = wgs84_geom.bounds # (minx, miny, maxx, maxy) 305 bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) 306 307 # Collect extra properties 308 skip_cols = {name_col, "geometry"} 309 if type_col: 310 skip_cols.add(type_col) 311 if id_col: 312 skip_cols.add(id_col) 313 314 properties: dict[str, Any] = { 315 "name": name, 316 "type": normalized_type, 317 "confidence": 1.0, 318 } 319 for col in self._gdf.columns: 320 if col not in skip_cols: 321 val = row.get(col) 322 if val is not None and str(val) != "nan": 323 properties[col] = val 324 325 return { 326 "type": "Feature", 327 "id": feature_id, 328 "geometry": geometry, 329 "bbox": bbox, 330 "properties": properties, 331 } 332 333 def search( 334 self, 335 name: str, 336 type: str | None = None, 337 max_results: int = 10, 338 ) -> list[dict[str, Any]]: 339 """ 340 Search for geographic features by name. 341 342 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 343 First tries exact matching, then falls back to fuzzy matching if no exact 344 matches found. 345 346 Args: 347 name: Location name to search for. 348 type: Optional type hint to filter results. If provided, only features 349 of this type are returned. 350 max_results: Maximum number of results to return. 351 352 Returns: 353 List of matching GeoJSON Feature dicts. If type is provided, only 354 features of that type are returned. Empty list if no matches found. 355 """ 356 self._ensure_loaded() 357 358 normalized = _normalize_name(name) 359 indices = self._name_index.get(normalized, []) 360 361 # If no exact match, try fuzzy matching 362 if not indices: 363 indices = self._fuzzy_search(normalized) 364 365 features = [self._row_to_feature(idx) for idx in indices] 366 367 # Filter by type if type hint provided. 368 # Expand via the type hierarchy so that category hints (e.g. "water") match 369 # all concrete types within that category ("lake", "river", "pond", ...). 370 if type is not None: 371 matching_types = get_matching_types(type) 372 if matching_types: 373 features = [f for f in features if f["properties"].get("type") in matching_types] 374 else: 375 # Unknown type hint, fall back to exact string match 376 features = [f for f in features if f["properties"].get("type") == type.lower()] 377 378 return features[:max_results] 379 380 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 381 """ 382 Fuzzy search for names that partially match the search query. 383 384 Uses token matching to find results where at least one token from the 385 query matches a token in the indexed name. This handles cases like: 386 - "venoge" matching "la venoge" 387 - "rhone" matching "rhone valais" 388 389 Args: 390 normalized: The normalized search query. 391 threshold: Minimum fuzzy match score (0-100) to include a result. 392 393 Returns: 394 List of row indices for fuzzy-matched names, sorted by score (descending). 395 """ 396 matches: list[tuple[int, float]] = [] 397 query_tokens = set(normalized.split()) 398 399 for indexed_name, indices in self._name_index.items(): 400 indexed_tokens = set(indexed_name.split()) 401 402 # Check if any query token matches any indexed token 403 token_overlap = query_tokens & indexed_tokens 404 405 if token_overlap: 406 # Also use token_set_ratio for better matching of partial strings 407 score = fuzz.token_set_ratio(normalized, indexed_name) 408 if score >= threshold: 409 for idx in indices: 410 matches.append((idx, score)) 411 412 # Sort by score (descending) to return best matches first 413 matches.sort(key=lambda x: x[1], reverse=True) 414 return [idx for idx, _ in matches] 415 416 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 417 """ 418 Get a specific feature by its unique identifier. 419 420 Args: 421 feature_id: Unique identifier (UUID or row index). 422 423 Returns: 424 The matching GeoJSON Feature dict, or None if not found. 425 """ 426 self._ensure_loaded() 427 assert self._gdf is not None 428 429 id_col = self._detect_id_column() 430 if id_col: 431 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 432 if not matches.empty: 433 return self._row_to_feature(matches.index[0]) 434 435 # Fallback: try as row index 436 try: 437 idx = int(feature_id) 438 if 0 <= idx < len(self._gdf): 439 return self._row_to_feature(idx) 440 except ValueError: 441 pass 442 443 return None 444 445 def get_available_types(self) -> list[str]: 446 """ 447 Get list of concrete geographic types this datasource can return. 448 449 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 450 representing all possible types that SwissNames3D data can be classified as. 451 452 Returns: 453 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 454 """ 455 return sorted(OBJEKTART_TYPE_MAP.keys())
Geographic data source backed by swisstopo's swissNAMES3D dataset.
Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.
If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
Arguments:
- data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
- layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/") # Load all 3 geometry types >>> results = source.search("Lac Léman", type="lake") >>> print(results[0].geometry) # GeoJSON in WGS84
333 def search( 334 self, 335 name: str, 336 type: str | None = None, 337 max_results: int = 10, 338 ) -> list[dict[str, Any]]: 339 """ 340 Search for geographic features by name. 341 342 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 343 First tries exact matching, then falls back to fuzzy matching if no exact 344 matches found. 345 346 Args: 347 name: Location name to search for. 348 type: Optional type hint to filter results. If provided, only features 349 of this type are returned. 350 max_results: Maximum number of results to return. 351 352 Returns: 353 List of matching GeoJSON Feature dicts. If type is provided, only 354 features of that type are returned. Empty list if no matches found. 355 """ 356 self._ensure_loaded() 357 358 normalized = _normalize_name(name) 359 indices = self._name_index.get(normalized, []) 360 361 # If no exact match, try fuzzy matching 362 if not indices: 363 indices = self._fuzzy_search(normalized) 364 365 features = [self._row_to_feature(idx) for idx in indices] 366 367 # Filter by type if type hint provided. 368 # Expand via the type hierarchy so that category hints (e.g. "water") match 369 # all concrete types within that category ("lake", "river", "pond", ...). 370 if type is not None: 371 matching_types = get_matching_types(type) 372 if matching_types: 373 features = [f for f in features if f["properties"].get("type") in matching_types] 374 else: 375 # Unknown type hint, fall back to exact string match 376 features = [f for f in features if f["properties"].get("type") == type.lower()] 377 378 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.
Arguments:
- name: Location name to search for.
- type: Optional type hint to filter results. If provided, only features of this type are returned.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.
416 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 417 """ 418 Get a specific feature by its unique identifier. 419 420 Args: 421 feature_id: Unique identifier (UUID or row index). 422 423 Returns: 424 The matching GeoJSON Feature dict, or None if not found. 425 """ 426 self._ensure_loaded() 427 assert self._gdf is not None 428 429 id_col = self._detect_id_column() 430 if id_col: 431 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 432 if not matches.empty: 433 return self._row_to_feature(matches.index[0]) 434 435 # Fallback: try as row index 436 try: 437 idx = int(feature_id) 438 if 0 <= idx < len(self._gdf): 439 return self._row_to_feature(idx) 440 except ValueError: 441 pass 442 443 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier (UUID or row index).
Returns:
The matching GeoJSON Feature dict, or None if not found.
445 def get_available_types(self) -> list[str]: 446 """ 447 Get list of concrete geographic types this datasource can return. 448 449 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 450 representing all possible types that SwissNames3D data can be classified as. 451 452 Returns: 453 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 454 """ 455 return sorted(OBJEKTART_TYPE_MAP.keys())
Get list of concrete geographic types this datasource can return.
Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.
Returns:
Sorted list of type strings (e.g., ["lake", "city", "river", ...])
266class IGNBDCartoSource: 267 """ 268 Geographic data source backed by IGN's BD-CARTO 5.0 dataset. 269 270 Loads French geographic data from GeoPackage files extracted to a directory. 271 Supports administrative boundaries (communes, departments, regions, …), 272 hydrography (rivers, lakes, …), named places (quarters, hamlets, …), 273 orographic features (peaks, passes, valleys, …) and protected areas. 274 275 Data must first be downloaded with ``make download-data-ign``, which places 276 the GeoPackage files in ``data/bdcarto/``. 277 278 All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 279 (EPSG:4326) and returned as standard GeoJSON Feature dicts. 280 281 Args: 282 data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``). 283 284 Example: 285 >>> source = IGNBDCartoSource("data/bdcarto") 286 >>> results = source.search("Ardèche", type="department") 287 >>> results = source.search("Lyon", type="city") 288 >>> results = source.search("Rhône", type="river") 289 """ 290 291 def __init__(self, data_path: str | Path) -> None: 292 self._data_path = Path(data_path) 293 self._gdf: gpd.GeoDataFrame | None = None 294 self._name_index: dict[str, list[int]] = {} 295 296 def _ensure_loaded(self) -> None: 297 if self._gdf is not None: 298 return 299 self._load_data() 300 301 def _load_data(self) -> None: 302 if self._data_path.is_dir(): 303 self._gdf = self._load_from_directory() 304 else: 305 self._gdf = self._load_from_file(self._data_path) 306 self._build_name_index() 307 308 def _load_from_file(self, path: Path) -> gpd.GeoDataFrame: 309 """Load from a GeoJSON fixture file. Features must include a ``_layer`` column.""" 310 full_gdf = gpd.read_file(str(path)) 311 if "_layer" not in full_gdf.columns: 312 raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column") 313 314 gdfs: list[gpd.GeoDataFrame] = [] 315 for layer_name, cfg in _LAYER_CONFIGS.items(): 316 rows = full_gdf[full_gdf["_layer"] == layer_name].copy() 317 if rows.empty: 318 continue 319 name_col: str = cfg["name_col"] 320 if name_col not in rows.columns: 321 continue 322 rows[_NAME_COL] = rows[name_col].astype(str) 323 rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 324 rows = rows.to_crs("EPSG:4326") 325 gdfs.append(rows) 326 327 if not gdfs: 328 raise ValueError(f"No matching BD-CARTO features found in {path}") 329 330 combined = pd.concat(gdfs, ignore_index=True) 331 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 332 333 def _load_from_directory(self) -> gpd.GeoDataFrame: 334 """Load and concatenate all configured layers from the data directory.""" 335 gdfs: list[gpd.GeoDataFrame] = [] 336 337 for layer_name, cfg in _LAYER_CONFIGS.items(): 338 gpkg_path = self._data_path / f"{layer_name}.gpkg" 339 if not gpkg_path.exists(): 340 continue 341 342 gdf = gpd.read_file(str(gpkg_path)) 343 344 name_col: str = cfg["name_col"] 345 if name_col not in gdf.columns: 346 continue 347 348 gdf[_NAME_COL] = gdf[name_col].astype(str) 349 gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 350 gdf["_layer"] = layer_name 351 gdf = gdf.to_crs("EPSG:4326") 352 353 gdfs.append(gdf) 354 355 if not gdfs: 356 raise ValueError( 357 f"No BD-CARTO GeoPackage files found in {self._data_path}. " 358 f"Run 'make download-data-ign' to download the dataset." 359 ) 360 361 combined = pd.concat(gdfs, ignore_index=True) 362 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 363 364 def _build_name_index(self) -> None: 365 """Build normalized name → row indices lookup (with article-stripped variants).""" 366 assert self._gdf is not None 367 self._name_index = {} 368 for idx, name in enumerate(self._gdf[_NAME_COL]): 369 if not isinstance(name, str) or not name.strip() or name == "nan": 370 continue 371 for key in _index_keys(name): 372 if key not in self._name_index: 373 self._name_index[key] = [] 374 self._name_index[key].append(idx) 375 376 def _row_to_feature(self, idx: int) -> dict[str, Any]: 377 """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84).""" 378 assert self._gdf is not None 379 row = self._gdf.iloc[idx] 380 381 name = str(row[_NAME_COL]) 382 normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown" 383 feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx) 384 385 geom = row.geometry 386 if geom is None or geom.is_empty: 387 geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]} 388 bbox = None 389 else: 390 geometry = mapping(geom) 391 bounds = geom.bounds 392 bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3]) 393 394 skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"} 395 properties: dict[str, Any] = { 396 "name": name, 397 "type": normalized_type, 398 "confidence": 1.0, 399 } 400 for col in self._gdf.columns: 401 if col not in skip_cols: 402 val = _to_json_value(row.get(col)) 403 if val is not None: 404 properties[col] = val 405 406 return { 407 "type": "Feature", 408 "id": feature_id, 409 "geometry": geometry, 410 "bbox": bbox, 411 "properties": properties, 412 } 413 414 def search( 415 self, 416 name: str, 417 type: str | None = None, 418 max_results: int = 10, 419 ) -> list[dict[str, Any]]: 420 """ 421 Search for geographic features by name. 422 423 Uses case-insensitive, accent-normalized exact matching with fuzzy 424 fallback when no exact match is found. 425 426 Args: 427 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 428 ``"Rhône"``). 429 type: Optional type hint for filtering. Supports both concrete types 430 (``"department"``, ``"city"``, ``"river"``) and category hints 431 (``"administrative"``, ``"water"``). 432 max_results: Maximum number of results. 433 434 Returns: 435 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 436 """ 437 self._ensure_loaded() 438 439 normalized = _normalize_name(name) 440 indices = self._name_index.get(normalized, []) 441 442 if not indices: 443 indices = self._fuzzy_search(normalized) 444 445 features = [self._row_to_feature(idx) for idx in indices] 446 447 if type is not None: 448 matching_types = get_matching_types(type) 449 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 450 if matching_types: 451 features = [f for f in features if f["properties"].get("type") in matching_types] 452 else: 453 features = [f for f in features if f["properties"].get("type") == type.lower()] 454 455 features = merge_segments(features) 456 457 return features[:max_results] 458 459 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 460 """Token-overlap + token_set_ratio fuzzy search.""" 461 matches: list[tuple[int, float]] = [] 462 query_tokens = set(normalized.split()) 463 464 for indexed_name, indices in self._name_index.items(): 465 if query_tokens & set(indexed_name.split()): 466 score = fuzz.token_set_ratio(normalized, indexed_name) 467 if score >= threshold: 468 for idx in indices: 469 matches.append((idx, score)) 470 471 matches.sort(key=lambda x: x[1], reverse=True) 472 return [idx for idx, _ in matches] 473 474 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 475 """ 476 Get a feature by its ``cleabs`` identifier or row index. 477 478 Args: 479 feature_id: ``cleabs`` string or integer row index. 480 481 Returns: 482 Matching GeoJSON Feature dict, or ``None``. 483 """ 484 self._ensure_loaded() 485 assert self._gdf is not None 486 487 if "cleabs" in self._gdf.columns: 488 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 489 if not matches.empty: 490 return self._row_to_feature(matches.index[0]) 491 492 try: 493 idx = int(feature_id) 494 if 0 <= idx < len(self._gdf): 495 return self._row_to_feature(idx) 496 except ValueError: 497 pass 498 499 return None 500 501 def get_available_types(self) -> list[str]: 502 """ 503 Return the union of all normalized types this source can return. 504 505 Returns: 506 Sorted list of type strings. 507 """ 508 types: set[str] = set() 509 for cfg in _LAYER_CONFIGS.values(): 510 if cfg.get("commune_flags"): 511 types.update({"city", "municipality"}) 512 elif cfg.get("fixed_type"): 513 types.add(cfg["fixed_type"]) 514 elif cfg.get("type_map"): 515 types.update(cfg["type_map"].values()) 516 return sorted(types)
Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.
Data must first be downloaded with make download-data-ign, which places
the GeoPackage files in data/bdcarto/.
All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.
Arguments:
- data_path: Directory containing the
.gpkgfiles (e.g."data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto") >>> results = source.search("Ardèche", type="department") >>> results = source.search("Lyon", type="city") >>> results = source.search("Rhône", type="river")
414 def search( 415 self, 416 name: str, 417 type: str | None = None, 418 max_results: int = 10, 419 ) -> list[dict[str, Any]]: 420 """ 421 Search for geographic features by name. 422 423 Uses case-insensitive, accent-normalized exact matching with fuzzy 424 fallback when no exact match is found. 425 426 Args: 427 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 428 ``"Rhône"``). 429 type: Optional type hint for filtering. Supports both concrete types 430 (``"department"``, ``"city"``, ``"river"``) and category hints 431 (``"administrative"``, ``"water"``). 432 max_results: Maximum number of results. 433 434 Returns: 435 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 436 """ 437 self._ensure_loaded() 438 439 normalized = _normalize_name(name) 440 indices = self._name_index.get(normalized, []) 441 442 if not indices: 443 indices = self._fuzzy_search(normalized) 444 445 features = [self._row_to_feature(idx) for idx in indices] 446 447 if type is not None: 448 matching_types = get_matching_types(type) 449 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 450 if matching_types: 451 features = [f for f in features if f["properties"].get("type") in matching_types] 452 else: 453 features = [f for f in features if f["properties"].get("type") == type.lower()] 454 455 features = merge_segments(features) 456 457 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.
Arguments:
- name: Location name to search for (e.g.
"Ardèche","Lyon","Rhône"). - type: Optional type hint for filtering. Supports both concrete types
(
"department","city","river") and category hints ("administrative","water"). - max_results: Maximum number of results.
Returns:
List of GeoJSON Feature dicts in WGS84. Empty list if no match.
474 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 475 """ 476 Get a feature by its ``cleabs`` identifier or row index. 477 478 Args: 479 feature_id: ``cleabs`` string or integer row index. 480 481 Returns: 482 Matching GeoJSON Feature dict, or ``None``. 483 """ 484 self._ensure_loaded() 485 assert self._gdf is not None 486 487 if "cleabs" in self._gdf.columns: 488 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 489 if not matches.empty: 490 return self._row_to_feature(matches.index[0]) 491 492 try: 493 idx = int(feature_id) 494 if 0 <= idx < len(self._gdf): 495 return self._row_to_feature(idx) 496 except ValueError: 497 pass 498 499 return None
Get a feature by its cleabs identifier or row index.
Arguments:
- feature_id:
cleabsstring or integer row index.
Returns:
Matching GeoJSON Feature dict, or
None.
501 def get_available_types(self) -> list[str]: 502 """ 503 Return the union of all normalized types this source can return. 504 505 Returns: 506 Sorted list of type strings. 507 """ 508 types: set[str] = set() 509 for cfg in _LAYER_CONFIGS.values(): 510 if cfg.get("commune_flags"): 511 types.update({"city", "municipality"}) 512 elif cfg.get("fixed_type"): 513 types.add(cfg["fixed_type"]) 514 elif cfg.get("type_map"): 515 types.update(cfg["type_map"].values()) 516 return sorted(types)
Return the union of all normalized types this source can return.
Returns:
Sorted list of type strings.
14class CompositeDataSource: 15 """ 16 Fan-out datasource that delegates to an ordered list of GeoDataSource instances. 17 18 ``search`` queries every registered source and merges results in order. 19 20 ``get_by_id`` tries each source in order and returns the first hit. 21 22 ``get_available_types`` returns the union of all sources' types. 23 24 Args: 25 sources: One or more GeoDataSource instances. 26 27 Example: 28 >>> swiss = SwissNames3DSource("data/") 29 >>> ign = IGNBDTopoSource("data/") 30 >>> combo = CompositeDataSource(swiss, ign) 31 >>> results = combo.search("Geneva", type="city") 32 """ 33 34 def __init__(self, *sources: GeoDataSource) -> None: 35 if not sources: 36 raise ValueError("At least one datasource is required.") 37 self._sources: list[GeoDataSource] = list(sources) 38 39 # Public API (mirrors GeoDataSource protocol) 40 41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[dict[str, Any]]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[dict[str, Any]] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged 67 68 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None 83 84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
search queries every registered source and merges results in order.
get_by_id tries each source in order and returns the first hit.
get_available_types returns the union of all sources' types.
Arguments:
- sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/") >>> ign = IGNBDTopoSource("data/") >>> combo = CompositeDataSource(swiss, ign) >>> results = combo.search("Geneva", type="city")
41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[dict[str, Any]]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[dict[str, Any]] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged
Search all registered sources and return merged.
Arguments:
- name: Location name to search for.
- type: Optional type hint passed through to every source.
- max_results: Maximum results per source.
Returns:
List of GeoJSON Feature dicts, merged from all sources.
68 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None
Get a feature by ID, trying each source in order.
Arguments:
- feature_id: Unique identifier to look up.
Returns:
The first matching GeoJSON Feature dict, or None.
84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Return the union of all sources' available types, sorted.
Returns:
Sorted list of unique type strings.
62class PostGISDataSource: 63 """ 64 Geographic data source backed by a PostGIS table. 65 66 The table must expose at minimum a name column, a geometry column, and 67 optionally a type column. The expected schema is: 68 69 .. code-block:: sql 70 71 CREATE TABLE <table> ( 72 id TEXT PRIMARY KEY, 73 name TEXT NOT NULL, 74 type TEXT, 75 geom GEOMETRY(Geometry, 4326) 76 ); 77 78 The ``type`` column may store either: 79 80 - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D), 81 pass ``type_map`` so the datasource can translate between raw values and 82 the normalized etter type names. 83 - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``), 84 leave ``type_map=None`` (default). 85 86 Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly 87 reprojection. 88 89 Args: 90 connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a 91 connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``). 92 When a string is provided the engine is created internally. 93 table: Fully-qualified table name, e.g. ``"public.swissnames3d"``. 94 name_column: Column used for name-based search (default ``"name"``). 95 type_column: Column used for type filtering. Pass ``None`` to disable 96 type filtering (default ``"type"``). 97 geometry_column: PostGIS geometry column (default ``"geom"``). 98 id_column: Primary-key column (default ``"id"``). 99 crs: CRS of the stored geometries as an EPSG string. Defaults to 100 ``"EPSG:4326"`` (no reprojection). 101 type_map: Optional mapping from **normalized etter type names** to 102 **lists of raw type column values** present in the database. 103 This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP`` 104 and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be 105 passed directly:: 106 107 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 108 source = PostGISDataSource( 109 engine, 110 table="public.swissnames3d", 111 type_map=OBJEKTART_TYPE_MAP, 112 ) 113 114 When ``type_map`` is provided the datasource: 115 116 - Translates raw DB values → normalized types in returned features. 117 - Translates user type hints → raw DB values in SQL ``WHERE`` clauses. 118 - Returns normalized type names from ``get_available_types()``. 119 120 When ``None`` (default) the stored values are used as-is. 121 fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for 122 fuzzy fallback search when no exact ``ILIKE`` match is found. 123 124 Example: unmodified SwissNames3D table:: 125 126 from sqlalchemy import create_engine 127 from etter.datasources import PostGISDataSource 128 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 129 130 engine = create_engine(...) 131 source = PostGISDataSource( 132 engine, 133 table="public.swissnames3d", 134 type_map=OBJEKTART_TYPE_MAP, 135 ) 136 results = source.search("Lac Léman", type="lake") 137 """ 138 139 def __init__( 140 self, 141 connection: str | Engine, 142 table: str, 143 name_column: str = "name", 144 type_column: str | None = "type", 145 geometry_column: str = "geom", 146 id_column: str = "id", 147 crs: str = "EPSG:4326", 148 type_map: TypeMap | None = None, 149 fuzzy_threshold: float = 0.65, 150 ) -> None: 151 sa = _require_sqlalchemy() 152 153 if isinstance(connection, str): 154 self._engine = sa.create_engine(connection) 155 else: 156 self._engine = connection 157 158 try: 159 with self._engine.connect() as conn: 160 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 161 except Exception as exc: 162 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 163 164 self._table = table 165 self._name_col = name_column 166 self._type_col = type_column 167 self._geom_col = geometry_column 168 self._id_col = id_column 169 self._crs = crs 170 self._fuzzy_threshold = fuzzy_threshold 171 172 # Build bidirectional lookup structures from the user-supplied map. 173 if type_map: 174 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 175 self._raw_to_normalized: dict[str, str] = { 176 raw: normalized for normalized, raws in type_map.items() for raw in raws 177 } 178 else: 179 self._normalized_to_raw = {} 180 self._raw_to_normalized = {} 181 182 self._trgm_available: bool | None = None 183 self._unaccent_available: bool | None = None 184 185 def _get_connection(self) -> Any: 186 """Return a SQLAlchemy connection from the engine.""" 187 return self._engine.connect() 188 189 def _check_trgm(self, conn: Any) -> bool: 190 """Return True if pg_trgm extension is available in the database.""" 191 if self._trgm_available is not None: 192 return self._trgm_available 193 sa = _require_sqlalchemy() 194 try: 195 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'")) 196 self._trgm_available = result.fetchone() is not None 197 except Exception: 198 logger.exception("Failed to check pg_trgm availability") 199 self._trgm_available = False 200 return self._trgm_available 201 202 def _check_unaccent(self, conn: Any) -> bool: 203 """Return True if the unaccent extension is available in the database.""" 204 if self._unaccent_available is not None: 205 return self._unaccent_available 206 sa = _require_sqlalchemy() 207 try: 208 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'")) 209 self._unaccent_available = result.fetchone() is not None 210 except Exception: 211 logger.exception("Failed to check unaccent availability") 212 self._unaccent_available = False 213 return self._unaccent_available 214 215 def _normalize_type(self, raw_type: str | None) -> str | None: 216 """Translate a raw DB type value to its normalized etter name. 217 218 If no type_map was supplied the value is returned unchanged. 219 """ 220 if raw_type is None: 221 return None 222 return self._raw_to_normalized.get(raw_type, raw_type) 223 224 def _row_to_feature(self, row: Any) -> dict[str, Any]: 225 """Convert a SQLAlchemy Row to a GeoJSON Feature dict.""" 226 feature_id = str(row.id) 227 name = str(row.name) 228 raw_type = getattr(row, "type", None) 229 normalized_type = self._normalize_type(raw_type) 230 231 geojson_str = row.geojson 232 if geojson_str: 233 geometry = json.loads(geojson_str) 234 else: 235 geometry = {"type": "Point", "coordinates": [0, 0]} 236 237 bbox = _bbox_from_geojson(geometry) 238 239 properties: dict[str, Any] = { 240 "name": name, 241 "type": normalized_type, 242 "confidence": 1.0, 243 } 244 245 return { 246 "type": "Feature", 247 "id": feature_id, 248 "geometry": geometry, 249 "bbox": bbox, 250 "properties": properties, 251 } 252 253 def _build_select_columns(self) -> str: 254 """Build the SELECT column list as a SQL fragment.""" 255 type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type" 256 if self._crs.upper() != "EPSG:4326": 257 geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson" 258 else: 259 geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson" 260 return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}" 261 262 def search( 263 self, 264 name: str, 265 type: str | None = None, 266 max_results: int = 10, 267 ) -> list[dict[str, Any]]: 268 """ 269 Search for geographic features by name. 270 271 Uses a three-step cascade, stopping as soon as any step returns results: 272 273 1. **Normalized exact match** 274 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 275 3. **ILIKE substring** 276 277 ``merge_segments`` is applied after all rows are fetched so that 278 multi-segment linestrings (rivers, roads) are merged before the 279 ``max_results`` cap is applied. 280 281 Args: 282 name: Location name to search for. 283 type: Optional type hint for filtering results. 284 max_results: Maximum number of results to return. 285 286 Returns: 287 List of matching GeoJSON Feature dicts in WGS84. 288 """ 289 sa = _require_sqlalchemy() 290 cols = self._build_select_columns() 291 292 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 293 type_filter_values: list[str] | None = None 294 if type is not None and self._type_col is not None: 295 matching_types = get_matching_types(type) 296 concrete_types = matching_types if matching_types else [type.lower()] 297 if self._normalized_to_raw: 298 raw_values: list[str] = [] 299 for t in concrete_types: 300 raw_values.extend(self._normalized_to_raw.get(t, [t])) 301 type_filter_values = raw_values if raw_values else concrete_types 302 else: 303 type_filter_values = concrete_types 304 305 # Fetch more rows than requested so that merge_segments has the full 306 # set of segments to work with. Without this, a SQL LIMIT applied 307 # *before* merging would only return a partial set of linestring 308 # segments, producing incorrect / truncated geometries. 309 # We cap the internal limit at 2000 to avoid unbounded queries. 310 internal_limit = min(max(max_results * 20, 100), 2000) 311 312 with self._get_connection() as conn: 313 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 314 315 if not features: 316 with self._get_connection() as conn: 317 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 318 319 if not features: 320 with self._get_connection() as conn: 321 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 322 323 features = merge_segments(features) 324 return features[:max_results] 325 326 def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]: 327 """Return a WHERE clause fragment and bind params for type filtering.""" 328 if not values or self._type_col is None: 329 return "", {} 330 placeholders = ", ".join(f":type_{i}" for i in range(len(values))) 331 clause = f" AND {self._type_col} IN ({placeholders})" 332 params = {f"type_{i}": v for i, v in enumerate(values)} 333 return clause, params 334 335 def _search_normalized( 336 self, 337 conn: Any, 338 sa: Any, 339 cols: str, 340 name: str, 341 type_filter: list[str] | None, 342 fetch_limit: int, 343 ) -> list[dict[str, Any]]: 344 """ 345 Exact accent- and case-insensitive search. 346 347 Accent normalization (NFD decomposition + diacritic strip) is done in 348 Python before the query is sent to the DB. 349 """ 350 type_clause, type_params = self._type_filter_sql(type_filter) 351 name_expr = f"lower({self._name_col})" 352 if self._check_unaccent(conn): 353 name_expr = f"unaccent({name_expr})" 354 sql = sa.text( 355 f"SELECT {cols} FROM {self._table} " # noqa: S608 356 f"WHERE {name_expr} = :query{type_clause} " 357 f"LIMIT :limit" 358 ) 359 params: dict[str, Any] = { 360 "query": _normalize_name(name), 361 "limit": fetch_limit, 362 **type_params, 363 } 364 try: 365 result = conn.execute(sql, params) 366 return [self._row_to_feature(row) for row in result] 367 except Exception: 368 logger.exception("Normalized search failed for %r", name) 369 return [] 370 371 def _search_ilike( 372 self, 373 conn: Any, 374 sa: Any, 375 cols: str, 376 name: str, 377 type_filter: list[str] | None, 378 fetch_limit: int, 379 ) -> list[dict[str, Any]]: 380 """Case-insensitive substring fallback using ``ILIKE '%name%'``. 381 382 When the ``unaccent`` extension is available, both the stored name column 383 and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches 384 ``"Rhône"``. Without ``unaccent``, standard ILIKE is used (case-insensitive 385 only). 386 """ 387 type_clause, type_params = self._type_filter_sql(type_filter) 388 normalized = _normalize_name(name) 389 if self._check_unaccent(conn): 390 name_expr = f"unaccent(lower({self._name_col}))" 391 pattern = f"%{normalized}%" 392 else: 393 name_expr = self._name_col 394 pattern = f"%{name}%" 395 sql = sa.text( 396 f"SELECT {cols} FROM {self._table} " # noqa: S608 397 f"WHERE {name_expr} ILIKE :pattern{type_clause} " 398 f"LIMIT :limit" 399 ) 400 params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params} 401 try: 402 result = conn.execute(sql, params) 403 return [self._row_to_feature(row) for row in result] 404 except Exception: 405 logger.exception("ILIKE search failed for %r", name) 406 return [] 407 408 def _search_fuzzy( 409 self, 410 conn: Any, 411 sa: Any, 412 cols: str, 413 name: str, 414 type_filter: list[str] | None, 415 fetch_limit: int, 416 ) -> list[dict[str, Any]]: 417 """Fuzzy fallback using pg_trgm similarity (if extension is available).""" 418 if not self._check_trgm(conn): 419 logger.warning( 420 "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;" 421 ) 422 return [] 423 normalized_query = _normalize_name(name) 424 if self._check_unaccent(conn): 425 name_expr = f"unaccent(lower({self._name_col}))" 426 else: 427 logger.warning( 428 "unaccent extension not available. Accent-insensitive fuzzy search degraded. " 429 "Install it with: CREATE EXTENSION unaccent;" 430 ) 431 name_expr = f"lower({self._name_col})" 432 type_clause, type_params = self._type_filter_sql(type_filter) 433 sql = sa.text( 434 f"SELECT {cols} FROM {self._table} " # noqa: S608 435 f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} " 436 f"ORDER BY word_similarity({name_expr}, :query) DESC " 437 f"LIMIT :limit" 438 ) 439 params: dict[str, Any] = { 440 "query": normalized_query, 441 "threshold": self._fuzzy_threshold, 442 "limit": fetch_limit, 443 **type_params, 444 } 445 try: 446 result = conn.execute(sql, params) 447 return [self._row_to_feature(row) for row in result] 448 except Exception: 449 logger.exception("Fuzzy search failed for %r", name) 450 return [] 451 452 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 453 """ 454 Get a specific feature by its unique identifier. 455 456 Args: 457 feature_id: Value of the ``id`` column. 458 459 Returns: 460 The matching GeoJSON Feature dict, or ``None`` if not found. 461 """ 462 sa = _require_sqlalchemy() 463 cols = self._build_select_columns() 464 sql = sa.text( 465 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 466 ) 467 with self._get_connection() as conn: 468 try: 469 result = conn.execute(sql, {"id": feature_id}) 470 row = result.fetchone() 471 return self._row_to_feature(row) if row else None 472 except Exception: 473 logger.exception("get_by_id failed for %r", feature_id) 474 return None 475 476 def get_available_types(self) -> list[str]: 477 """ 478 Return the distinct ``type`` values present in the table. 479 480 Returns: 481 Sorted list of concrete type strings, or an empty list if the table 482 has no type column. 483 """ 484 if self._type_col is None: 485 return [] 486 sa = _require_sqlalchemy() 487 sql = sa.text( 488 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 489 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 490 ) 491 with self._get_connection() as conn: 492 try: 493 result = conn.execute(sql) 494 raw_types = [row.type for row in result] 495 except Exception: 496 logger.exception("get_available_types failed") 497 return [] 498 499 normalized = {self._normalize_type(t) for t in raw_types if t} 500 return sorted(t for t in normalized if t)
Geographic data source backed by a PostGIS table.
The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:
CREATE TABLE <table> (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT,
geom GEOMETRY(Geometry, 4326)
);
The type column may store either:
- Raw dataset values (e.g.
"See","Berg"for SwissNames3D), passtype_mapso the datasource can translate between raw values and the normalized etter type names. - Already-normalized values (e.g.
"lake","mountain"), leavetype_map=None(default).
Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly
reprojection.
Arguments:
- connection: A SQLAlchemy
~sqlalchemy.engine.Engineor a connection URL string (e.g."postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally. - table: Fully-qualified table name, e.g.
"public.swissnames3d". - name_column: Column used for name-based search (default
"name"). - type_column: Column used for type filtering. Pass
Noneto disable type filtering (default"type"). - geometry_column: PostGIS geometry column (default
"geom"). - id_column: Primary-key column (default
"id"). - crs: CRS of the stored geometries as an EPSG string. Defaults to
"EPSG:4326"(no reprojection). type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as
SwissNames3DSource.OBJEKTART_TYPE_MAPandIGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP source = PostGISDataSource( engine, table="public.swissnames3d", type_map=OBJEKTART_TYPE_MAP, )When
type_mapis provided the datasource:- Translates raw DB values → normalized types in returned features.
- Translates user type hints → raw DB values in SQL
WHEREclauses. - Returns normalized type names from
get_available_types().
When
None(default) the stored values are used as-is.- fuzzy_threshold: Minimum
pg_trgmsimilarity score (0-1) used for fuzzy fallback search when no exactILIKEmatch is found.
Example: unmodified SwissNames3D table::
from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
engine = create_engine(...)
source = PostGISDataSource(
engine,
table="public.swissnames3d",
type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
139 def __init__( 140 self, 141 connection: str | Engine, 142 table: str, 143 name_column: str = "name", 144 type_column: str | None = "type", 145 geometry_column: str = "geom", 146 id_column: str = "id", 147 crs: str = "EPSG:4326", 148 type_map: TypeMap | None = None, 149 fuzzy_threshold: float = 0.65, 150 ) -> None: 151 sa = _require_sqlalchemy() 152 153 if isinstance(connection, str): 154 self._engine = sa.create_engine(connection) 155 else: 156 self._engine = connection 157 158 try: 159 with self._engine.connect() as conn: 160 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 161 except Exception as exc: 162 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 163 164 self._table = table 165 self._name_col = name_column 166 self._type_col = type_column 167 self._geom_col = geometry_column 168 self._id_col = id_column 169 self._crs = crs 170 self._fuzzy_threshold = fuzzy_threshold 171 172 # Build bidirectional lookup structures from the user-supplied map. 173 if type_map: 174 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 175 self._raw_to_normalized: dict[str, str] = { 176 raw: normalized for normalized, raws in type_map.items() for raw in raws 177 } 178 else: 179 self._normalized_to_raw = {} 180 self._raw_to_normalized = {} 181 182 self._trgm_available: bool | None = None 183 self._unaccent_available: bool | None = None
262 def search( 263 self, 264 name: str, 265 type: str | None = None, 266 max_results: int = 10, 267 ) -> list[dict[str, Any]]: 268 """ 269 Search for geographic features by name. 270 271 Uses a three-step cascade, stopping as soon as any step returns results: 272 273 1. **Normalized exact match** 274 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 275 3. **ILIKE substring** 276 277 ``merge_segments`` is applied after all rows are fetched so that 278 multi-segment linestrings (rivers, roads) are merged before the 279 ``max_results`` cap is applied. 280 281 Args: 282 name: Location name to search for. 283 type: Optional type hint for filtering results. 284 max_results: Maximum number of results to return. 285 286 Returns: 287 List of matching GeoJSON Feature dicts in WGS84. 288 """ 289 sa = _require_sqlalchemy() 290 cols = self._build_select_columns() 291 292 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 293 type_filter_values: list[str] | None = None 294 if type is not None and self._type_col is not None: 295 matching_types = get_matching_types(type) 296 concrete_types = matching_types if matching_types else [type.lower()] 297 if self._normalized_to_raw: 298 raw_values: list[str] = [] 299 for t in concrete_types: 300 raw_values.extend(self._normalized_to_raw.get(t, [t])) 301 type_filter_values = raw_values if raw_values else concrete_types 302 else: 303 type_filter_values = concrete_types 304 305 # Fetch more rows than requested so that merge_segments has the full 306 # set of segments to work with. Without this, a SQL LIMIT applied 307 # *before* merging would only return a partial set of linestring 308 # segments, producing incorrect / truncated geometries. 309 # We cap the internal limit at 2000 to avoid unbounded queries. 310 internal_limit = min(max(max_results * 20, 100), 2000) 311 312 with self._get_connection() as conn: 313 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 314 315 if not features: 316 with self._get_connection() as conn: 317 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 318 319 if not features: 320 with self._get_connection() as conn: 321 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 322 323 features = merge_segments(features) 324 return features[:max_results]
Search for geographic features by name.
Uses a three-step cascade, stopping as soon as any step returns results:
- Normalized exact match
- pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
- ILIKE substring
merge_segments is applied after all rows are fetched so that
multi-segment linestrings (rivers, roads) are merged before the
max_results cap is applied.
Arguments:
- name: Location name to search for.
- type: Optional type hint for filtering results.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts in WGS84.
452 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 453 """ 454 Get a specific feature by its unique identifier. 455 456 Args: 457 feature_id: Value of the ``id`` column. 458 459 Returns: 460 The matching GeoJSON Feature dict, or ``None`` if not found. 461 """ 462 sa = _require_sqlalchemy() 463 cols = self._build_select_columns() 464 sql = sa.text( 465 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 466 ) 467 with self._get_connection() as conn: 468 try: 469 result = conn.execute(sql, {"id": feature_id}) 470 row = result.fetchone() 471 return self._row_to_feature(row) if row else None 472 except Exception: 473 logger.exception("get_by_id failed for %r", feature_id) 474 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Value of the
idcolumn.
Returns:
The matching GeoJSON Feature dict, or
Noneif not found.
476 def get_available_types(self) -> list[str]: 477 """ 478 Return the distinct ``type`` values present in the table. 479 480 Returns: 481 Sorted list of concrete type strings, or an empty list if the table 482 has no type column. 483 """ 484 if self._type_col is None: 485 return [] 486 sa = _require_sqlalchemy() 487 sql = sa.text( 488 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 489 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 490 ) 491 with self._get_connection() as conn: 492 try: 493 result = conn.execute(sql) 494 raw_types = [row.type for row in result] 495 except Exception: 496 logger.exception("get_available_types failed") 497 return [] 498 499 normalized = {self._normalize_type(t) for t in raw_types if t} 500 return sorted(t for t in normalized if t)
Return the distinct type values present in the table.
Returns:
Sorted list of concrete type strings, or an empty list if the table has no type column.
26def apply_spatial_relation( 27 geometry: dict[str, Any], 28 relation: SpatialRelation, 29 buffer_config: BufferConfig | None = None, 30 spatial_config: SpatialRelationConfig | None = None, 31 geometry_format: GeometryFormat = "geojson", 32) -> dict[str, Any] | str: 33 """ 34 Transform a reference geometry according to a spatial relation. 35 36 Converts the input GeoJSON geometry to a search area based on the 37 spatial relation category: 38 - Containment: returns the original geometry unchanged 39 - Buffer: applies positive (expand), negative (erode), or ring buffer 40 - Directional: creates an angular sector wedge 41 42 Args: 43 geometry: GeoJSON geometry dict in WGS84 (EPSG:4326). 44 relation: Spatial relation to apply. 45 buffer_config: Buffer configuration (required for buffer/directional relations). 46 spatial_config: Spatial relation registry used to look up directional angles. 47 Defaults to the module-level singleton; pass an explicit instance to 48 avoid repeated construction when calling from a hot path. 49 geometry_format: Output format for the geometry. "geojson" (default) returns a 50 GeoJSON dict, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string. 51 52 Returns: 53 Transformed geometry in the requested format (GeoJSON dict, WKT string, or WKB hex string). 54 55 Raises: 56 ValueError: If buffer_config is missing for buffer/directional relations, 57 or if the relation category is unknown. 58 59 Examples: 60 >>> from etter.models import SpatialRelation, BufferConfig 61 >>> # Circular buffer as GeoJSON (default) 62 >>> result = apply_spatial_relation( 63 ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, 64 ... relation=SpatialRelation(relation="near", category="buffer"), 65 ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), 66 ... ) 67 68 >>> # Same buffer as WKT 69 >>> result = apply_spatial_relation( 70 ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, 71 ... relation=SpatialRelation(relation="near", category="buffer"), 72 ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), 73 ... geometry_format="wkt", 74 ... ) 75 76 >>> # Containment (passthrough) 77 >>> result = apply_spatial_relation( 78 ... geometry=city_polygon, 79 ... relation=SpatialRelation(relation="in", category="containment"), 80 ... ) 81 """ 82 if relation.category == "containment": 83 result = _apply_containment(geometry) 84 elif relation.category == "buffer": 85 if buffer_config is None: 86 raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config") 87 result = _apply_buffer(geometry, buffer_config) 88 elif relation.category == "directional": 89 if buffer_config is None: 90 raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config") 91 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 92 relation_config = cfg.get_config(relation.relation) 93 direction = relation_config.direction_angle_degrees or 0 94 sector_angle = relation_config.sector_angle_degrees or 90 95 result = _apply_directional(geometry, buffer_config, direction, sector_angle) 96 else: 97 raise ValueError(f"Unknown relation category: '{relation.category}'") 98 99 return convert_geometry(result, geometry_format)
Transform a reference geometry according to a spatial relation.
Converts the input GeoJSON geometry to a search area based on the spatial relation category:
- Containment: returns the original geometry unchanged
- Buffer: applies positive (expand), negative (erode), or ring buffer
- Directional: creates an angular sector wedge
Arguments:
- geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
- relation: Spatial relation to apply.
- buffer_config: Buffer configuration (required for buffer/directional relations).
- spatial_config: Spatial relation registry used to look up directional angles. Defaults to the module-level singleton; pass an explicit instance to avoid repeated construction when calling from a hot path.
- geometry_format: Output format for the geometry. "geojson" (default) returns a GeoJSON dict, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:
Transformed geometry in the requested format (GeoJSON dict, WKT string, or WKB hex string).
Raises:
- ValueError: If buffer_config is missing for buffer/directional relations, or if the relation category is unknown.
Examples:
>>> from etter.models import SpatialRelation, BufferConfig >>> # Circular buffer as GeoJSON (default) >>> result = apply_spatial_relation( ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, ... relation=SpatialRelation(relation="near", category="buffer"), ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), ... )>>> # Same buffer as WKT >>> result = apply_spatial_relation( ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, ... relation=SpatialRelation(relation="near", category="buffer"), ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), ... geometry_format="wkt", ... )>>> # Containment (passthrough) >>> result = apply_spatial_relation( ... geometry=city_polygon, ... relation=SpatialRelation(relation="in", category="containment"), ... )
13def convert_geometry(geometry: dict[str, Any], fmt: GeometryFormat) -> dict[str, Any] | str: 14 """ 15 Convert a GeoJSON geometry dict to the requested format. 16 17 Args: 18 geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]}) 19 fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, 20 "wkb" returns a hex-encoded WKB string. 21 22 Returns: 23 The geometry in the requested format. 24 """ 25 if fmt == "geojson": 26 return geometry 27 geom = shape(geometry) 28 if fmt == "wkt": 29 return geom.wkt 30 return geom.wkb_hex
Convert a GeoJSON geometry dict to the requested format.
Arguments:
- geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
- fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:
The geometry in the requested format.
33def convert_feature_geometry(feature: dict[str, Any], fmt: GeometryFormat) -> dict[str, Any]: 34 """ 35 Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format. 36 37 Args: 38 feature: GeoJSON Feature dict with a "geometry" key. 39 fmt: Target geometry format. 40 41 Returns: 42 A new dict identical to the input except the "geometry" value is converted. 43 """ 44 if fmt == "geojson": 45 return feature 46 return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}
Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
Arguments:
- feature: GeoJSON Feature dict with a "geometry" key.
- fmt: Target geometry format.
Returns:
A new dict identical to the input except the "geometry" value is converted.