etter
etter - Natural language geographic query parsing using LLMs.
Parse location queries into structured geographic queries using LLM.
1""" 2etter - Natural language geographic query parsing using LLMs. 3 4Parse location queries into structured geographic queries using LLM. 5""" 6 7from importlib.metadata import PackageNotFoundError, version 8 9try: 10 __version__ = version("etter") 11except PackageNotFoundError: # running from source without install 12 __version__ = "unknown" 13 14# Main API 15# Exceptions 16# Datasources 17from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource 18from .exceptions import ( 19 GeoFilterError, 20 LowConfidenceError, 21 LowConfidenceWarning, 22 NoReferenceLocationError, 23 ParsingError, 24 UnknownRelationError, 25 ValidationError, 26) 27from .geometry_format import convert_feature_geometry, convert_geometry 28 29# Models (for type hints and result access) 30from .models import ( 31 BufferConfig, 32 ConfidenceLevel, 33 ConfidenceScore, 34 GeometryFormat, 35 GeoQuery, 36 ReferenceLocation, 37 SpatialRelation, 38) 39from .parser import GeoFilterParser 40 41# Spatial operations 42from .spatial import apply_spatial_relation 43 44# Configuration 45from .spatial_config import RelationConfig, SpatialRelationConfig 46 47__all__ = [ 48 # Main API 49 "GeoFilterParser", 50 # Models 51 "GeoQuery", 52 "SpatialRelation", 53 "ReferenceLocation", 54 "BufferConfig", 55 "ConfidenceScore", 56 "ConfidenceLevel", 57 "GeometryFormat", 58 # Configuration 59 "SpatialRelationConfig", 60 "RelationConfig", 61 # Exceptions 62 "GeoFilterError", 63 "ParsingError", 64 "ValidationError", 65 "NoReferenceLocationError", 66 "UnknownRelationError", 67 "LowConfidenceError", 68 "LowConfidenceWarning", 69 # Datasources 70 "GeoDataSource", 71 "SwissNames3DSource", 72 "IGNBDCartoSource", 73 "CompositeDataSource", 74 "PostGISDataSource", 75 # Spatial 76 "apply_spatial_relation", 77 "convert_geometry", 78 "convert_feature_geometry", 79]
19class GeoFilterParser: 20 """ 21 Main entry point for parsing natural language location queries. 22 23 This class orchestrates the entire parsing pipeline: 24 1. Initialize LLM with structured output 25 2. Build prompt with spatial relations and examples 26 3. Parse query through LLM 27 4. Validate and enrich with defaults 28 5. Return structured GeoQuery 29 30 Examples: 31 Basic usage: 32 >>> from langchain.chat_models import init_chat_model 33 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") 34 >>> parser = GeoFilterParser(llm=llm) 35 >>> result = parser.parse("restaurants in Lausanne") 36 >>> print(result.reference_location.name) 37 'Lausanne' 38 39 With strict confidence mode: 40 >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) 41 >>> result = parser.parse("near the station") # May raise LowConfidenceError 42 """ 43 44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt() 94 95 def _build_structured_llm(self): 96 """Create LLM with structured output using Pydantic model.""" 97 98 return self.llm.with_structured_output( 99 GeoQuery, 100 method="function_calling", # Use function_calling for broader schema support 101 include_raw=True, # For error debugging 102 ) 103 104 def _build_prompt(self) -> ChatPromptTemplate: 105 """Build prompt template with spatial relations, examples, and available types.""" 106 available_types = None 107 if self.datasource is not None: 108 available_types = self.datasource.get_available_types() 109 110 return build_prompt_template( 111 spatial_config=self.spatial_config, 112 include_examples=self.include_examples, 113 available_types=available_types, 114 additional_instructions=self.additional_instructions, 115 ) 116 117 def _unpack_response(self, response) -> GeoQuery: 118 """Extract and validate the GeoQuery from a structured-LLM response.""" 119 parsed = response.get("parsed") if isinstance(response, dict) else response 120 121 if parsed is None: 122 raw = response.get("raw", "") if isinstance(response, dict) else "" 123 error = response.get("parsing_error") if isinstance(response, dict) else None 124 raise ParsingError( 125 message="Failed to parse query into structured format. " 126 "LLM may have returned invalid JSON or missed required fields.", 127 raw_response=str(raw), 128 original_error=error, 129 ) 130 131 assert isinstance(parsed, GeoQuery), "Parsed result must be GeoQuery" 132 return parsed 133 134 def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery: 135 """Set original_query and run the validation pipeline.""" 136 geo_query.original_query = query 137 138 return validate_query( 139 geo_query, 140 self.spatial_config, 141 confidence_threshold=self.confidence_threshold, 142 strict_mode=self.strict_mode, 143 ) 144 145 def parse(self, query: str) -> GeoQuery: 146 """ 147 Parse a natural language location query into structured format. 148 149 This is the main method for parsing queries. It: 150 1. Invokes the LLM with structured output 151 2. Validates the spatial relation is registered 152 3. Enriches with default parameters 153 4. Checks confidence threshold 154 155 Args: 156 query: Natural language query in any language 157 158 Returns: 159 GeoQuery: Structured query representation with confidence scores 160 161 Raises: 162 ParsingError: If LLM fails to parse query into valid structure 163 ValidationError: If parsed query fails business logic validation 164 UnknownRelationError: If spatial relation is not registered 165 LowConfidenceError: If confidence below threshold (strict mode only) 166 167 Warns: 168 LowConfidenceWarning: If confidence below threshold (permissive mode) 169 170 Examples: 171 Simple containment query: 172 >>> result = parser.parse("in Bern") 173 >>> result.reference_location.name 174 'Bern' 175 >>> result.spatial_relation.relation 176 'in' 177 178 Buffer query: 179 >>> result = parser.parse("near Lake Geneva") 180 >>> result.spatial_relation.relation 181 'near' 182 >>> result.buffer_config.distance_m 183 5000 184 185 Directional query: 186 >>> result = parser.parse("north of Lausanne") 187 >>> result.spatial_relation.relation 188 'north_of' 189 >>> result.reference_location.name 190 'Lausanne' 191 192 Multilingual: 193 >>> result = parser.parse("près de Genève") 194 >>> result.spatial_relation.relation 195 'near' 196 >>> result.reference_location.name 197 'Genève' 198 """ 199 formatted_messages = self.prompt.format_messages(query=query) 200 201 try: 202 response = self.structured_llm.invoke(formatted_messages) 203 except Exception as e: 204 raise ParsingError( 205 message=f"LLM invocation failed: {str(e)}", 206 raw_response="", 207 original_error=e, 208 ) from e 209 210 return self._finalize(self._unpack_response(response), query) 211 212 async def aparse(self, query: str) -> GeoQuery: 213 """ 214 Asynchronously parse a natural language location query into structured format. 215 216 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 217 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 218 blocking. Validation is synchronous and runs after the LLM call. 219 """ 220 formatted_messages = self.prompt.format_messages(query=query) 221 222 try: 223 response = await self.structured_llm.ainvoke(formatted_messages) 224 except Exception as e: 225 raise ParsingError( 226 message=f"LLM invocation failed: {str(e)}", 227 raw_response="", 228 original_error=e, 229 ) from e 230 231 return self._finalize(self._unpack_response(response), query) 232 233 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 234 """ 235 Parse a natural language location query with streaming reasoning and results. 236 237 This method provides real-time feedback during the parsing process by yielding 238 intermediate reasoning steps and the final GeoQuery result. This is useful for 239 providing users with transparency into the LLM's decision-making process and 240 for building responsive UIs. 241 242 The stream yields dictionaries with the following event types: 243 - {"type": "start"} - Stream started 244 - {"type": "reasoning", "content": str} - Intermediate processing steps 245 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 246 - {"type": "error", "content": str} - Errors encountered during processing 247 - {"type": "finish"} - Stream completed successfully 248 249 Args: 250 query: Natural language query in any language 251 252 Yields: 253 dict: Stream events with type and optional content fields 254 255 Raises: 256 ParsingError: If LLM fails to parse query into valid structure 257 ValidationError: If parsed query fails business logic validation 258 UnknownRelationError: If spatial relation is not registered 259 LowConfidenceError: If confidence below threshold (strict mode only) 260 261 Examples: 262 Basic usage with async iteration: 263 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 264 ... if event["type"] == "reasoning": 265 ... print(f"Reasoning: {event['content']}") 266 ... elif event["type"] == "data-response": 267 ... geo_query = event["content"] 268 ... print(f"Location: {geo_query['reference_location']['name']}") 269 ... elif event["type"] == "error": 270 ... print(f"Error: {event['content']}") 271 272 Using in a FastAPI streaming endpoint: 273 >>> from fastapi.responses import StreamingResponse 274 >>> @app.get("/stream") 275 >>> async def stream_endpoint(q: str): 276 ... async def event_stream(): 277 ... async for event in parser.parse_stream(q): 278 ... yield f"data: {json.dumps(event)}\\n\\n" 279 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 280 """ 281 try: 282 # Signal start of stream 283 yield {"type": "start"} 284 285 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 286 formatted_messages = self.prompt.format_messages(query=query) 287 288 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 289 try: 290 response = await self.structured_llm.ainvoke(formatted_messages) 291 except Exception as e: 292 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 293 raise ParsingError( 294 message=f"LLM invocation failed: {str(e)}", 295 raw_response="", 296 original_error=e, 297 ) from e 298 299 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 300 try: 301 geo_query = self._unpack_response(response) 302 except ParsingError: 303 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 304 raise 305 306 if geo_query.confidence_breakdown.reasoning: 307 yield { 308 "type": "reasoning", 309 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 310 } 311 312 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 313 geo_query = self._finalize(geo_query, query) 314 315 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 316 yield {"type": "data-response", "content": geo_query.model_dump()} 317 318 # Signal successful completion 319 yield {"type": "finish"} 320 321 except Exception as e: 322 # Emit error event before re-raising 323 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 324 raise 325 326 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 327 """ 328 Parse multiple queries in batch. 329 330 Note: This is a simple sequential implementation. 331 For true parallelization, consider using async methods or ThreadPoolExecutor. 332 333 Args: 334 queries: List of natural language queries 335 336 Returns: 337 List of GeoQuery objects (same order as input) 338 339 Raises: 340 Same exceptions as parse() for any failing query 341 """ 342 return [self.parse(query) for query in queries] 343 344 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 345 """ 346 Get list of available spatial relations. 347 348 Args: 349 category: Optional filter by category ("containment", "buffer", "directional") 350 351 Returns: 352 List of relation names 353 """ 354 return self.spatial_config.list_relations(category=category) 355 356 def describe_relation(self, relation_name: str) -> str: 357 """ 358 Get description of a spatial relation. 359 360 Args: 361 relation_name: Name of the relation 362 363 Returns: 364 Human-readable description 365 366 Raises: 367 UnknownRelationError: If relation is not registered 368 """ 369 config = self.spatial_config.get_config(relation_name) 370 return config.description
Main entry point for parsing natural language location queries.
This class orchestrates the entire parsing pipeline:
- Initialize LLM with structured output
- Build prompt with spatial relations and examples
- Parse query through LLM
- Validate and enrich with defaults
- Return structured GeoQuery
Examples:
Basic usage:
>>> from langchain.chat_models import init_chat_model >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...") >>> parser = GeoFilterParser(llm=llm) >>> result = parser.parse("restaurants in Lausanne") >>> print(result.reference_location.name) 'Lausanne'With strict confidence mode:
>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True) >>> result = parser.parse("near the station") # May raise LowConfidenceError
44 def __init__( 45 self, 46 llm: BaseChatModel, 47 spatial_config: SpatialRelationConfig | None = None, 48 confidence_threshold: float = 0.6, 49 strict_mode: bool = False, 50 include_examples: bool = True, 51 datasource: GeoDataSource | None = None, 52 additional_instructions: str | None = None, 53 ): 54 """ 55 Initialize the parser. 56 57 Args: 58 llm: LangChain LLM instance (required). 59 spatial_config: Spatial relation configuration. If None, uses defaults 60 confidence_threshold: Minimum confidence to accept (0-1) 61 strict_mode: If True, raise error on low confidence. If False, warn only 62 include_examples: Whether to include few-shot examples in prompt 63 datasource: Optional GeoDataSource instance. If provided, the LLM will be informed 64 about the concrete types available in that datasource for better type inference. 65 additional_instructions: Free-form text injected as a system message after the main 66 system prompt and before few-shot examples. Use this to add caller-specific 67 rules such as region-specific endonyms, domain aliases, or 68 organization-specific place names without forking the default prompt. 69 70 Example: 71 >>> from langchain.chat_models import init_chat_model 72 >>> from etter.datasources.swissnames3d import SwissNames3DSource 73 >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) 74 >>> datasource = SwissNames3DSource("data/") 75 >>> parser = GeoFilterParser(llm=llm, datasource=datasource) 76 """ 77 self.llm = llm 78 79 # Initialize spatial config 80 self.spatial_config = spatial_config or SpatialRelationConfig() 81 82 # Settings 83 self.confidence_threshold = confidence_threshold 84 self.strict_mode = strict_mode 85 self.include_examples = include_examples 86 self.datasource = datasource 87 self.additional_instructions = additional_instructions 88 89 # Build structured LLM 90 self.structured_llm = self._build_structured_llm() 91 92 # Build prompt template 93 self.prompt = self._build_prompt()
Initialize the parser.
Arguments:
- llm: LangChain LLM instance (required).
- spatial_config: Spatial relation configuration. If None, uses defaults
- confidence_threshold: Minimum confidence to accept (0-1)
- strict_mode: If True, raise error on low confidence. If False, warn only
- include_examples: Whether to include few-shot examples in prompt
- datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
- additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model >>> from etter.datasources.swissnames3d import SwissNames3DSource >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0) >>> datasource = SwissNames3DSource("data/") >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
145 def parse(self, query: str) -> GeoQuery: 146 """ 147 Parse a natural language location query into structured format. 148 149 This is the main method for parsing queries. It: 150 1. Invokes the LLM with structured output 151 2. Validates the spatial relation is registered 152 3. Enriches with default parameters 153 4. Checks confidence threshold 154 155 Args: 156 query: Natural language query in any language 157 158 Returns: 159 GeoQuery: Structured query representation with confidence scores 160 161 Raises: 162 ParsingError: If LLM fails to parse query into valid structure 163 ValidationError: If parsed query fails business logic validation 164 UnknownRelationError: If spatial relation is not registered 165 LowConfidenceError: If confidence below threshold (strict mode only) 166 167 Warns: 168 LowConfidenceWarning: If confidence below threshold (permissive mode) 169 170 Examples: 171 Simple containment query: 172 >>> result = parser.parse("in Bern") 173 >>> result.reference_location.name 174 'Bern' 175 >>> result.spatial_relation.relation 176 'in' 177 178 Buffer query: 179 >>> result = parser.parse("near Lake Geneva") 180 >>> result.spatial_relation.relation 181 'near' 182 >>> result.buffer_config.distance_m 183 5000 184 185 Directional query: 186 >>> result = parser.parse("north of Lausanne") 187 >>> result.spatial_relation.relation 188 'north_of' 189 >>> result.reference_location.name 190 'Lausanne' 191 192 Multilingual: 193 >>> result = parser.parse("près de Genève") 194 >>> result.spatial_relation.relation 195 'near' 196 >>> result.reference_location.name 197 'Genève' 198 """ 199 formatted_messages = self.prompt.format_messages(query=query) 200 201 try: 202 response = self.structured_llm.invoke(formatted_messages) 203 except Exception as e: 204 raise ParsingError( 205 message=f"LLM invocation failed: {str(e)}", 206 raw_response="", 207 original_error=e, 208 ) from e 209 210 return self._finalize(self._unpack_response(response), query)
Parse a natural language location query into structured format.
This is the main method for parsing queries. It:
- Invokes the LLM with structured output
- Validates the spatial relation is registered
- Enriches with default parameters
- Checks confidence threshold
Arguments:
- query: Natural language query in any language
Returns:
GeoQuery: Structured query representation with confidence scores
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Warns:
LowConfidenceWarning: If confidence below threshold (permissive mode)
Examples:
Simple containment query:
>>> result = parser.parse("in Bern") >>> result.reference_location.name 'Bern' >>> result.spatial_relation.relation 'in'Buffer query:
>>> result = parser.parse("near Lake Geneva") >>> result.spatial_relation.relation 'near' >>> result.buffer_config.distance_m 5000Directional query:
>>> result = parser.parse("north of Lausanne") >>> result.spatial_relation.relation 'north_of' >>> result.reference_location.name 'Lausanne'Multilingual:
>>> result = parser.parse("près de Genève") >>> result.spatial_relation.relation 'near' >>> result.reference_location.name 'Genève'
212 async def aparse(self, query: str) -> GeoQuery: 213 """ 214 Asynchronously parse a natural language location query into structured format. 215 216 Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM 217 so it can be awaited inside event loops (e.g. FastAPI endpoints) without 218 blocking. Validation is synchronous and runs after the LLM call. 219 """ 220 formatted_messages = self.prompt.format_messages(query=query) 221 222 try: 223 response = await self.structured_llm.ainvoke(formatted_messages) 224 except Exception as e: 225 raise ParsingError( 226 message=f"LLM invocation failed: {str(e)}", 227 raw_response="", 228 original_error=e, 229 ) from e 230 231 return self._finalize(self._unpack_response(response), query)
Asynchronously parse a natural language location query into structured format.
Async counterpart to parse(). Uses ainvoke on the structured LLM
so it can be awaited inside event loops (e.g. FastAPI endpoints) without
blocking. Validation is synchronous and runs after the LLM call.
233 async def parse_stream(self, query: str) -> AsyncGenerator[dict]: 234 """ 235 Parse a natural language location query with streaming reasoning and results. 236 237 This method provides real-time feedback during the parsing process by yielding 238 intermediate reasoning steps and the final GeoQuery result. This is useful for 239 providing users with transparency into the LLM's decision-making process and 240 for building responsive UIs. 241 242 The stream yields dictionaries with the following event types: 243 - {"type": "start"} - Stream started 244 - {"type": "reasoning", "content": str} - Intermediate processing steps 245 - {"type": "data-response", "content": dict} - Final GeoQuery as JSON 246 - {"type": "error", "content": str} - Errors encountered during processing 247 - {"type": "finish"} - Stream completed successfully 248 249 Args: 250 query: Natural language query in any language 251 252 Yields: 253 dict: Stream events with type and optional content fields 254 255 Raises: 256 ParsingError: If LLM fails to parse query into valid structure 257 ValidationError: If parsed query fails business logic validation 258 UnknownRelationError: If spatial relation is not registered 259 LowConfidenceError: If confidence below threshold (strict mode only) 260 261 Examples: 262 Basic usage with async iteration: 263 >>> async for event in parser.parse_stream("restaurants near Lake Geneva"): 264 ... if event["type"] == "reasoning": 265 ... print(f"Reasoning: {event['content']}") 266 ... elif event["type"] == "data-response": 267 ... geo_query = event["content"] 268 ... print(f"Location: {geo_query['reference_location']['name']}") 269 ... elif event["type"] == "error": 270 ... print(f"Error: {event['content']}") 271 272 Using in a FastAPI streaming endpoint: 273 >>> from fastapi.responses import StreamingResponse 274 >>> @app.get("/stream") 275 >>> async def stream_endpoint(q: str): 276 ... async def event_stream(): 277 ... async for event in parser.parse_stream(q): 278 ... yield f"data: {json.dumps(event)}\\n\\n" 279 ... return StreamingResponse(event_stream(), media_type="text/event-stream") 280 """ 281 try: 282 # Signal start of stream 283 yield {"type": "start"} 284 285 yield {"type": "reasoning", "content": "Preparing query for LLM processing"} 286 formatted_messages = self.prompt.format_messages(query=query) 287 288 yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"} 289 try: 290 response = await self.structured_llm.ainvoke(formatted_messages) 291 except Exception as e: 292 yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"} 293 raise ParsingError( 294 message=f"LLM invocation failed: {str(e)}", 295 raw_response="", 296 original_error=e, 297 ) from e 298 299 yield {"type": "reasoning", "content": "Parsing LLM response into structured format"} 300 try: 301 geo_query = self._unpack_response(response) 302 except ParsingError: 303 yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"} 304 raise 305 306 if geo_query.confidence_breakdown.reasoning: 307 yield { 308 "type": "reasoning", 309 "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}", 310 } 311 312 yield {"type": "reasoning", "content": "Validating spatial relation configuration"} 313 geo_query = self._finalize(geo_query, query) 314 315 yield {"type": "reasoning", "content": "Query parsing completed successfully"} 316 yield {"type": "data-response", "content": geo_query.model_dump()} 317 318 # Signal successful completion 319 yield {"type": "finish"} 320 321 except Exception as e: 322 # Emit error event before re-raising 323 yield {"type": "error", "content": f"Error during parsing: {str(e)}"} 324 raise
Parse a natural language location query with streaming reasoning and results.
This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.
The stream yields dictionaries with the following event types:
- {"type": "start"} - Stream started
- {"type": "reasoning", "content": str} - Intermediate processing steps
- {"type": "data-response", "content": dict} - Final GeoQuery as JSON
- {"type": "error", "content": str} - Errors encountered during processing
- {"type": "finish"} - Stream completed successfully
Arguments:
- query: Natural language query in any language
Yields:
dict: Stream events with type and optional content fields
Raises:
- ParsingError: If LLM fails to parse query into valid structure
- ValidationError: If parsed query fails business logic validation
- UnknownRelationError: If spatial relation is not registered
- LowConfidenceError: If confidence below threshold (strict mode only)
Examples:
Basic usage with async iteration:
>>> async for event in parser.parse_stream("restaurants near Lake Geneva"): ... if event["type"] == "reasoning": ... print(f"Reasoning: {event['content']}") ... elif event["type"] == "data-response": ... geo_query = event["content"] ... print(f"Location: {geo_query['reference_location']['name']}") ... elif event["type"] == "error": ... print(f"Error: {event['content']}")Using in a FastAPI streaming endpoint:
>>> from fastapi.responses import StreamingResponse >>> @app.get("/stream") >>> async def stream_endpoint(q: str): ... async def event_stream(): ... async for event in parser.parse_stream(q): ... yield f"data: {json.dumps(event)}\n\n" ... return StreamingResponse(event_stream(), media_type="text/event-stream")
326 def parse_batch(self, queries: list[str]) -> list[GeoQuery]: 327 """ 328 Parse multiple queries in batch. 329 330 Note: This is a simple sequential implementation. 331 For true parallelization, consider using async methods or ThreadPoolExecutor. 332 333 Args: 334 queries: List of natural language queries 335 336 Returns: 337 List of GeoQuery objects (same order as input) 338 339 Raises: 340 Same exceptions as parse() for any failing query 341 """ 342 return [self.parse(query) for query in queries]
Parse multiple queries in batch.
Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.
Arguments:
- queries: List of natural language queries
Returns:
List of GeoQuery objects (same order as input)
Raises:
- Same exceptions as parse() for any failing query
344 def get_available_relations(self, category: RelationCategory | None = None) -> list[str]: 345 """ 346 Get list of available spatial relations. 347 348 Args: 349 category: Optional filter by category ("containment", "buffer", "directional") 350 351 Returns: 352 List of relation names 353 """ 354 return self.spatial_config.list_relations(category=category)
Get list of available spatial relations.
Arguments:
- category: Optional filter by category ("containment", "buffer", "directional")
Returns:
List of relation names
356 def describe_relation(self, relation_name: str) -> str: 357 """ 358 Get description of a spatial relation. 359 360 Args: 361 relation_name: Name of the relation 362 363 Returns: 364 Human-readable description 365 366 Raises: 367 UnknownRelationError: If relation is not registered 368 """ 369 config = self.spatial_config.get_config(relation_name) 370 return config.description
Get description of a spatial relation.
Arguments:
- relation_name: Name of the relation
Returns:
Human-readable description
Raises:
- UnknownRelationError: If relation is not registered
123class GeoQuery(BaseModel): 124 """ 125 Root model representing a parsed geographic query. 126 This is the main output structure returned by the parser. 127 """ 128 129 query_type: Literal["simple", "compound", "split", "boolean"] = Field( 130 "simple", 131 description="Type of query. Phase 1 only supports 'simple'. " 132 "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations", 133 ) 134 spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location") 135 reference_location: ReferenceLocation | None = Field( 136 None, 137 description="Reference location for the spatial query. " 138 "None when the query contains no named geographic location.", 139 ) 140 buffer_config: BufferConfig | None = Field( 141 None, 142 description="Buffer configuration for buffer and directional relations. " 143 "Auto-generated with defaults by enrich_with_defaults() if not provided. " 144 "Required for 'near', 'around', 'north_of', etc. " 145 "Set to None for containment relations ('in').", 146 ) 147 confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse") 148 original_query: str = Field( 149 default="", 150 description="Original query text exactly as provided by the user", 151 ) 152 153 @model_validator(mode="after") 154 def validate_buffer_config_consistency(self) -> "GeoQuery": 155 """Validate buffer_config consistency with relation category.""" 156 # Buffer and directional relations must have buffer_config 157 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 158 raise ValueError( 159 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 160 ) 161 162 # Containment and clipping relations should not have buffer_config 163 if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None: 164 raise ValueError( 165 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 166 f"should not have buffer_config" 167 ) 168 169 return self
Root model representing a parsed geographic query. This is the main output structure returned by the parser.
Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations
Reference location for the spatial query. None when the query contains no named geographic location.
Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').
153 @model_validator(mode="after") 154 def validate_buffer_config_consistency(self) -> "GeoQuery": 155 """Validate buffer_config consistency with relation category.""" 156 # Buffer and directional relations must have buffer_config 157 if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None: 158 raise ValueError( 159 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config" 160 ) 161 162 # Containment and clipping relations should not have buffer_config 163 if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None: 164 raise ValueError( 165 f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' " 166 f"should not have buffer_config" 167 ) 168 169 return self
Validate buffer_config consistency with relation category.
101class SpatialRelation(BaseModel): 102 """A spatial relationship between target and reference.""" 103 104 relation: str = Field( 105 description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', " 106 "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list." 107 ) 108 category: RelationCategory = Field( 109 description="Category of spatial relation. " 110 "'containment' = exact boundary matching (in), " 111 "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), " 112 "'directional' = sector-based queries (north_of, south_of, east_of, west_of), " 113 "'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)" 114 ) 115 explicit_distance: float | None = Field( 116 None, 117 description="Distance in meters if explicitly mentioned by user. " 118 "For example: 'within 5km' → 5000, 'within 500 meters' → 500. " 119 "Leave null if not explicitly stated.", 120 )
A spatial relationship between target and reference.
Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.
Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), 'directional' = sector-based queries (north_of, south_of, east_of, west_of), 'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)
41class ReferenceLocation(BaseModel): 42 """A geographic reference location extracted from the query.""" 43 44 name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')") 45 # FIXME: enum ? 46 type: str | None = Field( 47 None, 48 description="Type hint for geographic feature (city, lake, mountain, canton, country, " 49 "train_station, airport, river, road, etc.). This is a HINT for ranking results, " 50 "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, " 51 "'Rhone' could be river or road), provide your best guess or leave null. " 52 "The datasource will return multiple types ranked by relevance.", 53 ) 54 type_confidence: ConfidenceLevel | None = Field( 55 None, 56 description="Confidence in the type inference (0-1). High confidence (>0.8) when type is " 57 "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous " 58 "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, " 59 "'in X' → city/region, 'on X' → lake/mountain.", 60 )
A geographic reference location extracted from the query.
Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')
Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.
Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.
63class BufferConfig(BaseModel): 64 """Configuration for buffer-based spatial operations.""" 65 66 distance_m: float = Field( 67 description="Buffer distance in meters. Positive values expand outward (proximity), " 68 "negative values erode inward (e.g., 'in the heart of'). " 69 "Examples: 5000 = 5km radius, -500 = 500m erosion" 70 ) 71 buffer_from: Literal["center", "boundary"] = Field( 72 description="Buffer origin. 'center' = buffer from centroid point (for proximity), " 73 "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)" 74 ) 75 ring_only: bool = Field( 76 False, 77 description="If True, exclude the reference feature itself to create a ring/donut shape. " 78 "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). " 79 "Only valid with buffer_from='boundary'.", 80 ) 81 side: Literal["left", "right"] | None = Field( 82 None, 83 description="Side of a linear feature for one-sided buffer. " 84 "'left' = left side relative to line direction, 'right' = right side. " 85 "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().", 86 ) 87 inferred: bool = Field( 88 True, 89 description="True if this configuration was inferred from relation defaults. " 90 "False if the user explicitly specified distance or buffer parameters.", 91 ) 92 93 @model_validator(mode="after") 94 def validate_ring_only(self) -> "BufferConfig": 95 """Validate that ring_only is only used with boundary buffers.""" 96 if self.ring_only and self.buffer_from == "center": 97 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 98 return self
Configuration for buffer-based spatial operations.
Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion
Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)
If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.
Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().
True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.
93 @model_validator(mode="after") 94 def validate_ring_only(self) -> "BufferConfig": 95 """Validate that ring_only is only used with boundary buffers.""" 96 if self.ring_only and self.buffer_from == "center": 97 raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)") 98 return self
Validate that ring_only is only used with boundary buffers.
21class ConfidenceScore(BaseModel): 22 """Confidence scores for different aspects of the parsed query.""" 23 24 overall: ConfidenceLevel = Field( 25 description="Overall confidence score for the entire query parse. " 26 "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain", 27 ) 28 location_confidence: ConfidenceLevel = Field( 29 description="Confidence in correctly identifying the reference location", 30 ) 31 relation_confidence: ConfidenceLevel = Field( 32 description="Confidence in correctly identifying the spatial relation", 33 ) 34 reasoning: str | None = Field( 35 None, 36 description="Explanation for confidence scores. Always include reasoning for clarity and debugging. " 37 "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.", 38 )
Confidence scores for different aspects of the parsed query.
Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain
Confidence in correctly identifying the reference location
41class SpatialRelationConfig: 42 """ 43 Registry and configuration for spatial relations. 44 45 Manages built-in and custom spatial relations with their default parameters. 46 """ 47 48 def __init__(self): 49 """Initialize with built-in spatial relations.""" 50 self.relations: dict[str, RelationConfig] = {} 51 self._initialize_defaults() 52 53 def _initialize_defaults(self): 54 """Register built-in spatial relations from ARCHITECTURE.md.""" 55 56 # ===== CONTAINMENT RELATIONS ===== 57 self.register_relation( 58 RelationConfig( 59 name="in", 60 category="containment", 61 description="Feature is within the reference boundary", 62 ) 63 ) 64 65 # ===== BUFFER/PROXIMITY RELATIONS ===== 66 self.register_relation( 67 RelationConfig( 68 name="near", 69 category="buffer", 70 description="Proximity search with default 5km radius", 71 default_distance_m=5000, 72 buffer_from="center", 73 ) 74 ) 75 76 self.register_relation( 77 RelationConfig( 78 name="on_shores_of", 79 category="buffer", 80 description="Ring buffer around lake/water boundary, excluding the water body itself", 81 default_distance_m=1000, 82 buffer_from="boundary", 83 ring_only=True, 84 ) 85 ) 86 87 self.register_relation( 88 RelationConfig( 89 name="along", 90 category="buffer", 91 description="Buffer following a linear feature like a river or road", 92 default_distance_m=500, 93 buffer_from="boundary", 94 ) 95 ) 96 97 self.register_relation( 98 RelationConfig( 99 name="left_bank", 100 category="buffer", 101 description="Left bank of a linear feature (river, road) relative to its direction/flow", 102 default_distance_m=500, 103 buffer_from="boundary", 104 side="left", 105 ) 106 ) 107 108 self.register_relation( 109 RelationConfig( 110 name="right_bank", 111 category="buffer", 112 description="Right bank of a linear feature (river, road) relative to its direction/flow", 113 default_distance_m=500, 114 buffer_from="boundary", 115 side="right", 116 ) 117 ) 118 119 self.register_relation( 120 RelationConfig( 121 name="in_the_heart_of", 122 category="buffer", 123 description="Central area excluding periphery (negative buffer - erosion)", 124 default_distance_m=-500, 125 buffer_from="boundary", 126 ) 127 ) 128 129 self.register_relation( 130 RelationConfig( 131 name="bordering", 132 category="buffer", 133 description="Thin ring just outside the reference boundary, for land-border adjacency queries (e.g. 'cities bordering Germany')", 134 default_distance_m=2000, 135 buffer_from="boundary", 136 ring_only=True, 137 ) 138 ) 139 140 # ===== CLIPPING RELATIONS ===== 141 # Clip the reference geometry to a directional half-plane using bbox intersection. 142 # These answer "what is in the northern/southern/eastern/western portion of X?" 143 # as opposed to directional relations which answer "what is north/south/etc. of X?". 144 self.register_relation( 145 RelationConfig( 146 name="northern_part_of", 147 category="clipping", 148 description="Northern half of the reference geometry (bbox clip to upper half)", 149 clip_direction="north", 150 ) 151 ) 152 153 self.register_relation( 154 RelationConfig( 155 name="southern_part_of", 156 category="clipping", 157 description="Southern half of the reference geometry (bbox clip to lower half)", 158 clip_direction="south", 159 ) 160 ) 161 162 self.register_relation( 163 RelationConfig( 164 name="eastern_part_of", 165 category="clipping", 166 description="Eastern half of the reference geometry (bbox clip to right half)", 167 clip_direction="east", 168 ) 169 ) 170 171 self.register_relation( 172 RelationConfig( 173 name="western_part_of", 174 category="clipping", 175 description="Western half of the reference geometry (bbox clip to left half)", 176 clip_direction="west", 177 ) 178 ) 179 180 # ===== DIRECTIONAL RELATIONS ===== 181 # All directional relations use consistent defaults: 182 # - Distance: 10km radius (default_distance_m=10000) 183 # - Sector: 90° angular wedge (sector_angle_degrees=90) 184 # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults) 185 # These defaults are applied automatically by enrich_with_defaults() for any directional query. 186 # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West) 187 self.register_relation( 188 RelationConfig( 189 name="north_of", 190 category="directional", 191 description="Directional sector north of reference", 192 default_distance_m=10000, 193 sector_angle_degrees=90, 194 direction_angle_degrees=0, 195 ) 196 ) 197 198 self.register_relation( 199 RelationConfig( 200 name="south_of", 201 category="directional", 202 description="Directional sector south of reference", 203 default_distance_m=10000, 204 sector_angle_degrees=90, 205 direction_angle_degrees=180, 206 ) 207 ) 208 209 self.register_relation( 210 RelationConfig( 211 name="east_of", 212 category="directional", 213 description="Directional sector east of reference", 214 default_distance_m=10000, 215 sector_angle_degrees=90, 216 direction_angle_degrees=90, 217 ) 218 ) 219 220 self.register_relation( 221 RelationConfig( 222 name="west_of", 223 category="directional", 224 description="Directional sector west of reference", 225 default_distance_m=10000, 226 sector_angle_degrees=90, 227 direction_angle_degrees=270, 228 ) 229 ) 230 231 # ===== DIAGONAL DIRECTIONAL RELATIONS ===== 232 self.register_relation( 233 RelationConfig( 234 name="northeast_of", 235 category="directional", 236 description="Directional sector northeast of reference", 237 default_distance_m=10000, 238 sector_angle_degrees=90, 239 direction_angle_degrees=45, 240 ) 241 ) 242 243 self.register_relation( 244 RelationConfig( 245 name="southeast_of", 246 category="directional", 247 description="Directional sector southeast of reference", 248 default_distance_m=10000, 249 sector_angle_degrees=90, 250 direction_angle_degrees=135, 251 ) 252 ) 253 254 self.register_relation( 255 RelationConfig( 256 name="southwest_of", 257 category="directional", 258 description="Directional sector southwest of reference", 259 default_distance_m=10000, 260 sector_angle_degrees=90, 261 direction_angle_degrees=225, 262 ) 263 ) 264 265 self.register_relation( 266 RelationConfig( 267 name="northwest_of", 268 category="directional", 269 description="Directional sector northwest of reference", 270 default_distance_m=10000, 271 sector_angle_degrees=90, 272 direction_angle_degrees=315, 273 ) 274 ) 275 276 def register_relation(self, config: RelationConfig) -> None: 277 """Register a new spatial relation.""" 278 self.relations[config.name] = config 279 280 def has_relation(self, name: str) -> bool: 281 """Check if a relation is registered.""" 282 return name in self.relations 283 284 def get_config(self, name: str) -> RelationConfig: 285 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 286 if not self.has_relation(name): 287 raise UnknownRelationError( 288 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 289 relation_name=name, 290 ) 291 return self.relations[name] 292 293 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 294 """List available relation names.""" 295 if category is None: 296 return sorted(self.relations.keys()) 297 return sorted(r.name for r in self.relations.values() if r.category == category) 298 299 def format_for_prompt(self) -> str: 300 """Format relations for inclusion in LLM prompt.""" 301 lines = [] 302 303 # Group by category 304 for category in get_args(RelationCategory): 305 category_relations = [r for r in self.relations.values() if r.category == category] 306 if not category_relations: 307 continue 308 309 lines.append(f"\n{category.upper()} RELATIONS:") 310 311 for rel in sorted(category_relations, key=lambda r: r.name): 312 # Build distance info 313 dist_info = "" 314 if rel.default_distance_m is not None: 315 dist_str = f"{abs(rel.default_distance_m)}m" 316 if rel.default_distance_m < 0: 317 dist_info = f" (default: {dist_str} erosion)" 318 else: 319 dist_info = f" (default: {dist_str})" 320 321 # Build special flags 322 flags = [] 323 if rel.ring_only: 324 flags.append("ring buffer") 325 if rel.buffer_from: 326 flags.append(f"from {rel.buffer_from}") 327 if rel.side: 328 flags.append(f"{rel.side} side only") 329 flag_info = f" [{', '.join(flags)}]" if flags else "" 330 331 # Format line 332 lines.append(f" • {rel.name}{dist_info}{flag_info}") 333 lines.append(f" {rel.description}") 334 335 # Add notes 336 lines.append("\nNOTES:") 337 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 338 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)") 339 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 340 lines.append(" • Clipping relations return a sub-area of the reference geometry (not a buffer outward)") 341 342 return "\n".join(lines)
Registry and configuration for spatial relations.
Manages built-in and custom spatial relations with their default parameters.
48 def __init__(self): 49 """Initialize with built-in spatial relations.""" 50 self.relations: dict[str, RelationConfig] = {} 51 self._initialize_defaults()
Initialize with built-in spatial relations.
276 def register_relation(self, config: RelationConfig) -> None: 277 """Register a new spatial relation.""" 278 self.relations[config.name] = config
Register a new spatial relation.
280 def has_relation(self, name: str) -> bool: 281 """Check if a relation is registered.""" 282 return name in self.relations
Check if a relation is registered.
284 def get_config(self, name: str) -> RelationConfig: 285 """Get configuration for a relation. Raises UnknownRelationError if not found.""" 286 if not self.has_relation(name): 287 raise UnknownRelationError( 288 f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}", 289 relation_name=name, 290 ) 291 return self.relations[name]
Get configuration for a relation. Raises UnknownRelationError if not found.
293 def list_relations(self, category: RelationCategory | None = None) -> list[str]: 294 """List available relation names.""" 295 if category is None: 296 return sorted(self.relations.keys()) 297 return sorted(r.name for r in self.relations.values() if r.category == category)
List available relation names.
299 def format_for_prompt(self) -> str: 300 """Format relations for inclusion in LLM prompt.""" 301 lines = [] 302 303 # Group by category 304 for category in get_args(RelationCategory): 305 category_relations = [r for r in self.relations.values() if r.category == category] 306 if not category_relations: 307 continue 308 309 lines.append(f"\n{category.upper()} RELATIONS:") 310 311 for rel in sorted(category_relations, key=lambda r: r.name): 312 # Build distance info 313 dist_info = "" 314 if rel.default_distance_m is not None: 315 dist_str = f"{abs(rel.default_distance_m)}m" 316 if rel.default_distance_m < 0: 317 dist_info = f" (default: {dist_str} erosion)" 318 else: 319 dist_info = f" (default: {dist_str})" 320 321 # Build special flags 322 flags = [] 323 if rel.ring_only: 324 flags.append("ring buffer") 325 if rel.buffer_from: 326 flags.append(f"from {rel.buffer_from}") 327 if rel.side: 328 flags.append(f"{rel.side} side only") 329 flag_info = f" [{', '.join(flags)}]" if flags else "" 330 331 # Format line 332 lines.append(f" • {rel.name}{dist_info}{flag_info}") 333 lines.append(f" {rel.description}") 334 335 # Add notes 336 lines.append("\nNOTES:") 337 lines.append(" • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)") 338 lines.append(" • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)") 339 lines.append(" • Buffer from 'center' vs 'boundary' determines buffer origin") 340 lines.append(" • Clipping relations return a sub-area of the reference geometry (not a buffer outward)") 341 342 return "\n".join(lines)
Format relations for inclusion in LLM prompt.
13@dataclass 14class RelationConfig: 15 """ 16 Configuration for a single spatial relation. 17 18 Attributes: 19 name: Relation identifier (e.g., "in", "near", "north_of") 20 category: Type of spatial operation 21 description: Human-readable description for LLM prompt 22 default_distance_m: Default buffer distance in meters 23 buffer_from: Buffer origin 24 ring_only: Exclude reference feature to create ring buffer 25 sector_angle_degrees: Angular sector for directional queries 26 direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise) 27 """ 28 29 name: str 30 category: RelationCategory 31 description: str 32 default_distance_m: float | None = None 33 buffer_from: Literal["center", "boundary"] | None = None 34 ring_only: bool = False 35 side: Literal["left", "right"] | None = None 36 sector_angle_degrees: float | None = None 37 direction_angle_degrees: float | None = None 38 clip_direction: Literal["north", "south", "east", "west"] | None = None
Configuration for a single spatial relation.
Attributes:
- name: Relation identifier (e.g., "in", "near", "north_of")
- category: Type of spatial operation
- description: Human-readable description for LLM prompt
- default_distance_m: Default buffer distance in meters
- buffer_from: Buffer origin
- ring_only: Exclude reference feature to create ring buffer
- sector_angle_degrees: Angular sector for directional queries
- direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
Base exception for all GeoFilter errors.
13class ParsingError(GeoFilterError): 14 """LLM failed to parse query into valid structure.""" 15 16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
LLM failed to parse query into valid structure.
16 def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None): 17 """ 18 Initialize parsing error. 19 20 Args: 21 message: Error description 22 raw_response: Raw response from LLM 23 original_error: Original exception that caused parsing failure 24 """ 25 self.raw_response = raw_response 26 self.original_error = original_error 27 super().__init__(message)
Initialize parsing error.
Arguments:
- message: Error description
- raw_response: Raw response from LLM
- original_error: Original exception that caused parsing failure
30class ValidationError(GeoFilterError): 31 """Structured output is valid but fails business logic validation.""" 32 33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Structured output is valid but fails business logic validation.
33 def __init__(self, message: str, field: str | None = None, detail: str | None = None): 34 """ 35 Initialize validation error. 36 37 Args: 38 message: Error description 39 field: Field name that failed validation 40 detail: Additional detail about the validation failure 41 """ 42 self.field = field 43 self.detail = detail 44 super().__init__(message)
Initialize validation error.
Arguments:
- message: Error description
- field: Field name that failed validation
- detail: Additional detail about the validation failure
47class NoReferenceLocationError(ValidationError): 48 """Query contains no named geographic reference location.""" 49 50 def __init__(self, message: str): 51 super().__init__(message, field="reference_location")
Query contains no named geographic reference location.
54class UnknownRelationError(ValidationError): 55 """Spatial relation is not registered in configuration.""" 56 57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Spatial relation is not registered in configuration.
57 def __init__(self, message: str, relation_name: str): 58 """ 59 Initialize unknown relation error. 60 61 Args: 62 message: Error description 63 relation_name: The unknown relation name 64 """ 65 self.relation_name = relation_name 66 super().__init__(message, field="spatial_relation")
Initialize unknown relation error.
Arguments:
- message: Error description
- relation_name: The unknown relation name
69class LowConfidenceError(GeoFilterError): 70 """Query confidence is below threshold (strict mode).""" 71 72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Query confidence is below threshold (strict mode).
72 def __init__(self, message: str, confidence: float, reasoning: str | None = None): 73 """ 74 Initialize low confidence error. 75 76 Args: 77 message: Error description 78 confidence: Confidence score (0-1) 79 reasoning: Optional explanation for low confidence 80 """ 81 self.confidence = confidence 82 self.reasoning = reasoning 83 super().__init__(message)
Initialize low confidence error.
Arguments:
- message: Error description
- confidence: Confidence score (0-1)
- reasoning: Optional explanation for low confidence
86class LowConfidenceWarning(UserWarning): 87 """Query confidence is below threshold (permissive mode).""" 88 89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Query confidence is below threshold (permissive mode).
89 def __init__(self, confidence: float, message: str = ""): 90 """ 91 Initialize low confidence warning. 92 93 Args: 94 confidence: Confidence score (0-1) 95 message: Warning message 96 """ 97 self.confidence = confidence 98 super().__init__(message)
Initialize low confidence warning.
Arguments:
- confidence: Confidence score (0-1)
- message: Warning message
14class GeoDataSource(Protocol): 15 """ 16 Protocol for geographic data sources. 17 18 Implementations resolve location names to geographic features. 19 Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326). 20 21 Example of returned feature: 22 { 23 "type": "Feature", 24 "id": "uuid-123", 25 "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, 26 "bbox": [8.4, 47.3, 8.6, 47.4], 27 "properties": { 28 "name": "Zürich", 29 "type": "city", 30 "confidence": 1.0, 31 ... 32 } 33 } 34 """ 35 36 def search( 37 self, 38 name: str, 39 type: str | None = None, 40 max_results: int = 10, 41 ) -> list[Feature]: 42 """ 43 Search for geographic features by name. 44 45 Args: 46 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 47 type: Optional type hint for filtering/ranking results. 48 Examples: "lake", "city", "mountain", "canton", "river". 49 When provided, matching types are ranked higher. 50 max_results: Maximum number of results to return. 51 52 Returns: 53 List of matching GeoJSON Feature dicts, ranked by relevance. 54 Returns empty list if no matches found. 55 """ 56 ... 57 58 def get_by_id(self, feature_id: str) -> Feature | None: 59 """ 60 Get a specific feature by its unique identifier. 61 62 Args: 63 feature_id: Unique identifier from the data source. 64 65 Returns: 66 The matching GeoJSON Feature dict, or None if not found. 67 """ 68 ... 69 70 def get_available_types(self) -> list[str]: 71 """ 72 Get list of concrete geographic types this datasource can return. 73 74 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 75 that this datasource uses in the "type" property of returned features. 76 These types can be matched against the location type hierarchy for fuzzy matching. 77 78 The returned types should be a subset of or mapped to the standard location 79 type hierarchy defined in location_types.TYPE_HIERARCHY. 80 81 Returns: 82 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 83 Empty list if this datasource does not provide type information. 84 85 Example: 86 >>> source = SwissNames3DSource("data/") 87 >>> types = source.get_available_types() 88 >>> print(types) 89 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 90 """ 91 ...
Protocol for geographic data sources.
Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
Example of returned feature:
{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }
1960def _no_init_or_replace_init(self, *args, **kwargs): 1961 cls = type(self) 1962 1963 if cls._is_protocol: 1964 raise TypeError('Protocols cannot be instantiated') 1965 1966 # Already using a custom `__init__`. No need to calculate correct 1967 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1968 if cls.__init__ is not _no_init_or_replace_init: 1969 return 1970 1971 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1972 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1973 # searches for a proper new `__init__` in the MRO. The new `__init__` 1974 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1975 # instantiation of the protocol subclass will thus use the new 1976 # `__init__` and no longer call `_no_init_or_replace_init`. 1977 for base in cls.__mro__: 1978 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1979 if init is not _no_init_or_replace_init: 1980 cls.__init__ = init 1981 break 1982 else: 1983 # should not happen 1984 cls.__init__ = object.__init__ 1985 1986 cls.__init__(self, *args, **kwargs)
36 def search( 37 self, 38 name: str, 39 type: str | None = None, 40 max_results: int = 10, 41 ) -> list[Feature]: 42 """ 43 Search for geographic features by name. 44 45 Args: 46 name: Location name to search for (e.g., "Lake Geneva", "Bern"). 47 type: Optional type hint for filtering/ranking results. 48 Examples: "lake", "city", "mountain", "canton", "river". 49 When provided, matching types are ranked higher. 50 max_results: Maximum number of results to return. 51 52 Returns: 53 List of matching GeoJSON Feature dicts, ranked by relevance. 54 Returns empty list if no matches found. 55 """ 56 ...
Search for geographic features by name.
Arguments:
- name: Location name to search for (e.g., "Lake Geneva", "Bern").
- type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.
58 def get_by_id(self, feature_id: str) -> Feature | None: 59 """ 60 Get a specific feature by its unique identifier. 61 62 Args: 63 feature_id: Unique identifier from the data source. 64 65 Returns: 66 The matching GeoJSON Feature dict, or None if not found. 67 """ 68 ...
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier from the data source.
Returns:
The matching GeoJSON Feature dict, or None if not found.
70 def get_available_types(self) -> list[str]: 71 """ 72 Get list of concrete geographic types this datasource can return. 73 74 Returns a list of concrete type values (e.g., "lake", "city", "restaurant") 75 that this datasource uses in the "type" property of returned features. 76 These types can be matched against the location type hierarchy for fuzzy matching. 77 78 The returned types should be a subset of or mapped to the standard location 79 type hierarchy defined in location_types.TYPE_HIERARCHY. 80 81 Returns: 82 List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). 83 Empty list if this datasource does not provide type information. 84 85 Example: 86 >>> source = SwissNames3DSource("data/") 87 >>> types = source.get_available_types() 88 >>> print(types) 89 ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...] 90 """ 91 ...
Get list of concrete geographic types this datasource can return.
Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.
The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.
Returns:
List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.
Example:
>>> source = SwissNames3DSource("data/") >>> types = source.get_available_types() >>> print(types) ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
163class SwissNames3DSource: 164 """ 165 Geographic data source backed by swisstopo's swissNAMES3D dataset. 166 167 Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase 168 and provides search by name with optional type filtering. 169 170 If data_path is a directory, automatically loads and concatenates all SwissNames3D 171 shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within. 172 173 All geometries are returned as GeoJSON in WGS84 (EPSG:4326). 174 175 Args: 176 data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles. 177 layer: Layer name within the data source (for multi-layer formats like GDB). 178 179 Example: 180 >>> source = SwissNames3DSource("data/") # Load all 3 geometry types 181 >>> results = source.search("Lac Léman", type="lake") 182 >>> print(results[0].geometry) # GeoJSON in WGS84 183 """ 184 185 def __init__(self, data_path: str | Path, layer: str | None = None) -> None: 186 self._data_path = Path(data_path) 187 self._layer = layer 188 self._gdf: gpd.GeoDataFrame | None = None 189 self._name_index: dict[str, list[int]] = {} 190 191 def _ensure_loaded(self) -> None: 192 """Load data lazily on first access.""" 193 if self._gdf is not None: 194 return 195 self._load_data() 196 197 def _load_data(self) -> None: 198 """Load SwissNames3D data and build the name index.""" 199 # Check if data_path is a directory 200 if self._data_path.is_dir(): 201 self._load_from_directory() 202 else: 203 # Load single file 204 kwargs: dict[str, Any] = {} 205 if self._layer is not None: 206 kwargs["layer"] = self._layer 207 self._gdf = gpd.read_file(str(self._data_path), **kwargs) 208 209 self._build_name_index() 210 211 def _load_from_directory(self) -> None: 212 """Load and concatenate all SwissNames3D shapefiles from a directory.""" 213 # Look for the 3 standard SwissNames3D shapefiles 214 shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"] 215 gdfs: list[gpd.GeoDataFrame] = [] 216 217 for name in shapefile_names: 218 shp_path = self._data_path / f"{name}.shp" 219 if shp_path.exists(): 220 gdf = gpd.read_file(str(shp_path)) 221 gdfs.append(gdf) 222 223 if not gdfs: 224 raise ValueError( 225 f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}" 226 ) 227 228 # Find common columns across all loaded GeoDataFrames 229 common_cols = set(gdfs[0].columns) 230 for gdf in gdfs[1:]: 231 common_cols &= set(gdf.columns) 232 233 # Keep only common columns and concatenate 234 gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs] 235 self._gdf = gpd.GeoDataFrame( 236 gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry" 237 ) 238 239 def _build_name_index(self) -> None: 240 """Build a normalized name → row indices lookup for fast search.""" 241 assert self._gdf is not None 242 self._name_index = {} 243 244 name_col = self._detect_name_column() 245 for idx, name in enumerate(self._gdf[name_col]): 246 if not isinstance(name, str) or not name.strip(): 247 continue 248 normalized = _normalize_name(name) 249 if normalized not in self._name_index: 250 self._name_index[normalized] = [] 251 self._name_index[normalized].append(idx) 252 253 def _detect_name_column(self) -> str: 254 """Detect the name column in the data.""" 255 assert self._gdf is not None 256 for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"): 257 if candidate in self._gdf.columns: 258 return candidate 259 raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}") 260 261 def _detect_type_column(self) -> str | None: 262 """Detect the feature type column in the data.""" 263 assert self._gdf is not None 264 for candidate in ("OBJEKTART", "objektart", "Objektart"): 265 if candidate in self._gdf.columns: 266 return candidate 267 return None 268 269 def _detect_id_column(self) -> str | None: 270 """Detect the unique ID column in the data.""" 271 assert self._gdf is not None 272 for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"): 273 if candidate in self._gdf.columns: 274 return candidate 275 return None 276 277 def _row_to_feature(self, idx: int) -> Feature: 278 """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry.""" 279 assert self._gdf is not None 280 row = self._gdf.iloc[idx] 281 282 # Get name 283 name_col = self._detect_name_column() 284 name = str(row[name_col]) 285 286 # Get type 287 type_col = self._detect_type_column() 288 raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown" 289 normalized_type = _objektart_to_type(raw_type) 290 291 # Get ID 292 id_col = self._detect_id_column() 293 feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx) 294 295 # Convert geometry to WGS84 GeoJSON 296 geom = row.geometry 297 if geom is None or geom.is_empty: 298 geometry = {"type": "Point", "coordinates": [0, 0]} 299 bbox = None 300 else: 301 # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer 302 # Drop Z coordinates — they are not needed and cause issues with single_sided buffers 303 wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom)) 304 geometry = mapping(wgs84_geom) 305 bounds = wgs84_geom.bounds # (minx, miny, maxx, maxy) 306 bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) 307 308 # Collect extra properties 309 skip_cols = {name_col, "geometry"} 310 if type_col: 311 skip_cols.add(type_col) 312 if id_col: 313 skip_cols.add(id_col) 314 315 properties: dict[str, Any] = { 316 "name": name, 317 "type": normalized_type, 318 "confidence": 1.0, 319 } 320 for col in self._gdf.columns: 321 if col not in skip_cols: 322 val = row.get(col) 323 if val is not None and str(val) != "nan": 324 properties[col] = val 325 326 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 327 328 def search( 329 self, 330 name: str, 331 type: str | None = None, 332 max_results: int = 10, 333 ) -> list[Feature]: 334 """ 335 Search for geographic features by name. 336 337 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 338 First tries exact matching, then falls back to fuzzy matching if no exact 339 matches found. 340 341 Args: 342 name: Location name to search for. 343 type: Optional type hint to filter results. If provided, only features 344 of this type are returned. 345 max_results: Maximum number of results to return. 346 347 Returns: 348 List of matching GeoJSON Feature dicts. If type is provided, only 349 features of that type are returned. Empty list if no matches found. 350 """ 351 self._ensure_loaded() 352 353 normalized = _normalize_name(name) 354 indices = self._name_index.get(normalized, []) 355 356 # If no exact match, try fuzzy matching 357 if not indices: 358 indices = self._fuzzy_search(normalized) 359 360 features = [self._row_to_feature(idx) for idx in indices] 361 362 # Filter by type if type hint provided. 363 # Expand via the type hierarchy so that category hints (e.g. "water") match 364 # all concrete types within that category ("lake", "river", "pond", ...). 365 if type is not None: 366 matching_types = get_matching_types(type) 367 if matching_types: 368 features = [f for f in features if f["properties"].get("type") in matching_types] 369 else: 370 # Unknown type hint, fall back to exact string match 371 features = [f for f in features if f["properties"].get("type") == type.lower()] 372 373 return features[:max_results] 374 375 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 376 """ 377 Fuzzy search for names that partially match the search query. 378 379 Uses token matching to find results where at least one token from the 380 query matches a token in the indexed name. This handles cases like: 381 - "venoge" matching "la venoge" 382 - "rhone" matching "rhone valais" 383 384 Args: 385 normalized: The normalized search query. 386 threshold: Minimum fuzzy match score (0-100) to include a result. 387 388 Returns: 389 List of row indices for fuzzy-matched names, sorted by score (descending). 390 """ 391 matches: list[tuple[int, float]] = [] 392 query_tokens = set(normalized.split()) 393 394 for indexed_name, indices in self._name_index.items(): 395 indexed_tokens = set(indexed_name.split()) 396 397 # Check if any query token matches any indexed token 398 token_overlap = query_tokens & indexed_tokens 399 400 if token_overlap: 401 # Also use token_set_ratio for better matching of partial strings 402 score = fuzz.token_set_ratio(normalized, indexed_name) 403 if score >= threshold: 404 for idx in indices: 405 matches.append((idx, score)) 406 407 # Sort by score (descending) to return best matches first 408 matches.sort(key=lambda x: x[1], reverse=True) 409 return [idx for idx, _ in matches] 410 411 def get_by_id(self, feature_id: str) -> Feature | None: 412 """ 413 Get a specific feature by its unique identifier. 414 415 Args: 416 feature_id: Unique identifier (UUID or row index). 417 418 Returns: 419 The matching GeoJSON Feature dict, or None if not found. 420 """ 421 self._ensure_loaded() 422 assert self._gdf is not None 423 424 id_col = self._detect_id_column() 425 if id_col: 426 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 427 if not matches.empty: 428 return self._row_to_feature(matches.index[0]) 429 430 # Fallback: try as row index 431 try: 432 idx = int(feature_id) 433 if 0 <= idx < len(self._gdf): 434 return self._row_to_feature(idx) 435 except ValueError: 436 pass 437 438 return None 439 440 def get_available_types(self) -> list[str]: 441 """ 442 Get list of concrete geographic types this datasource can return. 443 444 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 445 representing all possible types that SwissNames3D data can be classified as. 446 447 Returns: 448 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 449 """ 450 return sorted(OBJEKTART_TYPE_MAP.keys())
Geographic data source backed by swisstopo's swissNAMES3D dataset.
Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.
If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
Arguments:
- data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
- layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/") # Load all 3 geometry types >>> results = source.search("Lac Léman", type="lake") >>> print(results[0].geometry) # GeoJSON in WGS84
328 def search( 329 self, 330 name: str, 331 type: str | None = None, 332 max_results: int = 10, 333 ) -> list[Feature]: 334 """ 335 Search for geographic features by name. 336 337 Uses case-insensitive, accent-normalized matching with fuzzy fallback. 338 First tries exact matching, then falls back to fuzzy matching if no exact 339 matches found. 340 341 Args: 342 name: Location name to search for. 343 type: Optional type hint to filter results. If provided, only features 344 of this type are returned. 345 max_results: Maximum number of results to return. 346 347 Returns: 348 List of matching GeoJSON Feature dicts. If type is provided, only 349 features of that type are returned. Empty list if no matches found. 350 """ 351 self._ensure_loaded() 352 353 normalized = _normalize_name(name) 354 indices = self._name_index.get(normalized, []) 355 356 # If no exact match, try fuzzy matching 357 if not indices: 358 indices = self._fuzzy_search(normalized) 359 360 features = [self._row_to_feature(idx) for idx in indices] 361 362 # Filter by type if type hint provided. 363 # Expand via the type hierarchy so that category hints (e.g. "water") match 364 # all concrete types within that category ("lake", "river", "pond", ...). 365 if type is not None: 366 matching_types = get_matching_types(type) 367 if matching_types: 368 features = [f for f in features if f["properties"].get("type") in matching_types] 369 else: 370 # Unknown type hint, fall back to exact string match 371 features = [f for f in features if f["properties"].get("type") == type.lower()] 372 373 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.
Arguments:
- name: Location name to search for.
- type: Optional type hint to filter results. If provided, only features of this type are returned.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.
411 def get_by_id(self, feature_id: str) -> Feature | None: 412 """ 413 Get a specific feature by its unique identifier. 414 415 Args: 416 feature_id: Unique identifier (UUID or row index). 417 418 Returns: 419 The matching GeoJSON Feature dict, or None if not found. 420 """ 421 self._ensure_loaded() 422 assert self._gdf is not None 423 424 id_col = self._detect_id_column() 425 if id_col: 426 matches = self._gdf[self._gdf[id_col].astype(str) == feature_id] 427 if not matches.empty: 428 return self._row_to_feature(matches.index[0]) 429 430 # Fallback: try as row index 431 try: 432 idx = int(feature_id) 433 if 0 <= idx < len(self._gdf): 434 return self._row_to_feature(idx) 435 except ValueError: 436 pass 437 438 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Unique identifier (UUID or row index).
Returns:
The matching GeoJSON Feature dict, or None if not found.
440 def get_available_types(self) -> list[str]: 441 """ 442 Get list of concrete geographic types this datasource can return. 443 444 Returns all normalized types from the OBJEKTART_TYPE_MAP keys, 445 representing all possible types that SwissNames3D data can be classified as. 446 447 Returns: 448 Sorted list of type strings (e.g., ["lake", "city", "river", ...]) 449 """ 450 return sorted(OBJEKTART_TYPE_MAP.keys())
Get list of concrete geographic types this datasource can return.
Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.
Returns:
Sorted list of type strings (e.g., ["lake", "city", "river", ...])
267class IGNBDCartoSource: 268 """ 269 Geographic data source backed by IGN's BD-CARTO 5.0 dataset. 270 271 Loads French geographic data from GeoPackage files extracted to a directory. 272 Supports administrative boundaries (communes, departments, regions, …), 273 hydrography (rivers, lakes, …), named places (quarters, hamlets, …), 274 orographic features (peaks, passes, valleys, …) and protected areas. 275 276 Data must first be downloaded with ``make download-data-ign``, which places 277 the GeoPackage files in ``data/bdcarto/``. 278 279 All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 280 (EPSG:4326) and returned as standard GeoJSON Feature dicts. 281 282 Args: 283 data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``). 284 285 Example: 286 >>> source = IGNBDCartoSource("data/bdcarto") 287 >>> results = source.search("Ardèche", type="department") 288 >>> results = source.search("Lyon", type="city") 289 >>> results = source.search("Rhône", type="river") 290 """ 291 292 def __init__(self, data_path: str | Path) -> None: 293 self._data_path = Path(data_path) 294 self._gdf: gpd.GeoDataFrame | None = None 295 self._name_index: dict[str, list[int]] = {} 296 297 def _ensure_loaded(self) -> None: 298 if self._gdf is not None: 299 return 300 self._load_data() 301 302 def _load_data(self) -> None: 303 if self._data_path.is_dir(): 304 self._gdf = self._load_from_directory() 305 else: 306 self._gdf = self._load_from_file(self._data_path) 307 self._build_name_index() 308 309 def _load_from_file(self, path: Path) -> gpd.GeoDataFrame: 310 """Load from a GeoJSON fixture file. Features must include a ``_layer`` column.""" 311 full_gdf = gpd.read_file(str(path)) 312 if "_layer" not in full_gdf.columns: 313 raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column") 314 315 gdfs: list[gpd.GeoDataFrame] = [] 316 for layer_name, cfg in _LAYER_CONFIGS.items(): 317 rows = full_gdf[full_gdf["_layer"] == layer_name].copy() 318 if rows.empty: 319 continue 320 name_col: str = cfg["name_col"] 321 if name_col not in rows.columns: 322 continue 323 rows[_NAME_COL] = rows[name_col].astype(str) 324 rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 325 rows = rows.to_crs("EPSG:4326") 326 gdfs.append(rows) 327 328 if not gdfs: 329 raise ValueError(f"No matching BD-CARTO features found in {path}") 330 331 combined = pd.concat(gdfs, ignore_index=True) 332 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 333 334 def _load_from_directory(self) -> gpd.GeoDataFrame: 335 """Load and concatenate all configured layers from the data directory.""" 336 gdfs: list[gpd.GeoDataFrame] = [] 337 338 for layer_name, cfg in _LAYER_CONFIGS.items(): 339 gpkg_path = self._data_path / f"{layer_name}.gpkg" 340 if not gpkg_path.exists(): 341 continue 342 343 gdf = gpd.read_file(str(gpkg_path)) 344 345 name_col: str = cfg["name_col"] 346 if name_col not in gdf.columns: 347 continue 348 349 gdf[_NAME_COL] = gdf[name_col].astype(str) 350 gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1) 351 gdf["_layer"] = layer_name 352 gdf = gdf.to_crs("EPSG:4326") 353 354 gdfs.append(gdf) 355 356 if not gdfs: 357 raise ValueError( 358 f"No BD-CARTO GeoPackage files found in {self._data_path}. " 359 f"Run 'make download-data-ign' to download the dataset." 360 ) 361 362 combined = pd.concat(gdfs, ignore_index=True) 363 return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry") 364 365 def _build_name_index(self) -> None: 366 """Build normalized name → row indices lookup (with article-stripped variants).""" 367 assert self._gdf is not None 368 self._name_index = {} 369 for idx, name in enumerate(self._gdf[_NAME_COL]): 370 if not isinstance(name, str) or not name.strip() or name == "nan": 371 continue 372 for key in _index_keys(name): 373 if key not in self._name_index: 374 self._name_index[key] = [] 375 self._name_index[key].append(idx) 376 377 def _row_to_feature(self, idx: int) -> Feature: 378 """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84).""" 379 assert self._gdf is not None 380 row = self._gdf.iloc[idx] 381 382 name = str(row[_NAME_COL]) 383 normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown" 384 feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx) 385 386 geom = row.geometry 387 if geom is None or geom.is_empty: 388 geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]} 389 bbox = None 390 else: 391 geometry = mapping(geom) 392 bounds = geom.bounds 393 bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3]) 394 395 skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"} 396 properties: dict[str, Any] = { 397 "name": name, 398 "type": normalized_type, 399 "confidence": 1.0, 400 } 401 for col in self._gdf.columns: 402 if col not in skip_cols: 403 val = _to_json_value(row.get(col)) 404 if val is not None: 405 properties[col] = val 406 407 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 408 409 def search( 410 self, 411 name: str, 412 type: str | None = None, 413 max_results: int = 10, 414 ) -> list[Feature]: 415 """ 416 Search for geographic features by name. 417 418 Uses case-insensitive, accent-normalized exact matching with fuzzy 419 fallback when no exact match is found. 420 421 Args: 422 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 423 ``"Rhône"``). 424 type: Optional type hint for filtering. Supports both concrete types 425 (``"department"``, ``"city"``, ``"river"``) and category hints 426 (``"administrative"``, ``"water"``). 427 max_results: Maximum number of results. 428 429 Returns: 430 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 431 """ 432 self._ensure_loaded() 433 434 normalized = _normalize_name(name) 435 indices = self._name_index.get(normalized, []) 436 437 if not indices: 438 indices = self._fuzzy_search(normalized) 439 440 features = [self._row_to_feature(idx) for idx in indices] 441 442 if type is not None: 443 matching_types = get_matching_types(type) 444 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 445 if matching_types: 446 features = [f for f in features if f["properties"].get("type") in matching_types] 447 else: 448 features = [f for f in features if f["properties"].get("type") == type.lower()] 449 450 features = merge_segments(features) 451 452 return features[:max_results] 453 454 def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]: 455 """Token-overlap + token_set_ratio fuzzy search.""" 456 matches: list[tuple[int, float]] = [] 457 query_tokens = set(normalized.split()) 458 459 for indexed_name, indices in self._name_index.items(): 460 if query_tokens & set(indexed_name.split()): 461 score = fuzz.token_set_ratio(normalized, indexed_name) 462 if score >= threshold: 463 for idx in indices: 464 matches.append((idx, score)) 465 466 matches.sort(key=lambda x: x[1], reverse=True) 467 return [idx for idx, _ in matches] 468 469 def get_by_id(self, feature_id: str) -> Feature | None: 470 """ 471 Get a feature by its ``cleabs`` identifier or row index. 472 473 Args: 474 feature_id: ``cleabs`` string or integer row index. 475 476 Returns: 477 Matching GeoJSON Feature dict, or ``None``. 478 """ 479 self._ensure_loaded() 480 assert self._gdf is not None 481 482 if "cleabs" in self._gdf.columns: 483 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 484 if not matches.empty: 485 return self._row_to_feature(matches.index[0]) 486 487 try: 488 idx = int(feature_id) 489 if 0 <= idx < len(self._gdf): 490 return self._row_to_feature(idx) 491 except ValueError: 492 pass 493 494 return None 495 496 def get_available_types(self) -> list[str]: 497 """ 498 Return the union of all normalized types this source can return. 499 500 Returns: 501 Sorted list of type strings. 502 """ 503 types: set[str] = set() 504 for cfg in _LAYER_CONFIGS.values(): 505 if cfg.get("commune_flags"): 506 types.update({"city", "municipality"}) 507 elif cfg.get("fixed_type"): 508 types.add(cfg["fixed_type"]) 509 elif cfg.get("type_map"): 510 types.update(cfg["type_map"].values()) 511 return sorted(types)
Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.
Data must first be downloaded with make download-data-ign, which places
the GeoPackage files in data/bdcarto/.
All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.
Arguments:
- data_path: Directory containing the
.gpkgfiles (e.g."data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto") >>> results = source.search("Ardèche", type="department") >>> results = source.search("Lyon", type="city") >>> results = source.search("Rhône", type="river")
409 def search( 410 self, 411 name: str, 412 type: str | None = None, 413 max_results: int = 10, 414 ) -> list[Feature]: 415 """ 416 Search for geographic features by name. 417 418 Uses case-insensitive, accent-normalized exact matching with fuzzy 419 fallback when no exact match is found. 420 421 Args: 422 name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``, 423 ``"Rhône"``). 424 type: Optional type hint for filtering. Supports both concrete types 425 (``"department"``, ``"city"``, ``"river"``) and category hints 426 (``"administrative"``, ``"water"``). 427 max_results: Maximum number of results. 428 429 Returns: 430 List of GeoJSON Feature dicts in WGS84. Empty list if no match. 431 """ 432 self._ensure_loaded() 433 434 normalized = _normalize_name(name) 435 indices = self._name_index.get(normalized, []) 436 437 if not indices: 438 indices = self._fuzzy_search(normalized) 439 440 features = [self._row_to_feature(idx) for idx in indices] 441 442 if type is not None: 443 matching_types = get_matching_types(type) 444 logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types) 445 if matching_types: 446 features = [f for f in features if f["properties"].get("type") in matching_types] 447 else: 448 features = [f for f in features if f["properties"].get("type") == type.lower()] 449 450 features = merge_segments(features) 451 452 return features[:max_results]
Search for geographic features by name.
Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.
Arguments:
- name: Location name to search for (e.g.
"Ardèche","Lyon","Rhône"). - type: Optional type hint for filtering. Supports both concrete types
(
"department","city","river") and category hints ("administrative","water"). - max_results: Maximum number of results.
Returns:
List of GeoJSON Feature dicts in WGS84. Empty list if no match.
469 def get_by_id(self, feature_id: str) -> Feature | None: 470 """ 471 Get a feature by its ``cleabs`` identifier or row index. 472 473 Args: 474 feature_id: ``cleabs`` string or integer row index. 475 476 Returns: 477 Matching GeoJSON Feature dict, or ``None``. 478 """ 479 self._ensure_loaded() 480 assert self._gdf is not None 481 482 if "cleabs" in self._gdf.columns: 483 matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id] 484 if not matches.empty: 485 return self._row_to_feature(matches.index[0]) 486 487 try: 488 idx = int(feature_id) 489 if 0 <= idx < len(self._gdf): 490 return self._row_to_feature(idx) 491 except ValueError: 492 pass 493 494 return None
Get a feature by its cleabs identifier or row index.
Arguments:
- feature_id:
cleabsstring or integer row index.
Returns:
Matching GeoJSON Feature dict, or
None.
496 def get_available_types(self) -> list[str]: 497 """ 498 Return the union of all normalized types this source can return. 499 500 Returns: 501 Sorted list of type strings. 502 """ 503 types: set[str] = set() 504 for cfg in _LAYER_CONFIGS.values(): 505 if cfg.get("commune_flags"): 506 types.update({"city", "municipality"}) 507 elif cfg.get("fixed_type"): 508 types.add(cfg["fixed_type"]) 509 elif cfg.get("type_map"): 510 types.update(cfg["type_map"].values()) 511 return sorted(types)
Return the union of all normalized types this source can return.
Returns:
Sorted list of type strings.
14class CompositeDataSource: 15 """ 16 Fan-out datasource that delegates to an ordered list of GeoDataSource instances. 17 18 ``search`` queries every registered source and merges results in order. 19 20 ``get_by_id`` tries each source in order and returns the first hit. 21 22 ``get_available_types`` returns the union of all sources' types. 23 24 Args: 25 sources: One or more GeoDataSource instances. 26 27 Example: 28 >>> swiss = SwissNames3DSource("data/") 29 >>> ign = IGNBDTopoSource("data/") 30 >>> combo = CompositeDataSource(swiss, ign) 31 >>> results = combo.search("Geneva", type="city") 32 """ 33 34 def __init__(self, *sources: GeoDataSource) -> None: 35 if not sources: 36 raise ValueError("At least one datasource is required.") 37 self._sources: list[GeoDataSource] = list(sources) 38 39 # Public API (mirrors GeoDataSource protocol) 40 41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[Feature]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[Feature] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged 67 68 def get_by_id(self, feature_id: str) -> Feature | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None 83 84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
search queries every registered source and merges results in order.
get_by_id tries each source in order and returns the first hit.
get_available_types returns the union of all sources' types.
Arguments:
- sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/") >>> ign = IGNBDTopoSource("data/") >>> combo = CompositeDataSource(swiss, ign) >>> results = combo.search("Geneva", type="city")
41 def search( 42 self, 43 name: str, 44 type: str | None = None, 45 max_results: int = 10, 46 ) -> list[Feature]: 47 """ 48 Search all registered sources and return merged. 49 50 Args: 51 name: Location name to search for. 52 type: Optional type hint passed through to every source. 53 max_results: Maximum results per source. 54 55 Returns: 56 List of GeoJSON Feature dicts, merged from all sources. 57 """ 58 merged: list[Feature] = [] 59 60 for source in self._sources: 61 for feature in source.search(name, type=type, max_results=max_results): 62 merged.append(feature) 63 if len(merged) >= max_results: 64 return merged 65 66 return merged
Search all registered sources and return merged.
Arguments:
- name: Location name to search for.
- type: Optional type hint passed through to every source.
- max_results: Maximum results per source.
Returns:
List of GeoJSON Feature dicts, merged from all sources.
68 def get_by_id(self, feature_id: str) -> Feature | None: 69 """ 70 Get a feature by ID, trying each source in order. 71 72 Args: 73 feature_id: Unique identifier to look up. 74 75 Returns: 76 The first matching GeoJSON Feature dict, or None. 77 """ 78 for source in self._sources: 79 result = source.get_by_id(feature_id) 80 if result is not None: 81 return result 82 return None
Get a feature by ID, trying each source in order.
Arguments:
- feature_id: Unique identifier to look up.
Returns:
The first matching GeoJSON Feature dict, or None.
84 def get_available_types(self) -> list[str]: 85 """ 86 Return the union of all sources' available types, sorted. 87 88 Returns: 89 Sorted list of unique type strings. 90 """ 91 types: set[str] = set() 92 for source in self._sources: 93 types.update(source.get_available_types()) 94 return sorted(types)
Return the union of all sources' available types, sorted.
Returns:
Sorted list of unique type strings.
67class PostGISDataSource: 68 """ 69 Geographic data source backed by a PostGIS table. 70 71 The table must expose at minimum a name column, a geometry column, and 72 optionally a type column. The expected schema is: 73 74 .. code-block:: sql 75 76 CREATE TABLE <table> ( 77 id TEXT PRIMARY KEY, 78 name TEXT NOT NULL, 79 type TEXT, 80 geom GEOMETRY(Geometry, 4326) 81 ); 82 83 The ``type`` column may store either: 84 85 - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D), 86 pass ``type_map`` so the datasource can translate between raw values and 87 the normalized etter type names. 88 - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``), 89 leave ``type_map=None`` (default). 90 91 Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly 92 reprojection. 93 94 Args: 95 connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a 96 connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``). 97 When a string is provided the engine is created internally. 98 table: Fully-qualified table name, e.g. ``"public.swissnames3d"``. 99 name_column: Column used for name-based search (default ``"name"``). 100 type_column: Column used for type filtering. Pass ``None`` to disable 101 type filtering (default ``"type"``). 102 geometry_column: PostGIS geometry column (default ``"geom"``). 103 id_column: Primary-key column (default ``"id"``). 104 crs: CRS of the stored geometries as an EPSG string. Defaults to 105 ``"EPSG:4326"`` (no reprojection). 106 type_map: Optional mapping from **normalized etter type names** to 107 **lists of raw type column values** present in the database. 108 This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP`` 109 and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be 110 passed directly:: 111 112 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 113 source = PostGISDataSource( 114 engine, 115 table="public.swissnames3d", 116 type_map=OBJEKTART_TYPE_MAP, 117 ) 118 119 When ``type_map`` is provided the datasource: 120 121 - Translates raw DB values → normalized types in returned features. 122 - Translates user type hints → raw DB values in SQL ``WHERE`` clauses. 123 - Returns normalized type names from ``get_available_types()``. 124 125 When ``None`` (default) the stored values are used as-is. 126 fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for 127 fuzzy fallback search when no exact ``ILIKE`` match is found. 128 129 Example: unmodified SwissNames3D table:: 130 131 from sqlalchemy import create_engine 132 from etter.datasources import PostGISDataSource 133 from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP 134 135 engine = create_engine(...) 136 source = PostGISDataSource( 137 engine, 138 table="public.swissnames3d", 139 type_map=OBJEKTART_TYPE_MAP, 140 ) 141 results = source.search("Lac Léman", type="lake") 142 """ 143 144 def __init__( 145 self, 146 connection: str | Engine, 147 table: str, 148 name_column: str = "name", 149 type_column: str | None = "type", 150 geometry_column: str = "geom", 151 id_column: str = "id", 152 crs: str = "EPSG:4326", 153 type_map: TypeMap | None = None, 154 fuzzy_threshold: float = 0.65, 155 ) -> None: 156 sa = _require_sqlalchemy() 157 158 if isinstance(connection, str): 159 self._engine = sa.create_engine(connection) 160 else: 161 self._engine = connection 162 163 try: 164 with self._engine.connect() as conn: 165 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 166 except Exception as exc: 167 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 168 169 self._table = table 170 self._name_col = name_column 171 self._type_col = type_column 172 self._geom_col = geometry_column 173 self._id_col = id_column 174 self._crs = crs 175 self._fuzzy_threshold = fuzzy_threshold 176 177 # Build bidirectional lookup structures from the user-supplied map. 178 if type_map: 179 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 180 self._raw_to_normalized: dict[str, str] = { 181 raw: normalized for normalized, raws in type_map.items() for raw in raws 182 } 183 else: 184 self._normalized_to_raw = {} 185 self._raw_to_normalized = {} 186 187 self._trgm_available: bool | None = None 188 self._unaccent_available: bool | None = None 189 190 def _get_connection(self) -> Connection: 191 """Return a SQLAlchemy connection from the engine.""" 192 return self._engine.connect() 193 194 def _check_trgm(self, conn: Connection) -> bool: 195 """Return True if pg_trgm extension is available in the database.""" 196 if self._trgm_available is not None: 197 return self._trgm_available 198 sa = _require_sqlalchemy() 199 try: 200 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'")) 201 self._trgm_available = result.fetchone() is not None 202 except Exception: 203 logger.exception("Failed to check pg_trgm availability") 204 self._trgm_available = False 205 return self._trgm_available 206 207 def _check_unaccent(self, conn: Connection) -> bool: 208 """Return True if the unaccent extension is available in the database.""" 209 if self._unaccent_available is not None: 210 return self._unaccent_available 211 sa = _require_sqlalchemy() 212 try: 213 result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'")) 214 self._unaccent_available = result.fetchone() is not None 215 except Exception: 216 logger.exception("Failed to check unaccent availability") 217 self._unaccent_available = False 218 return self._unaccent_available 219 220 def _normalize_type(self, raw_type: str | None) -> str | None: 221 """Translate a raw DB type value to its normalized etter name. 222 223 If no type_map was supplied the value is returned unchanged. 224 """ 225 if raw_type is None: 226 return None 227 return self._raw_to_normalized.get(raw_type, raw_type) 228 229 def _row_to_feature(self, row: Row) -> Feature: 230 """Convert a SQLAlchemy Row to a GeoJSON Feature dict.""" 231 feature_id = str(row.id) 232 name = str(row.name) 233 raw_type = getattr(row, "type", None) 234 normalized_type = self._normalize_type(raw_type) 235 236 geojson_str = row.geojson 237 if geojson_str: 238 geometry = json.loads(geojson_str) 239 else: 240 geometry = {"type": "Point", "coordinates": [0, 0]} 241 242 bbox = _bbox_from_geojson(geometry) 243 244 properties: dict[str, Any] = { 245 "name": name, 246 "type": normalized_type, 247 "confidence": 1.0, 248 } 249 250 return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox) 251 252 def _build_select_columns(self) -> str: 253 """Build the SELECT column list as a SQL fragment.""" 254 type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type" 255 if self._crs.upper() != "EPSG:4326": 256 geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson" 257 else: 258 geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson" 259 return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}" 260 261 def search( 262 self, 263 name: str, 264 type: str | None = None, 265 max_results: int = 10, 266 ) -> list[Feature]: 267 """ 268 Search for geographic features by name. 269 270 Uses a three-step cascade, stopping as soon as any step returns results: 271 272 1. **Normalized exact match** 273 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 274 3. **ILIKE substring** 275 276 ``merge_segments`` is applied after all rows are fetched so that 277 multi-segment linestrings (rivers, roads) are merged before the 278 ``max_results`` cap is applied. 279 280 Args: 281 name: Location name to search for. 282 type: Optional type hint for filtering results. 283 max_results: Maximum number of results to return. 284 285 Returns: 286 List of matching GeoJSON Feature dicts in WGS84. 287 """ 288 sa = _require_sqlalchemy() 289 cols = self._build_select_columns() 290 291 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 292 type_filter_values: list[str] | None = None 293 if type is not None and self._type_col is not None: 294 matching_types = get_matching_types(type) 295 concrete_types = matching_types if matching_types else [type.lower()] 296 if self._normalized_to_raw: 297 raw_values: list[str] = [] 298 for t in concrete_types: 299 raw_values.extend(self._normalized_to_raw.get(t, [t])) 300 type_filter_values = raw_values if raw_values else concrete_types 301 else: 302 type_filter_values = concrete_types 303 304 # Fetch more rows than requested so that merge_segments has the full 305 # set of segments to work with. Without this, a SQL LIMIT applied 306 # *before* merging would only return a partial set of linestring 307 # segments, producing incorrect / truncated geometries. 308 # We cap the internal limit at 2000 to avoid unbounded queries. 309 internal_limit = min(max(max_results * 20, 100), 2000) 310 311 with self._get_connection() as conn: 312 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 313 314 if not features: 315 with self._get_connection() as conn: 316 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 317 318 if not features: 319 with self._get_connection() as conn: 320 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 321 322 features = merge_segments(features) 323 return features[:max_results] 324 325 def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]: 326 """Return a WHERE clause fragment and bind params for type filtering.""" 327 if not values or self._type_col is None: 328 return "", {} 329 placeholders = ", ".join(f":type_{i}" for i in range(len(values))) 330 clause = f" AND {self._type_col} IN ({placeholders})" 331 params = {f"type_{i}": v for i, v in enumerate(values)} 332 return clause, params 333 334 def _search_normalized( 335 self, 336 conn: Connection, 337 sa: types.ModuleType, 338 cols: str, 339 name: str, 340 type_filter: list[str] | None, 341 fetch_limit: int, 342 ) -> list[Feature]: 343 """ 344 Exact accent- and case-insensitive search. 345 346 Accent normalization (NFD decomposition + diacritic strip) is done in 347 Python before the query is sent to the DB. 348 """ 349 type_clause, type_params = self._type_filter_sql(type_filter) 350 name_expr = f"lower({self._name_col})" 351 if self._check_unaccent(conn): 352 name_expr = f"unaccent({name_expr})" 353 sql = sa.text( 354 f"SELECT {cols} FROM {self._table} " # noqa: S608 355 f"WHERE {name_expr} = :query{type_clause} " 356 f"LIMIT :limit" 357 ) 358 params: dict[str, Any] = { 359 "query": _normalize_name(name), 360 "limit": fetch_limit, 361 **type_params, 362 } 363 try: 364 result = conn.execute(sql, params) 365 return [self._row_to_feature(row) for row in result] 366 except Exception: 367 logger.exception("Normalized search failed for %r", name) 368 return [] 369 370 def _search_ilike( 371 self, 372 conn: Connection, 373 sa: types.ModuleType, 374 cols: str, 375 name: str, 376 type_filter: list[str] | None, 377 fetch_limit: int, 378 ) -> list[Feature]: 379 """Case-insensitive substring fallback using ``ILIKE '%name%'``. 380 381 When the ``unaccent`` extension is available, both the stored name column 382 and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches 383 ``"Rhône"``. Without ``unaccent``, standard ILIKE is used (case-insensitive 384 only). 385 """ 386 type_clause, type_params = self._type_filter_sql(type_filter) 387 normalized = _normalize_name(name) 388 if self._check_unaccent(conn): 389 name_expr = f"unaccent(lower({self._name_col}))" 390 pattern = f"%{normalized}%" 391 else: 392 name_expr = self._name_col 393 pattern = f"%{name}%" 394 sql = sa.text( 395 f"SELECT {cols} FROM {self._table} " # noqa: S608 396 f"WHERE {name_expr} ILIKE :pattern{type_clause} " 397 f"LIMIT :limit" 398 ) 399 params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params} 400 try: 401 result = conn.execute(sql, params) 402 return [self._row_to_feature(row) for row in result] 403 except Exception: 404 logger.exception("ILIKE search failed for %r", name) 405 return [] 406 407 def _search_fuzzy( 408 self, 409 conn: Connection, 410 sa: types.ModuleType, 411 cols: str, 412 name: str, 413 type_filter: list[str] | None, 414 fetch_limit: int, 415 ) -> list[Feature]: 416 """Fuzzy fallback using pg_trgm similarity (if extension is available).""" 417 if not self._check_trgm(conn): 418 logger.warning( 419 "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;" 420 ) 421 return [] 422 normalized_query = _normalize_name(name) 423 if self._check_unaccent(conn): 424 name_expr = f"unaccent(lower({self._name_col}))" 425 else: 426 logger.warning( 427 "unaccent extension not available. Accent-insensitive fuzzy search degraded. " 428 "Install it with: CREATE EXTENSION unaccent;" 429 ) 430 name_expr = f"lower({self._name_col})" 431 type_clause, type_params = self._type_filter_sql(type_filter) 432 sql = sa.text( 433 f"SELECT {cols} FROM {self._table} " # noqa: S608 434 f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} " 435 f"ORDER BY word_similarity({name_expr}, :query) DESC " 436 f"LIMIT :limit" 437 ) 438 params: dict[str, Any] = { 439 "query": normalized_query, 440 "threshold": self._fuzzy_threshold, 441 "limit": fetch_limit, 442 **type_params, 443 } 444 try: 445 result = conn.execute(sql, params) 446 return [self._row_to_feature(row) for row in result] 447 except Exception: 448 logger.exception("Fuzzy search failed for %r", name) 449 return [] 450 451 def get_by_id(self, feature_id: str) -> Feature | None: 452 """ 453 Get a specific feature by its unique identifier. 454 455 Args: 456 feature_id: Value of the ``id`` column. 457 458 Returns: 459 The matching GeoJSON Feature dict, or ``None`` if not found. 460 """ 461 sa = _require_sqlalchemy() 462 cols = self._build_select_columns() 463 sql = sa.text( 464 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 465 ) 466 with self._get_connection() as conn: 467 try: 468 result = conn.execute(sql, {"id": feature_id}) 469 row = result.fetchone() 470 return self._row_to_feature(row) if row else None 471 except Exception: 472 logger.exception("get_by_id failed for %r", feature_id) 473 return None 474 475 def get_available_types(self) -> list[str]: 476 """ 477 Return the distinct ``type`` values present in the table. 478 479 Returns: 480 Sorted list of concrete type strings, or an empty list if the table 481 has no type column. 482 """ 483 if self._type_col is None: 484 return [] 485 sa = _require_sqlalchemy() 486 sql = sa.text( 487 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 488 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 489 ) 490 with self._get_connection() as conn: 491 try: 492 result = conn.execute(sql) 493 raw_types = [row.type for row in result] 494 except Exception: 495 logger.exception("get_available_types failed") 496 return [] 497 498 normalized = {self._normalize_type(t) for t in raw_types if t} 499 return sorted(t for t in normalized if t)
Geographic data source backed by a PostGIS table.
The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:
CREATE TABLE <table> (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT,
geom GEOMETRY(Geometry, 4326)
);
The type column may store either:
- Raw dataset values (e.g.
"See","Berg"for SwissNames3D), passtype_mapso the datasource can translate between raw values and the normalized etter type names. - Already-normalized values (e.g.
"lake","mountain"), leavetype_map=None(default).
Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly
reprojection.
Arguments:
- connection: A SQLAlchemy
~sqlalchemy.engine.Engineor a connection URL string (e.g."postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally. - table: Fully-qualified table name, e.g.
"public.swissnames3d". - name_column: Column used for name-based search (default
"name"). - type_column: Column used for type filtering. Pass
Noneto disable type filtering (default"type"). - geometry_column: PostGIS geometry column (default
"geom"). - id_column: Primary-key column (default
"id"). - crs: CRS of the stored geometries as an EPSG string. Defaults to
"EPSG:4326"(no reprojection). type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as
SwissNames3DSource.OBJEKTART_TYPE_MAPandIGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP source = PostGISDataSource( engine, table="public.swissnames3d", type_map=OBJEKTART_TYPE_MAP, )When
type_mapis provided the datasource:- Translates raw DB values → normalized types in returned features.
- Translates user type hints → raw DB values in SQL
WHEREclauses. - Returns normalized type names from
get_available_types().
When
None(default) the stored values are used as-is.- fuzzy_threshold: Minimum
pg_trgmsimilarity score (0-1) used for fuzzy fallback search when no exactILIKEmatch is found.
Example: unmodified SwissNames3D table::
from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
engine = create_engine(...)
source = PostGISDataSource(
engine,
table="public.swissnames3d",
type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
144 def __init__( 145 self, 146 connection: str | Engine, 147 table: str, 148 name_column: str = "name", 149 type_column: str | None = "type", 150 geometry_column: str = "geom", 151 id_column: str = "id", 152 crs: str = "EPSG:4326", 153 type_map: TypeMap | None = None, 154 fuzzy_threshold: float = 0.65, 155 ) -> None: 156 sa = _require_sqlalchemy() 157 158 if isinstance(connection, str): 159 self._engine = sa.create_engine(connection) 160 else: 161 self._engine = connection 162 163 try: 164 with self._engine.connect() as conn: 165 conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1")) 166 except Exception as exc: 167 raise ValueError(f"Failed to connect to database or access table {table!r}") from exc 168 169 self._table = table 170 self._name_col = name_column 171 self._type_col = type_column 172 self._geom_col = geometry_column 173 self._id_col = id_column 174 self._crs = crs 175 self._fuzzy_threshold = fuzzy_threshold 176 177 # Build bidirectional lookup structures from the user-supplied map. 178 if type_map: 179 self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()} 180 self._raw_to_normalized: dict[str, str] = { 181 raw: normalized for normalized, raws in type_map.items() for raw in raws 182 } 183 else: 184 self._normalized_to_raw = {} 185 self._raw_to_normalized = {} 186 187 self._trgm_available: bool | None = None 188 self._unaccent_available: bool | None = None
261 def search( 262 self, 263 name: str, 264 type: str | None = None, 265 max_results: int = 10, 266 ) -> list[Feature]: 267 """ 268 Search for geographic features by name. 269 270 Uses a three-step cascade, stopping as soon as any step returns results: 271 272 1. **Normalized exact match** 273 2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended) 274 3. **ILIKE substring** 275 276 ``merge_segments`` is applied after all rows are fetched so that 277 multi-segment linestrings (rivers, roads) are merged before the 278 ``max_results`` cap is applied. 279 280 Args: 281 name: Location name to search for. 282 type: Optional type hint for filtering results. 283 max_results: Maximum number of results to return. 284 285 Returns: 286 List of matching GeoJSON Feature dicts in WGS84. 287 """ 288 sa = _require_sqlalchemy() 289 cols = self._build_select_columns() 290 291 # Resolve type filter to the raw DB values to use in the SQL WHERE clause. 292 type_filter_values: list[str] | None = None 293 if type is not None and self._type_col is not None: 294 matching_types = get_matching_types(type) 295 concrete_types = matching_types if matching_types else [type.lower()] 296 if self._normalized_to_raw: 297 raw_values: list[str] = [] 298 for t in concrete_types: 299 raw_values.extend(self._normalized_to_raw.get(t, [t])) 300 type_filter_values = raw_values if raw_values else concrete_types 301 else: 302 type_filter_values = concrete_types 303 304 # Fetch more rows than requested so that merge_segments has the full 305 # set of segments to work with. Without this, a SQL LIMIT applied 306 # *before* merging would only return a partial set of linestring 307 # segments, producing incorrect / truncated geometries. 308 # We cap the internal limit at 2000 to avoid unbounded queries. 309 internal_limit = min(max(max_results * 20, 100), 2000) 310 311 with self._get_connection() as conn: 312 features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit) 313 314 if not features: 315 with self._get_connection() as conn: 316 features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit) 317 318 if not features: 319 with self._get_connection() as conn: 320 features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit) 321 322 features = merge_segments(features) 323 return features[:max_results]
Search for geographic features by name.
Uses a three-step cascade, stopping as soon as any step returns results:
- Normalized exact match
- pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
- ILIKE substring
merge_segments is applied after all rows are fetched so that
multi-segment linestrings (rivers, roads) are merged before the
max_results cap is applied.
Arguments:
- name: Location name to search for.
- type: Optional type hint for filtering results.
- max_results: Maximum number of results to return.
Returns:
List of matching GeoJSON Feature dicts in WGS84.
451 def get_by_id(self, feature_id: str) -> Feature | None: 452 """ 453 Get a specific feature by its unique identifier. 454 455 Args: 456 feature_id: Value of the ``id`` column. 457 458 Returns: 459 The matching GeoJSON Feature dict, or ``None`` if not found. 460 """ 461 sa = _require_sqlalchemy() 462 cols = self._build_select_columns() 463 sql = sa.text( 464 f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1" # noqa: S608 465 ) 466 with self._get_connection() as conn: 467 try: 468 result = conn.execute(sql, {"id": feature_id}) 469 row = result.fetchone() 470 return self._row_to_feature(row) if row else None 471 except Exception: 472 logger.exception("get_by_id failed for %r", feature_id) 473 return None
Get a specific feature by its unique identifier.
Arguments:
- feature_id: Value of the
idcolumn.
Returns:
The matching GeoJSON Feature dict, or
Noneif not found.
475 def get_available_types(self) -> list[str]: 476 """ 477 Return the distinct ``type`` values present in the table. 478 479 Returns: 480 Sorted list of concrete type strings, or an empty list if the table 481 has no type column. 482 """ 483 if self._type_col is None: 484 return [] 485 sa = _require_sqlalchemy() 486 sql = sa.text( 487 f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} " # noqa: S608 488 f"WHERE {self._type_col} IS NOT NULL ORDER BY 1" 489 ) 490 with self._get_connection() as conn: 491 try: 492 result = conn.execute(sql) 493 raw_types = [row.type for row in result] 494 except Exception: 495 logger.exception("get_available_types failed") 496 return [] 497 498 normalized = {self._normalize_type(t) for t in raw_types if t} 499 return sorted(t for t in normalized if t)
Return the distinct type values present in the table.
Returns:
Sorted list of concrete type strings, or an empty list if the table has no type column.
111def apply_spatial_relation( 112 geometry: GeoJsonGeometry | list[GeoJsonGeometry], 113 relation: SpatialRelation, 114 buffer_config: BufferConfig | None = None, 115 spatial_config: SpatialRelationConfig | None = None, 116 geometry_format: GeometryFormat = "geojson", 117) -> GeoJsonGeometry | str: 118 """Transform one or more reference geometries according to a spatial relation. 119 120 A list of geometries is unioned into one before the transformation, so that 121 features split across multiple datasource records (e.g. a river in segments) 122 produce a single coherent search area. 123 124 When ``buffer_config.inferred`` is True (i.e. no explicit distance was 125 stated), the buffer distance is refined from the actual geometry area so 126 that small features receive small buffers and large regions receive large 127 ones. 128 129 Args: 130 geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84). 131 relation: Spatial relation to apply. 132 buffer_config: Required for buffer/directional relations. 133 spatial_config: Relation registry; defaults to the module-level singleton. 134 geometry_format: "geojson" (default), "wkt", or "wkb". 135 136 Returns: 137 Transformed geometry in the requested format. 138 """ 139 if isinstance(geometry, list): 140 if not geometry: 141 raise ValueError("geometry list must not be empty") 142 geom = unary_union([shape(g) for g in geometry]) 143 geom_dict: GeoJsonGeometry = mapping(geom) 144 else: 145 geom = shape(geometry) 146 geom_dict = geometry 147 148 # Refine inferred buffer distance from geometry area before dispatching. 149 if buffer_config is not None and buffer_config.inferred: 150 buffer_config = _refine_buffer_config(geom, buffer_config, relation) 151 152 if relation.category == "containment": 153 result = geom_dict 154 elif relation.category == "buffer": 155 if buffer_config is None: 156 raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config") 157 result = _apply_buffer(geom, buffer_config) 158 elif relation.category == "directional": 159 if buffer_config is None: 160 raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config") 161 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 162 relation_config = cfg.get_config(relation.relation) 163 direction = relation_config.direction_angle_degrees or 0 164 sector_angle = relation_config.sector_angle_degrees or 90 165 result = _apply_directional(geom, buffer_config, direction, sector_angle) 166 elif relation.category == "clipping": 167 cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG 168 relation_config = cfg.get_config(relation.relation) 169 clip_direction = relation_config.clip_direction or "north" 170 result = _apply_clipping(geom, clip_direction) 171 else: 172 raise ValueError(f"Unknown relation category: '{relation.category}'") 173 174 return convert_geometry(result, geometry_format)
Transform one or more reference geometries according to a spatial relation.
A list of geometries is unioned into one before the transformation, so that features split across multiple datasource records (e.g. a river in segments) produce a single coherent search area.
When buffer_config.inferred is True (i.e. no explicit distance was
stated), the buffer distance is refined from the actual geometry area so
that small features receive small buffers and large regions receive large
ones.
Arguments:
- geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
- relation: Spatial relation to apply.
- buffer_config: Required for buffer/directional relations.
- spatial_config: Relation registry; defaults to the module-level singleton.
- geometry_format: "geojson" (default), "wkt", or "wkb".
Returns:
Transformed geometry in the requested format.
11def convert_geometry(geometry: GeoJsonGeometry, fmt: GeometryFormat) -> GeoJsonGeometry | str: 12 """ 13 Convert a GeoJSON geometry dict to the requested format. 14 15 Args: 16 geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]}) 17 fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, 18 "wkb" returns a hex-encoded WKB string. 19 20 Returns: 21 The geometry in the requested format. 22 """ 23 if fmt == "geojson": 24 return geometry 25 geom = shape(geometry) 26 if fmt == "wkt": 27 return geom.wkt 28 return geom.wkb_hex
Convert a GeoJSON geometry dict to the requested format.
Arguments:
- geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
- fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:
The geometry in the requested format.
31def convert_feature_geometry(feature: Feature, fmt: GeometryFormat) -> Feature | dict: 32 """ 33 Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format. 34 35 Args: 36 feature: GeoJSON Feature dict with a "geometry" key. 37 fmt: Target geometry format. 38 39 Returns: 40 A new dict identical to the input except the "geometry" value is converted. 41 Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string). 42 """ 43 if fmt == "geojson": 44 return feature 45 return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}
Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
Arguments:
- feature: GeoJSON Feature dict with a "geometry" key.
- fmt: Target geometry format.
Returns:
A new dict identical to the input except the "geometry" value is converted. Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).