etter
etter - Natural Language Geographic Query Parsing
Parse location queries into structured geographic queries using LLM.
1""" 2etter - Natural Language Geographic Query Parsing 3 4Parse location queries into structured geographic queries using LLM. 5""" 6 7# Main API 8# Exceptions 9# Datasources 10from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource 11from .exceptions import ( 12 GeoFilterError, 13 LowConfidenceError, 14 LowConfidenceWarning, 15 ParsingError, 16 UnknownRelationError, 17 ValidationError, 18) 19 20# Models (for type hints and result access) 21from .models import ( 22 BufferConfig, 23 ConfidenceLevel, 24 ConfidenceScore, 25 GeoQuery, 26 ReferenceLocation, 27 SpatialRelation, 28) 29from .parser import GeoFilterParser 30 31# Spatial operations 32from .spatial import apply_spatial_relation 33 34# Configuration 35from .spatial_config import RelationConfig, SpatialRelationConfig 36 37__all__ = [ 38 # Main API 39 "GeoFilterParser", 40 # Models 41 "GeoQuery", 42 "SpatialRelation", 43 "ReferenceLocation", 44 "BufferConfig", 45 "ConfidenceScore", 46 "ConfidenceLevel", 47 # Configuration 48 "SpatialRelationConfig", 49 "RelationConfig", 50 # Exceptions 51 "GeoFilterError", 52 "ParsingError", 53 "ValidationError", 54 "UnknownRelationError", 55 "LowConfidenceError", 56 "LowConfidenceWarning", 57 # Datasources 58 "GeoDataSource", 59 "SwissNames3DSource", 60 "IGNBDCartoSource", 61 "CompositeDataSource", 62 "PostGISDataSource", 63 # Spatial 64 "apply_spatial_relation", 65]
19class GeoFilterParser: 20 """ 21 Main entry point for parsing natural language location queries. 22 23 This class orchestrates the entire parsing pipeline: 24 1. Initialize LLM with structured output 25 2. Build prompt with spatial relations and examples 26 3. Parse query through LLM 27 4. Validate and enrich with defaults 28 5. Return structured GeoQuery 29 30 Examples: 31 Basic usage: 32 >>> from langchain.chat_models import init_chat_model 33 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") 34 >>> parser = GeoFilterParser(llm=llm) 35 >>> result = parser.parse("restaurants in Lausanne") 36 >>> print(result.reference_location.name) 37 'Lausanne' 38 39 With strict confidence mode: 40 >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) 41 >>> result = parser.parse("near the station") # May raise LowConfidenceError 42 """ 43 44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 ): 53 """ 54 Initialize the parser. 55 56 Args: 57 llm: LangChain LLM instance (required). 58 spatial_config: Spatial relation configuration. If None, uses defaults 59 confidence_threshold: Minimum confidence to accept (0-1) 60 strict_mode: If True, raise error on low confidence. If False, warn only 61 include_examples: Whether to include few-shot examples in prompt 62 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 63 about the concrete types available in that datasource for better type inference. 64 65 Example: 66 >>> from langchain.chat_models import init_chat_model 67 >>> from etter.datasources.swissnames3d import SwissNames3DSource 68 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 69 >>> datasource = SwissNames3DSource("data/") 70 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 71 """ 72 self.llm = llm 73 74 # Initialize spatial config 75 self.spatial_config = spatial_config or SpatialRelationConfig() 76 77 # Settings 78 self.confidence_threshold = confidence_threshold 79 self.strict_mode = strict_mode 80 self.include_examples = include_examples 81 self.datasource = datasource 82 83 # Build structured LLM 84 self.structured_llm = self._build_structured_llm() 85 86 # Build prompt template 87 self.prompt = self._build_prompt() 88 89 def _build_structured_llm(self): 90 """Create LLM with structured output using Pydantic model.""" 91 92 return self.llm.with_structured_output( 93 GeoQuery, 94 method="function_calling", # Use function_calling for broader schema support 95 include_raw=True, # For error debugging 96 ) 97 98 def _build_prompt(self) -> ChatPromptTemplate: 99 """Build prompt template with spatial relations, examples, and available types.""" 100 available_types = None 101 if self.datasource is not None: 102 available_types = self.datasource.get_available_types() 103 104 return build_prompt_template( 105 spatial_config=self.spatial_config, 106 include_examples=self.include_examples, 107 available_types=available_types, 108 ) 109 110 def parse(self, query: str) -> GeoQuery: 111 """ 112 Parse a natural language location query into structured format. 113 114 This is the main method for parsing queries. It: 115 1. Invokes the LLM with structured output 116 2. Validates the spatial relation is registered 117 3. Enriches with default parameters 118 4. Checks confidence threshold 119 120 Args: 121 query: Natural language query in any language 122 123 Returns: 124 GeoQuery: Structured query representation with confidence scores 125 126 Raises: 127 ParsingError: If LLM fails to parse query into valid structure 128 ValidationError: If parsed query fails business logic validation 129 UnknownRelationError: If spatial relation is not registered 130 LowConfidenceError: If confidence below threshold (strict mode only) 131 132 Warns: 133 LowConfidenceWarning: If confidence below threshold (permissive mode) 134 135 Examples: 136 Simple containment query: 137 >>> result = parser.parse("in Bern") 138 >>> result.reference_location.name 139 'Bern' 140 >>> result.spatial_relation.relation 141 'in' 142 143 Buffer query: 144 >>> result = parser.parse("near Lake Geneva") 145 >>> result.spatial_relation.relation 146 'near' 147 >>> result.buffer_config.distance_m 148 5000 149 150 Directional query: 151 >>> result = parser.parse("north of Lausanne") 152 >>> result.spatial_relation.relation 153 'north_of' 154 >>> result.reference_location.name 155 'Lausanne' 156 157 Multilingual: 158 >>> result = parser.parse("près de Genève") 159 >>> result.spatial_relation.relation 160 'near' 161 >>> result.reference_location.name 162 'Genève' 163 """ 164 # Format prompt with query 165 formatted_messages = self.prompt.format_messages(query=query) 166 167 # Invoke LLM with structured output 168 try: 169 response = self.structured_llm.invoke(formatted_messages) 170 except Exception as e: 171 raise ParsingError( 172 message=f"LLM invocation failed: {str(e)}", 173 raw_response="", 174 original_error=e, 175 ) from e 176 177 # Check for parsing errors 178 parsed = response.get("parsed") if isinstance(response, dict) else response 179 180 if parsed is None: 181 raw = response.get("raw", "") if isinstance(response, dict) else "" 182 error = response.get("parsing_error") if isinstance(response, dict) else None 183 raise ParsingError( 184 message="Failed to parse query into structured format. " 185 "LLM may have returned invalid JSON or missed required fields.", 186 raw_response=str(raw), 187 original_error=error, 188 ) 189 190 geo_query = parsed 191 assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery" 192 193 # Ensure original_query is set correctly 194 if not geo_query.original_query or geo_query.original_query != query: 195 geo_query.original_query = query 196 197 # Run validation pipeline 198 geo_query = validate_query( 199 geo_query, 200 self.spatial_config, 201 confidence_threshold=self.confidence_threshold, 202 strict_mode=self.strict_mode, 203 ) 204 205 return geo_query 206 207 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 208 """ 209 Parse a natural language location query with streaming reasoning and results. 210 211 This method provides real-time feedback during the parsing process by yielding 212 intermediate reasoning steps and the final GeoQuery result. This is useful for 213 providing users with transparency into the LLM's decision-making process and 214 for building responsive UIs. 215 216 The stream yields dictionaries with the following event types: 217 - {"type": "start"} - Stream started 218 - {"type": "reasoning", "content": str} - Intermediate processing steps 219 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 220 - {"type": "error", "content": str} - Errors encountered during processing 221 - {"type": "finish"} - Stream completed successfully 222 223 Args: 224 query: Natural language query in any language 225 226 Yields: 227 dict: Stream events with type and optional content fields 228 229 Raises: 230 ParsingError: If LLM fails to parse query into valid structure 231 ValidationError: If parsed query fails business logic validation 232 UnknownRelationError: If spatial relation is not registered 233 LowConfidenceError: If confidence below threshold (strict mode only) 234 235 Examples: 236 Basic usage with async iteration: 237 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 238 ... if event["type"] == "reasoning": 239 ... print(f"Reasoning: {event['content']}") 240 ... elif event["type"] == "data-response": 241 ... geo_query = event["content"] 242 ... print(f"Location: {geo_query['reference_location']['name']}") 243 ... elif event["type"] == "error": 244 ... print(f"Error: {event['content']}") 245 246 Using in a FastAPI streaming endpoint: 247 >>> from fastapi.responses import StreamingResponse 248 >>> @app.get("/stream") 249 >>> async def stream_endpoint(q: str): 250 ... async def event_stream(): 251 ... async for event in parser.parse_stream(q): 252 ... yield f"data: {json.dumps(event)}\\n\\n" 253 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 254 """ 255 try: 256 # Signal start of stream 257 yield {"type": "start"} 258 259 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 260 formatted_messages = self.prompt.format_messages(query=query) 261 262 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 263 try: 264 response = await self.structured_llm.ainvoke(formatted_messages) 265 except Exception as e: 266 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 267 raise ParsingError( 268 message=f"LLM invocation failed: {str(e)}", 269 raw_response="", 270 original_error=e, 271 ) from e 272 273 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 274 parsed = response.get("parsed") if isinstance(response, dict) else response 275 276 if parsed is None: 277 raw = response.get("raw", "") if isinstance(response, dict) else "" 278 error = response.get("parsing_error") if isinstance(response, dict) else None 279 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 280 raise ParsingError( 281 message="Failed to parse query into structured format. " 282 "LLM may have returned invalid JSON or missed required fields.", 283 raw_response=str(raw), 284 original_error=error, 285 ) 286 287 geo_query = parsed 288 assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery" 289 290 # Ensure original_query is set correctly 291 if not geo_query.original_query or geo_query.original_query != query: 292 geo_query.original_query = query 293 294 if geo_query.confidence_breakdown.reasoning: 295 yield { 296 "type": "reasoning", 297 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 298 } 299 300 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 301 geo_query = validate_query( 302 geo_query, 303 self.spatial_config, 304 confidence_threshold=self.confidence_threshold, 305 strict_mode=self.strict_mode, 306 ) 307 308 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 309 yield {"type": "data-response", "content": geo_query.model_dump()} 310 311 # Signal successful completion 312 yield {"type": "finish"} 313 314 except Exception as e: 315 # Emit error event before re-raising 316 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 317 raise 318 319 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 320 """ 321 Parse multiple queries in batch. 322 323 Note: This is a simple sequential implementation. 324 For true parallelization, consider using async methods or ThreadPoolExecutor. 325 326 Args: 327 queries: List of natural language queries 328 329 Returns: 330 List of GeoQuery objects (same order as input) 331 332 Raises: 333 Same exceptions as parse() for any failing query 334 """ 335 return [self.parse(query) for query in queries] 336 337 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 338 """ 339 Get list of available spatial relations. 340 341 Args: 342 category: Optional filter by category ("containment", "buffer", "directional") 343 344 Returns: 345 List of relation names 346 """ 347 return self.spatial_config.list_relations(category=category) 348 349 def describe_relation(self, relation_name: str) -> str: 350 """ 351 Get description of a spatial relation. 352 353 Args: 354 relation_name: Name of the relation 355 356 Returns: 357 Human-readable description 358 359 Raises: 360 UnknownRelationError: If relation is not registered 361 """ 362 config = self.spatial_config.get_config(relation_name) 363 return config.description
Main entry point for parsing natural language location queries.
This class orchestrates the entire parsing pipeline:
- Initialize LLM with structured output
- Build prompt with spatial relations and examples
- Parse query through LLM
- Validate and enrich with defaults
- Return structured GeoQuery
Examples:
Basic usage:
>>> from langchain.chat_models import init_chat_model >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") >>> parser = GeoFilterParser(llm=llm) >>> result = parser.parse("restaurants in Lausanne") >>> print(result.reference_location.name) 'Lausanne'With strict confidence mode:
>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) >>> result = parser.parse("near the station") # May raise LowConfidenceError
44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 ): 53 """ 54 Initialize the parser. 55 56 Args: 57 llm: LangChain LLM instance (required). 58 spatial_config: Spatial relation configuration. If None, uses defaults 59 confidence_threshold: Minimum confidence to accept (0-1) 60 strict_mode: If True, raise error on low confidence. If False, warn only 61 include_examples: Whether to include few-shot examples in prompt 62 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 63 about the concrete types available in that datasource for better type inference. 64 65 Example: 66 >>> from langchain.chat_models import init_chat_model 67 >>> from etter.datasources.swissnames3d import SwissNames3DSource 68 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 69 >>> datasource = SwissNames3DSource("data/") 70 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 71 """ 72 self.llm = llm 73 74 # Initialize spatial config 75 self.spatial_config = spatial_config or SpatialRelationConfig() 76 77 # Settings 78 self.confidence_threshold = confidence_threshold 79 self.strict_mode = strict_mode 80 self.include_examples = include_examples 81 self.datasource = datasource 82 83 # Build structured LLM 84 self.structured_llm = self._build_structured_llm() 85 86 # Build prompt template 87 self.prompt = self._build_prompt()
Initialize the parser.
Arguments:
- llm: LangChain LLM instance (required).
- spatial_config: Spatial relation configuration. If None, uses defaults
- confidence_threshold: Minimum confidence to accept (0-1)
- strict_mode: If True, raise error on low confidence. If False, warn only
- include_examples: Whether to include few-shot examples in prompt
- datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
Example:
>>> from langchain.chat_models import init_chat_model >>> from etter.datasources.swissnames3d import SwissNames3DSource >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) >>> datasource = SwissNames3DSource("data/") >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
110 def parse(self, query: str) -> GeoQuery: 111 """ 112 Parse a natural language location query into structured format. 113 114 This is the main method for parsing queries. It: 115 1. Invokes the LLM with structured output 116 2. Validates the spatial relation is registered 117 3. Enriches with default parameters 118 4. Checks confidence threshold 119 120 Args: 121 query: Natural language query in any language 122 123 Returns: 124 GeoQuery: Structured query representation with confidence scores 125 126 Raises: 127 ParsingError: If LLM fails to parse query into valid structure 128 ValidationError: If parsed query fails business logic validation 129 UnknownRelationError: If spatial relation is not registered 130 LowConfidenceError: If confidence below threshold (strict mode only) 131 132 Warns: 133 LowConfidenceWarning: If confidence below threshold (permissive mode) 134 135 Examples: 136 Simple containment query: 137 >>> result = parser.parse("in Bern") 138 >>> result.reference_location.name 139 'Bern' 140 >>> result.spatial_relation.relation 141 'in' 142 143 Buffer query: 144 >>> result = parser.parse("near Lake Geneva") 145 >>> result.spatial_relation.relation 146 'near' 147 >>> result.buffer_config.distance_m 148 5000 149 150 Directional query: 151 >>> result = parser.parse("north of Lausanne") 152 >>> result.spatial_relation.relation 153 'north_of' 154 >>> result.reference_location.name 155 'Lausanne' 156 157 Multilingual: 158 >>> result = parser.parse("près de Genève") 159 >>> result.spatial_relation.relation 160 'near' 161 >>> result.reference_location.name 162 'Genève' 163 """ 164 # Format prompt with query 165 formatted_messages = self.prompt.format_messages(query=query) 166 167 # Invoke LLM with structured output 168 try: 169 response = self.structured_llm.invoke(formatted_messages) 170 except Exception as e: 171 raise ParsingError( 172 message=f"LLM invocation failed: {str(e)}", 173 raw_response="", 174 original_error=e, 175 ) from e 176 177 # Check for parsing errors 178 parsed = response.get("parsed") if isinstance(response, dict) else response 179 180 if parsed is None: 181 raw = response.get("raw", "") if isinstance(response, dict) else "" 182 error = response.get("parsing_error") if isinstance(response, dict) else None 183 raise ParsingError( 184 message="Failed to parse query into structured format. " 185 "LLM may have returned invalid JSON or missed required fields.", 186 raw_response=str(raw), 187 original_error=error, 188 ) 189 190 geo_query = parsed 191 assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery" 192 193 # Ensure original_query is set correctly 194 if not geo_query.original_query or geo_query.original_query != query: 195 geo_query.original_query = query 196 197 # Run validation pipeline 198 geo_query = validate_query( 199 geo_query, 200 self.spatial_config, 201 confidence_threshold=self.confidence_threshold, 202 strict_mode=self.strict_mode, 203 ) 204 205 return geo_query
Parse a natural language location query into structured format.
This is the main method for parsing queries. It:
- Invokes the LLM with structured output
- Validates the spatial relation is registered
- Enriches with default parameters
- Checks confidence threshold
Arguments:
- query: Natural language query in any language
Returns:
GeoQuery: Structured query representation with confidence scores
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Warns:
LowConfidenceWarning: If confidence below threshold (permissive mode)
Examples:
Simple containment query:
>>> result = parser.parse("in Bern") >>> result.reference_location.name 'Bern' >>> result.spatial_relation.relation 'in'Buffer query:
>>> result = parser.parse("near Lake Geneva") >>> result.spatial_relation.relation 'near' >>> result.buffer_config.distance_m 5000Directional query:
>>> result = parser.parse("north of Lausanne") >>> result.spatial_relation.relation 'north_of' >>> result.reference_location.name 'Lausanne'Multilingual:
>>> result = parser.parse("près de Genève") >>> result.spatial_relation.relation 'near' >>> result.reference_location.name 'Genève'
207 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 208 """ 209 Parse a natural language location query with streaming reasoning and results. 210 211 This method provides real-time feedback during the parsing process by yielding 212 intermediate reasoning steps and the final GeoQuery result. This is useful for 213 providing users with transparency into the LLM's decision-making process and 214 for building responsive UIs. 215 216 The stream yields dictionaries with the following event types: 217 - {"type": "start"} - Stream started 218 - {"type": "reasoning", "content": str} - Intermediate processing steps 219 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 220 - {"type": "error", "content": str} - Errors encountered during processing 221 - {"type": "finish"} - Stream completed successfully 222 223 Args: 224 query: Natural language query in any language 225 226 Yields: 227 dict: Stream events with type and optional content fields 228 229 Raises: 230 ParsingError: If LLM fails to parse query into valid structure 231 ValidationError: If parsed query fails business logic validation 232 UnknownRelationError: If spatial relation is not registered 233 LowConfidenceError: If confidence below threshold (strict mode only) 234 235 Examples: 236 Basic usage with async iteration: 237 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 238 ... if event["type"] == "reasoning": 239 ... print(f"Reasoning: {event['content']}") 240 ... elif event["type"] == "data-response": 241 ... geo_query = event["content"] 242 ... print(f"Location: {geo_query['reference_location']['name']}") 243 ... elif event["type"] == "error": 244 ... print(f"Error: {event['content']}") 245 246 Using in a FastAPI streaming endpoint: 247 >>> from fastapi.responses import StreamingResponse 248 >>> @app.get("/stream") 249 >>> async def stream_endpoint(q: str): 250 ... async def event_stream(): 251 ... async for event in parser.parse_stream(q): 252 ... yield f"data: {json.dumps(event)}\\n\\n" 253 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 254 """ 255 try: 256 # Signal start of stream 257 yield {"type": "start"} 258 259 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 260 formatted_messages = self.prompt.format_messages(query=query) 261 262 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 263 try: 264 response = await self.structured_llm.ainvoke(formatted_messages) 265 except Exception as e: 266 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 267 raise ParsingError( 268 message=f"LLM invocation failed: {str(e)}", 269 raw_response="", 270 original_error=e, 271 ) from e 272 273 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 274 parsed = response.get("parsed") if isinstance(response, dict) else response 275 276 if parsed is None: 277 raw = response.get("raw", "") if isinstance(response, dict) else "" 278 error = response.get("parsing_error") if isinstance(response, dict) else None 279 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 280 raise ParsingError( 281 message="Failed to parse query into structured format. " 282 "LLM may have returned invalid JSON or missed required fields.", 283 raw_response=str(raw), 284 original_error=error, 285 ) 286 287 geo_query = parsed 288 assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery" 289 290 # Ensure original_query is set correctly 291 if not geo_query.original_query or geo_query.original_query != query: 292 geo_query.original_query = query 293 294 if geo_query.confidence_breakdown.reasoning: 295 yield { 296 "type": "reasoning", 297 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 298 } 299 300 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 301 geo_query = validate_query( 302 geo_query, 303 self.spatial_config, 304 confidence_threshold=self.confidence_threshold, 305 strict_mode=self.strict_mode, 306 ) 307 308 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 309 yield {"type": "data-response", "content": geo_query.model_dump()} 310 311 # Signal successful completion 312 yield {"type": "finish"} 313 314 except Exception as e: 315 # Emit error event before re-raising 316 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 317 raise
Parse a natural language location query with streaming reasoning and results.
This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.
The stream yields dictionaries with the following event types:
- {"type": "start"} - Stream started
- {"type": "reasoning", "content": str} - Intermediate processing steps
- {"type": "data-response", "content": dict} - Final GeoQuery as JSON
- {"type": "error", "content": str} - Errors encountered during processing
- {"type": "finish"} - Stream completed successfully
Arguments:
- query: Natural language query in any language
Yields:
dict: Stream events with type and optional content fields
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Examples:
Basic usage with async iteration:
>>> async for event in parser.parse_stream("restaurants near Lake Geneva"): ... if event["type"] == "reasoning": ... print(f"Reasoning: {event['content']}") ... elif event["type"] == "data-response": ... geo_query = event["content"] ... print(f"Location: {geo_query['reference_location']['name']}") ... elif event["type"] == "error": ... print(f"Error: {event['content']}")Using in a FastAPI streaming endpoint:
>>> from fastapi.responses import StreamingResponse >>> @app.get("/stream") >>> async def stream_endpoint(q: str): ... async def event_stream(): ... async for event in parser.parse_stream(q): ... yield f"data: {json.dumps(event)}\n\n" ... return StreamingResponse(event_stream(), media_type="text/event-stream")
319 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 320 """ 321 Parse multiple queries in batch. 322 323 Note: This is a simple sequential implementation. 324 For true parallelization, consider using async methods or ThreadPoolExecutor. 325 326 Args: 327 queries: List of natural language queries 328 329 Returns: 330 List of GeoQuery objects (same order as input) 331 332 Raises: 333 Same exceptions as parse() for any failing query 334 """ 335 return [self.parse(query) for query in queries]
Parse multiple queries in batch.
Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.
Arguments:
- queries: List of natural language queries
Returns:
List of GeoQuery objects (same order as input)
Raises:
- Same exceptions as parse() for any failing query
337 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 338 """ 339 Get list of available spatial relations. 340 341 Args: 342 category: Optional filter by category ("containment", "buffer", "directional") 343 344 Returns: 345 List of relation names 346 """ 347 return self.spatial_config.list_relations(category=category)
Get list of available spatial relations.
Arguments:
- category: Optional filter by category ("containment", "buffer", "directional")
Returns:
List of relation names
349 def describe_relation(self, relation_name: str) -> str: 350 """ 351 Get description of a spatial relation. 352 353 Args: 354 relation_name: Name of the relation 355 356 Returns: 357 Human-readable description 358 359 Raises: 360 UnknownRelationError: If relation is not registered 361 """ 362 config = self.spatial_config.get_config(relation_name) 363 return config.description
Get description of a spatial relation.
Arguments:
- relation_name: Name of the relation
Returns:
Human-readable description
Raises:
- UnknownRelationError: If relation is not registered
117class GeoQuery(BaseModel): 118 """ 119 Root model representing a parsed geographic query. 120 This is the main output structure returned by the parser. 121 """ 122 123 query_type: Literal["simple", "compound", "split", "boolean"] = Field( 124 "simple", 125 description="Type of query. Phase 1 only supports 'simple'. " 126 "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations", 127 ) 128 spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location") 129 reference_location: ReferenceLocation = Field(description="Reference location for the spatial query") 130 buffer_config: BufferConfig | None = Field( 131 None, 132 description="Buffer configuration for buffer and directional relations. " 133 "Auto-generated with defaults by enrich_with_defaults() if not provided. " 134 "Required for 'near', 'around', 'north_of', etc. " 135 "Set to None for containment relations ('in').", 136 ) 137 confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse") 138 original_query: str = Field(description="Original query text exactly as provided by the user") 139 140 @model_validator(mode="after") 141 def validate_buffer_config_consistency(self) -> "GeoQuery": 142 """Validate buffer_config consistency with relation category.""" 143 # Buffer and directional relations must have buffer_config 144 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 145 raise ValueError( 146 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 147 ) 148 149 # Containment relations should not have buffer_config 150 if self.spatial_relation.category == "containment" and self.buffer_config is not None: 151 raise ValueError( 152 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 153 f"should not have buffer_config" 154 ) 155 156 return self
Root model representing a parsed geographic query. This is the main output structure returned by the parser.
Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations
Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').
140 @model_validator(mode="after") 141 def validate_buffer_config_consistency(self) -> "GeoQuery": 142 """Validate buffer_config consistency with relation category.""" 143 # Buffer and directional relations must have buffer_config 144 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 145 raise ValueError( 146 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 147 ) 148 149 # Containment relations should not have buffer_config 150 if self.spatial_relation.category == "containment" and self.buffer_config is not None: 151 raise ValueError( 152 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 153 f"should not have buffer_config" 154 ) 155 156 return self
Validate buffer_config consistency with relation category.
96class SpatialRelation(BaseModel): 97 """A spatial relationship between target and reference.""" 98 99 relation: str = Field( 100 description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', " 101 "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list." 102 ) 103 category: RelationCategory = Field( 104 description="Category of spatial relation. " 105 "'containment' = exact boundary matching (in), " 106 "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), " 107 "'directional' = sector-based queries (north_of, south_of, east_of, west_of)" 108 ) 109 explicit_distance: float | None = Field( 110 None, 111 description="Distance in meters if explicitly mentioned by user. " 112 "For example: 'within 5km' → 5000, 'within 500 meters' → 500. " 113 "Leave null if not explicitly stated.", 114 )
A spatial relationship between target and reference.
Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.
Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), 'directional' = sector-based queries (north_of, south_of, east_of, west_of)
36class ReferenceLocation(BaseModel): 37 """A geographic reference location extracted from the query.""" 38 39 name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')") 40 # FIXME: enum ? 41 type: str | None = Field( 42 None, 43 description="Type hint for geographic feature (city, lake, mountain, canton, country, " 44 "train_station, airport, river, road, etc.). This is a HINT for ranking results, " 45 "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, " 46 "'Rhone' could be river or road), provide your best guess or leave null. " 47 "The datasource will return multiple types ranked by relevance.", 48 ) 49 type_confidence: ConfidenceLevel | None = Field( 50 None, 51 description="Confidence in the type inference (0-1). High confidence (>0.8) when type is " 52 "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous " 53 "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, " 54 "'in X' → city/region, 'on X' → lake/mountain.", 55 )
A geographic reference location extracted from the query.
Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')
Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.
Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.
58class BufferConfig(BaseModel): 59 """Configuration for buffer-based spatial operations.""" 60 61 distance_m: float = Field( 62 description="Buffer distance in meters. Positive values expand outward (proximity), " 63 "negative values erode inward (e.g., 'in the heart of'). " 64 "Examples: 5000 = 5km radius, -500 = 500m erosion" 65 ) 66 buffer_from: Literal["center", "boundary"] = Field( 67 description="Buffer origin. 'center' = buffer from centroid point (for proximity), " 68 "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)" 69 ) 70 ring_only: bool = Field( 71 False, 72 description="If True, exclude the reference feature itself to create a ring/donut shape. " 73 "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). " 74 "Only valid with buffer_from='boundary'.", 75 ) 76 side: Literal["left", "right"] | None = Field( 77 None, 78 description="Side of a linear feature for one-sided buffer. " 79 "'left' = left side relative to line direction, 'right' = right side. " 80 "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().", 81 ) 82 inferred: bool = Field( 83 True, 84 description="True if this configuration was inferred from relation defaults. " 85 "False if the user explicitly specified distance or buffer parameters.", 86 ) 87 88 @model_validator(mode="after") 89 def validate_ring_only(self) -> "BufferConfig": 90 """Validate that ring_only is only used with boundary buffers.""" 91 if self.ring_only and self.buffer_from == "center": 92 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 93 return self
Configuration for buffer-based spatial operations.
Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion
Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)
If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.
Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().
True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.
88 @model_validator(mode="after") 89 def validate_ring_only(self) -> "BufferConfig": 90 """Validate that ring_only is only used with boundary buffers.""" 91 if self.ring_only and self.buffer_from == "center": 92 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 93 return self
Validate that ring_only is only used with boundary buffers.
16class ConfidenceScore(BaseModel): 17 """Confidence scores for different aspects of the parsed query.""" 18 19 overall: ConfidenceLevel = Field( 20 description="Overall confidence score for the entire query parse. " 21 "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain", 22 ) 23 location_confidence: ConfidenceLevel = Field( 24 description="Confidence in correctly identifying the reference location", 25 ) 26 relation_confidence: ConfidenceLevel = Field( 27 description="Confidence in correctly identifying the spatial relation", 28 ) 29 reasoning: str | None = Field( 30 None, 31 description="Explanation for confidence scores. Always include reasoning for clarity and debugging. " 32 "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.", 33 )
Confidence scores for different aspects of the parsed query.
Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain
Confidence in correctly identifying the reference location
40class SpatialRelationConfig: 41 """ 42 Registry and configuration for spatial relations. 43 44 Manages built-in and custom spatial relations with their default parameters. 45 """ 46 47 def __init__(self): 48 """Initialize with built-in spatial relations.""" 49 self.relations: dict[str, RelationConfig] = {} 50 self._initialize_defaults() 51 52 def _initialize_defaults(self): 53 """Register built-in spatial relations from ARCHITECTURE.md.""" 54 55 # ===== CONTAINMENT RELATIONS ===== 56 self.register_relation( 57 RelationConfig( 58 name="in", 59 category="containment", 60 description="Feature is within the reference boundary", 61 ) 62 ) 63 64 # ===== BUFFER/PROXIMITY RELATIONS ===== 65 self.register_relation( 66 RelationConfig( 67 name="near", 68 category="buffer", 69 description="Proximity search with default 5km radius", 70 default_distance_m=5000, 71 buffer_from="center", 72 ) 73 ) 74 75 self.register_relation( 76 RelationConfig( 77 name="on_shores_of", 78 category="buffer", 79 description="Ring buffer around lake/water boundary, excluding the water body itself", 80 default_distance_m=1000, 81 buffer_from="boundary", 82 ring_only=True, 83 ) 84 ) 85 86 self.register_relation( 87 RelationConfig( 88 name="along", 89 category="buffer", 90 description="Buffer following a linear feature like a river or road", 91 default_distance_m=500, 92 buffer_from="boundary", 93 ) 94 ) 95 96 self.register_relation( 97 RelationConfig( 98 name="left_bank", 99 category="buffer", 100 description="Left bank of a linear feature (river, road) relative to its direction/flow", 101 default_distance_m=500, 102 buffer_from="boundary", 103 side="left", 104 ) 105 ) 106 107 self.register_relation( 108 RelationConfig( 109 name="right_bank", 110 category="buffer", 111 description="Right bank of a linear feature (river, road) relative to its direction/flow", 112 default_distance_m=500, 113 buffer_from="boundary", 114 side="right", 115 ) 116 ) 117 118 self.register_relation( 119 RelationConfig( 120 name="in_the_heart_of", 121 category="buffer", 122 description="Central area excluding periphery (negative buffer - erosion)", 123 default_distance_m=-500, 124 buffer_from="boundary", 125 ) 126 ) 127 128 # ===== DIRECTIONAL RELATIONS ===== 129 # All directional relations use consistent defaults: 130 # - Distance: 10km radius (default_distance_m=10000) 131 # - Sector: 90° angular wedge (sector_angle_degrees=90) 132 # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults) 133 # These defaults are applied automatically by enrich_with_defaults() for any directional query. 134 # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West) 135 self.register_relation( 136 RelationConfig( 137 name="north_of", 138 category="directional", 139 description="Directional sector north of reference", 140 default_distance_m=10000, 141 sector_angle_degrees=90, 142 direction_angle_degrees=0, 143 ) 144 ) 145 146 self.register_relation( 147 RelationConfig( 148 name="south_of", 149 category="directional", 150 description="Directional sector south of reference", 151 default_distance_m=10000, 152 sector_angle_degrees=90, 153 direction_angle_degrees=180, 154 ) 155 ) 156 157 self.register_relation( 158 RelationConfig( 159 name="east_of", 160 category="directional", 161 description="Directional sector east of reference", 162 default_distance_m=10000, 163 sector_angle_degrees=90, 164 direction_angle_degrees=90, 165 ) 166 ) 167 168 self.register_relation( 169 RelationConfig( 170 name="west_of", 171 category="directional", 172 description="Directional sector west of reference", 173 default_distance_m=10000, 174 sector_angle_degrees=90, 175 direction_angle_degrees=270, 176 ) 177 ) 178 179 # ===== DIAGONAL DIRECTIONAL RELATIONS ===== 180 self.register_relation( 181 RelationConfig( 182 name="northeast_of", 183 category="directional", 184 description="Directional sector northeast of reference", 185 default_distance_m=10000, 186 sector_angle_degrees=90, 187 direction_angle_degrees=45, 188 ) 189 ) 190 191 self.register_relation( 192 RelationConfig( 193 name="southeast_of", 194 category="directional", 195 description="Directional sector southeast of reference", 196 default_distance_m=10000, 197 sector_angle_degrees=90, 198 direction_angle_degrees=135, 199 ) 200 ) 201 202 self.register_relation( 203 RelationConfig( 204 name="southwest_of", 205 category="directional", 206 description="Directional sector southwest of reference", 207 default_distance_m=10000, 208 sector_angle_degrees=90, 209 direction_angle_degrees=225, 210 ) 211 ) 212 213 self.register_relation( 214 RelationConfig( 215 name="northwest_of", 216 category="directional", 217 description="Directional sector northwest of reference", 218 default_distance_m=10000, 219 sector_angle_degrees=90, 220 direction_angle_degrees=315, 221 ) 222 ) 223 224 def register_relation(self, config: RelationConfig) -> None: 225 """Register a new spatial relation.""" 226 self.relations[config.name] = config 227 228 def has_relation(self, name: str) -> bool: 229 """Check if a relation is registered.""" 230 return name in self.relations 231 232 def get_config(self, name: str) -> RelationConfig: 233 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 234 if not self.has_relation(name): 235 raise UnknownRelationError( 236 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 237 relation_name=name, 238 ) 239 return self.relations[name] 240 241 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 242 """List available relation names.""" 243 if category is None: 244 return sorted(self.relations.keys()) 245 return sorted(r.name for r in self.relations.values() if r.category == category) 246 247 def format_for_prompt(self) -> str: 248 """Format relations for inclusion in LLM prompt.""" 249 lines = [] 250 251 # Group by category 252 for category in get_args(RelationCategory): 253 category_relations = [r for r in self.relations.values() if r.category == category] 254 if not category_relations: 255 continue 256 257 lines.append(f"\n{category.upper()} RELATIONS:") 258 259 for rel in sorted(category_relations, key=lambda r: r.name): 260 # Build distance info 261 dist_info = "" 262 if rel.default_distance_m is not None: 263 dist_str = f"{abs(rel.default_distance_m)}m" 264 if rel.default_distance_m < 0: 265 dist_info = f" (default: {dist_str} erosion)" 266 else: 267 dist_info = f" (default: {dist_str})" 268 269 # Build special flags 270 flags = [] 271 if rel.ring_only: 272 flags.append("ring buffer") 273 if rel.buffer_from: 274 flags.append(f"from {rel.buffer_from}") 275 if rel.side: 276 flags.append(f"{rel.side} side only") 277 flag_info = f" [{', '.join(flags)}]" if flags else "" 278 279 # Format line 280 lines.append(f" • {rel.name}{dist_info}{flag_info}") 281 lines.append(f" {rel.description}") 282 283 # Add notes 284 lines.append("\nNOTES:") 285 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 286 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake)") 287 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 288 289 return "\n".join(lines)
Registry and configuration for spatial relations.
Manages built-in and custom spatial relations with their default parameters.
47 def __init__(self): 48 """Initialize with built-in spatial relations.""" 49 self.relations: dict[str, RelationConfig] = {} 50 self._initialize_defaults()
Initialize with built-in spatial relations.
224 def register_relation(self, config: RelationConfig) -> None: 225 """Register a new spatial relation.""" 226 self.relations[config.name] = config
Register a new spatial relation.
228 def has_relation(self, name: str) -> bool: 229 """Check if a relation is registered.""" 230 return name in self.relations
Check if a relation is registered.
232 def get_config(self, name: str) -> RelationConfig: 233 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 234 if not self.has_relation(name): 235 raise UnknownRelationError( 236 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 237 relation_name=name, 238 ) 239 return self.relations[name]
Get configuration for a relation. Raises UnknownRelationError if not found.
241 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 242 """List available relation names.""" 243 if category is None: 244 return sorted(self.relations.keys()) 245 return sorted(r.name for r in self.relations.values() if r.category == category)
List available relation names.
247 def format_for_prompt(self) -> str: 248 """Format relations for inclusion in LLM prompt.""" 249 lines = [] 250 251 # Group by category 252 for category in get_args(RelationCategory): 253 category_relations = [r for r in self.relations.values() if r.category == category] 254 if not category_relations: 255 continue 256 257 lines.append(f"\n{category.upper()} RELATIONS:") 258 259 for rel in sorted(category_relations, key=lambda r: r.name): 260 # Build distance info 261 dist_info = "" 262 if rel.default_distance_m is not None: 263 dist_str = f"{abs(rel.default_distance_m)}m" 264 if rel.default_distance_m < 0: 265 dist_info = f" (default: {dist_str} erosion)" 266 else: 267 dist_info = f" (default: {dist_str})" 268 269 # Build special flags 270 flags = [] 271 if rel.ring_only: 272 flags.append("ring buffer") 273 if rel.buffer_from: 274 flags.append(f"from {rel.buffer_from}") 275 if rel.side: 276 flags.append(f"{rel.side} side only") 277 flag_info = f" [{', '.join(flags)}]" if flags else "" 278 279 # Format line 280 lines.append(f" • {rel.name}{dist_info}{flag_info}") 281 lines.append(f" {rel.description}") 282 283 # Add notes 284 lines.append("\nNOTES:") 285 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 286 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake)") 287 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 288 289 return "\n".join(lines)
Format relations for inclusion in LLM prompt.
13@dataclass 14class RelationConfig: 15 """ 16 Configuration for a single spatial relation. 17 18 Attributes: 19 name: Relation identifier (e.g., "in", "near", "north_of") 20 category: Type of spatial operation 21 description: Human-readable description for LLM prompt 22 default_distance_m: Default buffer distance in meters 23 buffer_from: Buffer origin 24 ring_only: Exclude reference feature to create ring buffer 25 sector_angle_degrees: Angular sector for directional queries 26 direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise) 27 """ 28 29 name: str 30 category: RelationCategory 31 description: str 32 default_distance_m: float | None = None 33 buffer_from: Literal["center", "boundary"] | None = None 34 ring_only: bool = False 35 side: Literal["left", "right"] | None = None 36 sector_angle_degrees: float | None = None 37 direction_angle_degrees: float | None = None
Configuration for a single spatial relation.
Attributes:
- name: Relation identifier (e.g., "in", "near", "north_of")
- category: Type of spatial operation
- description: Human-readable description for LLM prompt
- default_distance_m: Default buffer distance in meters
- buffer_from: Buffer origin
- ring_only: Exclude reference feature to create ring buffer
- sector_angle_degrees: Angular sector for directional queries
- direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
Base exception for all GeoFilter errors.
13class ParsingError(GeoFilterError): 14 """LLM failed to parse query into valid structure.""" 15 16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
LLM failed to parse query into valid structure.
16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
Initialize parsing error.
Arguments:
- message: Error description
- raw_response: Raw response from LLM
- original_error: Original exception that caused parsing failure
30class ValidationError(GeoFilterError): 31 """Structured output is valid but fails business logic validation.""" 32 33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Structured output is valid but fails business logic validation.
33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Initialize validation error.
Arguments:
- message: Error description
- field: Field name that failed validation
- detail: Additional detail about the validation failure
47class UnknownRelationError(ValidationError): 48 """Spatial relation is not registered in configuration.""" 49 50 def __init__(self, message: str, relation_name: str): 51 """ 52 Initialize unknown relation error. 53 54 Args: 55 message: Error description 56 relation_name: The unknown relation name 57 """ 58 self.relation_name = relation_name 59 super().__init__(message, field="spatial_relation")
Spatial relation is not registered in configuration.
50 def __init__(self, message: str, relation_name: str): 51 """ 52 Initialize unknown relation error. 53 54 Args: 55 message: Error description 56 relation_name: The unknown relation name 57 """ 58 self.relation_name = relation_name 59 super().__init__(message, field="spatial_relation")
Initialize unknown relation error.
Arguments:
- message: Error description
- relation_name: The unknown relation name
62class LowConfidenceError(GeoFilterError): 63 """Query confidence is below threshold (strict mode).""" 64 65 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 66 """ 67 Initialize low confidence error. 68 69 Args: 70 message: Error description 71 confidence: Confidence score (0-1) 72 reasoning: Optional explanation for low confidence 73 """ 74 self.confidence = confidence 75 self.reasoning = reasoning 76 super().__init__(message)
Query confidence is below threshold (strict mode).
65 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 66 """ 67 Initialize low confidence error. 68 69 Args: 70 message: Error description 71 confidence: Confidence score (0-1) 72 reasoning: Optional explanation for low confidence 73 """ 74 self.confidence = confidence 75 self.reasoning = reasoning 76 super().__init__(message)
Initialize low confidence error.
Arguments:
- message: Error description
- confidence: Confidence score (0-1)
- reasoning: Optional explanation for low confidence
79class LowConfidenceWarning(UserWarning): 80 """Query confidence is below threshold (permissive mode).""" 81 82 def __init__(self, confidence: float, message: str = ""): 83 """ 84 Initialize low confidence warning. 85 86 Args: 87 confidence: Confidence score (0-1) 88 message: Warning message 89 """ 90 self.confidence = confidence 91 super().__init__(message)
Query confidence is below threshold (permissive mode).
82 def __init__(self, confidence: float, message: str = ""): 83 """ 84 Initialize low confidence warning. 85 86 Args: 87 confidence: Confidence score (0-1) 88 message: Warning message 89 """ 90 self.confidence = confidence 91 super().__init__(message)
Initialize low confidence warning.
Arguments:
- confidence: Confidence score (0-1)
- message: Warning message
12class GeoDataSource(Protocol): 13 """ 14 Protocol for geographic data sources. 15 16 Implementations resolve location names to geographic features. 17 Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326). 18 19 Example of returned feature: 20 { 21 "type": "Feature", 22 "id": "uuid-123", 23 "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, 24 "bbox": [8.4, 47.3, 8.6, 47.4], 25 "properties": { 26 "name": "Zürich", 27 "type": "city", 28 "confidence": 1.0, 29 ... 30 } 31 } 32 """ 33 34 def search( 35 self, 36 name: str, 37 type: str | None = None, 38 max_results: int = 10, 39 ) -> list[dict[str, Any]]: 40 """ 41 Search for geographic features by name. 42 43 Args: 44 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 45 type: Optional type hint for filtering/ranking results. 46 Examples: "lake", "city", "mountain", "canton", "river". 47 When provided, matching types are ranked higher. 48 max_results: Maximum number of results to return. 49 50 Returns: 51 List of matching GeoJSON Feature dicts, ranked by relevance. 52 Returns empty list if no matches found. 53 """ 54 ... 55 56 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 57 """ 58 Get a specific feature by its unique identifier. 59 60 Args: 61 feature_id: Unique identifier from the data source. 62 63 Returns: 64 The matching GeoJSON Feature dict, or None if not found. 65 """ 66 ... 67 68 def get_available_types(self) -> list[str]: 69 """ 70 Get list of concrete geographic types this datasource can return. 71 72 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 73 that this datasource uses in the "type" property of returned features. 74 These types can be matched against the location type hierarchy for fuzzy matching. 75 76 The returned types should be a subset of or mapped to the standard location 77 type hierarchy defined in location_types.TYPE_HIERARCHY. 78 79 Returns: 80 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 81 Empty list if this datasource does not provide type information. 82 83 Example: 84 >>> source = SwissNames3DSource("data/") 85 >>> types = source.get_available_types() 86 >>> print(types) 87 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 88 """ 89 ...
Protocol for geographic data sources.
Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
Example of returned feature:
{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
34 def search( 35 self, 36 name: str, 37 type: str | None = None, 38 max_results: int = 10, 39 ) -> list[dict[str, Any]]: 40 """ 41 Search for geographic features by name. 42 43 Args: 44 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 45 type: Optional type hint for filtering/ranking results. 46 Examples: "lake", "city", "mountain", "canton", "river". 47 When provided, matching types are ranked higher. 48 max_results: Maximum number of results to return. 49 50 Returns: 51 List of matching GeoJSON Feature dicts, ranked by relevance. 52 Returns empty list if no matches found. 53 """ 54 ...
Search for geographic features by name.
Arguments:
- name: Location name to search for (e.g., "Lake Geneva", "Bern").
- type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.
56 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 57 """ 58 Get a specific feature by its unique identifier. 59 60 Args: 61 feature_id: Unique identifier from the data source. 62 63 Returns: 64 The matching GeoJSON Feature dict, or None if not found. 65 """ 66 ...
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier from the data source.
Returns:
The matching GeoJSON Feature dict, or None if not found.
68 def get_available_types(self) -> list[str]: 69 """ 70 Get list of concrete geographic types this datasource can return. 71 72 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 73 that this datasource uses in the "type" property of returned features. 74 These types can be matched against the location type hierarchy for fuzzy matching. 75 76 The returned types should be a subset of or mapped to the standard location 77 type hierarchy defined in location_types.TYPE_HIERARCHY. 78 79 Returns: 80 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 81 Empty list if this datasource does not provide type information. 82 83 Example: 84 >>> source = SwissNames3DSource("data/") 85 >>> types = source.get_available_types() 86 >>> print(types) 87 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 88 """ 89 ...
Get list of concrete geographic types this datasource can return.
Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.
The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.
Returns:
List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.
Example:
>>> source = SwissNames3DSource("data/") >>> types = source.get_available_types() >>> print(types) ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
171class SwissNames3DSource: 172 """ 173 Geographic data source backed by swisstopo's swissNAMES3D dataset. 174 175 Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase 176 and provides search by name with optional type filtering. 177 178 If data_path is a directory, automatically loads and concatenates all SwissNames3D 179 shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within. 180 181 All geometries are returned as GeoJSON in WGS84 (EPSG:4326). 182 183 Args: 184 data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles. 185 layer: Layer name within the data source (for multi-layer formats like GDB). 186 187 Example: 188 >>> source = SwissNames3DSource("data/") # Load all 3 geometry types 189 >>> results = source.search("Lac Léman", type="lake") 190 >>> print(results[0].geometry) # GeoJSON in WGS84 191 """ 192 193 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 194 self._data_path = Path(data_path) 195 self._layer = layer 196 self._gdf: gpd.GeoDataFrame | None = None 197 self._name_index: dict[str, list[int]] = {} 198 199 def _ensure_loaded(self) -> None: 200 """Load data lazily on first access.""" 201 if self._gdf is not None: 202 return 203 self._load_data() 204 205 def _load_data(self) -> None: 206 """Load SwissNames3D data and build the name index.""" 207 # Check if data_path is a directory 208 if self._data_path.is_dir(): 209 self._load_from_directory() 210 else: 211 # Load single file 212 kwargs: dict[str, Any] = {} 213 if self._layer is not None: 214 kwargs["layer"] = self._layer 215 self._gdf = gpd.read_file(str(self._data_path), **kwargs) 216 217 self._build_name_index() 218 219 def _load_from_directory(self) -> None: 220 """Load and concatenate all SwissNames3D shapefiles from a directory.""" 221 # Look for the 3 standard SwissNames3D shapefiles 222 shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"] 223 gdfs: list[gpd.GeoDataFrame] = [] 224 225 for name in shapefile_names: 226 shp_path = self._data_path / f"{name}.shp" 227 if shp_path.exists(): 228 gdf = gpd.read_file(str(shp_path)) 229 gdfs.append(gdf) 230 231 if not gdfs: 232 raise ValueError( 233 f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}" 234 ) 235 236 # Find common columns across all loaded GeoDataFrames 237 common_cols = set(gdfs[0].columns) 238 for gdf in gdfs[1:]: 239 common_cols &= set(gdf.columns) 240 241 # Keep only common columns and concatenate 242 gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs] 243 self._gdf = gpd.GeoDataFrame( 244 gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry" 245 ) 246 247 def _build_name_index(self) -> None: 248 """Build a normalized name → row indices lookup for fast search.""" 249 assert self._gdf is not None 250 self._name_index = {} 251 252 name_col = self._detect_name_column() 253 for idx, name in enumerate(self._gdf[name_col]): 254 if not isinstance(name, str) or not name.strip(): 255 continue 256 normalized = _normalize_name(name) 257 if normalized not in self._name_index: 258 self._name_index[normalized] = [] 259 self._name_index[normalized].append(idx) 260 261 def _detect_name_column(self) -> str: 262 """Detect the name column in the data.""" 263 assert self._gdf is not None 264 for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"): 265 if candidate in self._gdf.columns: 266 return candidate 267 raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}") 268 269 def _detect_type_column(self) -> str | None: 270 """Detect the feature type column in the data.""" 271 assert self._gdf is not None 272 for candidate in ("OBJEKTART", "objektart", "Objektart"): 273 if candidate in self._gdf.columns: 274 return candidate 275 return None 276 277 def _detect_id_column(self) -> str | None: 278 """Detect the unique ID column in the data.""" 279 assert self._gdf is not None 280 for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"): 281 if candidate in self._gdf.columns: 282 return candidate 283 return None 284 285 def _row_to_feature(self, idx: int) -> dict[str, Any]: 286 """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry.""" 287 assert self._gdf is not None 288 row = self._gdf.iloc[idx] 289 290 # Get name 291 name_col = self._detect_name_column() 292 name = str(row[name_col]) 293 294 # Get type 295 type_col = self._detect_type_column() 296 raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown" 297 normalized_type = _objektart_to_type(raw_type) 298 299 # Get ID 300 id_col = self._detect_id_column() 301 feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx) 302 303 # Convert geometry to WGS84 GeoJSON 304 geom = row.geometry 305 if geom is None or geom.is_empty: 306 geometry = {"type": "Point", "coordinates": [0, 0]} 307 bbox = None 308 else: 309 # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer 310 # Drop Z coordinates — they are not needed and cause issues with single_sided buffers 311 wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom)) 312 geometry = mapping(wgs84_geom) 313 bounds = wgs84_geom.bounds # (minx, miny, maxx, maxy) 314 bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) 315 316 # Collect extra properties 317 skip_cols = {name_col, "geometry"} 318 if type_col: 319 skip_cols.add(type_col) 320 if id_col: 321 skip_cols.add(id_col) 322 323 properties: dict[str, Any] = { 324 "name": name, 325 "type": normalized_type, 326 "confidence": 1.0, 327 } 328 for col in self._gdf.columns: 329 if col not in skip_cols: 330 val = row.get(col) 331 if val is not None and str(val) != "nan": 332 properties[col] = val 333 334 return { 335 "type": "Feature", 336 "id": feature_id, 337 "geometry": geometry, 338 "bbox": bbox, 339 "properties": properties, 340 } 341 342 def search( 343 self, 344 name: str, 345 type: str | None = None, 346 max_results: int = 10, 347 ) -> list[dict[str, Any]]: 348 """ 349 Search for geographic features by name. 350 351 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 352 First tries exact matching, then falls back to fuzzy matching if no exact 353 matches found. 354 355 Args: 356 name: Location name to search for. 357 type: Optional type hint to filter results. If provided, only features 358 of this type are returned. 359 max_results: Maximum number of results to return. 360 361 Returns: 362 List of matching GeoJSON Feature dicts. If type is provided, only 363 features of that type are returned. Empty list if no matches found. 364 """ 365 self._ensure_loaded() 366 367 normalized = _normalize_name(name) 368 indices = self._name_index.get(normalized, []) 369 370 # If no exact match, try fuzzy matching 371 if not indices: 372 indices = self._fuzzy_search(normalized) 373 374 features = [self._row_to_feature(idx) for idx in indices] 375 376 # Filter by type if type hint provided. 377 # Expand via the type hierarchy so that category hints (e.g. "water") match 378 # all concrete types within that category ("lake", "river", "pond", ...). 379 if type is not None: 380 matching_types = get_matching_types(type) 381 if matching_types: 382 features = [f for f in features if f["properties"].get("type") in matching_types] 383 else: 384 # Unknown type hint, fall back to exact string match 385 features = [f for f in features if f["properties"].get("type") == type.lower()] 386 387 return features[:max_results] 388 389 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 390 """ 391 Fuzzy search for names that partially match the search query. 392 393 Uses token matching to find results where at least one token from the 394 query matches a token in the indexed name. This handles cases like: 395 - "venoge" matching "la venoge" 396 - "rhone" matching "rhone valais" 397 398 Args: 399 normalized: The normalized search query. 400 threshold: Minimum fuzzy match score (0-100) to include a result. 401 402 Returns: 403 List of row indices for fuzzy-matched names, sorted by score (descending). 404 """ 405 matches: list[tuple[int, float]] = [] 406 query_tokens = set(normalized.split()) 407 408 for indexed_name, indices in self._name_index.items(): 409 indexed_tokens = set(indexed_name.split()) 410 411 # Check if any query token matches any indexed token 412 token_overlap = query_tokens & indexed_tokens 413 414 if token_overlap: 415 # Also use token_set_ratio for better matching of partial strings 416 score = fuzz.token_set_ratio(normalized, indexed_name) 417 if score >= threshold: 418 for idx in indices: 419 matches.append((idx, score)) 420 421 # Sort by score (descending) to return best matches first 422 matches.sort(key=lambda x: x[1], reverse=True) 423 return [idx for idx, _ in matches] 424 425 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 426 """ 427 Get a specific feature by its unique identifier. 428 429 Args: 430 feature_id: Unique identifier (UUID or row index). 431 432 Returns: 433 The matching GeoJSON Feature dict, or None if not found. 434 """ 435 self._ensure_loaded() 436 assert self._gdf is not None 437 438 id_col = self._detect_id_column() 439 if id_col: 440 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 441 if not matches.empty: 442 return self._row_to_feature(matches.index[0]) 443 444 # Fallback: try as row index 445 try: 446 idx = int(feature_id) 447 if 0 <= idx < len(self._gdf): 448 return self._row_to_feature(idx) 449 except ValueError: 450 pass 451 452 return None 453 454 def get_available_types(self) -> list[str]: 455 """ 456 Get list of concrete geographic types this datasource can return. 457 458 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 459 representing all possible types that SwissNames3D data can be classified as. 460 461 Returns: 462 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 463 """ 464 return sorted(OBJEKTART_TYPE_MAP.keys())
Geographic data source backed by swisstopo's swissNAMES3D dataset.
Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.
If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
Arguments:
- data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
- layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/") # Load all 3 geometry types >>> results = source.search("Lac Léman", type="lake") >>> print(results[0].geometry) # GeoJSON in WGS84
342 def search( 343 self, 344 name: str, 345 type: str | None = None, 346 max_results: int = 10, 347 ) -> list[dict[str, Any]]: 348 """ 349 Search for geographic features by name. 350 351 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 352 First tries exact matching, then falls back to fuzzy matching if no exact 353 matches found. 354 355 Args: 356 name: Location name to search for. 357 type: Optional type hint to filter results. If provided, only features 358 of this type are returned. 359 max_results: Maximum number of results to return. 360 361 Returns: 362 List of matching GeoJSON Feature dicts. If type is provided, only 363 features of that type are returned. Empty list if no matches found. 364 """ 365 self._ensure_loaded() 366 367 normalized = _normalize_name(name) 368 indices = self._name_index.get(normalized, []) 369 370 # If no exact match, try fuzzy matching 371 if not indices: 372 indices = self._fuzzy_search(normalized) 373 374 features = [self._row_to_feature(idx) for idx in indices] 375 376 # Filter by type if type hint provided. 377 # Expand via the type hierarchy so that category hints (e.g. "water") match 378 # all concrete types within that category ("lake", "river", "pond", ...). 379 if type is not None: 380 matching_types = get_matching_types(type) 381 if matching_types: 382 features = [f for f in features if f["properties"].get("type") in matching_types] 383 else: 384 # Unknown type hint, fall back to exact string match 385 features = [f for f in features if f["properties"].get("type") == type.lower()] 386 387 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.
Arguments:
- name: Location name to search for.
- type: Optional type hint to filter results. If provided, only features of this type are returned.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.
425 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 426 """ 427 Get a specific feature by its unique identifier. 428 429 Args: 430 feature_id: Unique identifier (UUID or row index). 431 432 Returns: 433 The matching GeoJSON Feature dict, or None if not found. 434 """ 435 self._ensure_loaded() 436 assert self._gdf is not None 437 438 id_col = self._detect_id_column() 439 if id_col: 440 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 441 if not matches.empty: 442 return self._row_to_feature(matches.index[0]) 443 444 # Fallback: try as row index 445 try: 446 idx = int(feature_id) 447 if 0 <= idx < len(self._gdf): 448 return self._row_to_feature(idx) 449 except ValueError: 450 pass 451 452 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier (UUID or row index).
Returns:
The matching GeoJSON Feature dict, or None if not found.
454 def get_available_types(self) -> list[str]: 455 """ 456 Get list of concrete geographic types this datasource can return. 457 458 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 459 representing all possible types that SwissNames3D data can be classified as. 460 461 Returns: 462 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 463 """ 464 return sorted(OBJEKTART_TYPE_MAP.keys())
Get list of concrete geographic types this datasource can return.
Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.
Returns:
Sorted list of type strings (e.g., ["lake", "city", "river", ...])
272class IGNBDCartoSource: 273 """ 274 Geographic data source backed by IGN's BD-CARTO 5.0 dataset. 275 276 Loads French geographic data from GeoPackage files extracted to a directory. 277 Supports administrative boundaries (communes, departments, regions, …), 278 hydrography (rivers, lakes, …), named places (quarters, hamlets, …), 279 orographic features (peaks, passes, valleys, …) and protected areas. 280 281 Data must first be downloaded with ``make download-data-ign``, which places 282 the GeoPackage files in ``data/bdcarto/``. 283 284 All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 285 (EPSG:4326) and returned as standard GeoJSON Feature dicts. 286 287 Args: 288 data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``). 289 290 Example: 291 >>> source = IGNBDCartoSource("data/bdcarto") 292 >>> results = source.search("Ardèche", type="department") 293 >>> results = source.search("Lyon", type="city") 294 >>> results = source.search("Rhône", type="river") 295 """ 296 297 def __init__(self, data_path: str | Path) -> None: 298 self._data_path = Path(data_path) 299 self._gdf: gpd.GeoDataFrame | None = None 300 self._name_index: dict[str, list[int]] = {} 301 302 def _ensure_loaded(self) -> None: 303 if self._gdf is not None: 304 return 305 self._load_data() 306 307 def _load_data(self) -> None: 308 if self._data_path.is_dir(): 309 self._gdf = self._load_from_directory() 310 else: 311 self._gdf = self._load_from_file(self._data_path) 312 self._build_name_index() 313 314 def _load_from_file(self, path: Path) -> gpd.GeoDataFrame: 315 """Load from a GeoJSON fixture file. Features must include a ``_layer`` column.""" 316 full_gdf = gpd.read_file(str(path)) 317 if "_layer" not in full_gdf.columns: 318 raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column") 319 320 gdfs: list[gpd.GeoDataFrame] = [] 321 for layer_name, cfg in _LAYER_CONFIGS.items(): 322 rows = full_gdf[full_gdf["_layer"] == layer_name].copy() 323 if rows.empty: 324 continue 325 name_col: str = cfg["name_col"] 326 if name_col not in rows.columns: 327 continue 328 rows[_NAME_COL] = rows[name_col].astype(str) 329 rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 330 rows = rows.to_crs("EPSG:4326") 331 gdfs.append(rows) 332 333 if not gdfs: 334 raise ValueError(f"No matching BD-CARTO features found in {path}") 335 336 combined = pd.concat(gdfs, ignore_index=True) 337 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 338 339 def _load_from_directory(self) -> gpd.GeoDataFrame: 340 """Load and concatenate all configured layers from the data directory.""" 341 gdfs: list[gpd.GeoDataFrame] = [] 342 343 for layer_name, cfg in _LAYER_CONFIGS.items(): 344 gpkg_path = self._data_path / f"{layer_name}.gpkg" 345 if not gpkg_path.exists(): 346 continue 347 348 gdf = gpd.read_file(str(gpkg_path)) 349 350 name_col: str = cfg["name_col"] 351 if name_col not in gdf.columns: 352 continue 353 354 gdf[_NAME_COL] = gdf[name_col].astype(str) 355 gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 356 gdf["_layer"] = layer_name 357 gdf = gdf.to_crs("EPSG:4326") 358 359 gdfs.append(gdf) 360 361 if not gdfs: 362 raise ValueError( 363 f"No BD-CARTO GeoPackage files found in {self._data_path}. " 364 f"Run 'make download-data-ign' to download the dataset." 365 ) 366 367 combined = pd.concat(gdfs, ignore_index=True) 368 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 369 370 def _build_name_index(self) -> None: 371 """Build normalized name → row indices lookup (with article-stripped variants).""" 372 assert self._gdf is not None 373 self._name_index = {} 374 for idx, name in enumerate(self._gdf[_NAME_COL]): 375 if not isinstance(name, str) or not name.strip() or name == "nan": 376 continue 377 for key in _index_keys(name): 378 if key not in self._name_index: 379 self._name_index[key] = [] 380 self._name_index[key].append(idx) 381 382 def _row_to_feature(self, idx: int) -> dict[str, Any]: 383 """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84).""" 384 assert self._gdf is not None 385 row = self._gdf.iloc[idx] 386 387 name = str(row[_NAME_COL]) 388 normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown" 389 feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx) 390 391 geom = row.geometry 392 if geom is None or geom.is_empty: 393 geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]} 394 bbox = None 395 else: 396 geometry = mapping(geom) 397 bounds = geom.bounds 398 bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3]) 399 400 skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"} 401 properties: dict[str, Any] = { 402 "name": name, 403 "type": normalized_type, 404 "confidence": 1.0, 405 } 406 for col in self._gdf.columns: 407 if col not in skip_cols: 408 val = _to_json_value(row.get(col)) 409 if val is not None: 410 properties[col] = val 411 412 return { 413 "type": "Feature", 414 "id": feature_id, 415 "geometry": geometry, 416 "bbox": bbox, 417 "properties": properties, 418 } 419 420 def search( 421 self, 422 name: str, 423 type: str | None = None, 424 max_results: int = 10, 425 ) -> list[dict[str, Any]]: 426 """ 427 Search for geographic features by name. 428 429 Uses case-insensitive, accent-normalized exact matching with fuzzy 430 fallback when no exact match is found. 431 432 Args: 433 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 434 ``"Rhône"``). 435 type: Optional type hint for filtering. Supports both concrete types 436 (``"department"``, ``"city"``, ``"river"``) and category hints 437 (``"administrative"``, ``"water"``). 438 max_results: Maximum number of results. 439 440 Returns: 441 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 442 """ 443 self._ensure_loaded() 444 445 normalized = _normalize_name(name) 446 indices = self._name_index.get(normalized, []) 447 448 if not indices: 449 indices = self._fuzzy_search(normalized) 450 451 features = [self._row_to_feature(idx) for idx in indices] 452 453 if type is not None: 454 matching_types = get_matching_types(type) 455 print(f"Filtering results by type hint '{type}' → matching types: {matching_types}") 456 if matching_types: 457 features = [f for f in features if f["properties"].get("type") in matching_types] 458 else: 459 features = [f for f in features if f["properties"].get("type") == type.lower()] 460 461 features = merge_segments(features) 462 463 return features[:max_results] 464 465 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 466 """Token-overlap + token_set_ratio fuzzy search.""" 467 matches: list[tuple[int, float]] = [] 468 query_tokens = set(normalized.split()) 469 470 for indexed_name, indices in self._name_index.items(): 471 if query_tokens & set(indexed_name.split()): 472 score = fuzz.token_set_ratio(normalized, indexed_name) 473 if score >= threshold: 474 for idx in indices: 475 matches.append((idx, score)) 476 477 matches.sort(key=lambda x: x[1], reverse=True) 478 return [idx for idx, _ in matches] 479 480 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 481 """ 482 Get a feature by its ``cleabs`` identifier or row index. 483 484 Args: 485 feature_id: ``cleabs`` string or integer row index. 486 487 Returns: 488 Matching GeoJSON Feature dict, or ``None``. 489 """ 490 self._ensure_loaded() 491 assert self._gdf is not None 492 493 if "cleabs" in self._gdf.columns: 494 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 495 if not matches.empty: 496 return self._row_to_feature(matches.index[0]) 497 498 try: 499 idx = int(feature_id) 500 if 0 <= idx < len(self._gdf): 501 return self._row_to_feature(idx) 502 except ValueError: 503 pass 504 505 return None 506 507 def get_available_types(self) -> list[str]: 508 """ 509 Return the union of all normalized types this source can return. 510 511 Returns: 512 Sorted list of type strings. 513 """ 514 types: set[str] = set() 515 for cfg in _LAYER_CONFIGS.values(): 516 if cfg.get("commune_flags"): 517 types.update({"city", "municipality"}) 518 elif cfg.get("fixed_type"): 519 types.add(cfg["fixed_type"]) 520 elif cfg.get("type_map"): 521 types.update(cfg["type_map"].values()) 522 return sorted(types)
Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.
Data must first be downloaded with make download-data-ign, which places
the GeoPackage files in data/bdcarto/.
All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.
Arguments:
- data_path: Directory containing the
.gpkgfiles (e.g."data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto") >>> results = source.search("Ardèche", type="department") >>> results = source.search("Lyon", type="city") >>> results = source.search("Rhône", type="river")
420 def search( 421 self, 422 name: str, 423 type: str | None = None, 424 max_results: int = 10, 425 ) -> list[dict[str, Any]]: 426 """ 427 Search for geographic features by name. 428 429 Uses case-insensitive, accent-normalized exact matching with fuzzy 430 fallback when no exact match is found. 431 432 Args: 433 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 434 ``"Rhône"``). 435 type: Optional type hint for filtering. Supports both concrete types 436 (``"department"``, ``"city"``, ``"river"``) and category hints 437 (``"administrative"``, ``"water"``). 438 max_results: Maximum number of results. 439 440 Returns: 441 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 442 """ 443 self._ensure_loaded() 444 445 normalized = _normalize_name(name) 446 indices = self._name_index.get(normalized, []) 447 448 if not indices: 449 indices = self._fuzzy_search(normalized) 450 451 features = [self._row_to_feature(idx) for idx in indices] 452 453 if type is not None: 454 matching_types = get_matching_types(type) 455 print(f"Filtering results by type hint '{type}' → matching types: {matching_types}") 456 if matching_types: 457 features = [f for f in features if f["properties"].get("type") in matching_types] 458 else: 459 features = [f for f in features if f["properties"].get("type") == type.lower()] 460 461 features = merge_segments(features) 462 463 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.
Arguments:
- name: Location name to search for (e.g.
"Ardèche","Lyon","Rhône"). - type: Optional type hint for filtering. Supports both concrete types
(
"department","city","river") and category hints ("administrative","water"). - max_results: Maximum number of results.
Returns:
List of GeoJSON Feature dicts in WGS84. Empty list if no match.
480 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 481 """ 482 Get a feature by its ``cleabs`` identifier or row index. 483 484 Args: 485 feature_id: ``cleabs`` string or integer row index. 486 487 Returns: 488 Matching GeoJSON Feature dict, or ``None``. 489 """ 490 self._ensure_loaded() 491 assert self._gdf is not None 492 493 if "cleabs" in self._gdf.columns: 494 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 495 if not matches.empty: 496 return self._row_to_feature(matches.index[0]) 497 498 try: 499 idx = int(feature_id) 500 if 0 <= idx < len(self._gdf): 501 return self._row_to_feature(idx) 502 except ValueError: 503 pass 504 505 return None
Get a feature by its cleabs identifier or row index.
Arguments:
- feature_id:
cleabsstring or integer row index.
Returns:
Matching GeoJSON Feature dict, or
None.
507 def get_available_types(self) -> list[str]: 508 """ 509 Return the union of all normalized types this source can return. 510 511 Returns: 512 Sorted list of type strings. 513 """ 514 types: set[str] = set() 515 for cfg in _LAYER_CONFIGS.values(): 516 if cfg.get("commune_flags"): 517 types.update({"city", "municipality"}) 518 elif cfg.get("fixed_type"): 519 types.add(cfg["fixed_type"]) 520 elif cfg.get("type_map"): 521 types.update(cfg["type_map"].values()) 522 return sorted(types)
Return the union of all normalized types this source can return.
Returns:
Sorted list of type strings.
14class CompositeDataSource: 15 """ 16 Fan-out datasource that delegates to an ordered list of GeoDataSource instances. 17 18 ``search`` queries every registered source and merges results in order. 19 20 ``get_by_id`` tries each source in order and returns the first hit. 21 22 ``get_available_types`` returns the union of all sources' types. 23 24 Args: 25 sources: One or more GeoDataSource instances. 26 27 Example: 28 >>> swiss = SwissNames3DSource("data/") 29 >>> ign = IGNBDTopoSource("data/") 30 >>> combo = CompositeDataSource(swiss, ign) 31 >>> results = combo.search("Geneva", type="city") 32 """ 33 34 def __init__(self, *sources: GeoDataSource) -> None: 35 if not sources: 36 raise ValueError("At least one datasource is required.") 37 self._sources: list[GeoDataSource] = list(sources) 38 39 # Public API (mirrors GeoDataSource protocol) 40 41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[dict[str, Any]]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[dict[str, Any]] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged 67 68 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None 83 84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
search queries every registered source and merges results in order.
get_by_id tries each source in order and returns the first hit.
get_available_types returns the union of all sources' types.
Arguments:
- sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/") >>> ign = IGNBDTopoSource("data/") >>> combo = CompositeDataSource(swiss, ign) >>> results = combo.search("Geneva", type="city")
41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[dict[str, Any]]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[dict[str, Any]] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged
Search all registered sources and return merged.
Arguments:
- name: Location name to search for.
- type: Optional type hint passed through to every source.
- max_results: Maximum results per source.
Returns:
List of GeoJSON Feature dicts, merged from all sources.
68 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None
Get a feature by ID, trying each source in order.
Arguments:
- feature_id: Unique identifier to look up.
Returns:
The first matching GeoJSON Feature dict, or None.
84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Return the union of all sources' available types, sorted.
Returns:
Sorted list of unique type strings.
62class PostGISDataSource: 63 """ 64 Geographic data source backed by a PostGIS table. 65 66 The table must expose at minimum a name column, a geometry column, and 67 optionally a type column. The expected schema is: 68 69 .. code-block:: sql 70 71 CREATE TABLE <table> ( 72 id TEXT PRIMARY KEY, 73 name TEXT NOT NULL, 74 type TEXT, 75 geom GEOMETRY(Geometry, 4326) 76 ); 77 78 The ``type`` column may store either: 79 80 - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D), 81 pass ``type_map`` so the datasource can translate between raw values and 82 the normalized etter type names. 83 - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``), 84 leave ``type_map=None`` (default). 85 86 Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly 87 reprojection. 88 89 Args: 90 connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a 91 connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``). 92 When a string is provided the engine is created internally. 93 table: Fully-qualified table name, e.g. ``"public.swissnames3d"``. 94 name_column: Column used for name-based search (default ``"name"``). 95 type_column: Column used for type filtering. Pass ``None`` to disable 96 type filtering (default ``"type"``). 97 geometry_column: PostGIS geometry column (default ``"geom"``). 98 id_column: Primary-key column (default ``"id"``). 99 crs: CRS of the stored geometries as an EPSG string. Defaults to 100 ``"EPSG:4326"`` (no reprojection). 101 type_map: Optional mapping from **normalized etter type names** to 102 **lists of raw type column values** present in the database. 103 This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP`` 104 and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be 105 passed directly:: 106 107 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 108 source = PostGISDataSource( 109 engine, 110 table="public.swissnames3d", 111 type_map=OBJEKTART_TYPE_MAP, 112 ) 113 114 When ``type_map`` is provided the datasource: 115 116 - Translates raw DB values → normalized types in returned features. 117 - Translates user type hints → raw DB values in SQL ``WHERE`` clauses. 118 - Returns normalized type names from ``get_available_types()``. 119 120 When ``None`` (default) the stored values are used as-is. 121 fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for 122 fuzzy fallback search when no exact ``ILIKE`` match is found. 123 124 Example: unmodified SwissNames3D table:: 125 126 from sqlalchemy import create_engine 127 from etter.datasources import PostGISDataSource 128 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 129 130 engine = create_engine(...) 131 source = PostGISDataSource( 132 engine, 133 table="public.swissnames3d", 134 type_map=OBJEKTART_TYPE_MAP, 135 ) 136 results = source.search("Lac Léman", type="lake") 137 """ 138 139 def __init__( 140 self, 141 connection: str | Engine, 142 table: str, 143 name_column: str = "name", 144 type_column: str | None = "type", 145 geometry_column: str = "geom", 146 id_column: str = "id", 147 crs: str = "EPSG:4326", 148 type_map: dict[str, list[str]] | None = None, 149 fuzzy_threshold: float = 0.65, 150 ) -> None: 151 sa = _require_sqlalchemy() 152 153 if isinstance(connection, str): 154 self._engine = sa.create_engine(connection) 155 else: 156 self._engine = connection 157 158 try: 159 with self._engine.connect() as conn: 160 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 161 except Exception as exc: 162 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 163 164 self._table = table 165 self._name_col = name_column 166 self._type_col = type_column 167 self._geom_col = geometry_column 168 self._id_col = id_column 169 self._crs = crs 170 self._fuzzy_threshold = fuzzy_threshold 171 172 # Build bidirectional lookup structures from the user-supplied map. 173 if type_map: 174 self._normalized_to_raw: dict[str, list[str]] = dict(type_map) 175 self._raw_to_normalized: dict[str, str] = { 176 raw: normalized for normalized, raws in type_map.items() for raw in raws 177 } 178 else: 179 self._normalized_to_raw = {} 180 self._raw_to_normalized = {} 181 182 self._trgm_available: bool | None = None 183 self._unaccent_available: bool | None = None 184 185 def _get_connection(self) -> Any: 186 """Return a SQLAlchemy connection from the engine.""" 187 return self._engine.connect() 188 189 def _check_trgm(self, conn: Any) -> bool: 190 """Return True if pg_trgm extension is available in the database.""" 191 if self._trgm_available is not None: 192 return self._trgm_available 193 sa = _require_sqlalchemy() 194 try: 195 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'")) 196 self._trgm_available = result.fetchone() is not None 197 except Exception: 198 logger.exception("Failed to check pg_trgm availability") 199 self._trgm_available = False 200 return self._trgm_available 201 202 def _check_unaccent(self, conn: Any) -> bool: 203 """Return True if the unaccent extension is available in the database.""" 204 if self._unaccent_available is not None: 205 return self._unaccent_available 206 sa = _require_sqlalchemy() 207 try: 208 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'")) 209 self._unaccent_available = result.fetchone() is not None 210 except Exception: 211 logger.exception("Failed to check unaccent availability") 212 self._unaccent_available = False 213 return self._unaccent_available 214 215 def _normalize_type(self, raw_type: str | None) -> str | None: 216 """Translate a raw DB type value to its normalized etter name. 217 218 If no type_map was supplied the value is returned unchanged. 219 """ 220 if raw_type is None: 221 return None 222 return self._raw_to_normalized.get(raw_type, raw_type) 223 224 def _row_to_feature(self, row: Any) -> dict[str, Any]: 225 """Convert a SQLAlchemy Row to a GeoJSON Feature dict.""" 226 feature_id = str(row.id) 227 name = str(row.name) 228 raw_type = getattr(row, "type", None) 229 normalized_type = self._normalize_type(raw_type) 230 231 geojson_str = row.geojson 232 if geojson_str: 233 geometry = json.loads(geojson_str) 234 else: 235 geometry = {"type": "Point", "coordinates": [0, 0]} 236 237 bbox = _bbox_from_geojson(geometry) 238 239 properties: dict[str, Any] = { 240 "name": name, 241 "type": normalized_type, 242 "confidence": 1.0, 243 } 244 245 return { 246 "type": "Feature", 247 "id": feature_id, 248 "geometry": geometry, 249 "bbox": bbox, 250 "properties": properties, 251 } 252 253 def _build_select_columns(self) -> str: 254 """Build the SELECT column list as a SQL fragment.""" 255 type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type" 256 if self._crs.upper() != "EPSG:4326": 257 geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson" 258 else: 259 geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson" 260 return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}" 261 262 def search( 263 self, 264 name: str, 265 type: str | None = None, 266 max_results: int = 10, 267 ) -> list[dict[str, Any]]: 268 """ 269 Search for geographic features by name. 270 271 Uses a three-step cascade, stopping as soon as any step returns results: 272 273 1. **Normalized exact match** 274 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 275 3. **ILIKE substring** 276 277 ``merge_segments`` is applied after all rows are fetched so that 278 multi-segment linestrings (rivers, roads) are merged before the 279 ``max_results`` cap is applied. 280 281 Args: 282 name: Location name to search for. 283 type: Optional type hint for filtering results. 284 max_results: Maximum number of results to return. 285 286 Returns: 287 List of matching GeoJSON Feature dicts in WGS84. 288 """ 289 sa = _require_sqlalchemy() 290 cols = self._build_select_columns() 291 292 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 293 type_filter_values: list[str] | None = None 294 if type is not None and self._type_col is not None: 295 matching_types = get_matching_types(type) 296 concrete_types = matching_types if matching_types else [type.lower()] 297 if self._normalized_to_raw: 298 raw_values: list[str] = [] 299 for t in concrete_types: 300 raw_values.extend(self._normalized_to_raw.get(t, [t])) 301 type_filter_values = raw_values if raw_values else concrete_types 302 else: 303 type_filter_values = concrete_types 304 305 # Fetch more rows than requested so that merge_segments has the full 306 # set of segments to work with. Without this, a SQL LIMIT applied 307 # *before* merging would only return a partial set of linestring 308 # segments, producing incorrect / truncated geometries. 309 # We cap the internal limit at 2000 to avoid unbounded queries. 310 internal_limit = min(max(max_results * 20, 100), 2000) 311 312 with self._get_connection() as conn: 313 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 314 315 if not features: 316 with self._get_connection() as conn: 317 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 318 319 if not features: 320 with self._get_connection() as conn: 321 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 322 323 features = merge_segments(features) 324 return features[:max_results] 325 326 def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]: 327 """Return a WHERE clause fragment and bind params for type filtering.""" 328 if not values or self._type_col is None: 329 return "", {} 330 placeholders = ", ".join(f":type_{i}" for i in range(len(values))) 331 clause = f" AND {self._type_col} IN ({placeholders})" 332 params = {f"type_{i}": v for i, v in enumerate(values)} 333 return clause, params 334 335 def _search_normalized( 336 self, 337 conn: Any, 338 sa: Any, 339 cols: str, 340 name: str, 341 type_filter: list[str] | None, 342 fetch_limit: int, 343 ) -> list[dict[str, Any]]: 344 """ 345 Exact accent- and case-insensitive search. 346 347 Accent normalization (NFD decomposition + diacritic strip) is done in 348 Python before the query is sent to the DB. 349 """ 350 type_clause, type_params = self._type_filter_sql(type_filter) 351 name_expr = f"lower({self._name_col})" 352 if self._check_unaccent(conn): 353 name_expr = f"unaccent({name_expr})" 354 sql = sa.text( 355 f"SELECT {cols} FROM {self._table} " # noqa: S608 356 f"WHERE {name_expr} = :query{type_clause} " 357 f"LIMIT :limit" 358 ) 359 params: dict[str, Any] = { 360 "query": _normalize_name(name), 361 "limit": fetch_limit, 362 **type_params, 363 } 364 try: 365 result = conn.execute(sql, params) 366 return [self._row_to_feature(row) for row in result] 367 except Exception: 368 logger.exception("Normalized search failed for %r", name) 369 return [] 370 371 def _search_ilike( 372 self, 373 conn: Any, 374 sa: Any, 375 cols: str, 376 name: str, 377 type_filter: list[str] | None, 378 fetch_limit: int, 379 ) -> list[dict[str, Any]]: 380 """Case-insensitive substring fallback using ``ILIKE '%name%'``. 381 382 When the ``unaccent`` extension is available, both the stored name column 383 and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches 384 ``"Rhône"``. Without ``unaccent``, standard ILIKE is used (case-insensitive 385 only). 386 """ 387 type_clause, type_params = self._type_filter_sql(type_filter) 388 normalized = _normalize_name(name) 389 if self._check_unaccent(conn): 390 name_expr = f"unaccent(lower({self._name_col}))" 391 pattern = f"%{normalized}%" 392 else: 393 name_expr = self._name_col 394 pattern = f"%{name}%" 395 sql = sa.text( 396 f"SELECT {cols} FROM {self._table} " # noqa: S608 397 f"WHERE {name_expr} ILIKE :pattern{type_clause} " 398 f"LIMIT :limit" 399 ) 400 params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params} 401 try: 402 result = conn.execute(sql, params) 403 return [self._row_to_feature(row) for row in result] 404 except Exception: 405 logger.exception("ILIKE search failed for %r", name) 406 return [] 407 408 def _search_fuzzy( 409 self, 410 conn: Any, 411 sa: Any, 412 cols: str, 413 name: str, 414 type_filter: list[str] | None, 415 fetch_limit: int, 416 ) -> list[dict[str, Any]]: 417 """Fuzzy fallback using pg_trgm similarity (if extension is available).""" 418 if not self._check_trgm(conn): 419 logger.warning( 420 "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;" 421 ) 422 return [] 423 normalized_query = _normalize_name(name) 424 if self._check_unaccent(conn): 425 name_expr = f"unaccent(lower({self._name_col}))" 426 else: 427 logger.warning( 428 "unaccent extension not available. Accent-insensitive fuzzy search degraded. " 429 "Install it with: CREATE EXTENSION unaccent;" 430 ) 431 name_expr = f"lower({self._name_col})" 432 type_clause, type_params = self._type_filter_sql(type_filter) 433 sql = sa.text( 434 f"SELECT {cols} FROM {self._table} " # noqa: S608 435 f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} " 436 f"ORDER BY word_similarity({name_expr}, :query) DESC " 437 f"LIMIT :limit" 438 ) 439 params: dict[str, Any] = { 440 "query": normalized_query, 441 "threshold": self._fuzzy_threshold, 442 "limit": fetch_limit, 443 **type_params, 444 } 445 try: 446 result = conn.execute(sql, params) 447 return [self._row_to_feature(row) for row in result] 448 except Exception: 449 logger.exception("Fuzzy search failed for %r", name) 450 return [] 451 452 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 453 """ 454 Get a specific feature by its unique identifier. 455 456 Args: 457 feature_id: Value of the ``id`` column. 458 459 Returns: 460 The matching GeoJSON Feature dict, or ``None`` if not found. 461 """ 462 sa = _require_sqlalchemy() 463 cols = self._build_select_columns() 464 sql = sa.text( 465 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 466 ) 467 with self._get_connection() as conn: 468 try: 469 result = conn.execute(sql, {"id": feature_id}) 470 row = result.fetchone() 471 return self._row_to_feature(row) if row else None 472 except Exception: 473 logger.exception("get_by_id failed for %r", feature_id) 474 return None 475 476 def get_available_types(self) -> list[str]: 477 """ 478 Return the distinct ``type`` values present in the table. 479 480 Returns: 481 Sorted list of concrete type strings, or an empty list if the table 482 has no type column. 483 """ 484 if self._type_col is None: 485 return [] 486 sa = _require_sqlalchemy() 487 sql = sa.text( 488 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 489 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 490 ) 491 with self._get_connection() as conn: 492 try: 493 result = conn.execute(sql) 494 raw_types = [row.type for row in result] 495 except Exception: 496 logger.exception("get_available_types failed") 497 return [] 498 499 normalized = {self._normalize_type(t) for t in raw_types if t} 500 return sorted(t for t in normalized if t)
Geographic data source backed by a PostGIS table.
The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:
CREATE TABLE <table> (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT,
geom GEOMETRY(Geometry, 4326)
);
The type column may store either:
- Raw dataset values (e.g.
"See","Berg"for SwissNames3D), passtype_mapso the datasource can translate between raw values and the normalized etter type names. - Already-normalized values (e.g.
"lake","mountain"), leavetype_map=None(default).
Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly
reprojection.
Arguments:
- connection: A SQLAlchemy
~sqlalchemy.engine.Engineor a connection URL string (e.g."postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally. - table: Fully-qualified table name, e.g.
"public.swissnames3d". - name_column: Column used for name-based search (default
"name"). - type_column: Column used for type filtering. Pass
Noneto disable type filtering (default"type"). - geometry_column: PostGIS geometry column (default
"geom"). - id_column: Primary-key column (default
"id"). - crs: CRS of the stored geometries as an EPSG string. Defaults to
"EPSG:4326"(no reprojection). type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as
SwissNames3DSource.OBJEKTART_TYPE_MAPandIGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP source = PostGISDataSource( engine, table="public.swissnames3d", type_map=OBJEKTART_TYPE_MAP, )When
type_mapis provided the datasource:- Translates raw DB values → normalized types in returned features.
- Translates user type hints → raw DB values in SQL
WHEREclauses. - Returns normalized type names from
get_available_types().
When
None(default) the stored values are used as-is.- fuzzy_threshold: Minimum
pg_trgmsimilarity score (0-1) used for fuzzy fallback search when no exactILIKEmatch is found.
Example: unmodified SwissNames3D table::
from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
engine = create_engine(...)
source = PostGISDataSource(
engine,
table="public.swissnames3d",
type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
139 def __init__( 140 self, 141 connection: str | Engine, 142 table: str, 143 name_column: str = "name", 144 type_column: str | None = "type", 145 geometry_column: str = "geom", 146 id_column: str = "id", 147 crs: str = "EPSG:4326", 148 type_map: dict[str, list[str]] | None = None, 149 fuzzy_threshold: float = 0.65, 150 ) -> None: 151 sa = _require_sqlalchemy() 152 153 if isinstance(connection, str): 154 self._engine = sa.create_engine(connection) 155 else: 156 self._engine = connection 157 158 try: 159 with self._engine.connect() as conn: 160 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 161 except Exception as exc: 162 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 163 164 self._table = table 165 self._name_col = name_column 166 self._type_col = type_column 167 self._geom_col = geometry_column 168 self._id_col = id_column 169 self._crs = crs 170 self._fuzzy_threshold = fuzzy_threshold 171 172 # Build bidirectional lookup structures from the user-supplied map. 173 if type_map: 174 self._normalized_to_raw: dict[str, list[str]] = dict(type_map) 175 self._raw_to_normalized: dict[str, str] = { 176 raw: normalized for normalized, raws in type_map.items() for raw in raws 177 } 178 else: 179 self._normalized_to_raw = {} 180 self._raw_to_normalized = {} 181 182 self._trgm_available: bool | None = None 183 self._unaccent_available: bool | None = None
262 def search( 263 self, 264 name: str, 265 type: str | None = None, 266 max_results: int = 10, 267 ) -> list[dict[str, Any]]: 268 """ 269 Search for geographic features by name. 270 271 Uses a three-step cascade, stopping as soon as any step returns results: 272 273 1. **Normalized exact match** 274 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 275 3. **ILIKE substring** 276 277 ``merge_segments`` is applied after all rows are fetched so that 278 multi-segment linestrings (rivers, roads) are merged before the 279 ``max_results`` cap is applied. 280 281 Args: 282 name: Location name to search for. 283 type: Optional type hint for filtering results. 284 max_results: Maximum number of results to return. 285 286 Returns: 287 List of matching GeoJSON Feature dicts in WGS84. 288 """ 289 sa = _require_sqlalchemy() 290 cols = self._build_select_columns() 291 292 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 293 type_filter_values: list[str] | None = None 294 if type is not None and self._type_col is not None: 295 matching_types = get_matching_types(type) 296 concrete_types = matching_types if matching_types else [type.lower()] 297 if self._normalized_to_raw: 298 raw_values: list[str] = [] 299 for t in concrete_types: 300 raw_values.extend(self._normalized_to_raw.get(t, [t])) 301 type_filter_values = raw_values if raw_values else concrete_types 302 else: 303 type_filter_values = concrete_types 304 305 # Fetch more rows than requested so that merge_segments has the full 306 # set of segments to work with. Without this, a SQL LIMIT applied 307 # *before* merging would only return a partial set of linestring 308 # segments, producing incorrect / truncated geometries. 309 # We cap the internal limit at 2000 to avoid unbounded queries. 310 internal_limit = min(max(max_results * 20, 100), 2000) 311 312 with self._get_connection() as conn: 313 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 314 315 if not features: 316 with self._get_connection() as conn: 317 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 318 319 if not features: 320 with self._get_connection() as conn: 321 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 322 323 features = merge_segments(features) 324 return features[:max_results]
Search for geographic features by name.
Uses a three-step cascade, stopping as soon as any step returns results:
- Normalized exact match
- pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
- ILIKE substring
merge_segments is applied after all rows are fetched so that
multi-segment linestrings (rivers, roads) are merged before the
max_results cap is applied.
Arguments:
- name: Location name to search for.
- type: Optional type hint for filtering results.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts in WGS84.
452 def get_by_id(self, feature_id: str) -> dict[str, Any] | None: 453 """ 454 Get a specific feature by its unique identifier. 455 456 Args: 457 feature_id: Value of the ``id`` column. 458 459 Returns: 460 The matching GeoJSON Feature dict, or ``None`` if not found. 461 """ 462 sa = _require_sqlalchemy() 463 cols = self._build_select_columns() 464 sql = sa.text( 465 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 466 ) 467 with self._get_connection() as conn: 468 try: 469 result = conn.execute(sql, {"id": feature_id}) 470 row = result.fetchone() 471 return self._row_to_feature(row) if row else None 472 except Exception: 473 logger.exception("get_by_id failed for %r", feature_id) 474 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Value of the
idcolumn.
Returns:
The matching GeoJSON Feature dict, or
Noneif not found.
476 def get_available_types(self) -> list[str]: 477 """ 478 Return the distinct ``type`` values present in the table. 479 480 Returns: 481 Sorted list of concrete type strings, or an empty list if the table 482 has no type column. 483 """ 484 if self._type_col is None: 485 return [] 486 sa = _require_sqlalchemy() 487 sql = sa.text( 488 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 489 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 490 ) 491 with self._get_connection() as conn: 492 try: 493 result = conn.execute(sql) 494 raw_types = [row.type for row in result] 495 except Exception: 496 logger.exception("get_available_types failed") 497 return [] 498 499 normalized = {self._normalize_type(t) for t in raw_types if t} 500 return sorted(t for t in normalized if t)
Return the distinct type values present in the table.
Returns:
Sorted list of concrete type strings, or an empty list if the table has no type column.
25def apply_spatial_relation( 26 geometry: dict[str, Any], 27 relation: SpatialRelation, 28 buffer_config: BufferConfig | None = None, 29 spatial_config: SpatialRelationConfig | None = None, 30) -> dict[str, Any]: 31 """ 32 Transform a reference geometry according to a spatial relation. 33 34 Converts the input GeoJSON geometry to a search area based on the 35 spatial relation category: 36 - Containment: returns the original geometry unchanged 37 - Buffer: applies positive (expand), negative (erode), or ring buffer 38 - Directional: creates an angular sector wedge 39 40 Args: 41 geometry: GeoJSON geometry dict in WGS84 (EPSG:4326). 42 relation: Spatial relation to apply. 43 buffer_config: Buffer configuration (required for buffer/directional relations). 44 spatial_config: Spatial relation registry used to look up directional angles. 45 Defaults to the module-level singleton; pass an explicit instance to 46 avoid repeated construction when calling from a hot path. 47 48 Returns: 49 Transformed GeoJSON geometry dict in WGS84. 50 51 Raises: 52 ValueError: If buffer_config is missing for buffer/directional relations, 53 or if the relation category is unknown. 54 55 Examples: 56 >>> from etter.models import SpatialRelation, BufferConfig 57 >>> # Circular buffer 58 >>> result = apply_spatial_relation( 59 ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, 60 ... relation=SpatialRelation(relation="near", category="buffer"), 61 ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), 62 ... ) 63 64 >>> # Containment (passthrough) 65 >>> result = apply_spatial_relation( 66 ... geometry=city_polygon, 67 ... relation=SpatialRelation(relation="in", category="containment"), 68 ... ) 69 """ 70 if relation.category == "containment": 71 return _apply_containment(geometry) 72 elif relation.category == "buffer": 73 if buffer_config is None: 74 raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config") 75 return _apply_buffer(geometry, buffer_config) 76 elif relation.category == "directional": 77 if buffer_config is None: 78 raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config") 79 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 80 relation_config = cfg.get_config(relation.relation) 81 direction = relation_config.direction_angle_degrees or 0 82 sector_angle = relation_config.sector_angle_degrees or 90 83 return _apply_directional(geometry, buffer_config, direction, sector_angle) 84 else: 85 raise ValueError(f"Unknown relation category: '{relation.category}'")
Transform a reference geometry according to a spatial relation.
Converts the input GeoJSON geometry to a search area based on the spatial relation category:
- Containment: returns the original geometry unchanged
- Buffer: applies positive (expand), negative (erode), or ring buffer
- Directional: creates an angular sector wedge
Arguments:
- geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
- relation: Spatial relation to apply.
- buffer_config: Buffer configuration (required for buffer/directional relations).
- spatial_config: Spatial relation registry used to look up directional angles. Defaults to the module-level singleton; pass an explicit instance to avoid repeated construction when calling from a hot path.
Returns:
Transformed GeoJSON geometry dict in WGS84.
Raises:
- ValueError: If buffer_config is missing for buffer/directional relations, or if the relation category is unknown.
Examples:
>>> from etter.models import SpatialRelation, BufferConfig >>> # Circular buffer >>> result = apply_spatial_relation( ... geometry={"type": "Point", "coordinates": [6.63, 46.52]}, ... relation=SpatialRelation(relation="near", category="buffer"), ... buffer_config=BufferConfig(distance_m=5000, buffer_from="center"), ... )>>> # Containment (passthrough) >>> result = apply_spatial_relation( ... geometry=city_polygon, ... relation=SpatialRelation(relation="in", category="containment"), ... )