etter

etter - Natural language geographic query parsing using LLMs.

Parse location queries into structured geographic queries using LLM.

 1"""
 2etter - Natural language geographic query parsing using LLMs.
 3
 4Parse location queries into structured geographic queries using LLM.
 5"""
 6
 7from importlib.metadata import PackageNotFoundError, version
 8
 9try:
10    __version__ = version("etter")
11except PackageNotFoundError:  # running from source without install
12    __version__ = "unknown"
13
14# Main API
15# Exceptions
16# Datasources
17from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource
18from .exceptions import (
19    GeoFilterError,
20    LowConfidenceError,
21    LowConfidenceWarning,
22    NoReferenceLocationError,
23    ParsingError,
24    UnknownRelationError,
25    ValidationError,
26)
27from .geometry_format import convert_feature_geometry, convert_geometry
28
29# Models (for type hints and result access)
30from .models import (
31    BufferConfig,
32    ConfidenceLevel,
33    ConfidenceScore,
34    GeometryFormat,
35    GeoQuery,
36    ReferenceLocation,
37    SpatialRelation,
38)
39from .parser import GeoFilterParser
40
41# Spatial operations
42from .spatial import apply_spatial_relation
43
44# Configuration
45from .spatial_config import RelationConfig, SpatialRelationConfig
46
47__all__ = [
48    # Main API
49    "GeoFilterParser",
50    # Models
51    "GeoQuery",
52    "SpatialRelation",
53    "ReferenceLocation",
54    "BufferConfig",
55    "ConfidenceScore",
56    "ConfidenceLevel",
57    "GeometryFormat",
58    # Configuration
59    "SpatialRelationConfig",
60    "RelationConfig",
61    # Exceptions
62    "GeoFilterError",
63    "ParsingError",
64    "ValidationError",
65    "NoReferenceLocationError",
66    "UnknownRelationError",
67    "LowConfidenceError",
68    "LowConfidenceWarning",
69    # Datasources
70    "GeoDataSource",
71    "SwissNames3DSource",
72    "IGNBDCartoSource",
73    "CompositeDataSource",
74    "PostGISDataSource",
75    # Spatial
76    "apply_spatial_relation",
77    "convert_geometry",
78    "convert_feature_geometry",
79]
class GeoFilterParser:
 19class GeoFilterParser:
 20    """
 21    Main entry point for parsing natural language location queries.
 22
 23    This class orchestrates the entire parsing pipeline:
 24    1. Initialize LLM with structured output
 25    2. Build prompt with spatial relations and examples
 26    3. Parse query through LLM
 27    4. Validate and enrich with defaults
 28    5. Return structured GeoQuery
 29
 30    Examples:
 31        Basic usage:
 32        >>> from langchain.chat_models import init_chat_model
 33        >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
 34        >>> parser = GeoFilterParser(llm=llm)
 35        >>> result = parser.parse("restaurants in Lausanne")
 36        >>> print(result.reference_location.name)
 37        'Lausanne'
 38
 39        With strict confidence mode:
 40        >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
 41        >>> result = parser.parse("near the station")  # May raise LowConfidenceError
 42    """
 43
 44    def __init__(
 45        self,
 46        llm: BaseChatModel,
 47        spatial_config: SpatialRelationConfig | None = None,
 48        confidence_threshold: float = 0.6,
 49        strict_mode: bool = False,
 50        include_examples: bool = True,
 51        datasource: GeoDataSource | None = None,
 52        additional_instructions: str | None = None,
 53    ):
 54        """
 55        Initialize the parser.
 56
 57        Args:
 58            llm: LangChain LLM instance (required).
 59            spatial_config: Spatial relation configuration. If None, uses defaults
 60            confidence_threshold: Minimum confidence to accept (0-1)
 61            strict_mode: If True, raise error on low confidence. If False, warn only
 62            include_examples: Whether to include few-shot examples in prompt
 63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
 64                       about the concrete types available in that datasource for better type inference.
 65            additional_instructions: Free-form text injected as a system message after the main
 66                       system prompt and before few-shot examples. Use this to add caller-specific
 67                       rules such as region-specific endonyms, domain aliases, or
 68                       organization-specific place names without forking the default prompt.
 69
 70        Example:
 71            >>> from langchain.chat_models import init_chat_model
 72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
 73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
 74            >>> datasource = SwissNames3DSource("data/")
 75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
 76        """
 77        self.llm = llm
 78
 79        # Initialize spatial config
 80        self.spatial_config = spatial_config or SpatialRelationConfig()
 81
 82        # Settings
 83        self.confidence_threshold = confidence_threshold
 84        self.strict_mode = strict_mode
 85        self.include_examples = include_examples
 86        self.datasource = datasource
 87        self.additional_instructions = additional_instructions
 88
 89        # Build structured LLM
 90        self.structured_llm = self._build_structured_llm()
 91
 92        # Build prompt template
 93        self.prompt = self._build_prompt()
 94
 95    def _build_structured_llm(self):
 96        """Create LLM with structured output using Pydantic model."""
 97
 98        return self.llm.with_structured_output(
 99            GeoQuery,
100            method="function_calling",  # Use function_calling for broader schema support
101            include_raw=True,  # For error debugging
102        )
103
104    def _build_prompt(self) -> ChatPromptTemplate:
105        """Build prompt template with spatial relations, examples, and available types."""
106        available_types = None
107        if self.datasource is not None:
108            available_types = self.datasource.get_available_types()
109
110        return build_prompt_template(
111            spatial_config=self.spatial_config,
112            include_examples=self.include_examples,
113            available_types=available_types,
114            additional_instructions=self.additional_instructions,
115        )
116
117    def _unpack_response(self, response) -> GeoQuery:
118        """Extract and validate the GeoQuery from a structured-LLM response."""
119        parsed = response.get("parsed") if isinstance(response, dict) else response
120
121        if parsed is None:
122            raw = response.get("raw", "") if isinstance(response, dict) else ""
123            error = response.get("parsing_error") if isinstance(response, dict) else None
124            raise ParsingError(
125                message="Failed to parse query into structured format. "
126                "LLM may have returned invalid JSON or missed required fields.",
127                raw_response=str(raw),
128                original_error=error,
129            )
130
131        assert isinstance(parsed, GeoQuery), "Parsed result must be GeoQuery"
132        return parsed
133
134    def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery:
135        """Set original_query and run the validation pipeline."""
136        geo_query.original_query = query
137
138        return validate_query(
139            geo_query,
140            self.spatial_config,
141            confidence_threshold=self.confidence_threshold,
142            strict_mode=self.strict_mode,
143        )
144
145    def parse(self, query: str) -> GeoQuery:
146        """
147        Parse a natural language location query into structured format.
148
149        This is the main method for parsing queries. It:
150        1. Invokes the LLM with structured output
151        2. Validates the spatial relation is registered
152        3. Enriches with default parameters
153        4. Checks confidence threshold
154
155        Args:
156            query: Natural language query in any language
157
158        Returns:
159            GeoQuery: Structured query representation with confidence scores
160
161        Raises:
162            ParsingError: If LLM fails to parse query into valid structure
163            ValidationError: If parsed query fails business logic validation
164            UnknownRelationError: If spatial relation is not registered
165            LowConfidenceError: If confidence below threshold (strict mode only)
166
167        Warns:
168            LowConfidenceWarning: If confidence below threshold (permissive mode)
169
170        Examples:
171            Simple containment query:
172            >>> result = parser.parse("in Bern")
173            >>> result.reference_location.name
174            'Bern'
175            >>> result.spatial_relation.relation
176            'in'
177
178            Buffer query:
179            >>> result = parser.parse("near Lake Geneva")
180            >>> result.spatial_relation.relation
181            'near'
182            >>> result.buffer_config.distance_m
183            5000
184
185            Directional query:
186            >>> result = parser.parse("north of Lausanne")
187            >>> result.spatial_relation.relation
188            'north_of'
189            >>> result.reference_location.name
190            'Lausanne'
191
192            Multilingual:
193            >>> result = parser.parse("près de Genève")
194            >>> result.spatial_relation.relation
195            'near'
196            >>> result.reference_location.name
197            'Genève'
198        """
199        formatted_messages = self.prompt.format_messages(query=query)
200
201        try:
202            response = self.structured_llm.invoke(formatted_messages)
203        except Exception as e:
204            raise ParsingError(
205                message=f"LLM invocation failed: {str(e)}",
206                raw_response="",
207                original_error=e,
208            ) from e
209
210        return self._finalize(self._unpack_response(response), query)
211
212    async def aparse(self, query: str) -> GeoQuery:
213        """
214        Asynchronously parse a natural language location query into structured format.
215
216        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
217        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
218        blocking. Validation is synchronous and runs after the LLM call.
219        """
220        formatted_messages = self.prompt.format_messages(query=query)
221
222        try:
223            response = await self.structured_llm.ainvoke(formatted_messages)
224        except Exception as e:
225            raise ParsingError(
226                message=f"LLM invocation failed: {str(e)}",
227                raw_response="",
228                original_error=e,
229            ) from e
230
231        return self._finalize(self._unpack_response(response), query)
232
233    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
234        """
235        Parse a natural language location query with streaming reasoning and results.
236
237        This method provides real-time feedback during the parsing process by yielding
238        intermediate reasoning steps and the final GeoQuery result. This is useful for
239        providing users with transparency into the LLM's decision-making process and
240        for building responsive UIs.
241
242        The stream yields dictionaries with the following event types:
243        - {"type": "start"} - Stream started
244        - {"type": "reasoning", "content": str} - Intermediate processing steps
245        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
246        - {"type": "error", "content": str} - Errors encountered during processing
247        - {"type": "finish"} - Stream completed successfully
248
249        Args:
250            query: Natural language query in any language
251
252        Yields:
253            dict: Stream events with type and optional content fields
254
255        Raises:
256            ParsingError: If LLM fails to parse query into valid structure
257            ValidationError: If parsed query fails business logic validation
258            UnknownRelationError: If spatial relation is not registered
259            LowConfidenceError: If confidence below threshold (strict mode only)
260
261        Examples:
262            Basic usage with async iteration:
263            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
264            ...     if event["type"] == "reasoning":
265            ...         print(f"Reasoning: {event['content']}")
266            ...     elif event["type"] == "data-response":
267            ...         geo_query = event["content"]
268            ...         print(f"Location: {geo_query['reference_location']['name']}")
269            ...     elif event["type"] == "error":
270            ...         print(f"Error: {event['content']}")
271
272            Using in a FastAPI streaming endpoint:
273            >>> from fastapi.responses import StreamingResponse
274            >>> @app.get("/stream")
275            >>> async def stream_endpoint(q: str):
276            ...     async def event_stream():
277            ...         async for event in parser.parse_stream(q):
278            ...             yield f"data: {json.dumps(event)}\\n\\n"
279            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
280        """
281        try:
282            # Signal start of stream
283            yield {"type": "start"}
284
285            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
286            formatted_messages = self.prompt.format_messages(query=query)
287
288            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
289            try:
290                response = await self.structured_llm.ainvoke(formatted_messages)
291            except Exception as e:
292                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
293                raise ParsingError(
294                    message=f"LLM invocation failed: {str(e)}",
295                    raw_response="",
296                    original_error=e,
297                ) from e
298
299            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
300            try:
301                geo_query = self._unpack_response(response)
302            except ParsingError:
303                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
304                raise
305
306            if geo_query.confidence_breakdown.reasoning:
307                yield {
308                    "type": "reasoning",
309                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
310                }
311
312            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
313            geo_query = self._finalize(geo_query, query)
314
315            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
316            yield {"type": "data-response", "content": geo_query.model_dump()}
317
318            # Signal successful completion
319            yield {"type": "finish"}
320
321        except Exception as e:
322            # Emit error event before re-raising
323            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
324            raise
325
326    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
327        """
328        Parse multiple queries in batch.
329
330        Note: This is a simple sequential implementation.
331        For true parallelization, consider using async methods or ThreadPoolExecutor.
332
333        Args:
334            queries: List of natural language queries
335
336        Returns:
337            List of GeoQuery objects (same order as input)
338
339        Raises:
340            Same exceptions as parse() for any failing query
341        """
342        return [self.parse(query) for query in queries]
343
344    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
345        """
346        Get list of available spatial relations.
347
348        Args:
349            category: Optional filter by category ("containment", "buffer", "directional")
350
351        Returns:
352            List of relation names
353        """
354        return self.spatial_config.list_relations(category=category)
355
356    def describe_relation(self, relation_name: str) -> str:
357        """
358        Get description of a spatial relation.
359
360        Args:
361            relation_name: Name of the relation
362
363        Returns:
364            Human-readable description
365
366        Raises:
367            UnknownRelationError: If relation is not registered
368        """
369        config = self.spatial_config.get_config(relation_name)
370        return config.description

Main entry point for parsing natural language location queries.

This class orchestrates the entire parsing pipeline:

  1. Initialize LLM with structured output
  2. Build prompt with spatial relations and examples
  3. Parse query through LLM
  4. Validate and enrich with defaults
  5. Return structured GeoQuery
Examples:

Basic usage:

>>> from langchain.chat_models import init_chat_model
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
>>> parser = GeoFilterParser(llm=llm)
>>> result = parser.parse("restaurants in Lausanne")
>>> print(result.reference_location.name)
'Lausanne'

With strict confidence mode:

>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
>>> result = parser.parse("near the station")  # May raise LowConfidenceError
GeoFilterParser( llm: langchain_core.language_models.chat_models.BaseChatModel, spatial_config: SpatialRelationConfig | None = None, confidence_threshold: float = 0.6, strict_mode: bool = False, include_examples: bool = True, datasource: GeoDataSource | None = None, additional_instructions: str | None = None)
44    def __init__(
45        self,
46        llm: BaseChatModel,
47        spatial_config: SpatialRelationConfig | None = None,
48        confidence_threshold: float = 0.6,
49        strict_mode: bool = False,
50        include_examples: bool = True,
51        datasource: GeoDataSource | None = None,
52        additional_instructions: str | None = None,
53    ):
54        """
55        Initialize the parser.
56
57        Args:
58            llm: LangChain LLM instance (required).
59            spatial_config: Spatial relation configuration. If None, uses defaults
60            confidence_threshold: Minimum confidence to accept (0-1)
61            strict_mode: If True, raise error on low confidence. If False, warn only
62            include_examples: Whether to include few-shot examples in prompt
63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
64                       about the concrete types available in that datasource for better type inference.
65            additional_instructions: Free-form text injected as a system message after the main
66                       system prompt and before few-shot examples. Use this to add caller-specific
67                       rules such as region-specific endonyms, domain aliases, or
68                       organization-specific place names without forking the default prompt.
69
70        Example:
71            >>> from langchain.chat_models import init_chat_model
72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
74            >>> datasource = SwissNames3DSource("data/")
75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
76        """
77        self.llm = llm
78
79        # Initialize spatial config
80        self.spatial_config = spatial_config or SpatialRelationConfig()
81
82        # Settings
83        self.confidence_threshold = confidence_threshold
84        self.strict_mode = strict_mode
85        self.include_examples = include_examples
86        self.datasource = datasource
87        self.additional_instructions = additional_instructions
88
89        # Build structured LLM
90        self.structured_llm = self._build_structured_llm()
91
92        # Build prompt template
93        self.prompt = self._build_prompt()

Initialize the parser.

Arguments:
  • llm: LangChain LLM instance (required).
  • spatial_config: Spatial relation configuration. If None, uses defaults
  • confidence_threshold: Minimum confidence to accept (0-1)
  • strict_mode: If True, raise error on low confidence. If False, warn only
  • include_examples: Whether to include few-shot examples in prompt
  • datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
  • additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model
>>> from etter.datasources.swissnames3d import SwissNames3DSource
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
>>> datasource = SwissNames3DSource("data/")
>>> parser = GeoFilterParser(llm=llm, datasource=datasource)
llm
spatial_config
confidence_threshold
strict_mode
include_examples
datasource
additional_instructions
structured_llm
prompt
def parse(self, query: str) -> GeoQuery:
145    def parse(self, query: str) -> GeoQuery:
146        """
147        Parse a natural language location query into structured format.
148
149        This is the main method for parsing queries. It:
150        1. Invokes the LLM with structured output
151        2. Validates the spatial relation is registered
152        3. Enriches with default parameters
153        4. Checks confidence threshold
154
155        Args:
156            query: Natural language query in any language
157
158        Returns:
159            GeoQuery: Structured query representation with confidence scores
160
161        Raises:
162            ParsingError: If LLM fails to parse query into valid structure
163            ValidationError: If parsed query fails business logic validation
164            UnknownRelationError: If spatial relation is not registered
165            LowConfidenceError: If confidence below threshold (strict mode only)
166
167        Warns:
168            LowConfidenceWarning: If confidence below threshold (permissive mode)
169
170        Examples:
171            Simple containment query:
172            >>> result = parser.parse("in Bern")
173            >>> result.reference_location.name
174            'Bern'
175            >>> result.spatial_relation.relation
176            'in'
177
178            Buffer query:
179            >>> result = parser.parse("near Lake Geneva")
180            >>> result.spatial_relation.relation
181            'near'
182            >>> result.buffer_config.distance_m
183            5000
184
185            Directional query:
186            >>> result = parser.parse("north of Lausanne")
187            >>> result.spatial_relation.relation
188            'north_of'
189            >>> result.reference_location.name
190            'Lausanne'
191
192            Multilingual:
193            >>> result = parser.parse("près de Genève")
194            >>> result.spatial_relation.relation
195            'near'
196            >>> result.reference_location.name
197            'Genève'
198        """
199        formatted_messages = self.prompt.format_messages(query=query)
200
201        try:
202            response = self.structured_llm.invoke(formatted_messages)
203        except Exception as e:
204            raise ParsingError(
205                message=f"LLM invocation failed: {str(e)}",
206                raw_response="",
207                original_error=e,
208            ) from e
209
210        return self._finalize(self._unpack_response(response), query)

Parse a natural language location query into structured format.

This is the main method for parsing queries. It:

  1. Invokes the LLM with structured output
  2. Validates the spatial relation is registered
  3. Enriches with default parameters
  4. Checks confidence threshold
Arguments:
  • query: Natural language query in any language
Returns:

GeoQuery: Structured query representation with confidence scores

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Warns:

LowConfidenceWarning: If confidence below threshold (permissive mode)

Examples:

Simple containment query:

>>> result = parser.parse("in Bern")
>>> result.reference_location.name
'Bern'
>>> result.spatial_relation.relation
'in'

Buffer query:

>>> result = parser.parse("near Lake Geneva")
>>> result.spatial_relation.relation
'near'
>>> result.buffer_config.distance_m
5000

Directional query:

>>> result = parser.parse("north of Lausanne")
>>> result.spatial_relation.relation
'north_of'
>>> result.reference_location.name
'Lausanne'

Multilingual:

>>> result = parser.parse("près de Genève")
>>> result.spatial_relation.relation
'near'
>>> result.reference_location.name
'Genève'
async def aparse(self, query: str) -> GeoQuery:
212    async def aparse(self, query: str) -> GeoQuery:
213        """
214        Asynchronously parse a natural language location query into structured format.
215
216        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
217        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
218        blocking. Validation is synchronous and runs after the LLM call.
219        """
220        formatted_messages = self.prompt.format_messages(query=query)
221
222        try:
223            response = await self.structured_llm.ainvoke(formatted_messages)
224        except Exception as e:
225            raise ParsingError(
226                message=f"LLM invocation failed: {str(e)}",
227                raw_response="",
228                original_error=e,
229            ) from e
230
231        return self._finalize(self._unpack_response(response), query)

Asynchronously parse a natural language location query into structured format.

Async counterpart to parse(). Uses ainvoke on the structured LLM so it can be awaited inside event loops (e.g. FastAPI endpoints) without blocking. Validation is synchronous and runs after the LLM call.

async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
233    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
234        """
235        Parse a natural language location query with streaming reasoning and results.
236
237        This method provides real-time feedback during the parsing process by yielding
238        intermediate reasoning steps and the final GeoQuery result. This is useful for
239        providing users with transparency into the LLM's decision-making process and
240        for building responsive UIs.
241
242        The stream yields dictionaries with the following event types:
243        - {"type": "start"} - Stream started
244        - {"type": "reasoning", "content": str} - Intermediate processing steps
245        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
246        - {"type": "error", "content": str} - Errors encountered during processing
247        - {"type": "finish"} - Stream completed successfully
248
249        Args:
250            query: Natural language query in any language
251
252        Yields:
253            dict: Stream events with type and optional content fields
254
255        Raises:
256            ParsingError: If LLM fails to parse query into valid structure
257            ValidationError: If parsed query fails business logic validation
258            UnknownRelationError: If spatial relation is not registered
259            LowConfidenceError: If confidence below threshold (strict mode only)
260
261        Examples:
262            Basic usage with async iteration:
263            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
264            ...     if event["type"] == "reasoning":
265            ...         print(f"Reasoning: {event['content']}")
266            ...     elif event["type"] == "data-response":
267            ...         geo_query = event["content"]
268            ...         print(f"Location: {geo_query['reference_location']['name']}")
269            ...     elif event["type"] == "error":
270            ...         print(f"Error: {event['content']}")
271
272            Using in a FastAPI streaming endpoint:
273            >>> from fastapi.responses import StreamingResponse
274            >>> @app.get("/stream")
275            >>> async def stream_endpoint(q: str):
276            ...     async def event_stream():
277            ...         async for event in parser.parse_stream(q):
278            ...             yield f"data: {json.dumps(event)}\\n\\n"
279            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
280        """
281        try:
282            # Signal start of stream
283            yield {"type": "start"}
284
285            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
286            formatted_messages = self.prompt.format_messages(query=query)
287
288            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
289            try:
290                response = await self.structured_llm.ainvoke(formatted_messages)
291            except Exception as e:
292                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
293                raise ParsingError(
294                    message=f"LLM invocation failed: {str(e)}",
295                    raw_response="",
296                    original_error=e,
297                ) from e
298
299            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
300            try:
301                geo_query = self._unpack_response(response)
302            except ParsingError:
303                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
304                raise
305
306            if geo_query.confidence_breakdown.reasoning:
307                yield {
308                    "type": "reasoning",
309                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
310                }
311
312            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
313            geo_query = self._finalize(geo_query, query)
314
315            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
316            yield {"type": "data-response", "content": geo_query.model_dump()}
317
318            # Signal successful completion
319            yield {"type": "finish"}
320
321        except Exception as e:
322            # Emit error event before re-raising
323            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
324            raise

Parse a natural language location query with streaming reasoning and results.

This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.

The stream yields dictionaries with the following event types:

  • {"type": "start"} - Stream started
  • {"type": "reasoning", "content": str} - Intermediate processing steps
  • {"type": "data-response", "content": dict} - Final GeoQuery as JSON
  • {"type": "error", "content": str} - Errors encountered during processing
  • {"type": "finish"} - Stream completed successfully
Arguments:
  • query: Natural language query in any language
Yields:

dict: Stream events with type and optional content fields

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Examples:

Basic usage with async iteration:

>>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
...     if event["type"] == "reasoning":
...         print(f"Reasoning: {event['content']}")
...     elif event["type"] == "data-response":
...         geo_query = event["content"]
...         print(f"Location: {geo_query['reference_location']['name']}")
...     elif event["type"] == "error":
...         print(f"Error: {event['content']}")

Using in a FastAPI streaming endpoint:

>>> from fastapi.responses import StreamingResponse
>>> @app.get("/stream")
>>> async def stream_endpoint(q: str):
...     async def event_stream():
...         async for event in parser.parse_stream(q):
...             yield f"data: {json.dumps(event)}\n\n"
...     return StreamingResponse(event_stream(), media_type="text/event-stream")
def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
326    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
327        """
328        Parse multiple queries in batch.
329
330        Note: This is a simple sequential implementation.
331        For true parallelization, consider using async methods or ThreadPoolExecutor.
332
333        Args:
334            queries: List of natural language queries
335
336        Returns:
337            List of GeoQuery objects (same order as input)
338
339        Raises:
340            Same exceptions as parse() for any failing query
341        """
342        return [self.parse(query) for query in queries]

Parse multiple queries in batch.

Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.

Arguments:
  • queries: List of natural language queries
Returns:

List of GeoQuery objects (same order as input)

Raises:
  • Same exceptions as parse() for any failing query
def get_available_relations( self, category: Optional[Literal['containment', 'buffer', 'directional', 'clipping']] = None) -> list[str]:
344    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
345        """
346        Get list of available spatial relations.
347
348        Args:
349            category: Optional filter by category ("containment", "buffer", "directional")
350
351        Returns:
352            List of relation names
353        """
354        return self.spatial_config.list_relations(category=category)

Get list of available spatial relations.

Arguments:
  • category: Optional filter by category ("containment", "buffer", "directional")
Returns:

List of relation names

def describe_relation(self, relation_name: str) -> str:
356    def describe_relation(self, relation_name: str) -> str:
357        """
358        Get description of a spatial relation.
359
360        Args:
361            relation_name: Name of the relation
362
363        Returns:
364            Human-readable description
365
366        Raises:
367            UnknownRelationError: If relation is not registered
368        """
369        config = self.spatial_config.get_config(relation_name)
370        return config.description

Get description of a spatial relation.

Arguments:
  • relation_name: Name of the relation
Returns:

Human-readable description

Raises:
  • UnknownRelationError: If relation is not registered
class GeoQuery(pydantic.main.BaseModel):
123class GeoQuery(BaseModel):
124    """
125    Root model representing a parsed geographic query.
126    This is the main output structure returned by the parser.
127    """
128
129    query_type: Literal["simple", "compound", "split", "boolean"] = Field(
130        "simple",
131        description="Type of query. Phase 1 only supports 'simple'. "
132        "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations",
133    )
134    spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location")
135    reference_location: ReferenceLocation | None = Field(
136        None,
137        description="Reference location for the spatial query. "
138        "None when the query contains no named geographic location.",
139    )
140    buffer_config: BufferConfig | None = Field(
141        None,
142        description="Buffer configuration for buffer and directional relations. "
143        "Auto-generated with defaults by enrich_with_defaults() if not provided. "
144        "Required for 'near', 'around', 'north_of', etc. "
145        "Set to None for containment relations ('in').",
146    )
147    confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse")
148    original_query: str = Field(
149        default="",
150        description="Original query text exactly as provided by the user",
151    )
152
153    @model_validator(mode="after")
154    def validate_buffer_config_consistency(self) -> "GeoQuery":
155        """Validate buffer_config consistency with relation category."""
156        # Buffer and directional relations must have buffer_config
157        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
158            raise ValueError(
159                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
160            )
161
162        # Containment and clipping relations should not have buffer_config
163        if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None:
164            raise ValueError(
165                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
166                f"should not have buffer_config"
167            )
168
169        return self

Root model representing a parsed geographic query. This is the main output structure returned by the parser.

query_type: Literal['simple', 'compound', 'split', 'boolean'] = 'simple'

Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations

spatial_relation: SpatialRelation = PydanticUndefined

Spatial relationship to reference location

reference_location: ReferenceLocation | None = None

Reference location for the spatial query. None when the query contains no named geographic location.

buffer_config: BufferConfig | None = None

Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').

confidence_breakdown: ConfidenceScore = PydanticUndefined

Confidence scores for different aspects of the parse

original_query: str = ''

Original query text exactly as provided by the user

@model_validator(mode='after')
def validate_buffer_config_consistency(self) -> GeoQuery:
153    @model_validator(mode="after")
154    def validate_buffer_config_consistency(self) -> "GeoQuery":
155        """Validate buffer_config consistency with relation category."""
156        # Buffer and directional relations must have buffer_config
157        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
158            raise ValueError(
159                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
160            )
161
162        # Containment and clipping relations should not have buffer_config
163        if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None:
164            raise ValueError(
165                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
166                f"should not have buffer_config"
167            )
168
169        return self

Validate buffer_config consistency with relation category.

class SpatialRelation(pydantic.main.BaseModel):
101class SpatialRelation(BaseModel):
102    """A spatial relationship between target and reference."""
103
104    relation: str = Field(
105        description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', "
106        "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list."
107    )
108    category: RelationCategory = Field(
109        description="Category of spatial relation. "
110        "'containment' = exact boundary matching (in), "
111        "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), "
112        "'directional' = sector-based queries (north_of, south_of, east_of, west_of), "
113        "'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)"
114    )
115    explicit_distance: float | None = Field(
116        None,
117        description="Distance in meters if explicitly mentioned by user. "
118        "For example: 'within 5km' → 5000, 'within 500 meters' → 500. "
119        "Leave null if not explicitly stated.",
120    )

A spatial relationship between target and reference.

relation: str = PydanticUndefined

Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.

category: Literal['containment', 'buffer', 'directional', 'clipping'] = PydanticUndefined

Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), 'directional' = sector-based queries (north_of, south_of, east_of, west_of), 'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)

explicit_distance: float | None = None

Distance in meters if explicitly mentioned by user. For example: 'within 5km' → 5000, 'within 500 meters' → 500. Leave null if not explicitly stated.

class ReferenceLocation(pydantic.main.BaseModel):
41class ReferenceLocation(BaseModel):
42    """A geographic reference location extracted from the query."""
43
44    name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')")
45    # FIXME: enum ?
46    type: str | None = Field(
47        None,
48        description="Type hint for geographic feature (city, lake, mountain, canton, country, "
49        "train_station, airport, river, road, etc.). This is a HINT for ranking results, "
50        "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, "
51        "'Rhone' could be river or road), provide your best guess or leave null. "
52        "The datasource will return multiple types ranked by relevance.",
53    )
54    type_confidence: ConfidenceLevel | None = Field(
55        None,
56        description="Confidence in the type inference (0-1). High confidence (>0.8) when type is "
57        "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous "
58        "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, "
59        "'in X' → city/region, 'on X' → lake/mountain.",
60    )

A geographic reference location extracted from the query.

name: str = PydanticUndefined

Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')

type: str | None = None

Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.

type_confidence: Optional[Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]] = None

Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.

class BufferConfig(pydantic.main.BaseModel):
63class BufferConfig(BaseModel):
64    """Configuration for buffer-based spatial operations."""
65
66    distance_m: float = Field(
67        description="Buffer distance in meters. Positive values expand outward (proximity), "
68        "negative values erode inward (e.g., 'in the heart of'). "
69        "Examples: 5000 = 5km radius, -500 = 500m erosion"
70    )
71    buffer_from: Literal["center", "boundary"] = Field(
72        description="Buffer origin. 'center' = buffer from centroid point (for proximity), "
73        "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)"
74    )
75    ring_only: bool = Field(
76        False,
77        description="If True, exclude the reference feature itself to create a ring/donut shape. "
78        "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). "
79        "Only valid with buffer_from='boundary'.",
80    )
81    side: Literal["left", "right"] | None = Field(
82        None,
83        description="Side of a linear feature for one-sided buffer. "
84        "'left' = left side relative to line direction, 'right' = right side. "
85        "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().",
86    )
87    inferred: bool = Field(
88        True,
89        description="True if this configuration was inferred from relation defaults. "
90        "False if the user explicitly specified distance or buffer parameters.",
91    )
92
93    @model_validator(mode="after")
94    def validate_ring_only(self) -> "BufferConfig":
95        """Validate that ring_only is only used with boundary buffers."""
96        if self.ring_only and self.buffer_from == "center":
97            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
98        return self

Configuration for buffer-based spatial operations.

distance_m: float = PydanticUndefined

Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion

buffer_from: Literal['center', 'boundary'] = PydanticUndefined

Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)

ring_only: bool = False

If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.

side: Optional[Literal['left', 'right']] = None

Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().

inferred: bool = True

True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.

@model_validator(mode='after')
def validate_ring_only(self) -> BufferConfig:
93    @model_validator(mode="after")
94    def validate_ring_only(self) -> "BufferConfig":
95        """Validate that ring_only is only used with boundary buffers."""
96        if self.ring_only and self.buffer_from == "center":
97            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
98        return self

Validate that ring_only is only used with boundary buffers.

class ConfidenceScore(pydantic.main.BaseModel):
21class ConfidenceScore(BaseModel):
22    """Confidence scores for different aspects of the parsed query."""
23
24    overall: ConfidenceLevel = Field(
25        description="Overall confidence score for the entire query parse. "
26        "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain",
27    )
28    location_confidence: ConfidenceLevel = Field(
29        description="Confidence in correctly identifying the reference location",
30    )
31    relation_confidence: ConfidenceLevel = Field(
32        description="Confidence in correctly identifying the spatial relation",
33    )
34    reasoning: str | None = Field(
35        None,
36        description="Explanation for confidence scores. Always include reasoning for clarity and debugging. "
37        "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.",
38    )

Confidence scores for different aspects of the parsed query.

overall: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain

location_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the reference location

relation_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the spatial relation

reasoning: str | None = None

Explanation for confidence scores. Always include reasoning for clarity and debugging. For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.

ConfidenceLevel = typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]
GeometryFormat = typing.Literal['geojson', 'wkt', 'wkb']
class SpatialRelationConfig:
 41class SpatialRelationConfig:
 42    """
 43    Registry and configuration for spatial relations.
 44
 45    Manages built-in and custom spatial relations with their default parameters.
 46    """
 47
 48    def __init__(self):
 49        """Initialize with built-in spatial relations."""
 50        self.relations: dict[str, RelationConfig] = {}
 51        self._initialize_defaults()
 52
 53    def _initialize_defaults(self):
 54        """Register built-in spatial relations from ARCHITECTURE.md."""
 55
 56        # ===== CONTAINMENT RELATIONS =====
 57        self.register_relation(
 58            RelationConfig(
 59                name="in",
 60                category="containment",
 61                description="Feature is within the reference boundary",
 62            )
 63        )
 64
 65        # ===== BUFFER/PROXIMITY RELATIONS =====
 66        self.register_relation(
 67            RelationConfig(
 68                name="near",
 69                category="buffer",
 70                description="Proximity search with default 5km radius",
 71                default_distance_m=5000,
 72                buffer_from="center",
 73            )
 74        )
 75
 76        self.register_relation(
 77            RelationConfig(
 78                name="on_shores_of",
 79                category="buffer",
 80                description="Ring buffer around lake/water boundary, excluding the water body itself",
 81                default_distance_m=1000,
 82                buffer_from="boundary",
 83                ring_only=True,
 84            )
 85        )
 86
 87        self.register_relation(
 88            RelationConfig(
 89                name="along",
 90                category="buffer",
 91                description="Buffer following a linear feature like a river or road",
 92                default_distance_m=500,
 93                buffer_from="boundary",
 94            )
 95        )
 96
 97        self.register_relation(
 98            RelationConfig(
 99                name="left_bank",
100                category="buffer",
101                description="Left bank of a linear feature (river, road) relative to its direction/flow",
102                default_distance_m=500,
103                buffer_from="boundary",
104                side="left",
105            )
106        )
107
108        self.register_relation(
109            RelationConfig(
110                name="right_bank",
111                category="buffer",
112                description="Right bank of a linear feature (river, road) relative to its direction/flow",
113                default_distance_m=500,
114                buffer_from="boundary",
115                side="right",
116            )
117        )
118
119        self.register_relation(
120            RelationConfig(
121                name="in_the_heart_of",
122                category="buffer",
123                description="Central area excluding periphery (negative buffer - erosion)",
124                default_distance_m=-500,
125                buffer_from="boundary",
126            )
127        )
128
129        self.register_relation(
130            RelationConfig(
131                name="bordering",
132                category="buffer",
133                description="Thin ring just outside the reference boundary, for land-border adjacency queries (e.g. 'cities bordering Germany')",
134                default_distance_m=2000,
135                buffer_from="boundary",
136                ring_only=True,
137            )
138        )
139
140        # ===== CLIPPING RELATIONS =====
141        # Clip the reference geometry to a directional half-plane using bbox intersection.
142        # These answer "what is in the northern/southern/eastern/western portion of X?"
143        # as opposed to directional relations which answer "what is north/south/etc. of X?".
144        self.register_relation(
145            RelationConfig(
146                name="northern_part_of",
147                category="clipping",
148                description="Northern half of the reference geometry (bbox clip to upper half)",
149                clip_direction="north",
150            )
151        )
152
153        self.register_relation(
154            RelationConfig(
155                name="southern_part_of",
156                category="clipping",
157                description="Southern half of the reference geometry (bbox clip to lower half)",
158                clip_direction="south",
159            )
160        )
161
162        self.register_relation(
163            RelationConfig(
164                name="eastern_part_of",
165                category="clipping",
166                description="Eastern half of the reference geometry (bbox clip to right half)",
167                clip_direction="east",
168            )
169        )
170
171        self.register_relation(
172            RelationConfig(
173                name="western_part_of",
174                category="clipping",
175                description="Western half of the reference geometry (bbox clip to left half)",
176                clip_direction="west",
177            )
178        )
179
180        # ===== DIRECTIONAL RELATIONS =====
181        # All directional relations use consistent defaults:
182        # - Distance: 10km radius (default_distance_m=10000)
183        # - Sector: 90° angular wedge (sector_angle_degrees=90)
184        # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults)
185        # These defaults are applied automatically by enrich_with_defaults() for any directional query.
186        # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West)
187        self.register_relation(
188            RelationConfig(
189                name="north_of",
190                category="directional",
191                description="Directional sector north of reference",
192                default_distance_m=10000,
193                sector_angle_degrees=90,
194                direction_angle_degrees=0,
195            )
196        )
197
198        self.register_relation(
199            RelationConfig(
200                name="south_of",
201                category="directional",
202                description="Directional sector south of reference",
203                default_distance_m=10000,
204                sector_angle_degrees=90,
205                direction_angle_degrees=180,
206            )
207        )
208
209        self.register_relation(
210            RelationConfig(
211                name="east_of",
212                category="directional",
213                description="Directional sector east of reference",
214                default_distance_m=10000,
215                sector_angle_degrees=90,
216                direction_angle_degrees=90,
217            )
218        )
219
220        self.register_relation(
221            RelationConfig(
222                name="west_of",
223                category="directional",
224                description="Directional sector west of reference",
225                default_distance_m=10000,
226                sector_angle_degrees=90,
227                direction_angle_degrees=270,
228            )
229        )
230
231        # ===== DIAGONAL DIRECTIONAL RELATIONS =====
232        self.register_relation(
233            RelationConfig(
234                name="northeast_of",
235                category="directional",
236                description="Directional sector northeast of reference",
237                default_distance_m=10000,
238                sector_angle_degrees=90,
239                direction_angle_degrees=45,
240            )
241        )
242
243        self.register_relation(
244            RelationConfig(
245                name="southeast_of",
246                category="directional",
247                description="Directional sector southeast of reference",
248                default_distance_m=10000,
249                sector_angle_degrees=90,
250                direction_angle_degrees=135,
251            )
252        )
253
254        self.register_relation(
255            RelationConfig(
256                name="southwest_of",
257                category="directional",
258                description="Directional sector southwest of reference",
259                default_distance_m=10000,
260                sector_angle_degrees=90,
261                direction_angle_degrees=225,
262            )
263        )
264
265        self.register_relation(
266            RelationConfig(
267                name="northwest_of",
268                category="directional",
269                description="Directional sector northwest of reference",
270                default_distance_m=10000,
271                sector_angle_degrees=90,
272                direction_angle_degrees=315,
273            )
274        )
275
276    def register_relation(self, config: RelationConfig) -> None:
277        """Register a new spatial relation."""
278        self.relations[config.name] = config
279
280    def has_relation(self, name: str) -> bool:
281        """Check if a relation is registered."""
282        return name in self.relations
283
284    def get_config(self, name: str) -> RelationConfig:
285        """Get configuration for a relation. Raises UnknownRelationError if not found."""
286        if not self.has_relation(name):
287            raise UnknownRelationError(
288                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
289                relation_name=name,
290            )
291        return self.relations[name]
292
293    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
294        """List available relation names."""
295        if category is None:
296            return sorted(self.relations.keys())
297        return sorted(r.name for r in self.relations.values() if r.category == category)
298
299    def format_for_prompt(self) -> str:
300        """Format relations for inclusion in LLM prompt."""
301        lines = []
302
303        # Group by category
304        for category in get_args(RelationCategory):
305            category_relations = [r for r in self.relations.values() if r.category == category]
306            if not category_relations:
307                continue
308
309            lines.append(f"\n{category.upper()} RELATIONS:")
310
311            for rel in sorted(category_relations, key=lambda r: r.name):
312                # Build distance info
313                dist_info = ""
314                if rel.default_distance_m is not None:
315                    dist_str = f"{abs(rel.default_distance_m)}m"
316                    if rel.default_distance_m < 0:
317                        dist_info = f" (default: {dist_str} erosion)"
318                    else:
319                        dist_info = f" (default: {dist_str})"
320
321                # Build special flags
322                flags = []
323                if rel.ring_only:
324                    flags.append("ring buffer")
325                if rel.buffer_from:
326                    flags.append(f"from {rel.buffer_from}")
327                if rel.side:
328                    flags.append(f"{rel.side} side only")
329                flag_info = f" [{', '.join(flags)}]" if flags else ""
330
331                # Format line
332                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
333                lines.append(f"    {rel.description}")
334
335        # Add notes
336        lines.append("\nNOTES:")
337        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
338        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)")
339        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
340        lines.append("  • Clipping relations return a sub-area of the reference geometry (not a buffer outward)")
341
342        return "\n".join(lines)

Registry and configuration for spatial relations.

Manages built-in and custom spatial relations with their default parameters.

SpatialRelationConfig()
48    def __init__(self):
49        """Initialize with built-in spatial relations."""
50        self.relations: dict[str, RelationConfig] = {}
51        self._initialize_defaults()

Initialize with built-in spatial relations.

relations: dict[str, RelationConfig]
def register_relation(self, config: RelationConfig) -> None:
276    def register_relation(self, config: RelationConfig) -> None:
277        """Register a new spatial relation."""
278        self.relations[config.name] = config

Register a new spatial relation.

def has_relation(self, name: str) -> bool:
280    def has_relation(self, name: str) -> bool:
281        """Check if a relation is registered."""
282        return name in self.relations

Check if a relation is registered.

def get_config(self, name: str) -> RelationConfig:
284    def get_config(self, name: str) -> RelationConfig:
285        """Get configuration for a relation. Raises UnknownRelationError if not found."""
286        if not self.has_relation(name):
287            raise UnknownRelationError(
288                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
289                relation_name=name,
290            )
291        return self.relations[name]

Get configuration for a relation. Raises UnknownRelationError if not found.

def list_relations( self, category: Optional[Literal['containment', 'buffer', 'directional', 'clipping']] = None) -> list[str]:
293    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
294        """List available relation names."""
295        if category is None:
296            return sorted(self.relations.keys())
297        return sorted(r.name for r in self.relations.values() if r.category == category)

List available relation names.

def format_for_prompt(self) -> str:
299    def format_for_prompt(self) -> str:
300        """Format relations for inclusion in LLM prompt."""
301        lines = []
302
303        # Group by category
304        for category in get_args(RelationCategory):
305            category_relations = [r for r in self.relations.values() if r.category == category]
306            if not category_relations:
307                continue
308
309            lines.append(f"\n{category.upper()} RELATIONS:")
310
311            for rel in sorted(category_relations, key=lambda r: r.name):
312                # Build distance info
313                dist_info = ""
314                if rel.default_distance_m is not None:
315                    dist_str = f"{abs(rel.default_distance_m)}m"
316                    if rel.default_distance_m < 0:
317                        dist_info = f" (default: {dist_str} erosion)"
318                    else:
319                        dist_info = f" (default: {dist_str})"
320
321                # Build special flags
322                flags = []
323                if rel.ring_only:
324                    flags.append("ring buffer")
325                if rel.buffer_from:
326                    flags.append(f"from {rel.buffer_from}")
327                if rel.side:
328                    flags.append(f"{rel.side} side only")
329                flag_info = f" [{', '.join(flags)}]" if flags else ""
330
331                # Format line
332                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
333                lines.append(f"    {rel.description}")
334
335        # Add notes
336        lines.append("\nNOTES:")
337        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
338        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)")
339        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
340        lines.append("  • Clipping relations return a sub-area of the reference geometry (not a buffer outward)")
341
342        return "\n".join(lines)

Format relations for inclusion in LLM prompt.

@dataclass
class RelationConfig:
13@dataclass
14class RelationConfig:
15    """
16    Configuration for a single spatial relation.
17
18    Attributes:
19        name: Relation identifier (e.g., "in", "near", "north_of")
20        category: Type of spatial operation
21        description: Human-readable description for LLM prompt
22        default_distance_m: Default buffer distance in meters
23        buffer_from: Buffer origin
24        ring_only: Exclude reference feature to create ring buffer
25        sector_angle_degrees: Angular sector for directional queries
26        direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
27    """
28
29    name: str
30    category: RelationCategory
31    description: str
32    default_distance_m: float | None = None
33    buffer_from: Literal["center", "boundary"] | None = None
34    ring_only: bool = False
35    side: Literal["left", "right"] | None = None
36    sector_angle_degrees: float | None = None
37    direction_angle_degrees: float | None = None
38    clip_direction: Literal["north", "south", "east", "west"] | None = None

Configuration for a single spatial relation.

Attributes:
  • name: Relation identifier (e.g., "in", "near", "north_of")
  • category: Type of spatial operation
  • description: Human-readable description for LLM prompt
  • default_distance_m: Default buffer distance in meters
  • buffer_from: Buffer origin
  • ring_only: Exclude reference feature to create ring buffer
  • sector_angle_degrees: Angular sector for directional queries
  • direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
RelationConfig( name: str, category: Literal['containment', 'buffer', 'directional', 'clipping'], description: str, default_distance_m: float | None = None, buffer_from: Optional[Literal['center', 'boundary']] = None, ring_only: bool = False, side: Optional[Literal['left', 'right']] = None, sector_angle_degrees: float | None = None, direction_angle_degrees: float | None = None, clip_direction: Optional[Literal['north', 'south', 'east', 'west']] = None)
name: str
category: Literal['containment', 'buffer', 'directional', 'clipping']
description: str
default_distance_m: float | None = None
buffer_from: Optional[Literal['center', 'boundary']] = None
ring_only: bool = False
side: Optional[Literal['left', 'right']] = None
sector_angle_degrees: float | None = None
direction_angle_degrees: float | None = None
clip_direction: Optional[Literal['north', 'south', 'east', 'west']] = None
class GeoFilterError(builtins.Exception):
 7class GeoFilterError(Exception):
 8    """Base exception for all GeoFilter errors."""
 9
10    pass

Base exception for all GeoFilter errors.

class ParsingError(etter.GeoFilterError):
13class ParsingError(GeoFilterError):
14    """LLM failed to parse query into valid structure."""
15
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

LLM failed to parse query into valid structure.

ParsingError( message: str, raw_response: str = '', original_error: Exception | None = None)
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

Initialize parsing error.

Arguments:
  • message: Error description
  • raw_response: Raw response from LLM
  • original_error: Original exception that caused parsing failure
raw_response
original_error
class ValidationError(etter.GeoFilterError):
30class ValidationError(GeoFilterError):
31    """Structured output is valid but fails business logic validation."""
32
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Structured output is valid but fails business logic validation.

ValidationError(message: str, field: str | None = None, detail: str | None = None)
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
field
detail
class NoReferenceLocationError(etter.ValidationError):
47class NoReferenceLocationError(ValidationError):
48    """Query contains no named geographic reference location."""
49
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Query contains no named geographic reference location.

NoReferenceLocationError(message: str)
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
class UnknownRelationError(etter.ValidationError):
54class UnknownRelationError(ValidationError):
55    """Spatial relation is not registered in configuration."""
56
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Spatial relation is not registered in configuration.

UnknownRelationError(message: str, relation_name: str)
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Initialize unknown relation error.

Arguments:
  • message: Error description
  • relation_name: The unknown relation name
relation_name
class LowConfidenceError(etter.GeoFilterError):
69class LowConfidenceError(GeoFilterError):
70    """Query confidence is below threshold (strict mode)."""
71
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Query confidence is below threshold (strict mode).

LowConfidenceError(message: str, confidence: float, reasoning: str | None = None)
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Initialize low confidence error.

Arguments:
  • message: Error description
  • confidence: Confidence score (0-1)
  • reasoning: Optional explanation for low confidence
confidence
reasoning
class LowConfidenceWarning(builtins.UserWarning):
86class LowConfidenceWarning(UserWarning):
87    """Query confidence is below threshold (permissive mode)."""
88
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Query confidence is below threshold (permissive mode).

LowConfidenceWarning(confidence: float, message: str = '')
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Initialize low confidence warning.

Arguments:
  • confidence: Confidence score (0-1)
  • message: Warning message
confidence
class GeoDataSource(typing.Protocol):
14class GeoDataSource(Protocol):
15    """
16    Protocol for geographic data sources.
17
18    Implementations resolve location names to geographic features.
19    Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
20
21    Example of returned feature:
22        {
23            "type": "Feature",
24            "id": "uuid-123",
25            "geometry": {"type": "Point", "coordinates": [8.5, 47.3]},
26            "bbox": [8.4, 47.3, 8.6, 47.4],
27            "properties": {
28                "name": "Zürich",
29                "type": "city",
30                "confidence": 1.0,
31                ...
32            }
33        }
34    """
35
36    def search(
37        self,
38        name: str,
39        type: str | None = None,
40        max_results: int = 10,
41    ) -> list[Feature]:
42        """
43        Search for geographic features by name.
44
45        Args:
46            name: Location name to search for (e.g., "Lake Geneva", "Bern").
47            type: Optional type hint for filtering/ranking results.
48                  Examples: "lake", "city", "mountain", "canton", "river".
49                  When provided, matching types are ranked higher.
50            max_results: Maximum number of results to return.
51
52        Returns:
53            List of matching GeoJSON Feature dicts, ranked by relevance.
54            Returns empty list if no matches found.
55        """
56        ...
57
58    def get_by_id(self, feature_id: str) -> Feature | None:
59        """
60        Get a specific feature by its unique identifier.
61
62        Args:
63            feature_id: Unique identifier from the data source.
64
65        Returns:
66            The matching GeoJSON Feature dict, or None if not found.
67        """
68        ...
69
70    def get_available_types(self) -> list[str]:
71        """
72        Get list of concrete geographic types this datasource can return.
73
74        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
75        that this datasource uses in the "type" property of returned features.
76        These types can be matched against the location type hierarchy for fuzzy matching.
77
78        The returned types should be a subset of or mapped to the standard location
79        type hierarchy defined in location_types.TYPE_HIERARCHY.
80
81        Returns:
82            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
83            Empty list if this datasource does not provide type information.
84
85        Example:
86            >>> source = SwissNames3DSource("data/")
87            >>> types = source.get_available_types()
88            >>> print(types)
89            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
90        """
91        ...

Protocol for geographic data sources.

Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).

Example of returned feature:

{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }

GeoDataSource(*args, **kwargs)
1960def _no_init_or_replace_init(self, *args, **kwargs):
1961    cls = type(self)
1962
1963    if cls._is_protocol:
1964        raise TypeError('Protocols cannot be instantiated')
1965
1966    # Already using a custom `__init__`. No need to calculate correct
1967    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1968    if cls.__init__ is not _no_init_or_replace_init:
1969        return
1970
1971    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1972    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1973    # searches for a proper new `__init__` in the MRO. The new `__init__`
1974    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1975    # instantiation of the protocol subclass will thus use the new
1976    # `__init__` and no longer call `_no_init_or_replace_init`.
1977    for base in cls.__mro__:
1978        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1979        if init is not _no_init_or_replace_init:
1980            cls.__init__ = init
1981            break
1982    else:
1983        # should not happen
1984        cls.__init__ = object.__init__
1985
1986    cls.__init__(self, *args, **kwargs)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
36    def search(
37        self,
38        name: str,
39        type: str | None = None,
40        max_results: int = 10,
41    ) -> list[Feature]:
42        """
43        Search for geographic features by name.
44
45        Args:
46            name: Location name to search for (e.g., "Lake Geneva", "Bern").
47            type: Optional type hint for filtering/ranking results.
48                  Examples: "lake", "city", "mountain", "canton", "river".
49                  When provided, matching types are ranked higher.
50            max_results: Maximum number of results to return.
51
52        Returns:
53            List of matching GeoJSON Feature dicts, ranked by relevance.
54            Returns empty list if no matches found.
55        """
56        ...

Search for geographic features by name.

Arguments:
  • name: Location name to search for (e.g., "Lake Geneva", "Bern").
  • type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
58    def get_by_id(self, feature_id: str) -> Feature | None:
59        """
60        Get a specific feature by its unique identifier.
61
62        Args:
63            feature_id: Unique identifier from the data source.
64
65        Returns:
66            The matching GeoJSON Feature dict, or None if not found.
67        """
68        ...

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier from the data source.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
70    def get_available_types(self) -> list[str]:
71        """
72        Get list of concrete geographic types this datasource can return.
73
74        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
75        that this datasource uses in the "type" property of returned features.
76        These types can be matched against the location type hierarchy for fuzzy matching.
77
78        The returned types should be a subset of or mapped to the standard location
79        type hierarchy defined in location_types.TYPE_HIERARCHY.
80
81        Returns:
82            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
83            Empty list if this datasource does not provide type information.
84
85        Example:
86            >>> source = SwissNames3DSource("data/")
87            >>> types = source.get_available_types()
88            >>> print(types)
89            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
90        """
91        ...

Get list of concrete geographic types this datasource can return.

Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.

The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.

Returns:

List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.

Example:
>>> source = SwissNames3DSource("data/")
>>> types = source.get_available_types()
>>> print(types)
['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
class SwissNames3DSource:
163class SwissNames3DSource:
164    """
165    Geographic data source backed by swisstopo's swissNAMES3D dataset.
166
167    Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase
168    and provides search by name with optional type filtering.
169
170    If data_path is a directory, automatically loads and concatenates all SwissNames3D
171    shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
172
173    All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
174
175    Args:
176        data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
177        layer: Layer name within the data source (for multi-layer formats like GDB).
178
179    Example:
180        >>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
181        >>> results = source.search("Lac Léman", type="lake")
182        >>> print(results[0].geometry)  # GeoJSON in WGS84
183    """
184
185    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
186        self._data_path = Path(data_path)
187        self._layer = layer
188        self._gdf: gpd.GeoDataFrame | None = None
189        self._name_index: dict[str, list[int]] = {}
190
191    def _ensure_loaded(self) -> None:
192        """Load data lazily on first access."""
193        if self._gdf is not None:
194            return
195        self._load_data()
196
197    def _load_data(self) -> None:
198        """Load SwissNames3D data and build the name index."""
199        # Check if data_path is a directory
200        if self._data_path.is_dir():
201            self._load_from_directory()
202        else:
203            # Load single file
204            kwargs: dict[str, Any] = {}
205            if self._layer is not None:
206                kwargs["layer"] = self._layer
207            self._gdf = gpd.read_file(str(self._data_path), **kwargs)
208
209        self._build_name_index()
210
211    def _load_from_directory(self) -> None:
212        """Load and concatenate all SwissNames3D shapefiles from a directory."""
213        # Look for the 3 standard SwissNames3D shapefiles
214        shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"]
215        gdfs: list[gpd.GeoDataFrame] = []
216
217        for name in shapefile_names:
218            shp_path = self._data_path / f"{name}.shp"
219            if shp_path.exists():
220                gdf = gpd.read_file(str(shp_path))
221                gdfs.append(gdf)
222
223        if not gdfs:
224            raise ValueError(
225                f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}"
226            )
227
228        # Find common columns across all loaded GeoDataFrames
229        common_cols = set(gdfs[0].columns)
230        for gdf in gdfs[1:]:
231            common_cols &= set(gdf.columns)
232
233        # Keep only common columns and concatenate
234        gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs]
235        self._gdf = gpd.GeoDataFrame(
236            gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry"
237        )
238
239    def _build_name_index(self) -> None:
240        """Build a normalized name → row indices lookup for fast search."""
241        assert self._gdf is not None
242        self._name_index = {}
243
244        name_col = self._detect_name_column()
245        for idx, name in enumerate(self._gdf[name_col]):
246            if not isinstance(name, str) or not name.strip():
247                continue
248            normalized = _normalize_name(name)
249            if normalized not in self._name_index:
250                self._name_index[normalized] = []
251            self._name_index[normalized].append(idx)
252
253    def _detect_name_column(self) -> str:
254        """Detect the name column in the data."""
255        assert self._gdf is not None
256        for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"):
257            if candidate in self._gdf.columns:
258                return candidate
259        raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}")
260
261    def _detect_type_column(self) -> str | None:
262        """Detect the feature type column in the data."""
263        assert self._gdf is not None
264        for candidate in ("OBJEKTART", "objektart", "Objektart"):
265            if candidate in self._gdf.columns:
266                return candidate
267        return None
268
269    def _detect_id_column(self) -> str | None:
270        """Detect the unique ID column in the data."""
271        assert self._gdf is not None
272        for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"):
273            if candidate in self._gdf.columns:
274                return candidate
275        return None
276
277    def _row_to_feature(self, idx: int) -> Feature:
278        """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry."""
279        assert self._gdf is not None
280        row = self._gdf.iloc[idx]
281
282        # Get name
283        name_col = self._detect_name_column()
284        name = str(row[name_col])
285
286        # Get type
287        type_col = self._detect_type_column()
288        raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown"
289        normalized_type = _objektart_to_type(raw_type)
290
291        # Get ID
292        id_col = self._detect_id_column()
293        feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx)
294
295        # Convert geometry to WGS84 GeoJSON
296        geom = row.geometry
297        if geom is None or geom.is_empty:
298            geometry = {"type": "Point", "coordinates": [0, 0]}
299            bbox = None
300        else:
301            # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer
302            # Drop Z coordinates — they are not needed and cause issues with single_sided buffers
303            wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom))
304            geometry = mapping(wgs84_geom)
305            bounds = wgs84_geom.bounds  # (minx, miny, maxx, maxy)
306            bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
307
308        # Collect extra properties
309        skip_cols = {name_col, "geometry"}
310        if type_col:
311            skip_cols.add(type_col)
312        if id_col:
313            skip_cols.add(id_col)
314
315        properties: dict[str, Any] = {
316            "name": name,
317            "type": normalized_type,
318            "confidence": 1.0,
319        }
320        for col in self._gdf.columns:
321            if col not in skip_cols:
322                val = row.get(col)
323                if val is not None and str(val) != "nan":
324                    properties[col] = val
325
326        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
327
328    def search(
329        self,
330        name: str,
331        type: str | None = None,
332        max_results: int = 10,
333    ) -> list[Feature]:
334        """
335        Search for geographic features by name.
336
337        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
338        First tries exact matching, then falls back to fuzzy matching if no exact
339        matches found.
340
341        Args:
342            name: Location name to search for.
343            type: Optional type hint to filter results. If provided, only features
344                  of this type are returned.
345            max_results: Maximum number of results to return.
346
347        Returns:
348            List of matching GeoJSON Feature dicts. If type is provided, only
349            features of that type are returned. Empty list if no matches found.
350        """
351        self._ensure_loaded()
352
353        normalized = _normalize_name(name)
354        indices = self._name_index.get(normalized, [])
355
356        # If no exact match, try fuzzy matching
357        if not indices:
358            indices = self._fuzzy_search(normalized)
359
360        features = [self._row_to_feature(idx) for idx in indices]
361
362        # Filter by type if type hint provided.
363        # Expand via the type hierarchy so that category hints (e.g. "water") match
364        # all concrete types within that category ("lake", "river", "pond", ...).
365        if type is not None:
366            matching_types = get_matching_types(type)
367            if matching_types:
368                features = [f for f in features if f["properties"].get("type") in matching_types]
369            else:
370                # Unknown type hint, fall back to exact string match
371                features = [f for f in features if f["properties"].get("type") == type.lower()]
372
373        return features[:max_results]
374
375    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
376        """
377        Fuzzy search for names that partially match the search query.
378
379        Uses token matching to find results where at least one token from the
380        query matches a token in the indexed name. This handles cases like:
381        - "venoge" matching "la venoge"
382        - "rhone" matching "rhone valais"
383
384        Args:
385            normalized: The normalized search query.
386            threshold: Minimum fuzzy match score (0-100) to include a result.
387
388        Returns:
389            List of row indices for fuzzy-matched names, sorted by score (descending).
390        """
391        matches: list[tuple[int, float]] = []
392        query_tokens = set(normalized.split())
393
394        for indexed_name, indices in self._name_index.items():
395            indexed_tokens = set(indexed_name.split())
396
397            # Check if any query token matches any indexed token
398            token_overlap = query_tokens & indexed_tokens
399
400            if token_overlap:
401                # Also use token_set_ratio for better matching of partial strings
402                score = fuzz.token_set_ratio(normalized, indexed_name)
403                if score >= threshold:
404                    for idx in indices:
405                        matches.append((idx, score))
406
407        # Sort by score (descending) to return best matches first
408        matches.sort(key=lambda x: x[1], reverse=True)
409        return [idx for idx, _ in matches]
410
411    def get_by_id(self, feature_id: str) -> Feature | None:
412        """
413        Get a specific feature by its unique identifier.
414
415        Args:
416            feature_id: Unique identifier (UUID or row index).
417
418        Returns:
419            The matching GeoJSON Feature dict, or None if not found.
420        """
421        self._ensure_loaded()
422        assert self._gdf is not None
423
424        id_col = self._detect_id_column()
425        if id_col:
426            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
427            if not matches.empty:
428                return self._row_to_feature(matches.index[0])
429
430        # Fallback: try as row index
431        try:
432            idx = int(feature_id)
433            if 0 <= idx < len(self._gdf):
434                return self._row_to_feature(idx)
435        except ValueError:
436            pass
437
438        return None
439
440    def get_available_types(self) -> list[str]:
441        """
442        Get list of concrete geographic types this datasource can return.
443
444        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
445        representing all possible types that SwissNames3D data can be classified as.
446
447        Returns:
448            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
449        """
450        return sorted(OBJEKTART_TYPE_MAP.keys())

Geographic data source backed by swisstopo's swissNAMES3D dataset.

Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.

If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.

All geometries are returned as GeoJSON in WGS84 (EPSG:4326).

Arguments:
  • data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
  • layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
>>> results = source.search("Lac Léman", type="lake")
>>> print(results[0].geometry)  # GeoJSON in WGS84
SwissNames3DSource(data_path: str | pathlib._local.Path, layer: str | None = None)
185    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
186        self._data_path = Path(data_path)
187        self._layer = layer
188        self._gdf: gpd.GeoDataFrame | None = None
189        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
328    def search(
329        self,
330        name: str,
331        type: str | None = None,
332        max_results: int = 10,
333    ) -> list[Feature]:
334        """
335        Search for geographic features by name.
336
337        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
338        First tries exact matching, then falls back to fuzzy matching if no exact
339        matches found.
340
341        Args:
342            name: Location name to search for.
343            type: Optional type hint to filter results. If provided, only features
344                  of this type are returned.
345            max_results: Maximum number of results to return.
346
347        Returns:
348            List of matching GeoJSON Feature dicts. If type is provided, only
349            features of that type are returned. Empty list if no matches found.
350        """
351        self._ensure_loaded()
352
353        normalized = _normalize_name(name)
354        indices = self._name_index.get(normalized, [])
355
356        # If no exact match, try fuzzy matching
357        if not indices:
358            indices = self._fuzzy_search(normalized)
359
360        features = [self._row_to_feature(idx) for idx in indices]
361
362        # Filter by type if type hint provided.
363        # Expand via the type hierarchy so that category hints (e.g. "water") match
364        # all concrete types within that category ("lake", "river", "pond", ...).
365        if type is not None:
366            matching_types = get_matching_types(type)
367            if matching_types:
368                features = [f for f in features if f["properties"].get("type") in matching_types]
369            else:
370                # Unknown type hint, fall back to exact string match
371                features = [f for f in features if f["properties"].get("type") == type.lower()]
372
373        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint to filter results. If provided, only features of this type are returned.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
411    def get_by_id(self, feature_id: str) -> Feature | None:
412        """
413        Get a specific feature by its unique identifier.
414
415        Args:
416            feature_id: Unique identifier (UUID or row index).
417
418        Returns:
419            The matching GeoJSON Feature dict, or None if not found.
420        """
421        self._ensure_loaded()
422        assert self._gdf is not None
423
424        id_col = self._detect_id_column()
425        if id_col:
426            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
427            if not matches.empty:
428                return self._row_to_feature(matches.index[0])
429
430        # Fallback: try as row index
431        try:
432            idx = int(feature_id)
433            if 0 <= idx < len(self._gdf):
434                return self._row_to_feature(idx)
435        except ValueError:
436            pass
437
438        return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier (UUID or row index).
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
440    def get_available_types(self) -> list[str]:
441        """
442        Get list of concrete geographic types this datasource can return.
443
444        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
445        representing all possible types that SwissNames3D data can be classified as.
446
447        Returns:
448            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
449        """
450        return sorted(OBJEKTART_TYPE_MAP.keys())

Get list of concrete geographic types this datasource can return.

Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.

Returns:

Sorted list of type strings (e.g., ["lake", "city", "river", ...])

class IGNBDCartoSource:
267class IGNBDCartoSource:
268    """
269    Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
270
271    Loads French geographic data from GeoPackage files extracted to a directory.
272    Supports administrative boundaries (communes, departments, regions, …),
273    hydrography (rivers, lakes, …), named places (quarters, hamlets, …),
274    orographic features (peaks, passes, valleys, …) and protected areas.
275
276    Data must first be downloaded with ``make download-data-ign``, which places
277    the GeoPackage files in ``data/bdcarto/``.
278
279    All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84
280    (EPSG:4326) and returned as standard GeoJSON Feature dicts.
281
282    Args:
283        data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``).
284
285    Example:
286        >>> source = IGNBDCartoSource("data/bdcarto")
287        >>> results = source.search("Ardèche", type="department")
288        >>> results = source.search("Lyon", type="city")
289        >>> results = source.search("Rhône", type="river")
290    """
291
292    def __init__(self, data_path: str | Path) -> None:
293        self._data_path = Path(data_path)
294        self._gdf: gpd.GeoDataFrame | None = None
295        self._name_index: dict[str, list[int]] = {}
296
297    def _ensure_loaded(self) -> None:
298        if self._gdf is not None:
299            return
300        self._load_data()
301
302    def _load_data(self) -> None:
303        if self._data_path.is_dir():
304            self._gdf = self._load_from_directory()
305        else:
306            self._gdf = self._load_from_file(self._data_path)
307        self._build_name_index()
308
309    def _load_from_file(self, path: Path) -> gpd.GeoDataFrame:
310        """Load from a GeoJSON fixture file. Features must include a ``_layer`` column."""
311        full_gdf = gpd.read_file(str(path))
312        if "_layer" not in full_gdf.columns:
313            raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column")
314
315        gdfs: list[gpd.GeoDataFrame] = []
316        for layer_name, cfg in _LAYER_CONFIGS.items():
317            rows = full_gdf[full_gdf["_layer"] == layer_name].copy()
318            if rows.empty:
319                continue
320            name_col: str = cfg["name_col"]
321            if name_col not in rows.columns:
322                continue
323            rows[_NAME_COL] = rows[name_col].astype(str)
324            rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
325            rows = rows.to_crs("EPSG:4326")
326            gdfs.append(rows)
327
328        if not gdfs:
329            raise ValueError(f"No matching BD-CARTO features found in {path}")
330
331        combined = pd.concat(gdfs, ignore_index=True)
332        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
333
334    def _load_from_directory(self) -> gpd.GeoDataFrame:
335        """Load and concatenate all configured layers from the data directory."""
336        gdfs: list[gpd.GeoDataFrame] = []
337
338        for layer_name, cfg in _LAYER_CONFIGS.items():
339            gpkg_path = self._data_path / f"{layer_name}.gpkg"
340            if not gpkg_path.exists():
341                continue
342
343            gdf = gpd.read_file(str(gpkg_path))
344
345            name_col: str = cfg["name_col"]
346            if name_col not in gdf.columns:
347                continue
348
349            gdf[_NAME_COL] = gdf[name_col].astype(str)
350            gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
351            gdf["_layer"] = layer_name
352            gdf = gdf.to_crs("EPSG:4326")
353
354            gdfs.append(gdf)
355
356        if not gdfs:
357            raise ValueError(
358                f"No BD-CARTO GeoPackage files found in {self._data_path}. "
359                f"Run 'make download-data-ign' to download the dataset."
360            )
361
362        combined = pd.concat(gdfs, ignore_index=True)
363        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
364
365    def _build_name_index(self) -> None:
366        """Build normalized name → row indices lookup (with article-stripped variants)."""
367        assert self._gdf is not None
368        self._name_index = {}
369        for idx, name in enumerate(self._gdf[_NAME_COL]):
370            if not isinstance(name, str) or not name.strip() or name == "nan":
371                continue
372            for key in _index_keys(name):
373                if key not in self._name_index:
374                    self._name_index[key] = []
375                self._name_index[key].append(idx)
376
377    def _row_to_feature(self, idx: int) -> Feature:
378        """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84)."""
379        assert self._gdf is not None
380        row = self._gdf.iloc[idx]
381
382        name = str(row[_NAME_COL])
383        normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown"
384        feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx)
385
386        geom = row.geometry
387        if geom is None or geom.is_empty:
388            geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]}
389            bbox = None
390        else:
391            geometry = mapping(geom)
392            bounds = geom.bounds
393            bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3])
394
395        skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"}
396        properties: dict[str, Any] = {
397            "name": name,
398            "type": normalized_type,
399            "confidence": 1.0,
400        }
401        for col in self._gdf.columns:
402            if col not in skip_cols:
403                val = _to_json_value(row.get(col))
404                if val is not None:
405                    properties[col] = val
406
407        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
408
409    def search(
410        self,
411        name: str,
412        type: str | None = None,
413        max_results: int = 10,
414    ) -> list[Feature]:
415        """
416        Search for geographic features by name.
417
418        Uses case-insensitive, accent-normalized exact matching with fuzzy
419        fallback when no exact match is found.
420
421        Args:
422            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
423                  ``"Rhône"``).
424            type: Optional type hint for filtering. Supports both concrete types
425                  (``"department"``, ``"city"``, ``"river"``) and category hints
426                  (``"administrative"``, ``"water"``).
427            max_results: Maximum number of results.
428
429        Returns:
430            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
431        """
432        self._ensure_loaded()
433
434        normalized = _normalize_name(name)
435        indices = self._name_index.get(normalized, [])
436
437        if not indices:
438            indices = self._fuzzy_search(normalized)
439
440        features = [self._row_to_feature(idx) for idx in indices]
441
442        if type is not None:
443            matching_types = get_matching_types(type)
444            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
445            if matching_types:
446                features = [f for f in features if f["properties"].get("type") in matching_types]
447            else:
448                features = [f for f in features if f["properties"].get("type") == type.lower()]
449
450        features = merge_segments(features)
451
452        return features[:max_results]
453
454    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
455        """Token-overlap + token_set_ratio fuzzy search."""
456        matches: list[tuple[int, float]] = []
457        query_tokens = set(normalized.split())
458
459        for indexed_name, indices in self._name_index.items():
460            if query_tokens & set(indexed_name.split()):
461                score = fuzz.token_set_ratio(normalized, indexed_name)
462                if score >= threshold:
463                    for idx in indices:
464                        matches.append((idx, score))
465
466        matches.sort(key=lambda x: x[1], reverse=True)
467        return [idx for idx, _ in matches]
468
469    def get_by_id(self, feature_id: str) -> Feature | None:
470        """
471        Get a feature by its ``cleabs`` identifier or row index.
472
473        Args:
474            feature_id: ``cleabs`` string or integer row index.
475
476        Returns:
477            Matching GeoJSON Feature dict, or ``None``.
478        """
479        self._ensure_loaded()
480        assert self._gdf is not None
481
482        if "cleabs" in self._gdf.columns:
483            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
484            if not matches.empty:
485                return self._row_to_feature(matches.index[0])
486
487        try:
488            idx = int(feature_id)
489            if 0 <= idx < len(self._gdf):
490                return self._row_to_feature(idx)
491        except ValueError:
492            pass
493
494        return None
495
496    def get_available_types(self) -> list[str]:
497        """
498        Return the union of all normalized types this source can return.
499
500        Returns:
501            Sorted list of type strings.
502        """
503        types: set[str] = set()
504        for cfg in _LAYER_CONFIGS.values():
505            if cfg.get("commune_flags"):
506                types.update({"city", "municipality"})
507            elif cfg.get("fixed_type"):
508                types.add(cfg["fixed_type"])
509            elif cfg.get("type_map"):
510                types.update(cfg["type_map"].values())
511        return sorted(types)

Geographic data source backed by IGN's BD-CARTO 5.0 dataset.

Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.

Data must first be downloaded with make download-data-ign, which places the GeoPackage files in data/bdcarto/.

All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.

Arguments:
  • data_path: Directory containing the .gpkg files (e.g. "data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto")
>>> results = source.search("Ardèche", type="department")
>>> results = source.search("Lyon", type="city")
>>> results = source.search("Rhône", type="river")
IGNBDCartoSource(data_path: str | pathlib._local.Path)
292    def __init__(self, data_path: str | Path) -> None:
293        self._data_path = Path(data_path)
294        self._gdf: gpd.GeoDataFrame | None = None
295        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
409    def search(
410        self,
411        name: str,
412        type: str | None = None,
413        max_results: int = 10,
414    ) -> list[Feature]:
415        """
416        Search for geographic features by name.
417
418        Uses case-insensitive, accent-normalized exact matching with fuzzy
419        fallback when no exact match is found.
420
421        Args:
422            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
423                  ``"Rhône"``).
424            type: Optional type hint for filtering. Supports both concrete types
425                  (``"department"``, ``"city"``, ``"river"``) and category hints
426                  (``"administrative"``, ``"water"``).
427            max_results: Maximum number of results.
428
429        Returns:
430            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
431        """
432        self._ensure_loaded()
433
434        normalized = _normalize_name(name)
435        indices = self._name_index.get(normalized, [])
436
437        if not indices:
438            indices = self._fuzzy_search(normalized)
439
440        features = [self._row_to_feature(idx) for idx in indices]
441
442        if type is not None:
443            matching_types = get_matching_types(type)
444            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
445            if matching_types:
446                features = [f for f in features if f["properties"].get("type") in matching_types]
447            else:
448                features = [f for f in features if f["properties"].get("type") == type.lower()]
449
450        features = merge_segments(features)
451
452        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.

Arguments:
  • name: Location name to search for (e.g. "Ardèche", "Lyon", "Rhône").
  • type: Optional type hint for filtering. Supports both concrete types ("department", "city", "river") and category hints ("administrative", "water").
  • max_results: Maximum number of results.
Returns:

List of GeoJSON Feature dicts in WGS84. Empty list if no match.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
469    def get_by_id(self, feature_id: str) -> Feature | None:
470        """
471        Get a feature by its ``cleabs`` identifier or row index.
472
473        Args:
474            feature_id: ``cleabs`` string or integer row index.
475
476        Returns:
477            Matching GeoJSON Feature dict, or ``None``.
478        """
479        self._ensure_loaded()
480        assert self._gdf is not None
481
482        if "cleabs" in self._gdf.columns:
483            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
484            if not matches.empty:
485                return self._row_to_feature(matches.index[0])
486
487        try:
488            idx = int(feature_id)
489            if 0 <= idx < len(self._gdf):
490                return self._row_to_feature(idx)
491        except ValueError:
492            pass
493
494        return None

Get a feature by its cleabs identifier or row index.

Arguments:
  • feature_id: cleabs string or integer row index.
Returns:

Matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
496    def get_available_types(self) -> list[str]:
497        """
498        Return the union of all normalized types this source can return.
499
500        Returns:
501            Sorted list of type strings.
502        """
503        types: set[str] = set()
504        for cfg in _LAYER_CONFIGS.values():
505            if cfg.get("commune_flags"):
506                types.update({"city", "municipality"})
507            elif cfg.get("fixed_type"):
508                types.add(cfg["fixed_type"])
509            elif cfg.get("type_map"):
510                types.update(cfg["type_map"].values())
511        return sorted(types)

Return the union of all normalized types this source can return.

Returns:

Sorted list of type strings.

class CompositeDataSource:
14class CompositeDataSource:
15    """
16    Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
17
18    ``search`` queries every registered source and merges results in order.
19
20    ``get_by_id`` tries each source in order and returns the first hit.
21
22    ``get_available_types`` returns the union of all sources' types.
23
24    Args:
25        sources: One or more GeoDataSource instances.
26
27    Example:
28        >>> swiss = SwissNames3DSource("data/")
29        >>> ign   = IGNBDTopoSource("data/")
30        >>> combo = CompositeDataSource(swiss, ign)
31        >>> results = combo.search("Geneva", type="city")
32    """
33
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
38
39    # Public API (mirrors GeoDataSource protocol)
40
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[Feature]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[Feature] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged
67
68    def get_by_id(self, feature_id: str) -> Feature | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None
83
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Fan-out datasource that delegates to an ordered list of GeoDataSource instances.

search queries every registered source and merges results in order.

get_by_id tries each source in order and returns the first hit.

get_available_types returns the union of all sources' types.

Arguments:
  • sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/")
>>> ign   = IGNBDTopoSource("data/")
>>> combo = CompositeDataSource(swiss, ign)
>>> results = combo.search("Geneva", type="city")
CompositeDataSource(*sources: GeoDataSource)
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[Feature]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[Feature] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged

Search all registered sources and return merged.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint passed through to every source.
  • max_results: Maximum results per source.
Returns:

List of GeoJSON Feature dicts, merged from all sources.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
68    def get_by_id(self, feature_id: str) -> Feature | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None

Get a feature by ID, trying each source in order.

Arguments:
  • feature_id: Unique identifier to look up.
Returns:

The first matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Return the union of all sources' available types, sorted.

Returns:

Sorted list of unique type strings.

class PostGISDataSource:
 67class PostGISDataSource:
 68    """
 69    Geographic data source backed by a PostGIS table.
 70
 71    The table must expose at minimum a name column, a geometry column, and
 72    optionally a type column. The expected schema is:
 73
 74    .. code-block:: sql
 75
 76        CREATE TABLE <table> (
 77            id      TEXT PRIMARY KEY,
 78            name    TEXT NOT NULL,
 79            type    TEXT,
 80            geom    GEOMETRY(Geometry, 4326)
 81        );
 82
 83    The ``type`` column may store either:
 84
 85    - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D),
 86      pass ``type_map`` so the datasource can translate between raw values and
 87      the normalized etter type names.
 88    - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``),
 89      leave ``type_map=None`` (default).
 90
 91    Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly
 92    reprojection.
 93
 94    Args:
 95        connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a
 96            connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``).
 97            When a string is provided the engine is created internally.
 98        table: Fully-qualified table name, e.g. ``"public.swissnames3d"``.
 99        name_column: Column used for name-based search (default ``"name"``).
100        type_column: Column used for type filtering.  Pass ``None`` to disable
101            type filtering (default ``"type"``).
102        geometry_column: PostGIS geometry column (default ``"geom"``).
103        id_column: Primary-key column (default ``"id"``).
104        crs: CRS of the stored geometries as an EPSG string.  Defaults to
105            ``"EPSG:4326"`` (no reprojection).
106        type_map: Optional mapping from **normalized etter type names** to
107            **lists of raw type column values** present in the database.
108            This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP``
109            and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be
110            passed directly::
111
112                from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
113                source = PostGISDataSource(
114                    engine,
115                    table="public.swissnames3d",
116                    type_map=OBJEKTART_TYPE_MAP,
117                )
118
119            When ``type_map`` is provided the datasource:
120
121            - Translates raw DB values → normalized types in returned features.
122            - Translates user type hints → raw DB values in SQL ``WHERE`` clauses.
123            - Returns normalized type names from ``get_available_types()``.
124
125            When ``None`` (default) the stored values are used as-is.
126        fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for
127            fuzzy fallback search when no exact ``ILIKE`` match is found.
128
129    Example: unmodified SwissNames3D table::
130
131        from sqlalchemy import create_engine
132        from etter.datasources import PostGISDataSource
133        from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
134
135        engine = create_engine(...)
136        source = PostGISDataSource(
137            engine,
138            table="public.swissnames3d",
139            type_map=OBJEKTART_TYPE_MAP,
140        )
141        results = source.search("Lac Léman", type="lake")
142    """
143
144    def __init__(
145        self,
146        connection: str | Engine,
147        table: str,
148        name_column: str = "name",
149        type_column: str | None = "type",
150        geometry_column: str = "geom",
151        id_column: str = "id",
152        crs: str = "EPSG:4326",
153        type_map: TypeMap | None = None,
154        fuzzy_threshold: float = 0.65,
155    ) -> None:
156        sa = _require_sqlalchemy()
157
158        if isinstance(connection, str):
159            self._engine = sa.create_engine(connection)
160        else:
161            self._engine = connection
162
163        try:
164            with self._engine.connect() as conn:
165                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
166        except Exception as exc:
167            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
168
169        self._table = table
170        self._name_col = name_column
171        self._type_col = type_column
172        self._geom_col = geometry_column
173        self._id_col = id_column
174        self._crs = crs
175        self._fuzzy_threshold = fuzzy_threshold
176
177        # Build bidirectional lookup structures from the user-supplied map.
178        if type_map:
179            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
180            self._raw_to_normalized: dict[str, str] = {
181                raw: normalized for normalized, raws in type_map.items() for raw in raws
182            }
183        else:
184            self._normalized_to_raw = {}
185            self._raw_to_normalized = {}
186
187        self._trgm_available: bool | None = None
188        self._unaccent_available: bool | None = None
189
190    def _get_connection(self) -> Connection:
191        """Return a SQLAlchemy connection from the engine."""
192        return self._engine.connect()
193
194    def _check_trgm(self, conn: Connection) -> bool:
195        """Return True if pg_trgm extension is available in the database."""
196        if self._trgm_available is not None:
197            return self._trgm_available
198        sa = _require_sqlalchemy()
199        try:
200            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
201            self._trgm_available = result.fetchone() is not None
202        except Exception:
203            logger.exception("Failed to check pg_trgm availability")
204            self._trgm_available = False
205        return self._trgm_available
206
207    def _check_unaccent(self, conn: Connection) -> bool:
208        """Return True if the unaccent extension is available in the database."""
209        if self._unaccent_available is not None:
210            return self._unaccent_available
211        sa = _require_sqlalchemy()
212        try:
213            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'"))
214            self._unaccent_available = result.fetchone() is not None
215        except Exception:
216            logger.exception("Failed to check unaccent availability")
217            self._unaccent_available = False
218        return self._unaccent_available
219
220    def _normalize_type(self, raw_type: str | None) -> str | None:
221        """Translate a raw DB type value to its normalized etter name.
222
223        If no type_map was supplied the value is returned unchanged.
224        """
225        if raw_type is None:
226            return None
227        return self._raw_to_normalized.get(raw_type, raw_type)
228
229    def _row_to_feature(self, row: Row) -> Feature:
230        """Convert a SQLAlchemy Row to a GeoJSON Feature dict."""
231        feature_id = str(row.id)
232        name = str(row.name)
233        raw_type = getattr(row, "type", None)
234        normalized_type = self._normalize_type(raw_type)
235
236        geojson_str = row.geojson
237        if geojson_str:
238            geometry = json.loads(geojson_str)
239        else:
240            geometry = {"type": "Point", "coordinates": [0, 0]}
241
242        bbox = _bbox_from_geojson(geometry)
243
244        properties: dict[str, Any] = {
245            "name": name,
246            "type": normalized_type,
247            "confidence": 1.0,
248        }
249
250        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
251
252    def _build_select_columns(self) -> str:
253        """Build the SELECT column list as a SQL fragment."""
254        type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type"
255        if self._crs.upper() != "EPSG:4326":
256            geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson"
257        else:
258            geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson"
259        return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}"
260
261    def search(
262        self,
263        name: str,
264        type: str | None = None,
265        max_results: int = 10,
266    ) -> list[Feature]:
267        """
268        Search for geographic features by name.
269
270        Uses a three-step cascade, stopping as soon as any step returns results:
271
272        1. **Normalized exact match**
273        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
274        3. **ILIKE substring**
275
276        ``merge_segments`` is applied after all rows are fetched so that
277        multi-segment linestrings (rivers, roads) are merged before the
278        ``max_results`` cap is applied.
279
280        Args:
281            name: Location name to search for.
282            type: Optional type hint for filtering results.
283            max_results: Maximum number of results to return.
284
285        Returns:
286            List of matching GeoJSON Feature dicts in WGS84.
287        """
288        sa = _require_sqlalchemy()
289        cols = self._build_select_columns()
290
291        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
292        type_filter_values: list[str] | None = None
293        if type is not None and self._type_col is not None:
294            matching_types = get_matching_types(type)
295            concrete_types = matching_types if matching_types else [type.lower()]
296            if self._normalized_to_raw:
297                raw_values: list[str] = []
298                for t in concrete_types:
299                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
300                type_filter_values = raw_values if raw_values else concrete_types
301            else:
302                type_filter_values = concrete_types
303
304        # Fetch more rows than requested so that merge_segments has the full
305        # set of segments to work with.  Without this, a SQL LIMIT applied
306        # *before* merging would only return a partial set of linestring
307        # segments, producing incorrect / truncated geometries.
308        # We cap the internal limit at 2000 to avoid unbounded queries.
309        internal_limit = min(max(max_results * 20, 100), 2000)
310
311        with self._get_connection() as conn:
312            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
313
314        if not features:
315            with self._get_connection() as conn:
316                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
317
318        if not features:
319            with self._get_connection() as conn:
320                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
321
322        features = merge_segments(features)
323        return features[:max_results]
324
325    def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]:
326        """Return a WHERE clause fragment and bind params for type filtering."""
327        if not values or self._type_col is None:
328            return "", {}
329        placeholders = ", ".join(f":type_{i}" for i in range(len(values)))
330        clause = f" AND {self._type_col} IN ({placeholders})"
331        params = {f"type_{i}": v for i, v in enumerate(values)}
332        return clause, params
333
334    def _search_normalized(
335        self,
336        conn: Connection,
337        sa: types.ModuleType,
338        cols: str,
339        name: str,
340        type_filter: list[str] | None,
341        fetch_limit: int,
342    ) -> list[Feature]:
343        """
344        Exact accent- and case-insensitive search.
345
346        Accent normalization (NFD decomposition + diacritic strip) is done in
347        Python before the query is sent to the DB.
348        """
349        type_clause, type_params = self._type_filter_sql(type_filter)
350        name_expr = f"lower({self._name_col})"
351        if self._check_unaccent(conn):
352            name_expr = f"unaccent({name_expr})"
353        sql = sa.text(
354            f"SELECT {cols} FROM {self._table} "  # noqa: S608
355            f"WHERE {name_expr} = :query{type_clause} "
356            f"LIMIT :limit"
357        )
358        params: dict[str, Any] = {
359            "query": _normalize_name(name),
360            "limit": fetch_limit,
361            **type_params,
362        }
363        try:
364            result = conn.execute(sql, params)
365            return [self._row_to_feature(row) for row in result]
366        except Exception:
367            logger.exception("Normalized search failed for %r", name)
368            return []
369
370    def _search_ilike(
371        self,
372        conn: Connection,
373        sa: types.ModuleType,
374        cols: str,
375        name: str,
376        type_filter: list[str] | None,
377        fetch_limit: int,
378    ) -> list[Feature]:
379        """Case-insensitive substring fallback using ``ILIKE '%name%'``.
380
381        When the ``unaccent`` extension is available, both the stored name column
382        and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches
383        ``"Rhône"``.  Without ``unaccent``, standard ILIKE is used (case-insensitive
384        only).
385        """
386        type_clause, type_params = self._type_filter_sql(type_filter)
387        normalized = _normalize_name(name)
388        if self._check_unaccent(conn):
389            name_expr = f"unaccent(lower({self._name_col}))"
390            pattern = f"%{normalized}%"
391        else:
392            name_expr = self._name_col
393            pattern = f"%{name}%"
394        sql = sa.text(
395            f"SELECT {cols} FROM {self._table} "  # noqa: S608
396            f"WHERE {name_expr} ILIKE :pattern{type_clause} "
397            f"LIMIT :limit"
398        )
399        params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params}
400        try:
401            result = conn.execute(sql, params)
402            return [self._row_to_feature(row) for row in result]
403        except Exception:
404            logger.exception("ILIKE search failed for %r", name)
405            return []
406
407    def _search_fuzzy(
408        self,
409        conn: Connection,
410        sa: types.ModuleType,
411        cols: str,
412        name: str,
413        type_filter: list[str] | None,
414        fetch_limit: int,
415    ) -> list[Feature]:
416        """Fuzzy fallback using pg_trgm similarity (if extension is available)."""
417        if not self._check_trgm(conn):
418            logger.warning(
419                "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;"
420            )
421            return []
422        normalized_query = _normalize_name(name)
423        if self._check_unaccent(conn):
424            name_expr = f"unaccent(lower({self._name_col}))"
425        else:
426            logger.warning(
427                "unaccent extension not available. Accent-insensitive fuzzy search degraded. "
428                "Install it with: CREATE EXTENSION unaccent;"
429            )
430            name_expr = f"lower({self._name_col})"
431        type_clause, type_params = self._type_filter_sql(type_filter)
432        sql = sa.text(
433            f"SELECT {cols} FROM {self._table} "  # noqa: S608
434            f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} "
435            f"ORDER BY word_similarity({name_expr}, :query) DESC "
436            f"LIMIT :limit"
437        )
438        params: dict[str, Any] = {
439            "query": normalized_query,
440            "threshold": self._fuzzy_threshold,
441            "limit": fetch_limit,
442            **type_params,
443        }
444        try:
445            result = conn.execute(sql, params)
446            return [self._row_to_feature(row) for row in result]
447        except Exception:
448            logger.exception("Fuzzy search failed for %r", name)
449            return []
450
451    def get_by_id(self, feature_id: str) -> Feature | None:
452        """
453        Get a specific feature by its unique identifier.
454
455        Args:
456            feature_id: Value of the ``id`` column.
457
458        Returns:
459            The matching GeoJSON Feature dict, or ``None`` if not found.
460        """
461        sa = _require_sqlalchemy()
462        cols = self._build_select_columns()
463        sql = sa.text(
464            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
465        )
466        with self._get_connection() as conn:
467            try:
468                result = conn.execute(sql, {"id": feature_id})
469                row = result.fetchone()
470                return self._row_to_feature(row) if row else None
471            except Exception:
472                logger.exception("get_by_id failed for %r", feature_id)
473                return None
474
475    def get_available_types(self) -> list[str]:
476        """
477        Return the distinct ``type`` values present in the table.
478
479        Returns:
480            Sorted list of concrete type strings, or an empty list if the table
481            has no type column.
482        """
483        if self._type_col is None:
484            return []
485        sa = _require_sqlalchemy()
486        sql = sa.text(
487            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
488            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
489        )
490        with self._get_connection() as conn:
491            try:
492                result = conn.execute(sql)
493                raw_types = [row.type for row in result]
494            except Exception:
495                logger.exception("get_available_types failed")
496                return []
497
498        normalized = {self._normalize_type(t) for t in raw_types if t}
499        return sorted(t for t in normalized if t)

Geographic data source backed by a PostGIS table.

The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:

CREATE TABLE <table> (
    id      TEXT PRIMARY KEY,
    name    TEXT NOT NULL,
    type    TEXT,
    geom    GEOMETRY(Geometry, 4326)
);

The type column may store either:

  • Raw dataset values (e.g. "See", "Berg" for SwissNames3D), pass type_map so the datasource can translate between raw values and the normalized etter type names.
  • Already-normalized values (e.g. "lake", "mountain"), leave type_map=None (default).

Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly reprojection.

Arguments:
  • connection: A SQLAlchemy ~sqlalchemy.engine.Engine or a connection URL string (e.g. "postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally.
  • table: Fully-qualified table name, e.g. "public.swissnames3d".
  • name_column: Column used for name-based search (default "name").
  • type_column: Column used for type filtering. Pass None to disable type filtering (default "type").
  • geometry_column: PostGIS geometry column (default "geom").
  • id_column: Primary-key column (default "id").
  • crs: CRS of the stored geometries as an EPSG string. Defaults to "EPSG:4326" (no reprojection).
  • type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as SwissNames3DSource.OBJEKTART_TYPE_MAP and IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::

    from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
    source = PostGISDataSource(
        engine,
        table="public.swissnames3d",
        type_map=OBJEKTART_TYPE_MAP,
    )
    

    When type_map is provided the datasource:

    • Translates raw DB values → normalized types in returned features.
    • Translates user type hints → raw DB values in SQL WHERE clauses.
    • Returns normalized type names from get_available_types().

    When None (default) the stored values are used as-is.

  • fuzzy_threshold: Minimum pg_trgm similarity score (0-1) used for fuzzy fallback search when no exact ILIKE match is found.

Example: unmodified SwissNames3D table::

from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP

engine = create_engine(...)
source = PostGISDataSource(
    engine,
    table="public.swissnames3d",
    type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
PostGISDataSource( connection: str | sqlalchemy.engine.base.Engine, table: str, name_column: str = 'name', type_column: str | None = 'type', geometry_column: str = 'geom', id_column: str = 'id', crs: str = 'EPSG:4326', type_map: dict[typing.Literal['alpine_pasture', 'airport', 'area', 'arrondissement', 'border_marker', 'boulder', 'bridge', 'building', 'bus_stop', 'boat_stop', 'camping', 'canton', 'cave', 'cemetery', 'city', 'correctional_facility', 'country', 'customs', 'dam', 'department', 'district', 'ditch', 'entrance_exit', 'exit', 'fairground', 'ferry', 'field_name', 'forest', 'fountain', 'glacier', 'hamlet', 'heliport', 'hill', 'historical_site', 'hospital', 'island', 'junction', 'lake', 'leisure_facility', 'landfill', 'lift', 'loading_station', 'local_name', 'massif', 'military_training_area', 'monastery', 'monument', 'mountain', 'municipality', 'nature_reserve', 'park', 'parking', 'pass', 'peak', 'peninsula', 'plain', 'pond', 'power_plant', 'private_driving_area', 'quarry', 'railway', 'railway_area', 'region', 'religious_building', 'rest_area', 'restaurant', 'ridge', 'river', 'road', 'rock_head', 'school', 'spring', 'sports_facility', 'standing_area', 'swimming_pool', 'town', 'tower', 'train_station', 'tunnel', 'unknown', 'valley', 'viewpoint', 'village', 'wastewater_treatment', 'waste_incineration', 'waterfall', 'weir', 'zoo', 'administrative', 'amenity', 'infrastructure', 'landforms', 'natural', 'other', 'settlement', 'transport', 'water'], list[str]] | None = None, fuzzy_threshold: float = 0.65)
144    def __init__(
145        self,
146        connection: str | Engine,
147        table: str,
148        name_column: str = "name",
149        type_column: str | None = "type",
150        geometry_column: str = "geom",
151        id_column: str = "id",
152        crs: str = "EPSG:4326",
153        type_map: TypeMap | None = None,
154        fuzzy_threshold: float = 0.65,
155    ) -> None:
156        sa = _require_sqlalchemy()
157
158        if isinstance(connection, str):
159            self._engine = sa.create_engine(connection)
160        else:
161            self._engine = connection
162
163        try:
164            with self._engine.connect() as conn:
165                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
166        except Exception as exc:
167            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
168
169        self._table = table
170        self._name_col = name_column
171        self._type_col = type_column
172        self._geom_col = geometry_column
173        self._id_col = id_column
174        self._crs = crs
175        self._fuzzy_threshold = fuzzy_threshold
176
177        # Build bidirectional lookup structures from the user-supplied map.
178        if type_map:
179            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
180            self._raw_to_normalized: dict[str, str] = {
181                raw: normalized for normalized, raws in type_map.items() for raw in raws
182            }
183        else:
184            self._normalized_to_raw = {}
185            self._raw_to_normalized = {}
186
187        self._trgm_available: bool | None = None
188        self._unaccent_available: bool | None = None
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
261    def search(
262        self,
263        name: str,
264        type: str | None = None,
265        max_results: int = 10,
266    ) -> list[Feature]:
267        """
268        Search for geographic features by name.
269
270        Uses a three-step cascade, stopping as soon as any step returns results:
271
272        1. **Normalized exact match**
273        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
274        3. **ILIKE substring**
275
276        ``merge_segments`` is applied after all rows are fetched so that
277        multi-segment linestrings (rivers, roads) are merged before the
278        ``max_results`` cap is applied.
279
280        Args:
281            name: Location name to search for.
282            type: Optional type hint for filtering results.
283            max_results: Maximum number of results to return.
284
285        Returns:
286            List of matching GeoJSON Feature dicts in WGS84.
287        """
288        sa = _require_sqlalchemy()
289        cols = self._build_select_columns()
290
291        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
292        type_filter_values: list[str] | None = None
293        if type is not None and self._type_col is not None:
294            matching_types = get_matching_types(type)
295            concrete_types = matching_types if matching_types else [type.lower()]
296            if self._normalized_to_raw:
297                raw_values: list[str] = []
298                for t in concrete_types:
299                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
300                type_filter_values = raw_values if raw_values else concrete_types
301            else:
302                type_filter_values = concrete_types
303
304        # Fetch more rows than requested so that merge_segments has the full
305        # set of segments to work with.  Without this, a SQL LIMIT applied
306        # *before* merging would only return a partial set of linestring
307        # segments, producing incorrect / truncated geometries.
308        # We cap the internal limit at 2000 to avoid unbounded queries.
309        internal_limit = min(max(max_results * 20, 100), 2000)
310
311        with self._get_connection() as conn:
312            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
313
314        if not features:
315            with self._get_connection() as conn:
316                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
317
318        if not features:
319            with self._get_connection() as conn:
320                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
321
322        features = merge_segments(features)
323        return features[:max_results]

Search for geographic features by name.

Uses a three-step cascade, stopping as soon as any step returns results:

  1. Normalized exact match
  2. pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
  3. ILIKE substring

merge_segments is applied after all rows are fetched so that multi-segment linestrings (rivers, roads) are merged before the max_results cap is applied.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint for filtering results.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts in WGS84.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
451    def get_by_id(self, feature_id: str) -> Feature | None:
452        """
453        Get a specific feature by its unique identifier.
454
455        Args:
456            feature_id: Value of the ``id`` column.
457
458        Returns:
459            The matching GeoJSON Feature dict, or ``None`` if not found.
460        """
461        sa = _require_sqlalchemy()
462        cols = self._build_select_columns()
463        sql = sa.text(
464            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
465        )
466        with self._get_connection() as conn:
467            try:
468                result = conn.execute(sql, {"id": feature_id})
469                row = result.fetchone()
470                return self._row_to_feature(row) if row else None
471            except Exception:
472                logger.exception("get_by_id failed for %r", feature_id)
473                return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Value of the id column.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
475    def get_available_types(self) -> list[str]:
476        """
477        Return the distinct ``type`` values present in the table.
478
479        Returns:
480            Sorted list of concrete type strings, or an empty list if the table
481            has no type column.
482        """
483        if self._type_col is None:
484            return []
485        sa = _require_sqlalchemy()
486        sql = sa.text(
487            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
488            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
489        )
490        with self._get_connection() as conn:
491            try:
492                result = conn.execute(sql)
493                raw_types = [row.type for row in result]
494            except Exception:
495                logger.exception("get_available_types failed")
496                return []
497
498        normalized = {self._normalize_type(t) for t in raw_types if t}
499        return sorted(t for t in normalized if t)

Return the distinct type values present in the table.

Returns:

Sorted list of concrete type strings, or an empty list if the table has no type column.

def apply_spatial_relation( geometry: dict[str, typing.Any] | list[dict[str, typing.Any]], relation: SpatialRelation, buffer_config: BufferConfig | None = None, spatial_config: SpatialRelationConfig | None = None, geometry_format: Literal['geojson', 'wkt', 'wkb'] = 'geojson') -> dict[str, typing.Any] | str:
111def apply_spatial_relation(
112    geometry: GeoJsonGeometry | list[GeoJsonGeometry],
113    relation: SpatialRelation,
114    buffer_config: BufferConfig | None = None,
115    spatial_config: SpatialRelationConfig | None = None,
116    geometry_format: GeometryFormat = "geojson",
117) -> GeoJsonGeometry | str:
118    """Transform one or more reference geometries according to a spatial relation.
119
120    A list of geometries is unioned into one before the transformation, so that
121    features split across multiple datasource records (e.g. a river in segments)
122    produce a single coherent search area.
123
124    When ``buffer_config.inferred`` is True (i.e. no explicit distance was
125    stated), the buffer distance is refined from the actual geometry area so
126    that small features receive small buffers and large regions receive large
127    ones.
128
129    Args:
130        geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
131        relation: Spatial relation to apply.
132        buffer_config: Required for buffer/directional relations.
133        spatial_config: Relation registry; defaults to the module-level singleton.
134        geometry_format: "geojson" (default), "wkt", or "wkb".
135
136    Returns:
137        Transformed geometry in the requested format.
138    """
139    if isinstance(geometry, list):
140        if not geometry:
141            raise ValueError("geometry list must not be empty")
142        geom = unary_union([shape(g) for g in geometry])
143        geom_dict: GeoJsonGeometry = mapping(geom)
144    else:
145        geom = shape(geometry)
146        geom_dict = geometry
147
148    # Refine inferred buffer distance from geometry area before dispatching.
149    if buffer_config is not None and buffer_config.inferred:
150        buffer_config = _refine_buffer_config(geom, buffer_config, relation)
151
152    if relation.category == "containment":
153        result = geom_dict
154    elif relation.category == "buffer":
155        if buffer_config is None:
156            raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config")
157        result = _apply_buffer(geom, buffer_config)
158    elif relation.category == "directional":
159        if buffer_config is None:
160            raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config")
161        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
162        relation_config = cfg.get_config(relation.relation)
163        direction = relation_config.direction_angle_degrees or 0
164        sector_angle = relation_config.sector_angle_degrees or 90
165        result = _apply_directional(geom, buffer_config, direction, sector_angle)
166    elif relation.category == "clipping":
167        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
168        relation_config = cfg.get_config(relation.relation)
169        clip_direction = relation_config.clip_direction or "north"
170        result = _apply_clipping(geom, clip_direction)
171    else:
172        raise ValueError(f"Unknown relation category: '{relation.category}'")
173
174    return convert_geometry(result, geometry_format)

Transform one or more reference geometries according to a spatial relation.

A list of geometries is unioned into one before the transformation, so that features split across multiple datasource records (e.g. a river in segments) produce a single coherent search area.

When buffer_config.inferred is True (i.e. no explicit distance was stated), the buffer distance is refined from the actual geometry area so that small features receive small buffers and large regions receive large ones.

Arguments:
  • geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
  • relation: Spatial relation to apply.
  • buffer_config: Required for buffer/directional relations.
  • spatial_config: Relation registry; defaults to the module-level singleton.
  • geometry_format: "geojson" (default), "wkt", or "wkb".
Returns:

Transformed geometry in the requested format.

def convert_geometry( geometry: dict[str, typing.Any], fmt: Literal['geojson', 'wkt', 'wkb']) -> dict[str, typing.Any] | str:
11def convert_geometry(geometry: GeoJsonGeometry, fmt: GeometryFormat) -> GeoJsonGeometry | str:
12    """
13    Convert a GeoJSON geometry dict to the requested format.
14
15    Args:
16        geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
17        fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string,
18             "wkb" returns a hex-encoded WKB string.
19
20    Returns:
21        The geometry in the requested format.
22    """
23    if fmt == "geojson":
24        return geometry
25    geom = shape(geometry)
26    if fmt == "wkt":
27        return geom.wkt
28    return geom.wkb_hex

Convert a GeoJSON geometry dict to the requested format.

Arguments:
  • geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
  • fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:

The geometry in the requested format.

def convert_feature_geometry( feature: geojson.feature.Feature, fmt: Literal['geojson', 'wkt', 'wkb']) -> geojson.feature.Feature | dict:
31def convert_feature_geometry(feature: Feature, fmt: GeometryFormat) -> Feature | dict:
32    """
33    Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
34
35    Args:
36        feature: GeoJSON Feature dict with a "geometry" key.
37        fmt: Target geometry format.
38
39    Returns:
40        A new dict identical to the input except the "geometry" value is converted.
41        Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).
42    """
43    if fmt == "geojson":
44        return feature
45    return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}

Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.

Arguments:
  • feature: GeoJSON Feature dict with a "geometry" key.
  • fmt: Target geometry format.
Returns:

A new dict identical to the input except the "geometry" value is converted. Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).