etter

etter - Natural language geographic query parsing using LLMs.

Parse location queries into structured geographic queries using LLM.

 1"""
 2etter - Natural language geographic query parsing using LLMs.
 3
 4Parse location queries into structured geographic queries using LLM.
 5"""
 6
 7from importlib.metadata import PackageNotFoundError, version
 8
 9try:
10    __version__ = version("etter")
11except PackageNotFoundError:  # running from source without install
12    __version__ = "unknown"
13
14# Main API
15# Exceptions
16# Datasources
17from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource
18from .exceptions import (
19    GeoFilterError,
20    LowConfidenceError,
21    LowConfidenceWarning,
22    NoReferenceLocationError,
23    ParsingError,
24    UnknownRelationError,
25    ValidationError,
26)
27from .geometry_format import convert_feature_geometry, convert_geometry
28
29# Models (for type hints and result access)
30from .models import (
31    BufferConfig,
32    ConfidenceLevel,
33    ConfidenceScore,
34    GeometryFormat,
35    GeoQuery,
36    ReferenceLocation,
37    SpatialRelation,
38)
39from .parser import GeoFilterParser
40
41# Spatial operations
42from .spatial import apply_spatial_relation
43
44# Configuration
45from .spatial_config import RelationConfig, SpatialRelationConfig
46
47__all__ = [
48    # Main API
49    "GeoFilterParser",
50    # Models
51    "GeoQuery",
52    "SpatialRelation",
53    "ReferenceLocation",
54    "BufferConfig",
55    "ConfidenceScore",
56    "ConfidenceLevel",
57    "GeometryFormat",
58    # Configuration
59    "SpatialRelationConfig",
60    "RelationConfig",
61    # Exceptions
62    "GeoFilterError",
63    "ParsingError",
64    "ValidationError",
65    "NoReferenceLocationError",
66    "UnknownRelationError",
67    "LowConfidenceError",
68    "LowConfidenceWarning",
69    # Datasources
70    "GeoDataSource",
71    "SwissNames3DSource",
72    "IGNBDCartoSource",
73    "CompositeDataSource",
74    "PostGISDataSource",
75    # Spatial
76    "apply_spatial_relation",
77    "convert_geometry",
78    "convert_feature_geometry",
79]
class GeoFilterParser:
 19class GeoFilterParser:
 20    """
 21    Main entry point for parsing natural language location queries.
 22
 23    This class orchestrates the entire parsing pipeline:
 24    1. Initialize LLM with structured output
 25    2. Build prompt with spatial relations and examples
 26    3. Parse query through LLM
 27    4. Validate and enrich with defaults
 28    5. Return structured GeoQuery
 29
 30    Examples:
 31        Basic usage:
 32        >>> from langchain.chat_models import init_chat_model
 33        >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
 34        >>> parser = GeoFilterParser(llm=llm)
 35        >>> result = parser.parse("restaurants in Lausanne")
 36        >>> print(result.reference_location.name)
 37        'Lausanne'
 38
 39        With strict confidence mode:
 40        >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
 41        >>> result = parser.parse("near the station")  # May raise LowConfidenceError
 42    """
 43
 44    def __init__(
 45        self,
 46        llm: BaseChatModel,
 47        spatial_config: SpatialRelationConfig | None = None,
 48        confidence_threshold: float = 0.6,
 49        strict_mode: bool = False,
 50        include_examples: bool = True,
 51        datasource: GeoDataSource | None = None,
 52        additional_instructions: str | None = None,
 53    ):
 54        """
 55        Initialize the parser.
 56
 57        Args:
 58            llm: LangChain LLM instance (required).
 59            spatial_config: Spatial relation configuration. If None, uses defaults
 60            confidence_threshold: Minimum confidence to accept (0-1)
 61            strict_mode: If True, raise error on low confidence. If False, warn only
 62            include_examples: Whether to include few-shot examples in prompt
 63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
 64                       about the concrete types available in that datasource for better type inference.
 65            additional_instructions: Free-form text injected as a system message after the main
 66                       system prompt and before few-shot examples. Use this to add caller-specific
 67                       rules such as region-specific endonyms, domain aliases, or
 68                       organization-specific place names without forking the default prompt.
 69
 70        Example:
 71            >>> from langchain.chat_models import init_chat_model
 72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
 73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
 74            >>> datasource = SwissNames3DSource("data/")
 75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
 76        """
 77        self.llm = llm
 78
 79        # Initialize spatial config
 80        self.spatial_config = spatial_config or SpatialRelationConfig()
 81
 82        # Settings
 83        self.confidence_threshold = confidence_threshold
 84        self.strict_mode = strict_mode
 85        self.include_examples = include_examples
 86        self.datasource = datasource
 87        self.additional_instructions = additional_instructions
 88
 89        # Build structured LLM
 90        self.structured_llm = self._build_structured_llm()
 91
 92        # Build prompt template
 93        self.prompt = self._build_prompt()
 94
 95    def _build_structured_llm(self):
 96        """Create LLM with structured output using Pydantic model."""
 97
 98        return self.llm.with_structured_output(
 99            GeoQuery,
100            method="function_calling",  # Use function_calling for broader schema support
101            include_raw=True,  # For error debugging
102        )
103
104    def _build_prompt(self) -> ChatPromptTemplate:
105        """Build prompt template with spatial relations, examples, and available types."""
106        available_types = None
107        if self.datasource is not None:
108            available_types = self.datasource.get_available_types()
109
110        return build_prompt_template(
111            spatial_config=self.spatial_config,
112            include_examples=self.include_examples,
113            available_types=available_types,
114            additional_instructions=self.additional_instructions,
115        )
116
117    def _unpack_response(self, response) -> GeoQuery:
118        """Extract and validate the GeoQuery from a structured-LLM response."""
119        parsed = response.get("parsed") if isinstance(response, dict) else response
120
121        if parsed is None:
122            raw = response.get("raw", "") if isinstance(response, dict) else ""
123            error = response.get("parsing_error") if isinstance(response, dict) else None
124            raise ParsingError(
125                message="Failed to parse query into structured format. "
126                "LLM may have returned invalid JSON or missed required fields.",
127                raw_response=str(raw),
128                original_error=error,
129            )
130
131        assert isinstance(parsed, GeoQuery), "Parsed result must be GeoQuery"
132        return parsed
133
134    def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery:
135        """Set original_query and run the validation pipeline."""
136        if not geo_query.original_query or geo_query.original_query != query:
137            geo_query.original_query = query
138
139        return validate_query(
140            geo_query,
141            self.spatial_config,
142            confidence_threshold=self.confidence_threshold,
143            strict_mode=self.strict_mode,
144        )
145
146    def parse(self, query: str) -> GeoQuery:
147        """
148        Parse a natural language location query into structured format.
149
150        This is the main method for parsing queries. It:
151        1. Invokes the LLM with structured output
152        2. Validates the spatial relation is registered
153        3. Enriches with default parameters
154        4. Checks confidence threshold
155
156        Args:
157            query: Natural language query in any language
158
159        Returns:
160            GeoQuery: Structured query representation with confidence scores
161
162        Raises:
163            ParsingError: If LLM fails to parse query into valid structure
164            ValidationError: If parsed query fails business logic validation
165            UnknownRelationError: If spatial relation is not registered
166            LowConfidenceError: If confidence below threshold (strict mode only)
167
168        Warns:
169            LowConfidenceWarning: If confidence below threshold (permissive mode)
170
171        Examples:
172            Simple containment query:
173            >>> result = parser.parse("in Bern")
174            >>> result.reference_location.name
175            'Bern'
176            >>> result.spatial_relation.relation
177            'in'
178
179            Buffer query:
180            >>> result = parser.parse("near Lake Geneva")
181            >>> result.spatial_relation.relation
182            'near'
183            >>> result.buffer_config.distance_m
184            5000
185
186            Directional query:
187            >>> result = parser.parse("north of Lausanne")
188            >>> result.spatial_relation.relation
189            'north_of'
190            >>> result.reference_location.name
191            'Lausanne'
192
193            Multilingual:
194            >>> result = parser.parse("près de Genève")
195            >>> result.spatial_relation.relation
196            'near'
197            >>> result.reference_location.name
198            'Genève'
199        """
200        formatted_messages = self.prompt.format_messages(query=query)
201
202        try:
203            response = self.structured_llm.invoke(formatted_messages)
204        except Exception as e:
205            raise ParsingError(
206                message=f"LLM invocation failed: {str(e)}",
207                raw_response="",
208                original_error=e,
209            ) from e
210
211        return self._finalize(self._unpack_response(response), query)
212
213    async def aparse(self, query: str) -> GeoQuery:
214        """
215        Asynchronously parse a natural language location query into structured format.
216
217        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
218        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
219        blocking. Validation is synchronous and runs after the LLM call.
220        """
221        formatted_messages = self.prompt.format_messages(query=query)
222
223        try:
224            response = await self.structured_llm.ainvoke(formatted_messages)
225        except Exception as e:
226            raise ParsingError(
227                message=f"LLM invocation failed: {str(e)}",
228                raw_response="",
229                original_error=e,
230            ) from e
231
232        return self._finalize(self._unpack_response(response), query)
233
234    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
235        """
236        Parse a natural language location query with streaming reasoning and results.
237
238        This method provides real-time feedback during the parsing process by yielding
239        intermediate reasoning steps and the final GeoQuery result. This is useful for
240        providing users with transparency into the LLM's decision-making process and
241        for building responsive UIs.
242
243        The stream yields dictionaries with the following event types:
244        - {"type": "start"} - Stream started
245        - {"type": "reasoning", "content": str} - Intermediate processing steps
246        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
247        - {"type": "error", "content": str} - Errors encountered during processing
248        - {"type": "finish"} - Stream completed successfully
249
250        Args:
251            query: Natural language query in any language
252
253        Yields:
254            dict: Stream events with type and optional content fields
255
256        Raises:
257            ParsingError: If LLM fails to parse query into valid structure
258            ValidationError: If parsed query fails business logic validation
259            UnknownRelationError: If spatial relation is not registered
260            LowConfidenceError: If confidence below threshold (strict mode only)
261
262        Examples:
263            Basic usage with async iteration:
264            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
265            ...     if event["type"] == "reasoning":
266            ...         print(f"Reasoning: {event['content']}")
267            ...     elif event["type"] == "data-response":
268            ...         geo_query = event["content"]
269            ...         print(f"Location: {geo_query['reference_location']['name']}")
270            ...     elif event["type"] == "error":
271            ...         print(f"Error: {event['content']}")
272
273            Using in a FastAPI streaming endpoint:
274            >>> from fastapi.responses import StreamingResponse
275            >>> @app.get("/stream")
276            >>> async def stream_endpoint(q: str):
277            ...     async def event_stream():
278            ...         async for event in parser.parse_stream(q):
279            ...             yield f"data: {json.dumps(event)}\\n\\n"
280            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
281        """
282        try:
283            # Signal start of stream
284            yield {"type": "start"}
285
286            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
287            formatted_messages = self.prompt.format_messages(query=query)
288
289            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
290            try:
291                response = await self.structured_llm.ainvoke(formatted_messages)
292            except Exception as e:
293                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
294                raise ParsingError(
295                    message=f"LLM invocation failed: {str(e)}",
296                    raw_response="",
297                    original_error=e,
298                ) from e
299
300            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
301            try:
302                geo_query = self._unpack_response(response)
303            except ParsingError:
304                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
305                raise
306
307            if geo_query.confidence_breakdown.reasoning:
308                yield {
309                    "type": "reasoning",
310                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
311                }
312
313            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
314            geo_query = self._finalize(geo_query, query)
315
316            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
317            yield {"type": "data-response", "content": geo_query.model_dump()}
318
319            # Signal successful completion
320            yield {"type": "finish"}
321
322        except Exception as e:
323            # Emit error event before re-raising
324            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
325            raise
326
327    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
328        """
329        Parse multiple queries in batch.
330
331        Note: This is a simple sequential implementation.
332        For true parallelization, consider using async methods or ThreadPoolExecutor.
333
334        Args:
335            queries: List of natural language queries
336
337        Returns:
338            List of GeoQuery objects (same order as input)
339
340        Raises:
341            Same exceptions as parse() for any failing query
342        """
343        return [self.parse(query) for query in queries]
344
345    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
346        """
347        Get list of available spatial relations.
348
349        Args:
350            category: Optional filter by category ("containment", "buffer", "directional")
351
352        Returns:
353            List of relation names
354        """
355        return self.spatial_config.list_relations(category=category)
356
357    def describe_relation(self, relation_name: str) -> str:
358        """
359        Get description of a spatial relation.
360
361        Args:
362            relation_name: Name of the relation
363
364        Returns:
365            Human-readable description
366
367        Raises:
368            UnknownRelationError: If relation is not registered
369        """
370        config = self.spatial_config.get_config(relation_name)
371        return config.description

Main entry point for parsing natural language location queries.

This class orchestrates the entire parsing pipeline:

  1. Initialize LLM with structured output
  2. Build prompt with spatial relations and examples
  3. Parse query through LLM
  4. Validate and enrich with defaults
  5. Return structured GeoQuery
Examples:

Basic usage:

>>> from langchain.chat_models import init_chat_model
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
>>> parser = GeoFilterParser(llm=llm)
>>> result = parser.parse("restaurants in Lausanne")
>>> print(result.reference_location.name)
'Lausanne'

With strict confidence mode:

>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
>>> result = parser.parse("near the station")  # May raise LowConfidenceError
GeoFilterParser( llm: langchain_core.language_models.chat_models.BaseChatModel, spatial_config: SpatialRelationConfig | None = None, confidence_threshold: float = 0.6, strict_mode: bool = False, include_examples: bool = True, datasource: GeoDataSource | None = None, additional_instructions: str | None = None)
44    def __init__(
45        self,
46        llm: BaseChatModel,
47        spatial_config: SpatialRelationConfig | None = None,
48        confidence_threshold: float = 0.6,
49        strict_mode: bool = False,
50        include_examples: bool = True,
51        datasource: GeoDataSource | None = None,
52        additional_instructions: str | None = None,
53    ):
54        """
55        Initialize the parser.
56
57        Args:
58            llm: LangChain LLM instance (required).
59            spatial_config: Spatial relation configuration. If None, uses defaults
60            confidence_threshold: Minimum confidence to accept (0-1)
61            strict_mode: If True, raise error on low confidence. If False, warn only
62            include_examples: Whether to include few-shot examples in prompt
63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
64                       about the concrete types available in that datasource for better type inference.
65            additional_instructions: Free-form text injected as a system message after the main
66                       system prompt and before few-shot examples. Use this to add caller-specific
67                       rules such as region-specific endonyms, domain aliases, or
68                       organization-specific place names without forking the default prompt.
69
70        Example:
71            >>> from langchain.chat_models import init_chat_model
72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
74            >>> datasource = SwissNames3DSource("data/")
75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
76        """
77        self.llm = llm
78
79        # Initialize spatial config
80        self.spatial_config = spatial_config or SpatialRelationConfig()
81
82        # Settings
83        self.confidence_threshold = confidence_threshold
84        self.strict_mode = strict_mode
85        self.include_examples = include_examples
86        self.datasource = datasource
87        self.additional_instructions = additional_instructions
88
89        # Build structured LLM
90        self.structured_llm = self._build_structured_llm()
91
92        # Build prompt template
93        self.prompt = self._build_prompt()

Initialize the parser.

Arguments:
  • llm: LangChain LLM instance (required).
  • spatial_config: Spatial relation configuration. If None, uses defaults
  • confidence_threshold: Minimum confidence to accept (0-1)
  • strict_mode: If True, raise error on low confidence. If False, warn only
  • include_examples: Whether to include few-shot examples in prompt
  • datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
  • additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model
>>> from etter.datasources.swissnames3d import SwissNames3DSource
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
>>> datasource = SwissNames3DSource("data/")
>>> parser = GeoFilterParser(llm=llm, datasource=datasource)
llm
spatial_config
confidence_threshold
strict_mode
include_examples
datasource
additional_instructions
structured_llm
prompt
def parse(self, query: str) -> GeoQuery:
146    def parse(self, query: str) -> GeoQuery:
147        """
148        Parse a natural language location query into structured format.
149
150        This is the main method for parsing queries. It:
151        1. Invokes the LLM with structured output
152        2. Validates the spatial relation is registered
153        3. Enriches with default parameters
154        4. Checks confidence threshold
155
156        Args:
157            query: Natural language query in any language
158
159        Returns:
160            GeoQuery: Structured query representation with confidence scores
161
162        Raises:
163            ParsingError: If LLM fails to parse query into valid structure
164            ValidationError: If parsed query fails business logic validation
165            UnknownRelationError: If spatial relation is not registered
166            LowConfidenceError: If confidence below threshold (strict mode only)
167
168        Warns:
169            LowConfidenceWarning: If confidence below threshold (permissive mode)
170
171        Examples:
172            Simple containment query:
173            >>> result = parser.parse("in Bern")
174            >>> result.reference_location.name
175            'Bern'
176            >>> result.spatial_relation.relation
177            'in'
178
179            Buffer query:
180            >>> result = parser.parse("near Lake Geneva")
181            >>> result.spatial_relation.relation
182            'near'
183            >>> result.buffer_config.distance_m
184            5000
185
186            Directional query:
187            >>> result = parser.parse("north of Lausanne")
188            >>> result.spatial_relation.relation
189            'north_of'
190            >>> result.reference_location.name
191            'Lausanne'
192
193            Multilingual:
194            >>> result = parser.parse("près de Genève")
195            >>> result.spatial_relation.relation
196            'near'
197            >>> result.reference_location.name
198            'Genève'
199        """
200        formatted_messages = self.prompt.format_messages(query=query)
201
202        try:
203            response = self.structured_llm.invoke(formatted_messages)
204        except Exception as e:
205            raise ParsingError(
206                message=f"LLM invocation failed: {str(e)}",
207                raw_response="",
208                original_error=e,
209            ) from e
210
211        return self._finalize(self._unpack_response(response), query)

Parse a natural language location query into structured format.

This is the main method for parsing queries. It:

  1. Invokes the LLM with structured output
  2. Validates the spatial relation is registered
  3. Enriches with default parameters
  4. Checks confidence threshold
Arguments:
  • query: Natural language query in any language
Returns:

GeoQuery: Structured query representation with confidence scores

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Warns:

LowConfidenceWarning: If confidence below threshold (permissive mode)

Examples:

Simple containment query:

>>> result = parser.parse("in Bern")
>>> result.reference_location.name
'Bern'
>>> result.spatial_relation.relation
'in'

Buffer query:

>>> result = parser.parse("near Lake Geneva")
>>> result.spatial_relation.relation
'near'
>>> result.buffer_config.distance_m
5000

Directional query:

>>> result = parser.parse("north of Lausanne")
>>> result.spatial_relation.relation
'north_of'
>>> result.reference_location.name
'Lausanne'

Multilingual:

>>> result = parser.parse("près de Genève")
>>> result.spatial_relation.relation
'near'
>>> result.reference_location.name
'Genève'
async def aparse(self, query: str) -> GeoQuery:
213    async def aparse(self, query: str) -> GeoQuery:
214        """
215        Asynchronously parse a natural language location query into structured format.
216
217        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
218        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
219        blocking. Validation is synchronous and runs after the LLM call.
220        """
221        formatted_messages = self.prompt.format_messages(query=query)
222
223        try:
224            response = await self.structured_llm.ainvoke(formatted_messages)
225        except Exception as e:
226            raise ParsingError(
227                message=f"LLM invocation failed: {str(e)}",
228                raw_response="",
229                original_error=e,
230            ) from e
231
232        return self._finalize(self._unpack_response(response), query)

Asynchronously parse a natural language location query into structured format.

Async counterpart to parse(). Uses ainvoke on the structured LLM so it can be awaited inside event loops (e.g. FastAPI endpoints) without blocking. Validation is synchronous and runs after the LLM call.

async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
234    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
235        """
236        Parse a natural language location query with streaming reasoning and results.
237
238        This method provides real-time feedback during the parsing process by yielding
239        intermediate reasoning steps and the final GeoQuery result. This is useful for
240        providing users with transparency into the LLM's decision-making process and
241        for building responsive UIs.
242
243        The stream yields dictionaries with the following event types:
244        - {"type": "start"} - Stream started
245        - {"type": "reasoning", "content": str} - Intermediate processing steps
246        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
247        - {"type": "error", "content": str} - Errors encountered during processing
248        - {"type": "finish"} - Stream completed successfully
249
250        Args:
251            query: Natural language query in any language
252
253        Yields:
254            dict: Stream events with type and optional content fields
255
256        Raises:
257            ParsingError: If LLM fails to parse query into valid structure
258            ValidationError: If parsed query fails business logic validation
259            UnknownRelationError: If spatial relation is not registered
260            LowConfidenceError: If confidence below threshold (strict mode only)
261
262        Examples:
263            Basic usage with async iteration:
264            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
265            ...     if event["type"] == "reasoning":
266            ...         print(f"Reasoning: {event['content']}")
267            ...     elif event["type"] == "data-response":
268            ...         geo_query = event["content"]
269            ...         print(f"Location: {geo_query['reference_location']['name']}")
270            ...     elif event["type"] == "error":
271            ...         print(f"Error: {event['content']}")
272
273            Using in a FastAPI streaming endpoint:
274            >>> from fastapi.responses import StreamingResponse
275            >>> @app.get("/stream")
276            >>> async def stream_endpoint(q: str):
277            ...     async def event_stream():
278            ...         async for event in parser.parse_stream(q):
279            ...             yield f"data: {json.dumps(event)}\\n\\n"
280            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
281        """
282        try:
283            # Signal start of stream
284            yield {"type": "start"}
285
286            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
287            formatted_messages = self.prompt.format_messages(query=query)
288
289            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
290            try:
291                response = await self.structured_llm.ainvoke(formatted_messages)
292            except Exception as e:
293                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
294                raise ParsingError(
295                    message=f"LLM invocation failed: {str(e)}",
296                    raw_response="",
297                    original_error=e,
298                ) from e
299
300            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
301            try:
302                geo_query = self._unpack_response(response)
303            except ParsingError:
304                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
305                raise
306
307            if geo_query.confidence_breakdown.reasoning:
308                yield {
309                    "type": "reasoning",
310                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
311                }
312
313            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
314            geo_query = self._finalize(geo_query, query)
315
316            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
317            yield {"type": "data-response", "content": geo_query.model_dump()}
318
319            # Signal successful completion
320            yield {"type": "finish"}
321
322        except Exception as e:
323            # Emit error event before re-raising
324            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
325            raise

Parse a natural language location query with streaming reasoning and results.

This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.

The stream yields dictionaries with the following event types:

  • {"type": "start"} - Stream started
  • {"type": "reasoning", "content": str} - Intermediate processing steps
  • {"type": "data-response", "content": dict} - Final GeoQuery as JSON
  • {"type": "error", "content": str} - Errors encountered during processing
  • {"type": "finish"} - Stream completed successfully
Arguments:
  • query: Natural language query in any language
Yields:

dict: Stream events with type and optional content fields

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Examples:

Basic usage with async iteration:

>>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
...     if event["type"] == "reasoning":
...         print(f"Reasoning: {event['content']}")
...     elif event["type"] == "data-response":
...         geo_query = event["content"]
...         print(f"Location: {geo_query['reference_location']['name']}")
...     elif event["type"] == "error":
...         print(f"Error: {event['content']}")

Using in a FastAPI streaming endpoint:

>>> from fastapi.responses import StreamingResponse
>>> @app.get("/stream")
>>> async def stream_endpoint(q: str):
...     async def event_stream():
...         async for event in parser.parse_stream(q):
...             yield f"data: {json.dumps(event)}\n\n"
...     return StreamingResponse(event_stream(), media_type="text/event-stream")
def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
327    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
328        """
329        Parse multiple queries in batch.
330
331        Note: This is a simple sequential implementation.
332        For true parallelization, consider using async methods or ThreadPoolExecutor.
333
334        Args:
335            queries: List of natural language queries
336
337        Returns:
338            List of GeoQuery objects (same order as input)
339
340        Raises:
341            Same exceptions as parse() for any failing query
342        """
343        return [self.parse(query) for query in queries]

Parse multiple queries in batch.

Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.

Arguments:
  • queries: List of natural language queries
Returns:

List of GeoQuery objects (same order as input)

Raises:
  • Same exceptions as parse() for any failing query
def get_available_relations( self, category: Optional[Literal['containment', 'buffer', 'directional']] = None) -> list[str]:
345    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
346        """
347        Get list of available spatial relations.
348
349        Args:
350            category: Optional filter by category ("containment", "buffer", "directional")
351
352        Returns:
353            List of relation names
354        """
355        return self.spatial_config.list_relations(category=category)

Get list of available spatial relations.

Arguments:
  • category: Optional filter by category ("containment", "buffer", "directional")
Returns:

List of relation names

def describe_relation(self, relation_name: str) -> str:
357    def describe_relation(self, relation_name: str) -> str:
358        """
359        Get description of a spatial relation.
360
361        Args:
362            relation_name: Name of the relation
363
364        Returns:
365            Human-readable description
366
367        Raises:
368            UnknownRelationError: If relation is not registered
369        """
370        config = self.spatial_config.get_config(relation_name)
371        return config.description

Get description of a spatial relation.

Arguments:
  • relation_name: Name of the relation
Returns:

Human-readable description

Raises:
  • UnknownRelationError: If relation is not registered
class GeoQuery(pydantic.main.BaseModel):
117class GeoQuery(BaseModel):
118    """
119    Root model representing a parsed geographic query.
120    This is the main output structure returned by the parser.
121    """
122
123    query_type: Literal["simple", "compound", "split", "boolean"] = Field(
124        "simple",
125        description="Type of query. Phase 1 only supports 'simple'. "
126        "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations",
127    )
128    spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location")
129    reference_location: ReferenceLocation | None = Field(
130        None,
131        description="Reference location for the spatial query. "
132        "None when the query contains no named geographic location.",
133    )
134    buffer_config: BufferConfig | None = Field(
135        None,
136        description="Buffer configuration for buffer and directional relations. "
137        "Auto-generated with defaults by enrich_with_defaults() if not provided. "
138        "Required for 'near', 'around', 'north_of', etc. "
139        "Set to None for containment relations ('in').",
140    )
141    confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse")
142    original_query: str = Field(description="Original query text exactly as provided by the user")
143
144    @model_validator(mode="after")
145    def validate_buffer_config_consistency(self) -> "GeoQuery":
146        """Validate buffer_config consistency with relation category."""
147        # Buffer and directional relations must have buffer_config
148        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
149            raise ValueError(
150                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
151            )
152
153        # Containment relations should not have buffer_config
154        if self.spatial_relation.category == "containment" and self.buffer_config is not None:
155            raise ValueError(
156                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
157                f"should not have buffer_config"
158            )
159
160        return self

Root model representing a parsed geographic query. This is the main output structure returned by the parser.

query_type: Literal['simple', 'compound', 'split', 'boolean'] = 'simple'

Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations

spatial_relation: SpatialRelation = PydanticUndefined

Spatial relationship to reference location

reference_location: ReferenceLocation | None = None

Reference location for the spatial query. None when the query contains no named geographic location.

buffer_config: BufferConfig | None = None

Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').

confidence_breakdown: ConfidenceScore = PydanticUndefined

Confidence scores for different aspects of the parse

original_query: str = PydanticUndefined

Original query text exactly as provided by the user

@model_validator(mode='after')
def validate_buffer_config_consistency(self) -> GeoQuery:
144    @model_validator(mode="after")
145    def validate_buffer_config_consistency(self) -> "GeoQuery":
146        """Validate buffer_config consistency with relation category."""
147        # Buffer and directional relations must have buffer_config
148        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
149            raise ValueError(
150                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
151            )
152
153        # Containment relations should not have buffer_config
154        if self.spatial_relation.category == "containment" and self.buffer_config is not None:
155            raise ValueError(
156                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
157                f"should not have buffer_config"
158            )
159
160        return self

Validate buffer_config consistency with relation category.

class SpatialRelation(pydantic.main.BaseModel):
 96class SpatialRelation(BaseModel):
 97    """A spatial relationship between target and reference."""
 98
 99    relation: str = Field(
100        description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', "
101        "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list."
102    )
103    category: RelationCategory = Field(
104        description="Category of spatial relation. "
105        "'containment' = exact boundary matching (in), "
106        "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), "
107        "'directional' = sector-based queries (north_of, south_of, east_of, west_of)"
108    )
109    explicit_distance: float | None = Field(
110        None,
111        description="Distance in meters if explicitly mentioned by user. "
112        "For example: 'within 5km' → 5000, 'within 500 meters' → 500. "
113        "Leave null if not explicitly stated.",
114    )

A spatial relationship between target and reference.

relation: str = PydanticUndefined

Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.

category: Literal['containment', 'buffer', 'directional'] = PydanticUndefined

Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), 'directional' = sector-based queries (north_of, south_of, east_of, west_of)

explicit_distance: float | None = None

Distance in meters if explicitly mentioned by user. For example: 'within 5km' → 5000, 'within 500 meters' → 500. Leave null if not explicitly stated.

class ReferenceLocation(pydantic.main.BaseModel):
36class ReferenceLocation(BaseModel):
37    """A geographic reference location extracted from the query."""
38
39    name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')")
40    # FIXME: enum ?
41    type: str | None = Field(
42        None,
43        description="Type hint for geographic feature (city, lake, mountain, canton, country, "
44        "train_station, airport, river, road, etc.). This is a HINT for ranking results, "
45        "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, "
46        "'Rhone' could be river or road), provide your best guess or leave null. "
47        "The datasource will return multiple types ranked by relevance.",
48    )
49    type_confidence: ConfidenceLevel | None = Field(
50        None,
51        description="Confidence in the type inference (0-1). High confidence (>0.8) when type is "
52        "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous "
53        "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, "
54        "'in X' → city/region, 'on X' → lake/mountain.",
55    )

A geographic reference location extracted from the query.

name: str = PydanticUndefined

Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')

type: str | None = None

Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.

type_confidence: Optional[Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]] = None

Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.

class BufferConfig(pydantic.main.BaseModel):
58class BufferConfig(BaseModel):
59    """Configuration for buffer-based spatial operations."""
60
61    distance_m: float = Field(
62        description="Buffer distance in meters. Positive values expand outward (proximity), "
63        "negative values erode inward (e.g., 'in the heart of'). "
64        "Examples: 5000 = 5km radius, -500 = 500m erosion"
65    )
66    buffer_from: Literal["center", "boundary"] = Field(
67        description="Buffer origin. 'center' = buffer from centroid point (for proximity), "
68        "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)"
69    )
70    ring_only: bool = Field(
71        False,
72        description="If True, exclude the reference feature itself to create a ring/donut shape. "
73        "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). "
74        "Only valid with buffer_from='boundary'.",
75    )
76    side: Literal["left", "right"] | None = Field(
77        None,
78        description="Side of a linear feature for one-sided buffer. "
79        "'left' = left side relative to line direction, 'right' = right side. "
80        "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().",
81    )
82    inferred: bool = Field(
83        True,
84        description="True if this configuration was inferred from relation defaults. "
85        "False if the user explicitly specified distance or buffer parameters.",
86    )
87
88    @model_validator(mode="after")
89    def validate_ring_only(self) -> "BufferConfig":
90        """Validate that ring_only is only used with boundary buffers."""
91        if self.ring_only and self.buffer_from == "center":
92            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
93        return self

Configuration for buffer-based spatial operations.

distance_m: float = PydanticUndefined

Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion

buffer_from: Literal['center', 'boundary'] = PydanticUndefined

Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)

ring_only: bool = False

If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.

side: Optional[Literal['left', 'right']] = None

Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().

inferred: bool = True

True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.

@model_validator(mode='after')
def validate_ring_only(self) -> BufferConfig:
88    @model_validator(mode="after")
89    def validate_ring_only(self) -> "BufferConfig":
90        """Validate that ring_only is only used with boundary buffers."""
91        if self.ring_only and self.buffer_from == "center":
92            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
93        return self

Validate that ring_only is only used with boundary buffers.

class ConfidenceScore(pydantic.main.BaseModel):
16class ConfidenceScore(BaseModel):
17    """Confidence scores for different aspects of the parsed query."""
18
19    overall: ConfidenceLevel = Field(
20        description="Overall confidence score for the entire query parse. "
21        "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain",
22    )
23    location_confidence: ConfidenceLevel = Field(
24        description="Confidence in correctly identifying the reference location",
25    )
26    relation_confidence: ConfidenceLevel = Field(
27        description="Confidence in correctly identifying the spatial relation",
28    )
29    reasoning: str | None = Field(
30        None,
31        description="Explanation for confidence scores. Always include reasoning for clarity and debugging. "
32        "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.",
33    )

Confidence scores for different aspects of the parsed query.

overall: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain

location_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the reference location

relation_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the spatial relation

reasoning: str | None = None

Explanation for confidence scores. Always include reasoning for clarity and debugging. For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.

ConfidenceLevel = typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]
GeometryFormat = typing.Literal['geojson', 'wkt', 'wkb']
class SpatialRelationConfig:
 40class SpatialRelationConfig:
 41    """
 42    Registry and configuration for spatial relations.
 43
 44    Manages built-in and custom spatial relations with their default parameters.
 45    """
 46
 47    def __init__(self):
 48        """Initialize with built-in spatial relations."""
 49        self.relations: dict[str, RelationConfig] = {}
 50        self._initialize_defaults()
 51
 52    def _initialize_defaults(self):
 53        """Register built-in spatial relations from ARCHITECTURE.md."""
 54
 55        # ===== CONTAINMENT RELATIONS =====
 56        self.register_relation(
 57            RelationConfig(
 58                name="in",
 59                category="containment",
 60                description="Feature is within the reference boundary",
 61            )
 62        )
 63
 64        # ===== BUFFER/PROXIMITY RELATIONS =====
 65        self.register_relation(
 66            RelationConfig(
 67                name="near",
 68                category="buffer",
 69                description="Proximity search with default 5km radius",
 70                default_distance_m=5000,
 71                buffer_from="center",
 72            )
 73        )
 74
 75        self.register_relation(
 76            RelationConfig(
 77                name="on_shores_of",
 78                category="buffer",
 79                description="Ring buffer around lake/water boundary, excluding the water body itself",
 80                default_distance_m=1000,
 81                buffer_from="boundary",
 82                ring_only=True,
 83            )
 84        )
 85
 86        self.register_relation(
 87            RelationConfig(
 88                name="along",
 89                category="buffer",
 90                description="Buffer following a linear feature like a river or road",
 91                default_distance_m=500,
 92                buffer_from="boundary",
 93            )
 94        )
 95
 96        self.register_relation(
 97            RelationConfig(
 98                name="left_bank",
 99                category="buffer",
100                description="Left bank of a linear feature (river, road) relative to its direction/flow",
101                default_distance_m=500,
102                buffer_from="boundary",
103                side="left",
104            )
105        )
106
107        self.register_relation(
108            RelationConfig(
109                name="right_bank",
110                category="buffer",
111                description="Right bank of a linear feature (river, road) relative to its direction/flow",
112                default_distance_m=500,
113                buffer_from="boundary",
114                side="right",
115            )
116        )
117
118        self.register_relation(
119            RelationConfig(
120                name="in_the_heart_of",
121                category="buffer",
122                description="Central area excluding periphery (negative buffer - erosion)",
123                default_distance_m=-500,
124                buffer_from="boundary",
125            )
126        )
127
128        # ===== DIRECTIONAL RELATIONS =====
129        # All directional relations use consistent defaults:
130        # - Distance: 10km radius (default_distance_m=10000)
131        # - Sector: 90° angular wedge (sector_angle_degrees=90)
132        # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults)
133        # These defaults are applied automatically by enrich_with_defaults() for any directional query.
134        # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West)
135        self.register_relation(
136            RelationConfig(
137                name="north_of",
138                category="directional",
139                description="Directional sector north of reference",
140                default_distance_m=10000,
141                sector_angle_degrees=90,
142                direction_angle_degrees=0,
143            )
144        )
145
146        self.register_relation(
147            RelationConfig(
148                name="south_of",
149                category="directional",
150                description="Directional sector south of reference",
151                default_distance_m=10000,
152                sector_angle_degrees=90,
153                direction_angle_degrees=180,
154            )
155        )
156
157        self.register_relation(
158            RelationConfig(
159                name="east_of",
160                category="directional",
161                description="Directional sector east of reference",
162                default_distance_m=10000,
163                sector_angle_degrees=90,
164                direction_angle_degrees=90,
165            )
166        )
167
168        self.register_relation(
169            RelationConfig(
170                name="west_of",
171                category="directional",
172                description="Directional sector west of reference",
173                default_distance_m=10000,
174                sector_angle_degrees=90,
175                direction_angle_degrees=270,
176            )
177        )
178
179        # ===== DIAGONAL DIRECTIONAL RELATIONS =====
180        self.register_relation(
181            RelationConfig(
182                name="northeast_of",
183                category="directional",
184                description="Directional sector northeast of reference",
185                default_distance_m=10000,
186                sector_angle_degrees=90,
187                direction_angle_degrees=45,
188            )
189        )
190
191        self.register_relation(
192            RelationConfig(
193                name="southeast_of",
194                category="directional",
195                description="Directional sector southeast of reference",
196                default_distance_m=10000,
197                sector_angle_degrees=90,
198                direction_angle_degrees=135,
199            )
200        )
201
202        self.register_relation(
203            RelationConfig(
204                name="southwest_of",
205                category="directional",
206                description="Directional sector southwest of reference",
207                default_distance_m=10000,
208                sector_angle_degrees=90,
209                direction_angle_degrees=225,
210            )
211        )
212
213        self.register_relation(
214            RelationConfig(
215                name="northwest_of",
216                category="directional",
217                description="Directional sector northwest of reference",
218                default_distance_m=10000,
219                sector_angle_degrees=90,
220                direction_angle_degrees=315,
221            )
222        )
223
224    def register_relation(self, config: RelationConfig) -> None:
225        """Register a new spatial relation."""
226        self.relations[config.name] = config
227
228    def has_relation(self, name: str) -> bool:
229        """Check if a relation is registered."""
230        return name in self.relations
231
232    def get_config(self, name: str) -> RelationConfig:
233        """Get configuration for a relation. Raises UnknownRelationError if not found."""
234        if not self.has_relation(name):
235            raise UnknownRelationError(
236                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
237                relation_name=name,
238            )
239        return self.relations[name]
240
241    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
242        """List available relation names."""
243        if category is None:
244            return sorted(self.relations.keys())
245        return sorted(r.name for r in self.relations.values() if r.category == category)
246
247    def format_for_prompt(self) -> str:
248        """Format relations for inclusion in LLM prompt."""
249        lines = []
250
251        # Group by category
252        for category in get_args(RelationCategory):
253            category_relations = [r for r in self.relations.values() if r.category == category]
254            if not category_relations:
255                continue
256
257            lines.append(f"\n{category.upper()} RELATIONS:")
258
259            for rel in sorted(category_relations, key=lambda r: r.name):
260                # Build distance info
261                dist_info = ""
262                if rel.default_distance_m is not None:
263                    dist_str = f"{abs(rel.default_distance_m)}m"
264                    if rel.default_distance_m < 0:
265                        dist_info = f" (default: {dist_str} erosion)"
266                    else:
267                        dist_info = f" (default: {dist_str})"
268
269                # Build special flags
270                flags = []
271                if rel.ring_only:
272                    flags.append("ring buffer")
273                if rel.buffer_from:
274                    flags.append(f"from {rel.buffer_from}")
275                if rel.side:
276                    flags.append(f"{rel.side} side only")
277                flag_info = f" [{', '.join(flags)}]" if flags else ""
278
279                # Format line
280                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
281                lines.append(f"    {rel.description}")
282
283        # Add notes
284        lines.append("\nNOTES:")
285        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
286        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake)")
287        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
288
289        return "\n".join(lines)

Registry and configuration for spatial relations.

Manages built-in and custom spatial relations with their default parameters.

SpatialRelationConfig()
47    def __init__(self):
48        """Initialize with built-in spatial relations."""
49        self.relations: dict[str, RelationConfig] = {}
50        self._initialize_defaults()

Initialize with built-in spatial relations.

relations: dict[str, RelationConfig]
def register_relation(self, config: RelationConfig) -> None:
224    def register_relation(self, config: RelationConfig) -> None:
225        """Register a new spatial relation."""
226        self.relations[config.name] = config

Register a new spatial relation.

def has_relation(self, name: str) -> bool:
228    def has_relation(self, name: str) -> bool:
229        """Check if a relation is registered."""
230        return name in self.relations

Check if a relation is registered.

def get_config(self, name: str) -> RelationConfig:
232    def get_config(self, name: str) -> RelationConfig:
233        """Get configuration for a relation. Raises UnknownRelationError if not found."""
234        if not self.has_relation(name):
235            raise UnknownRelationError(
236                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
237                relation_name=name,
238            )
239        return self.relations[name]

Get configuration for a relation. Raises UnknownRelationError if not found.

def list_relations( self, category: Optional[Literal['containment', 'buffer', 'directional']] = None) -> list[str]:
241    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
242        """List available relation names."""
243        if category is None:
244            return sorted(self.relations.keys())
245        return sorted(r.name for r in self.relations.values() if r.category == category)

List available relation names.

def format_for_prompt(self) -> str:
247    def format_for_prompt(self) -> str:
248        """Format relations for inclusion in LLM prompt."""
249        lines = []
250
251        # Group by category
252        for category in get_args(RelationCategory):
253            category_relations = [r for r in self.relations.values() if r.category == category]
254            if not category_relations:
255                continue
256
257            lines.append(f"\n{category.upper()} RELATIONS:")
258
259            for rel in sorted(category_relations, key=lambda r: r.name):
260                # Build distance info
261                dist_info = ""
262                if rel.default_distance_m is not None:
263                    dist_str = f"{abs(rel.default_distance_m)}m"
264                    if rel.default_distance_m < 0:
265                        dist_info = f" (default: {dist_str} erosion)"
266                    else:
267                        dist_info = f" (default: {dist_str})"
268
269                # Build special flags
270                flags = []
271                if rel.ring_only:
272                    flags.append("ring buffer")
273                if rel.buffer_from:
274                    flags.append(f"from {rel.buffer_from}")
275                if rel.side:
276                    flags.append(f"{rel.side} side only")
277                flag_info = f" [{', '.join(flags)}]" if flags else ""
278
279                # Format line
280                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
281                lines.append(f"    {rel.description}")
282
283        # Add notes
284        lines.append("\nNOTES:")
285        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
286        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake)")
287        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
288
289        return "\n".join(lines)

Format relations for inclusion in LLM prompt.

@dataclass
class RelationConfig:
13@dataclass
14class RelationConfig:
15    """
16    Configuration for a single spatial relation.
17
18    Attributes:
19        name: Relation identifier (e.g., "in", "near", "north_of")
20        category: Type of spatial operation
21        description: Human-readable description for LLM prompt
22        default_distance_m: Default buffer distance in meters
23        buffer_from: Buffer origin
24        ring_only: Exclude reference feature to create ring buffer
25        sector_angle_degrees: Angular sector for directional queries
26        direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
27    """
28
29    name: str
30    category: RelationCategory
31    description: str
32    default_distance_m: float | None = None
33    buffer_from: Literal["center", "boundary"] | None = None
34    ring_only: bool = False
35    side: Literal["left", "right"] | None = None
36    sector_angle_degrees: float | None = None
37    direction_angle_degrees: float | None = None

Configuration for a single spatial relation.

Attributes:
  • name: Relation identifier (e.g., "in", "near", "north_of")
  • category: Type of spatial operation
  • description: Human-readable description for LLM prompt
  • default_distance_m: Default buffer distance in meters
  • buffer_from: Buffer origin
  • ring_only: Exclude reference feature to create ring buffer
  • sector_angle_degrees: Angular sector for directional queries
  • direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
RelationConfig( name: str, category: Literal['containment', 'buffer', 'directional'], description: str, default_distance_m: float | None = None, buffer_from: Optional[Literal['center', 'boundary']] = None, ring_only: bool = False, side: Optional[Literal['left', 'right']] = None, sector_angle_degrees: float | None = None, direction_angle_degrees: float | None = None)
name: str
category: Literal['containment', 'buffer', 'directional']
description: str
default_distance_m: float | None = None
buffer_from: Optional[Literal['center', 'boundary']] = None
ring_only: bool = False
side: Optional[Literal['left', 'right']] = None
sector_angle_degrees: float | None = None
direction_angle_degrees: float | None = None
class GeoFilterError(builtins.Exception):
 7class GeoFilterError(Exception):
 8    """Base exception for all GeoFilter errors."""
 9
10    pass

Base exception for all GeoFilter errors.

class ParsingError(etter.GeoFilterError):
13class ParsingError(GeoFilterError):
14    """LLM failed to parse query into valid structure."""
15
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

LLM failed to parse query into valid structure.

ParsingError( message: str, raw_response: str = '', original_error: Exception | None = None)
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

Initialize parsing error.

Arguments:
  • message: Error description
  • raw_response: Raw response from LLM
  • original_error: Original exception that caused parsing failure
raw_response
original_error
class ValidationError(etter.GeoFilterError):
30class ValidationError(GeoFilterError):
31    """Structured output is valid but fails business logic validation."""
32
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Structured output is valid but fails business logic validation.

ValidationError(message: str, field: str | None = None, detail: str | None = None)
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
field
detail
class NoReferenceLocationError(etter.ValidationError):
47class NoReferenceLocationError(ValidationError):
48    """Query contains no named geographic reference location."""
49
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Query contains no named geographic reference location.

NoReferenceLocationError(message: str)
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
class UnknownRelationError(etter.ValidationError):
54class UnknownRelationError(ValidationError):
55    """Spatial relation is not registered in configuration."""
56
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Spatial relation is not registered in configuration.

UnknownRelationError(message: str, relation_name: str)
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Initialize unknown relation error.

Arguments:
  • message: Error description
  • relation_name: The unknown relation name
relation_name
class LowConfidenceError(etter.GeoFilterError):
69class LowConfidenceError(GeoFilterError):
70    """Query confidence is below threshold (strict mode)."""
71
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Query confidence is below threshold (strict mode).

LowConfidenceError(message: str, confidence: float, reasoning: str | None = None)
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Initialize low confidence error.

Arguments:
  • message: Error description
  • confidence: Confidence score (0-1)
  • reasoning: Optional explanation for low confidence
confidence
reasoning
class LowConfidenceWarning(builtins.UserWarning):
86class LowConfidenceWarning(UserWarning):
87    """Query confidence is below threshold (permissive mode)."""
88
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Query confidence is below threshold (permissive mode).

LowConfidenceWarning(confidence: float, message: str = '')
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Initialize low confidence warning.

Arguments:
  • confidence: Confidence score (0-1)
  • message: Warning message
confidence
class GeoDataSource(typing.Protocol):
12class GeoDataSource(Protocol):
13    """
14    Protocol for geographic data sources.
15
16    Implementations resolve location names to geographic features.
17    Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
18
19    Example of returned feature:
20        {
21            "type": "Feature",
22            "id": "uuid-123",
23            "geometry": {"type": "Point", "coordinates": [8.5, 47.3]},
24            "bbox": [8.4, 47.3, 8.6, 47.4],
25            "properties": {
26                "name": "Zürich",
27                "type": "city",
28                "confidence": 1.0,
29                ...
30            }
31        }
32    """
33
34    def search(
35        self,
36        name: str,
37        type: str | None = None,
38        max_results: int = 10,
39    ) -> list[dict[str, Any]]:
40        """
41        Search for geographic features by name.
42
43        Args:
44            name: Location name to search for (e.g., "Lake Geneva", "Bern").
45            type: Optional type hint for filtering/ranking results.
46                  Examples: "lake", "city", "mountain", "canton", "river".
47                  When provided, matching types are ranked higher.
48            max_results: Maximum number of results to return.
49
50        Returns:
51            List of matching GeoJSON Feature dicts, ranked by relevance.
52            Returns empty list if no matches found.
53        """
54        ...
55
56    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
57        """
58        Get a specific feature by its unique identifier.
59
60        Args:
61            feature_id: Unique identifier from the data source.
62
63        Returns:
64            The matching GeoJSON Feature dict, or None if not found.
65        """
66        ...
67
68    def get_available_types(self) -> list[str]:
69        """
70        Get list of concrete geographic types this datasource can return.
71
72        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
73        that this datasource uses in the "type" property of returned features.
74        These types can be matched against the location type hierarchy for fuzzy matching.
75
76        The returned types should be a subset of or mapped to the standard location
77        type hierarchy defined in location_types.TYPE_HIERARCHY.
78
79        Returns:
80            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
81            Empty list if this datasource does not provide type information.
82
83        Example:
84            >>> source = SwissNames3DSource("data/")
85            >>> types = source.get_available_types()
86            >>> print(types)
87            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
88        """
89        ...

Protocol for geographic data sources.

Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).

Example of returned feature:

{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }

GeoDataSource(*args, **kwargs)
1960def _no_init_or_replace_init(self, *args, **kwargs):
1961    cls = type(self)
1962
1963    if cls._is_protocol:
1964        raise TypeError('Protocols cannot be instantiated')
1965
1966    # Already using a custom `__init__`. No need to calculate correct
1967    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1968    if cls.__init__ is not _no_init_or_replace_init:
1969        return
1970
1971    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1972    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1973    # searches for a proper new `__init__` in the MRO. The new `__init__`
1974    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1975    # instantiation of the protocol subclass will thus use the new
1976    # `__init__` and no longer call `_no_init_or_replace_init`.
1977    for base in cls.__mro__:
1978        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1979        if init is not _no_init_or_replace_init:
1980            cls.__init__ = init
1981            break
1982    else:
1983        # should not happen
1984        cls.__init__ = object.__init__
1985
1986    cls.__init__(self, *args, **kwargs)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
34    def search(
35        self,
36        name: str,
37        type: str | None = None,
38        max_results: int = 10,
39    ) -> list[dict[str, Any]]:
40        """
41        Search for geographic features by name.
42
43        Args:
44            name: Location name to search for (e.g., "Lake Geneva", "Bern").
45            type: Optional type hint for filtering/ranking results.
46                  Examples: "lake", "city", "mountain", "canton", "river".
47                  When provided, matching types are ranked higher.
48            max_results: Maximum number of results to return.
49
50        Returns:
51            List of matching GeoJSON Feature dicts, ranked by relevance.
52            Returns empty list if no matches found.
53        """
54        ...

Search for geographic features by name.

Arguments:
  • name: Location name to search for (e.g., "Lake Geneva", "Bern").
  • type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
56    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
57        """
58        Get a specific feature by its unique identifier.
59
60        Args:
61            feature_id: Unique identifier from the data source.
62
63        Returns:
64            The matching GeoJSON Feature dict, or None if not found.
65        """
66        ...

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier from the data source.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
68    def get_available_types(self) -> list[str]:
69        """
70        Get list of concrete geographic types this datasource can return.
71
72        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
73        that this datasource uses in the "type" property of returned features.
74        These types can be matched against the location type hierarchy for fuzzy matching.
75
76        The returned types should be a subset of or mapped to the standard location
77        type hierarchy defined in location_types.TYPE_HIERARCHY.
78
79        Returns:
80            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
81            Empty list if this datasource does not provide type information.
82
83        Example:
84            >>> source = SwissNames3DSource("data/")
85            >>> types = source.get_available_types()
86            >>> print(types)
87            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
88        """
89        ...

Get list of concrete geographic types this datasource can return.

Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.

The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.

Returns:

List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.

Example:
>>> source = SwissNames3DSource("data/")
>>> types = source.get_available_types()
>>> print(types)
['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
class SwissNames3DSource:
162class SwissNames3DSource:
163    """
164    Geographic data source backed by swisstopo's swissNAMES3D dataset.
165
166    Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase
167    and provides search by name with optional type filtering.
168
169    If data_path is a directory, automatically loads and concatenates all SwissNames3D
170    shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
171
172    All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
173
174    Args:
175        data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
176        layer: Layer name within the data source (for multi-layer formats like GDB).
177
178    Example:
179        >>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
180        >>> results = source.search("Lac Léman", type="lake")
181        >>> print(results[0].geometry)  # GeoJSON in WGS84
182    """
183
184    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
185        self._data_path = Path(data_path)
186        self._layer = layer
187        self._gdf: gpd.GeoDataFrame | None = None
188        self._name_index: dict[str, list[int]] = {}
189
190    def _ensure_loaded(self) -> None:
191        """Load data lazily on first access."""
192        if self._gdf is not None:
193            return
194        self._load_data()
195
196    def _load_data(self) -> None:
197        """Load SwissNames3D data and build the name index."""
198        # Check if data_path is a directory
199        if self._data_path.is_dir():
200            self._load_from_directory()
201        else:
202            # Load single file
203            kwargs: dict[str, Any] = {}
204            if self._layer is not None:
205                kwargs["layer"] = self._layer
206            self._gdf = gpd.read_file(str(self._data_path), **kwargs)
207
208        self._build_name_index()
209
210    def _load_from_directory(self) -> None:
211        """Load and concatenate all SwissNames3D shapefiles from a directory."""
212        # Look for the 3 standard SwissNames3D shapefiles
213        shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"]
214        gdfs: list[gpd.GeoDataFrame] = []
215
216        for name in shapefile_names:
217            shp_path = self._data_path / f"{name}.shp"
218            if shp_path.exists():
219                gdf = gpd.read_file(str(shp_path))
220                gdfs.append(gdf)
221
222        if not gdfs:
223            raise ValueError(
224                f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}"
225            )
226
227        # Find common columns across all loaded GeoDataFrames
228        common_cols = set(gdfs[0].columns)
229        for gdf in gdfs[1:]:
230            common_cols &= set(gdf.columns)
231
232        # Keep only common columns and concatenate
233        gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs]
234        self._gdf = gpd.GeoDataFrame(
235            gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry"
236        )
237
238    def _build_name_index(self) -> None:
239        """Build a normalized name → row indices lookup for fast search."""
240        assert self._gdf is not None
241        self._name_index = {}
242
243        name_col = self._detect_name_column()
244        for idx, name in enumerate(self._gdf[name_col]):
245            if not isinstance(name, str) or not name.strip():
246                continue
247            normalized = _normalize_name(name)
248            if normalized not in self._name_index:
249                self._name_index[normalized] = []
250            self._name_index[normalized].append(idx)
251
252    def _detect_name_column(self) -> str:
253        """Detect the name column in the data."""
254        assert self._gdf is not None
255        for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"):
256            if candidate in self._gdf.columns:
257                return candidate
258        raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}")
259
260    def _detect_type_column(self) -> str | None:
261        """Detect the feature type column in the data."""
262        assert self._gdf is not None
263        for candidate in ("OBJEKTART", "objektart", "Objektart"):
264            if candidate in self._gdf.columns:
265                return candidate
266        return None
267
268    def _detect_id_column(self) -> str | None:
269        """Detect the unique ID column in the data."""
270        assert self._gdf is not None
271        for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"):
272            if candidate in self._gdf.columns:
273                return candidate
274        return None
275
276    def _row_to_feature(self, idx: int) -> dict[str, Any]:
277        """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry."""
278        assert self._gdf is not None
279        row = self._gdf.iloc[idx]
280
281        # Get name
282        name_col = self._detect_name_column()
283        name = str(row[name_col])
284
285        # Get type
286        type_col = self._detect_type_column()
287        raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown"
288        normalized_type = _objektart_to_type(raw_type)
289
290        # Get ID
291        id_col = self._detect_id_column()
292        feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx)
293
294        # Convert geometry to WGS84 GeoJSON
295        geom = row.geometry
296        if geom is None or geom.is_empty:
297            geometry = {"type": "Point", "coordinates": [0, 0]}
298            bbox = None
299        else:
300            # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer
301            # Drop Z coordinates — they are not needed and cause issues with single_sided buffers
302            wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom))
303            geometry = mapping(wgs84_geom)
304            bounds = wgs84_geom.bounds  # (minx, miny, maxx, maxy)
305            bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
306
307        # Collect extra properties
308        skip_cols = {name_col, "geometry"}
309        if type_col:
310            skip_cols.add(type_col)
311        if id_col:
312            skip_cols.add(id_col)
313
314        properties: dict[str, Any] = {
315            "name": name,
316            "type": normalized_type,
317            "confidence": 1.0,
318        }
319        for col in self._gdf.columns:
320            if col not in skip_cols:
321                val = row.get(col)
322                if val is not None and str(val) != "nan":
323                    properties[col] = val
324
325        return {
326            "type": "Feature",
327            "id": feature_id,
328            "geometry": geometry,
329            "bbox": bbox,
330            "properties": properties,
331        }
332
333    def search(
334        self,
335        name: str,
336        type: str | None = None,
337        max_results: int = 10,
338    ) -> list[dict[str, Any]]:
339        """
340        Search for geographic features by name.
341
342        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
343        First tries exact matching, then falls back to fuzzy matching if no exact
344        matches found.
345
346        Args:
347            name: Location name to search for.
348            type: Optional type hint to filter results. If provided, only features
349                  of this type are returned.
350            max_results: Maximum number of results to return.
351
352        Returns:
353            List of matching GeoJSON Feature dicts. If type is provided, only
354            features of that type are returned. Empty list if no matches found.
355        """
356        self._ensure_loaded()
357
358        normalized = _normalize_name(name)
359        indices = self._name_index.get(normalized, [])
360
361        # If no exact match, try fuzzy matching
362        if not indices:
363            indices = self._fuzzy_search(normalized)
364
365        features = [self._row_to_feature(idx) for idx in indices]
366
367        # Filter by type if type hint provided.
368        # Expand via the type hierarchy so that category hints (e.g. "water") match
369        # all concrete types within that category ("lake", "river", "pond", ...).
370        if type is not None:
371            matching_types = get_matching_types(type)
372            if matching_types:
373                features = [f for f in features if f["properties"].get("type") in matching_types]
374            else:
375                # Unknown type hint, fall back to exact string match
376                features = [f for f in features if f["properties"].get("type") == type.lower()]
377
378        return features[:max_results]
379
380    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
381        """
382        Fuzzy search for names that partially match the search query.
383
384        Uses token matching to find results where at least one token from the
385        query matches a token in the indexed name. This handles cases like:
386        - "venoge" matching "la venoge"
387        - "rhone" matching "rhone valais"
388
389        Args:
390            normalized: The normalized search query.
391            threshold: Minimum fuzzy match score (0-100) to include a result.
392
393        Returns:
394            List of row indices for fuzzy-matched names, sorted by score (descending).
395        """
396        matches: list[tuple[int, float]] = []
397        query_tokens = set(normalized.split())
398
399        for indexed_name, indices in self._name_index.items():
400            indexed_tokens = set(indexed_name.split())
401
402            # Check if any query token matches any indexed token
403            token_overlap = query_tokens & indexed_tokens
404
405            if token_overlap:
406                # Also use token_set_ratio for better matching of partial strings
407                score = fuzz.token_set_ratio(normalized, indexed_name)
408                if score >= threshold:
409                    for idx in indices:
410                        matches.append((idx, score))
411
412        # Sort by score (descending) to return best matches first
413        matches.sort(key=lambda x: x[1], reverse=True)
414        return [idx for idx, _ in matches]
415
416    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
417        """
418        Get a specific feature by its unique identifier.
419
420        Args:
421            feature_id: Unique identifier (UUID or row index).
422
423        Returns:
424            The matching GeoJSON Feature dict, or None if not found.
425        """
426        self._ensure_loaded()
427        assert self._gdf is not None
428
429        id_col = self._detect_id_column()
430        if id_col:
431            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
432            if not matches.empty:
433                return self._row_to_feature(matches.index[0])
434
435        # Fallback: try as row index
436        try:
437            idx = int(feature_id)
438            if 0 <= idx < len(self._gdf):
439                return self._row_to_feature(idx)
440        except ValueError:
441            pass
442
443        return None
444
445    def get_available_types(self) -> list[str]:
446        """
447        Get list of concrete geographic types this datasource can return.
448
449        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
450        representing all possible types that SwissNames3D data can be classified as.
451
452        Returns:
453            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
454        """
455        return sorted(OBJEKTART_TYPE_MAP.keys())

Geographic data source backed by swisstopo's swissNAMES3D dataset.

Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.

If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.

All geometries are returned as GeoJSON in WGS84 (EPSG:4326).

Arguments:
  • data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
  • layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
>>> results = source.search("Lac Léman", type="lake")
>>> print(results[0].geometry)  # GeoJSON in WGS84
SwissNames3DSource(data_path: str | pathlib._local.Path, layer: str | None = None)
184    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
185        self._data_path = Path(data_path)
186        self._layer = layer
187        self._gdf: gpd.GeoDataFrame | None = None
188        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
333    def search(
334        self,
335        name: str,
336        type: str | None = None,
337        max_results: int = 10,
338    ) -> list[dict[str, Any]]:
339        """
340        Search for geographic features by name.
341
342        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
343        First tries exact matching, then falls back to fuzzy matching if no exact
344        matches found.
345
346        Args:
347            name: Location name to search for.
348            type: Optional type hint to filter results. If provided, only features
349                  of this type are returned.
350            max_results: Maximum number of results to return.
351
352        Returns:
353            List of matching GeoJSON Feature dicts. If type is provided, only
354            features of that type are returned. Empty list if no matches found.
355        """
356        self._ensure_loaded()
357
358        normalized = _normalize_name(name)
359        indices = self._name_index.get(normalized, [])
360
361        # If no exact match, try fuzzy matching
362        if not indices:
363            indices = self._fuzzy_search(normalized)
364
365        features = [self._row_to_feature(idx) for idx in indices]
366
367        # Filter by type if type hint provided.
368        # Expand via the type hierarchy so that category hints (e.g. "water") match
369        # all concrete types within that category ("lake", "river", "pond", ...).
370        if type is not None:
371            matching_types = get_matching_types(type)
372            if matching_types:
373                features = [f for f in features if f["properties"].get("type") in matching_types]
374            else:
375                # Unknown type hint, fall back to exact string match
376                features = [f for f in features if f["properties"].get("type") == type.lower()]
377
378        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint to filter results. If provided, only features of this type are returned.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
416    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
417        """
418        Get a specific feature by its unique identifier.
419
420        Args:
421            feature_id: Unique identifier (UUID or row index).
422
423        Returns:
424            The matching GeoJSON Feature dict, or None if not found.
425        """
426        self._ensure_loaded()
427        assert self._gdf is not None
428
429        id_col = self._detect_id_column()
430        if id_col:
431            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
432            if not matches.empty:
433                return self._row_to_feature(matches.index[0])
434
435        # Fallback: try as row index
436        try:
437            idx = int(feature_id)
438            if 0 <= idx < len(self._gdf):
439                return self._row_to_feature(idx)
440        except ValueError:
441            pass
442
443        return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier (UUID or row index).
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
445    def get_available_types(self) -> list[str]:
446        """
447        Get list of concrete geographic types this datasource can return.
448
449        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
450        representing all possible types that SwissNames3D data can be classified as.
451
452        Returns:
453            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
454        """
455        return sorted(OBJEKTART_TYPE_MAP.keys())

Get list of concrete geographic types this datasource can return.

Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.

Returns:

Sorted list of type strings (e.g., ["lake", "city", "river", ...])

class IGNBDCartoSource:
266class IGNBDCartoSource:
267    """
268    Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
269
270    Loads French geographic data from GeoPackage files extracted to a directory.
271    Supports administrative boundaries (communes, departments, regions, …),
272    hydrography (rivers, lakes, …), named places (quarters, hamlets, …),
273    orographic features (peaks, passes, valleys, …) and protected areas.
274
275    Data must first be downloaded with ``make download-data-ign``, which places
276    the GeoPackage files in ``data/bdcarto/``.
277
278    All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84
279    (EPSG:4326) and returned as standard GeoJSON Feature dicts.
280
281    Args:
282        data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``).
283
284    Example:
285        >>> source = IGNBDCartoSource("data/bdcarto")
286        >>> results = source.search("Ardèche", type="department")
287        >>> results = source.search("Lyon", type="city")
288        >>> results = source.search("Rhône", type="river")
289    """
290
291    def __init__(self, data_path: str | Path) -> None:
292        self._data_path = Path(data_path)
293        self._gdf: gpd.GeoDataFrame | None = None
294        self._name_index: dict[str, list[int]] = {}
295
296    def _ensure_loaded(self) -> None:
297        if self._gdf is not None:
298            return
299        self._load_data()
300
301    def _load_data(self) -> None:
302        if self._data_path.is_dir():
303            self._gdf = self._load_from_directory()
304        else:
305            self._gdf = self._load_from_file(self._data_path)
306        self._build_name_index()
307
308    def _load_from_file(self, path: Path) -> gpd.GeoDataFrame:
309        """Load from a GeoJSON fixture file. Features must include a ``_layer`` column."""
310        full_gdf = gpd.read_file(str(path))
311        if "_layer" not in full_gdf.columns:
312            raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column")
313
314        gdfs: list[gpd.GeoDataFrame] = []
315        for layer_name, cfg in _LAYER_CONFIGS.items():
316            rows = full_gdf[full_gdf["_layer"] == layer_name].copy()
317            if rows.empty:
318                continue
319            name_col: str = cfg["name_col"]
320            if name_col not in rows.columns:
321                continue
322            rows[_NAME_COL] = rows[name_col].astype(str)
323            rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
324            rows = rows.to_crs("EPSG:4326")
325            gdfs.append(rows)
326
327        if not gdfs:
328            raise ValueError(f"No matching BD-CARTO features found in {path}")
329
330        combined = pd.concat(gdfs, ignore_index=True)
331        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
332
333    def _load_from_directory(self) -> gpd.GeoDataFrame:
334        """Load and concatenate all configured layers from the data directory."""
335        gdfs: list[gpd.GeoDataFrame] = []
336
337        for layer_name, cfg in _LAYER_CONFIGS.items():
338            gpkg_path = self._data_path / f"{layer_name}.gpkg"
339            if not gpkg_path.exists():
340                continue
341
342            gdf = gpd.read_file(str(gpkg_path))
343
344            name_col: str = cfg["name_col"]
345            if name_col not in gdf.columns:
346                continue
347
348            gdf[_NAME_COL] = gdf[name_col].astype(str)
349            gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
350            gdf["_layer"] = layer_name
351            gdf = gdf.to_crs("EPSG:4326")
352
353            gdfs.append(gdf)
354
355        if not gdfs:
356            raise ValueError(
357                f"No BD-CARTO GeoPackage files found in {self._data_path}. "
358                f"Run 'make download-data-ign' to download the dataset."
359            )
360
361        combined = pd.concat(gdfs, ignore_index=True)
362        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
363
364    def _build_name_index(self) -> None:
365        """Build normalized name → row indices lookup (with article-stripped variants)."""
366        assert self._gdf is not None
367        self._name_index = {}
368        for idx, name in enumerate(self._gdf[_NAME_COL]):
369            if not isinstance(name, str) or not name.strip() or name == "nan":
370                continue
371            for key in _index_keys(name):
372                if key not in self._name_index:
373                    self._name_index[key] = []
374                self._name_index[key].append(idx)
375
376    def _row_to_feature(self, idx: int) -> dict[str, Any]:
377        """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84)."""
378        assert self._gdf is not None
379        row = self._gdf.iloc[idx]
380
381        name = str(row[_NAME_COL])
382        normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown"
383        feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx)
384
385        geom = row.geometry
386        if geom is None or geom.is_empty:
387            geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]}
388            bbox = None
389        else:
390            geometry = mapping(geom)
391            bounds = geom.bounds
392            bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3])
393
394        skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"}
395        properties: dict[str, Any] = {
396            "name": name,
397            "type": normalized_type,
398            "confidence": 1.0,
399        }
400        for col in self._gdf.columns:
401            if col not in skip_cols:
402                val = _to_json_value(row.get(col))
403                if val is not None:
404                    properties[col] = val
405
406        return {
407            "type": "Feature",
408            "id": feature_id,
409            "geometry": geometry,
410            "bbox": bbox,
411            "properties": properties,
412        }
413
414    def search(
415        self,
416        name: str,
417        type: str | None = None,
418        max_results: int = 10,
419    ) -> list[dict[str, Any]]:
420        """
421        Search for geographic features by name.
422
423        Uses case-insensitive, accent-normalized exact matching with fuzzy
424        fallback when no exact match is found.
425
426        Args:
427            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
428                  ``"Rhône"``).
429            type: Optional type hint for filtering. Supports both concrete types
430                  (``"department"``, ``"city"``, ``"river"``) and category hints
431                  (``"administrative"``, ``"water"``).
432            max_results: Maximum number of results.
433
434        Returns:
435            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
436        """
437        self._ensure_loaded()
438
439        normalized = _normalize_name(name)
440        indices = self._name_index.get(normalized, [])
441
442        if not indices:
443            indices = self._fuzzy_search(normalized)
444
445        features = [self._row_to_feature(idx) for idx in indices]
446
447        if type is not None:
448            matching_types = get_matching_types(type)
449            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
450            if matching_types:
451                features = [f for f in features if f["properties"].get("type") in matching_types]
452            else:
453                features = [f for f in features if f["properties"].get("type") == type.lower()]
454
455        features = merge_segments(features)
456
457        return features[:max_results]
458
459    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
460        """Token-overlap + token_set_ratio fuzzy search."""
461        matches: list[tuple[int, float]] = []
462        query_tokens = set(normalized.split())
463
464        for indexed_name, indices in self._name_index.items():
465            if query_tokens & set(indexed_name.split()):
466                score = fuzz.token_set_ratio(normalized, indexed_name)
467                if score >= threshold:
468                    for idx in indices:
469                        matches.append((idx, score))
470
471        matches.sort(key=lambda x: x[1], reverse=True)
472        return [idx for idx, _ in matches]
473
474    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
475        """
476        Get a feature by its ``cleabs`` identifier or row index.
477
478        Args:
479            feature_id: ``cleabs`` string or integer row index.
480
481        Returns:
482            Matching GeoJSON Feature dict, or ``None``.
483        """
484        self._ensure_loaded()
485        assert self._gdf is not None
486
487        if "cleabs" in self._gdf.columns:
488            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
489            if not matches.empty:
490                return self._row_to_feature(matches.index[0])
491
492        try:
493            idx = int(feature_id)
494            if 0 <= idx < len(self._gdf):
495                return self._row_to_feature(idx)
496        except ValueError:
497            pass
498
499        return None
500
501    def get_available_types(self) -> list[str]:
502        """
503        Return the union of all normalized types this source can return.
504
505        Returns:
506            Sorted list of type strings.
507        """
508        types: set[str] = set()
509        for cfg in _LAYER_CONFIGS.values():
510            if cfg.get("commune_flags"):
511                types.update({"city", "municipality"})
512            elif cfg.get("fixed_type"):
513                types.add(cfg["fixed_type"])
514            elif cfg.get("type_map"):
515                types.update(cfg["type_map"].values())
516        return sorted(types)

Geographic data source backed by IGN's BD-CARTO 5.0 dataset.

Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.

Data must first be downloaded with make download-data-ign, which places the GeoPackage files in data/bdcarto/.

All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.

Arguments:
  • data_path: Directory containing the .gpkg files (e.g. "data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto")
>>> results = source.search("Ardèche", type="department")
>>> results = source.search("Lyon", type="city")
>>> results = source.search("Rhône", type="river")
IGNBDCartoSource(data_path: str | pathlib._local.Path)
291    def __init__(self, data_path: str | Path) -> None:
292        self._data_path = Path(data_path)
293        self._gdf: gpd.GeoDataFrame | None = None
294        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
414    def search(
415        self,
416        name: str,
417        type: str | None = None,
418        max_results: int = 10,
419    ) -> list[dict[str, Any]]:
420        """
421        Search for geographic features by name.
422
423        Uses case-insensitive, accent-normalized exact matching with fuzzy
424        fallback when no exact match is found.
425
426        Args:
427            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
428                  ``"Rhône"``).
429            type: Optional type hint for filtering. Supports both concrete types
430                  (``"department"``, ``"city"``, ``"river"``) and category hints
431                  (``"administrative"``, ``"water"``).
432            max_results: Maximum number of results.
433
434        Returns:
435            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
436        """
437        self._ensure_loaded()
438
439        normalized = _normalize_name(name)
440        indices = self._name_index.get(normalized, [])
441
442        if not indices:
443            indices = self._fuzzy_search(normalized)
444
445        features = [self._row_to_feature(idx) for idx in indices]
446
447        if type is not None:
448            matching_types = get_matching_types(type)
449            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
450            if matching_types:
451                features = [f for f in features if f["properties"].get("type") in matching_types]
452            else:
453                features = [f for f in features if f["properties"].get("type") == type.lower()]
454
455        features = merge_segments(features)
456
457        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.

Arguments:
  • name: Location name to search for (e.g. "Ardèche", "Lyon", "Rhône").
  • type: Optional type hint for filtering. Supports both concrete types ("department", "city", "river") and category hints ("administrative", "water").
  • max_results: Maximum number of results.
Returns:

List of GeoJSON Feature dicts in WGS84. Empty list if no match.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
474    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
475        """
476        Get a feature by its ``cleabs`` identifier or row index.
477
478        Args:
479            feature_id: ``cleabs`` string or integer row index.
480
481        Returns:
482            Matching GeoJSON Feature dict, or ``None``.
483        """
484        self._ensure_loaded()
485        assert self._gdf is not None
486
487        if "cleabs" in self._gdf.columns:
488            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
489            if not matches.empty:
490                return self._row_to_feature(matches.index[0])
491
492        try:
493            idx = int(feature_id)
494            if 0 <= idx < len(self._gdf):
495                return self._row_to_feature(idx)
496        except ValueError:
497            pass
498
499        return None

Get a feature by its cleabs identifier or row index.

Arguments:
  • feature_id: cleabs string or integer row index.
Returns:

Matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
501    def get_available_types(self) -> list[str]:
502        """
503        Return the union of all normalized types this source can return.
504
505        Returns:
506            Sorted list of type strings.
507        """
508        types: set[str] = set()
509        for cfg in _LAYER_CONFIGS.values():
510            if cfg.get("commune_flags"):
511                types.update({"city", "municipality"})
512            elif cfg.get("fixed_type"):
513                types.add(cfg["fixed_type"])
514            elif cfg.get("type_map"):
515                types.update(cfg["type_map"].values())
516        return sorted(types)

Return the union of all normalized types this source can return.

Returns:

Sorted list of type strings.

class CompositeDataSource:
14class CompositeDataSource:
15    """
16    Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
17
18    ``search`` queries every registered source and merges results in order.
19
20    ``get_by_id`` tries each source in order and returns the first hit.
21
22    ``get_available_types`` returns the union of all sources' types.
23
24    Args:
25        sources: One or more GeoDataSource instances.
26
27    Example:
28        >>> swiss = SwissNames3DSource("data/")
29        >>> ign   = IGNBDTopoSource("data/")
30        >>> combo = CompositeDataSource(swiss, ign)
31        >>> results = combo.search("Geneva", type="city")
32    """
33
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
38
39    # Public API (mirrors GeoDataSource protocol)
40
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[dict[str, Any]]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[dict[str, Any]] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged
67
68    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None
83
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Fan-out datasource that delegates to an ordered list of GeoDataSource instances.

search queries every registered source and merges results in order.

get_by_id tries each source in order and returns the first hit.

get_available_types returns the union of all sources' types.

Arguments:
  • sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/")
>>> ign   = IGNBDTopoSource("data/")
>>> combo = CompositeDataSource(swiss, ign)
>>> results = combo.search("Geneva", type="city")
CompositeDataSource(*sources: GeoDataSource)
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[dict[str, Any]]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[dict[str, Any]] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged

Search all registered sources and return merged.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint passed through to every source.
  • max_results: Maximum results per source.
Returns:

List of GeoJSON Feature dicts, merged from all sources.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
68    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None

Get a feature by ID, trying each source in order.

Arguments:
  • feature_id: Unique identifier to look up.
Returns:

The first matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Return the union of all sources' available types, sorted.

Returns:

Sorted list of unique type strings.

class PostGISDataSource:
 62class PostGISDataSource:
 63    """
 64    Geographic data source backed by a PostGIS table.
 65
 66    The table must expose at minimum a name column, a geometry column, and
 67    optionally a type column. The expected schema is:
 68
 69    .. code-block:: sql
 70
 71        CREATE TABLE <table> (
 72            id      TEXT PRIMARY KEY,
 73            name    TEXT NOT NULL,
 74            type    TEXT,
 75            geom    GEOMETRY(Geometry, 4326)
 76        );
 77
 78    The ``type`` column may store either:
 79
 80    - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D),
 81      pass ``type_map`` so the datasource can translate between raw values and
 82      the normalized etter type names.
 83    - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``),
 84      leave ``type_map=None`` (default).
 85
 86    Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly
 87    reprojection.
 88
 89    Args:
 90        connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a
 91            connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``).
 92            When a string is provided the engine is created internally.
 93        table: Fully-qualified table name, e.g. ``"public.swissnames3d"``.
 94        name_column: Column used for name-based search (default ``"name"``).
 95        type_column: Column used for type filtering.  Pass ``None`` to disable
 96            type filtering (default ``"type"``).
 97        geometry_column: PostGIS geometry column (default ``"geom"``).
 98        id_column: Primary-key column (default ``"id"``).
 99        crs: CRS of the stored geometries as an EPSG string.  Defaults to
100            ``"EPSG:4326"`` (no reprojection).
101        type_map: Optional mapping from **normalized etter type names** to
102            **lists of raw type column values** present in the database.
103            This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP``
104            and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be
105            passed directly::
106
107                from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
108                source = PostGISDataSource(
109                    engine,
110                    table="public.swissnames3d",
111                    type_map=OBJEKTART_TYPE_MAP,
112                )
113
114            When ``type_map`` is provided the datasource:
115
116            - Translates raw DB values → normalized types in returned features.
117            - Translates user type hints → raw DB values in SQL ``WHERE`` clauses.
118            - Returns normalized type names from ``get_available_types()``.
119
120            When ``None`` (default) the stored values are used as-is.
121        fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for
122            fuzzy fallback search when no exact ``ILIKE`` match is found.
123
124    Example: unmodified SwissNames3D table::
125
126        from sqlalchemy import create_engine
127        from etter.datasources import PostGISDataSource
128        from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
129
130        engine = create_engine(...)
131        source = PostGISDataSource(
132            engine,
133            table="public.swissnames3d",
134            type_map=OBJEKTART_TYPE_MAP,
135        )
136        results = source.search("Lac Léman", type="lake")
137    """
138
139    def __init__(
140        self,
141        connection: str | Engine,
142        table: str,
143        name_column: str = "name",
144        type_column: str | None = "type",
145        geometry_column: str = "geom",
146        id_column: str = "id",
147        crs: str = "EPSG:4326",
148        type_map: TypeMap | None = None,
149        fuzzy_threshold: float = 0.65,
150    ) -> None:
151        sa = _require_sqlalchemy()
152
153        if isinstance(connection, str):
154            self._engine = sa.create_engine(connection)
155        else:
156            self._engine = connection
157
158        try:
159            with self._engine.connect() as conn:
160                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
161        except Exception as exc:
162            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
163
164        self._table = table
165        self._name_col = name_column
166        self._type_col = type_column
167        self._geom_col = geometry_column
168        self._id_col = id_column
169        self._crs = crs
170        self._fuzzy_threshold = fuzzy_threshold
171
172        # Build bidirectional lookup structures from the user-supplied map.
173        if type_map:
174            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
175            self._raw_to_normalized: dict[str, str] = {
176                raw: normalized for normalized, raws in type_map.items() for raw in raws
177            }
178        else:
179            self._normalized_to_raw = {}
180            self._raw_to_normalized = {}
181
182        self._trgm_available: bool | None = None
183        self._unaccent_available: bool | None = None
184
185    def _get_connection(self) -> Any:
186        """Return a SQLAlchemy connection from the engine."""
187        return self._engine.connect()
188
189    def _check_trgm(self, conn: Any) -> bool:
190        """Return True if pg_trgm extension is available in the database."""
191        if self._trgm_available is not None:
192            return self._trgm_available
193        sa = _require_sqlalchemy()
194        try:
195            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
196            self._trgm_available = result.fetchone() is not None
197        except Exception:
198            logger.exception("Failed to check pg_trgm availability")
199            self._trgm_available = False
200        return self._trgm_available
201
202    def _check_unaccent(self, conn: Any) -> bool:
203        """Return True if the unaccent extension is available in the database."""
204        if self._unaccent_available is not None:
205            return self._unaccent_available
206        sa = _require_sqlalchemy()
207        try:
208            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'"))
209            self._unaccent_available = result.fetchone() is not None
210        except Exception:
211            logger.exception("Failed to check unaccent availability")
212            self._unaccent_available = False
213        return self._unaccent_available
214
215    def _normalize_type(self, raw_type: str | None) -> str | None:
216        """Translate a raw DB type value to its normalized etter name.
217
218        If no type_map was supplied the value is returned unchanged.
219        """
220        if raw_type is None:
221            return None
222        return self._raw_to_normalized.get(raw_type, raw_type)
223
224    def _row_to_feature(self, row: Any) -> dict[str, Any]:
225        """Convert a SQLAlchemy Row to a GeoJSON Feature dict."""
226        feature_id = str(row.id)
227        name = str(row.name)
228        raw_type = getattr(row, "type", None)
229        normalized_type = self._normalize_type(raw_type)
230
231        geojson_str = row.geojson
232        if geojson_str:
233            geometry = json.loads(geojson_str)
234        else:
235            geometry = {"type": "Point", "coordinates": [0, 0]}
236
237        bbox = _bbox_from_geojson(geometry)
238
239        properties: dict[str, Any] = {
240            "name": name,
241            "type": normalized_type,
242            "confidence": 1.0,
243        }
244
245        return {
246            "type": "Feature",
247            "id": feature_id,
248            "geometry": geometry,
249            "bbox": bbox,
250            "properties": properties,
251        }
252
253    def _build_select_columns(self) -> str:
254        """Build the SELECT column list as a SQL fragment."""
255        type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type"
256        if self._crs.upper() != "EPSG:4326":
257            geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson"
258        else:
259            geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson"
260        return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}"
261
262    def search(
263        self,
264        name: str,
265        type: str | None = None,
266        max_results: int = 10,
267    ) -> list[dict[str, Any]]:
268        """
269        Search for geographic features by name.
270
271        Uses a three-step cascade, stopping as soon as any step returns results:
272
273        1. **Normalized exact match**
274        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
275        3. **ILIKE substring**
276
277        ``merge_segments`` is applied after all rows are fetched so that
278        multi-segment linestrings (rivers, roads) are merged before the
279        ``max_results`` cap is applied.
280
281        Args:
282            name: Location name to search for.
283            type: Optional type hint for filtering results.
284            max_results: Maximum number of results to return.
285
286        Returns:
287            List of matching GeoJSON Feature dicts in WGS84.
288        """
289        sa = _require_sqlalchemy()
290        cols = self._build_select_columns()
291
292        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
293        type_filter_values: list[str] | None = None
294        if type is not None and self._type_col is not None:
295            matching_types = get_matching_types(type)
296            concrete_types = matching_types if matching_types else [type.lower()]
297            if self._normalized_to_raw:
298                raw_values: list[str] = []
299                for t in concrete_types:
300                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
301                type_filter_values = raw_values if raw_values else concrete_types
302            else:
303                type_filter_values = concrete_types
304
305        # Fetch more rows than requested so that merge_segments has the full
306        # set of segments to work with.  Without this, a SQL LIMIT applied
307        # *before* merging would only return a partial set of linestring
308        # segments, producing incorrect / truncated geometries.
309        # We cap the internal limit at 2000 to avoid unbounded queries.
310        internal_limit = min(max(max_results * 20, 100), 2000)
311
312        with self._get_connection() as conn:
313            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
314
315        if not features:
316            with self._get_connection() as conn:
317                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
318
319        if not features:
320            with self._get_connection() as conn:
321                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
322
323        features = merge_segments(features)
324        return features[:max_results]
325
326    def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]:
327        """Return a WHERE clause fragment and bind params for type filtering."""
328        if not values or self._type_col is None:
329            return "", {}
330        placeholders = ", ".join(f":type_{i}" for i in range(len(values)))
331        clause = f" AND {self._type_col} IN ({placeholders})"
332        params = {f"type_{i}": v for i, v in enumerate(values)}
333        return clause, params
334
335    def _search_normalized(
336        self,
337        conn: Any,
338        sa: Any,
339        cols: str,
340        name: str,
341        type_filter: list[str] | None,
342        fetch_limit: int,
343    ) -> list[dict[str, Any]]:
344        """
345        Exact accent- and case-insensitive search.
346
347        Accent normalization (NFD decomposition + diacritic strip) is done in
348        Python before the query is sent to the DB.
349        """
350        type_clause, type_params = self._type_filter_sql(type_filter)
351        name_expr = f"lower({self._name_col})"
352        if self._check_unaccent(conn):
353            name_expr = f"unaccent({name_expr})"
354        sql = sa.text(
355            f"SELECT {cols} FROM {self._table} "  # noqa: S608
356            f"WHERE {name_expr} = :query{type_clause} "
357            f"LIMIT :limit"
358        )
359        params: dict[str, Any] = {
360            "query": _normalize_name(name),
361            "limit": fetch_limit,
362            **type_params,
363        }
364        try:
365            result = conn.execute(sql, params)
366            return [self._row_to_feature(row) for row in result]
367        except Exception:
368            logger.exception("Normalized search failed for %r", name)
369            return []
370
371    def _search_ilike(
372        self,
373        conn: Any,
374        sa: Any,
375        cols: str,
376        name: str,
377        type_filter: list[str] | None,
378        fetch_limit: int,
379    ) -> list[dict[str, Any]]:
380        """Case-insensitive substring fallback using ``ILIKE '%name%'``.
381
382        When the ``unaccent`` extension is available, both the stored name column
383        and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches
384        ``"Rhône"``.  Without ``unaccent``, standard ILIKE is used (case-insensitive
385        only).
386        """
387        type_clause, type_params = self._type_filter_sql(type_filter)
388        normalized = _normalize_name(name)
389        if self._check_unaccent(conn):
390            name_expr = f"unaccent(lower({self._name_col}))"
391            pattern = f"%{normalized}%"
392        else:
393            name_expr = self._name_col
394            pattern = f"%{name}%"
395        sql = sa.text(
396            f"SELECT {cols} FROM {self._table} "  # noqa: S608
397            f"WHERE {name_expr} ILIKE :pattern{type_clause} "
398            f"LIMIT :limit"
399        )
400        params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params}
401        try:
402            result = conn.execute(sql, params)
403            return [self._row_to_feature(row) for row in result]
404        except Exception:
405            logger.exception("ILIKE search failed for %r", name)
406            return []
407
408    def _search_fuzzy(
409        self,
410        conn: Any,
411        sa: Any,
412        cols: str,
413        name: str,
414        type_filter: list[str] | None,
415        fetch_limit: int,
416    ) -> list[dict[str, Any]]:
417        """Fuzzy fallback using pg_trgm similarity (if extension is available)."""
418        if not self._check_trgm(conn):
419            logger.warning(
420                "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;"
421            )
422            return []
423        normalized_query = _normalize_name(name)
424        if self._check_unaccent(conn):
425            name_expr = f"unaccent(lower({self._name_col}))"
426        else:
427            logger.warning(
428                "unaccent extension not available. Accent-insensitive fuzzy search degraded. "
429                "Install it with: CREATE EXTENSION unaccent;"
430            )
431            name_expr = f"lower({self._name_col})"
432        type_clause, type_params = self._type_filter_sql(type_filter)
433        sql = sa.text(
434            f"SELECT {cols} FROM {self._table} "  # noqa: S608
435            f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} "
436            f"ORDER BY word_similarity({name_expr}, :query) DESC "
437            f"LIMIT :limit"
438        )
439        params: dict[str, Any] = {
440            "query": normalized_query,
441            "threshold": self._fuzzy_threshold,
442            "limit": fetch_limit,
443            **type_params,
444        }
445        try:
446            result = conn.execute(sql, params)
447            return [self._row_to_feature(row) for row in result]
448        except Exception:
449            logger.exception("Fuzzy search failed for %r", name)
450            return []
451
452    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
453        """
454        Get a specific feature by its unique identifier.
455
456        Args:
457            feature_id: Value of the ``id`` column.
458
459        Returns:
460            The matching GeoJSON Feature dict, or ``None`` if not found.
461        """
462        sa = _require_sqlalchemy()
463        cols = self._build_select_columns()
464        sql = sa.text(
465            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
466        )
467        with self._get_connection() as conn:
468            try:
469                result = conn.execute(sql, {"id": feature_id})
470                row = result.fetchone()
471                return self._row_to_feature(row) if row else None
472            except Exception:
473                logger.exception("get_by_id failed for %r", feature_id)
474                return None
475
476    def get_available_types(self) -> list[str]:
477        """
478        Return the distinct ``type`` values present in the table.
479
480        Returns:
481            Sorted list of concrete type strings, or an empty list if the table
482            has no type column.
483        """
484        if self._type_col is None:
485            return []
486        sa = _require_sqlalchemy()
487        sql = sa.text(
488            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
489            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
490        )
491        with self._get_connection() as conn:
492            try:
493                result = conn.execute(sql)
494                raw_types = [row.type for row in result]
495            except Exception:
496                logger.exception("get_available_types failed")
497                return []
498
499        normalized = {self._normalize_type(t) for t in raw_types if t}
500        return sorted(t for t in normalized if t)

Geographic data source backed by a PostGIS table.

The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:

CREATE TABLE <table> (
    id      TEXT PRIMARY KEY,
    name    TEXT NOT NULL,
    type    TEXT,
    geom    GEOMETRY(Geometry, 4326)
);

The type column may store either:

  • Raw dataset values (e.g. "See", "Berg" for SwissNames3D), pass type_map so the datasource can translate between raw values and the normalized etter type names.
  • Already-normalized values (e.g. "lake", "mountain"), leave type_map=None (default).

Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly reprojection.

Arguments:
  • connection: A SQLAlchemy ~sqlalchemy.engine.Engine or a connection URL string (e.g. "postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally.
  • table: Fully-qualified table name, e.g. "public.swissnames3d".
  • name_column: Column used for name-based search (default "name").
  • type_column: Column used for type filtering. Pass None to disable type filtering (default "type").
  • geometry_column: PostGIS geometry column (default "geom").
  • id_column: Primary-key column (default "id").
  • crs: CRS of the stored geometries as an EPSG string. Defaults to "EPSG:4326" (no reprojection).
  • type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as SwissNames3DSource.OBJEKTART_TYPE_MAP and IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::

    from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
    source = PostGISDataSource(
        engine,
        table="public.swissnames3d",
        type_map=OBJEKTART_TYPE_MAP,
    )
    

    When type_map is provided the datasource:

    • Translates raw DB values → normalized types in returned features.
    • Translates user type hints → raw DB values in SQL WHERE clauses.
    • Returns normalized type names from get_available_types().

    When None (default) the stored values are used as-is.

  • fuzzy_threshold: Minimum pg_trgm similarity score (0-1) used for fuzzy fallback search when no exact ILIKE match is found.

Example: unmodified SwissNames3D table::

from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP

engine = create_engine(...)
source = PostGISDataSource(
    engine,
    table="public.swissnames3d",
    type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
PostGISDataSource( connection: str | sqlalchemy.engine.base.Engine, table: str, name_column: str = 'name', type_column: str | None = 'type', geometry_column: str = 'geom', id_column: str = 'id', crs: str = 'EPSG:4326', type_map: dict[typing.Literal['alpine_pasture', 'airport', 'area', 'arrondissement', 'border_marker', 'boulder', 'bridge', 'building', 'bus_stop', 'boat_stop', 'camping', 'canton', 'cave', 'cemetery', 'city', 'correctional_facility', 'country', 'customs', 'dam', 'department', 'district', 'ditch', 'entrance_exit', 'exit', 'fairground', 'ferry', 'field_name', 'forest', 'fountain', 'glacier', 'hamlet', 'heliport', 'hill', 'historical_site', 'hospital', 'island', 'junction', 'lake', 'leisure_facility', 'landfill', 'lift', 'loading_station', 'local_name', 'massif', 'military_training_area', 'monastery', 'monument', 'mountain', 'municipality', 'nature_reserve', 'park', 'parking', 'pass', 'peak', 'peninsula', 'plain', 'pond', 'power_plant', 'private_driving_area', 'quarry', 'railway', 'railway_area', 'region', 'religious_building', 'rest_area', 'restaurant', 'ridge', 'river', 'road', 'rock_head', 'school', 'spring', 'sports_facility', 'standing_area', 'swimming_pool', 'town', 'tower', 'train_station', 'tunnel', 'unknown', 'valley', 'viewpoint', 'village', 'wastewater_treatment', 'waste_incineration', 'waterfall', 'weir', 'zoo', 'administrative', 'amenity', 'infrastructure', 'landforms', 'natural', 'other', 'settlement', 'transport', 'water'], list[str]] | None = None, fuzzy_threshold: float = 0.65)
139    def __init__(
140        self,
141        connection: str | Engine,
142        table: str,
143        name_column: str = "name",
144        type_column: str | None = "type",
145        geometry_column: str = "geom",
146        id_column: str = "id",
147        crs: str = "EPSG:4326",
148        type_map: TypeMap | None = None,
149        fuzzy_threshold: float = 0.65,
150    ) -> None:
151        sa = _require_sqlalchemy()
152
153        if isinstance(connection, str):
154            self._engine = sa.create_engine(connection)
155        else:
156            self._engine = connection
157
158        try:
159            with self._engine.connect() as conn:
160                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
161        except Exception as exc:
162            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
163
164        self._table = table
165        self._name_col = name_column
166        self._type_col = type_column
167        self._geom_col = geometry_column
168        self._id_col = id_column
169        self._crs = crs
170        self._fuzzy_threshold = fuzzy_threshold
171
172        # Build bidirectional lookup structures from the user-supplied map.
173        if type_map:
174            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
175            self._raw_to_normalized: dict[str, str] = {
176                raw: normalized for normalized, raws in type_map.items() for raw in raws
177            }
178        else:
179            self._normalized_to_raw = {}
180            self._raw_to_normalized = {}
181
182        self._trgm_available: bool | None = None
183        self._unaccent_available: bool | None = None
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
262    def search(
263        self,
264        name: str,
265        type: str | None = None,
266        max_results: int = 10,
267    ) -> list[dict[str, Any]]:
268        """
269        Search for geographic features by name.
270
271        Uses a three-step cascade, stopping as soon as any step returns results:
272
273        1. **Normalized exact match**
274        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
275        3. **ILIKE substring**
276
277        ``merge_segments`` is applied after all rows are fetched so that
278        multi-segment linestrings (rivers, roads) are merged before the
279        ``max_results`` cap is applied.
280
281        Args:
282            name: Location name to search for.
283            type: Optional type hint for filtering results.
284            max_results: Maximum number of results to return.
285
286        Returns:
287            List of matching GeoJSON Feature dicts in WGS84.
288        """
289        sa = _require_sqlalchemy()
290        cols = self._build_select_columns()
291
292        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
293        type_filter_values: list[str] | None = None
294        if type is not None and self._type_col is not None:
295            matching_types = get_matching_types(type)
296            concrete_types = matching_types if matching_types else [type.lower()]
297            if self._normalized_to_raw:
298                raw_values: list[str] = []
299                for t in concrete_types:
300                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
301                type_filter_values = raw_values if raw_values else concrete_types
302            else:
303                type_filter_values = concrete_types
304
305        # Fetch more rows than requested so that merge_segments has the full
306        # set of segments to work with.  Without this, a SQL LIMIT applied
307        # *before* merging would only return a partial set of linestring
308        # segments, producing incorrect / truncated geometries.
309        # We cap the internal limit at 2000 to avoid unbounded queries.
310        internal_limit = min(max(max_results * 20, 100), 2000)
311
312        with self._get_connection() as conn:
313            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
314
315        if not features:
316            with self._get_connection() as conn:
317                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
318
319        if not features:
320            with self._get_connection() as conn:
321                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
322
323        features = merge_segments(features)
324        return features[:max_results]

Search for geographic features by name.

Uses a three-step cascade, stopping as soon as any step returns results:

  1. Normalized exact match
  2. pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
  3. ILIKE substring

merge_segments is applied after all rows are fetched so that multi-segment linestrings (rivers, roads) are merged before the max_results cap is applied.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint for filtering results.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts in WGS84.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
452    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
453        """
454        Get a specific feature by its unique identifier.
455
456        Args:
457            feature_id: Value of the ``id`` column.
458
459        Returns:
460            The matching GeoJSON Feature dict, or ``None`` if not found.
461        """
462        sa = _require_sqlalchemy()
463        cols = self._build_select_columns()
464        sql = sa.text(
465            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
466        )
467        with self._get_connection() as conn:
468            try:
469                result = conn.execute(sql, {"id": feature_id})
470                row = result.fetchone()
471                return self._row_to_feature(row) if row else None
472            except Exception:
473                logger.exception("get_by_id failed for %r", feature_id)
474                return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Value of the id column.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
476    def get_available_types(self) -> list[str]:
477        """
478        Return the distinct ``type`` values present in the table.
479
480        Returns:
481            Sorted list of concrete type strings, or an empty list if the table
482            has no type column.
483        """
484        if self._type_col is None:
485            return []
486        sa = _require_sqlalchemy()
487        sql = sa.text(
488            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
489            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
490        )
491        with self._get_connection() as conn:
492            try:
493                result = conn.execute(sql)
494                raw_types = [row.type for row in result]
495            except Exception:
496                logger.exception("get_available_types failed")
497                return []
498
499        normalized = {self._normalize_type(t) for t in raw_types if t}
500        return sorted(t for t in normalized if t)

Return the distinct type values present in the table.

Returns:

Sorted list of concrete type strings, or an empty list if the table has no type column.

def apply_spatial_relation( geometry: dict[str, typing.Any], relation: SpatialRelation, buffer_config: BufferConfig | None = None, spatial_config: SpatialRelationConfig | None = None, geometry_format: Literal['geojson', 'wkt', 'wkb'] = 'geojson') -> dict[str, typing.Any] | str:
26def apply_spatial_relation(
27    geometry: dict[str, Any],
28    relation: SpatialRelation,
29    buffer_config: BufferConfig | None = None,
30    spatial_config: SpatialRelationConfig | None = None,
31    geometry_format: GeometryFormat = "geojson",
32) -> dict[str, Any] | str:
33    """
34    Transform a reference geometry according to a spatial relation.
35
36    Converts the input GeoJSON geometry to a search area based on the
37    spatial relation category:
38    - Containment: returns the original geometry unchanged
39    - Buffer: applies positive (expand), negative (erode), or ring buffer
40    - Directional: creates an angular sector wedge
41
42    Args:
43        geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
44        relation: Spatial relation to apply.
45        buffer_config: Buffer configuration (required for buffer/directional relations).
46        spatial_config: Spatial relation registry used to look up directional angles.
47            Defaults to the module-level singleton; pass an explicit instance to
48            avoid repeated construction when calling from a hot path.
49        geometry_format: Output format for the geometry. "geojson" (default) returns a
50            GeoJSON dict, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
51
52    Returns:
53        Transformed geometry in the requested format (GeoJSON dict, WKT string, or WKB hex string).
54
55    Raises:
56        ValueError: If buffer_config is missing for buffer/directional relations,
57                     or if the relation category is unknown.
58
59    Examples:
60        >>> from etter.models import SpatialRelation, BufferConfig
61        >>> # Circular buffer as GeoJSON (default)
62        >>> result = apply_spatial_relation(
63        ...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
64        ...     relation=SpatialRelation(relation="near", category="buffer"),
65        ...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
66        ... )
67
68        >>> # Same buffer as WKT
69        >>> result = apply_spatial_relation(
70        ...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
71        ...     relation=SpatialRelation(relation="near", category="buffer"),
72        ...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
73        ...     geometry_format="wkt",
74        ... )
75
76        >>> # Containment (passthrough)
77        >>> result = apply_spatial_relation(
78        ...     geometry=city_polygon,
79        ...     relation=SpatialRelation(relation="in", category="containment"),
80        ... )
81    """
82    if relation.category == "containment":
83        result = _apply_containment(geometry)
84    elif relation.category == "buffer":
85        if buffer_config is None:
86            raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config")
87        result = _apply_buffer(geometry, buffer_config)
88    elif relation.category == "directional":
89        if buffer_config is None:
90            raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config")
91        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
92        relation_config = cfg.get_config(relation.relation)
93        direction = relation_config.direction_angle_degrees or 0
94        sector_angle = relation_config.sector_angle_degrees or 90
95        result = _apply_directional(geometry, buffer_config, direction, sector_angle)
96    else:
97        raise ValueError(f"Unknown relation category: '{relation.category}'")
98
99    return convert_geometry(result, geometry_format)

Transform a reference geometry according to a spatial relation.

Converts the input GeoJSON geometry to a search area based on the spatial relation category:

  • Containment: returns the original geometry unchanged
  • Buffer: applies positive (expand), negative (erode), or ring buffer
  • Directional: creates an angular sector wedge
Arguments:
  • geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
  • relation: Spatial relation to apply.
  • buffer_config: Buffer configuration (required for buffer/directional relations).
  • spatial_config: Spatial relation registry used to look up directional angles. Defaults to the module-level singleton; pass an explicit instance to avoid repeated construction when calling from a hot path.
  • geometry_format: Output format for the geometry. "geojson" (default) returns a GeoJSON dict, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:

Transformed geometry in the requested format (GeoJSON dict, WKT string, or WKB hex string).

Raises:
  • ValueError: If buffer_config is missing for buffer/directional relations, or if the relation category is unknown.
Examples:
>>> from etter.models import SpatialRelation, BufferConfig
>>> # Circular buffer as GeoJSON (default)
>>> result = apply_spatial_relation(
...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
...     relation=SpatialRelation(relation="near", category="buffer"),
...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
... )
>>> # Same buffer as WKT
>>> result = apply_spatial_relation(
...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
...     relation=SpatialRelation(relation="near", category="buffer"),
...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
...     geometry_format="wkt",
... )
>>> # Containment (passthrough)
>>> result = apply_spatial_relation(
...     geometry=city_polygon,
...     relation=SpatialRelation(relation="in", category="containment"),
... )
def convert_geometry( geometry: dict[str, typing.Any], fmt: Literal['geojson', 'wkt', 'wkb']) -> dict[str, typing.Any] | str:
13def convert_geometry(geometry: dict[str, Any], fmt: GeometryFormat) -> dict[str, Any] | str:
14    """
15    Convert a GeoJSON geometry dict to the requested format.
16
17    Args:
18        geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
19        fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string,
20             "wkb" returns a hex-encoded WKB string.
21
22    Returns:
23        The geometry in the requested format.
24    """
25    if fmt == "geojson":
26        return geometry
27    geom = shape(geometry)
28    if fmt == "wkt":
29        return geom.wkt
30    return geom.wkb_hex

Convert a GeoJSON geometry dict to the requested format.

Arguments:
  • geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
  • fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:

The geometry in the requested format.

def convert_feature_geometry( feature: dict[str, typing.Any], fmt: Literal['geojson', 'wkt', 'wkb']) -> dict[str, typing.Any]:
33def convert_feature_geometry(feature: dict[str, Any], fmt: GeometryFormat) -> dict[str, Any]:
34    """
35    Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
36
37    Args:
38        feature: GeoJSON Feature dict with a "geometry" key.
39        fmt: Target geometry format.
40
41    Returns:
42        A new dict identical to the input except the "geometry" value is converted.
43    """
44    if fmt == "geojson":
45        return feature
46    return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}

Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.

Arguments:
  • feature: GeoJSON Feature dict with a "geometry" key.
  • fmt: Target geometry format.
Returns:

A new dict identical to the input except the "geometry" value is converted.