etter

etter - Natural Language Geographic Query Parsing

Parse location queries into structured geographic queries using LLM.

 1"""
 2etter - Natural Language Geographic Query Parsing
 3
 4Parse location queries into structured geographic queries using LLM.
 5"""
 6
 7# Main API
 8# Exceptions
 9# Datasources
10from .datasources import CompositeDataSource, GeoDataSource, IGNBDCartoSource, PostGISDataSource, SwissNames3DSource
11from .exceptions import (
12    GeoFilterError,
13    LowConfidenceError,
14    LowConfidenceWarning,
15    ParsingError,
16    UnknownRelationError,
17    ValidationError,
18)
19
20# Models (for type hints and result access)
21from .models import (
22    BufferConfig,
23    ConfidenceLevel,
24    ConfidenceScore,
25    GeoQuery,
26    ReferenceLocation,
27    SpatialRelation,
28)
29from .parser import GeoFilterParser
30
31# Spatial operations
32from .spatial import apply_spatial_relation
33
34# Configuration
35from .spatial_config import RelationConfig, SpatialRelationConfig
36
37__all__ = [
38    # Main API
39    "GeoFilterParser",
40    # Models
41    "GeoQuery",
42    "SpatialRelation",
43    "ReferenceLocation",
44    "BufferConfig",
45    "ConfidenceScore",
46    "ConfidenceLevel",
47    # Configuration
48    "SpatialRelationConfig",
49    "RelationConfig",
50    # Exceptions
51    "GeoFilterError",
52    "ParsingError",
53    "ValidationError",
54    "UnknownRelationError",
55    "LowConfidenceError",
56    "LowConfidenceWarning",
57    # Datasources
58    "GeoDataSource",
59    "SwissNames3DSource",
60    "IGNBDCartoSource",
61    "CompositeDataSource",
62    "PostGISDataSource",
63    # Spatial
64    "apply_spatial_relation",
65]
class GeoFilterParser:
 19class GeoFilterParser:
 20    """
 21    Main entry point for parsing natural language location queries.
 22
 23    This class orchestrates the entire parsing pipeline:
 24    1. Initialize LLM with structured output
 25    2. Build prompt with spatial relations and examples
 26    3. Parse query through LLM
 27    4. Validate and enrich with defaults
 28    5. Return structured GeoQuery
 29
 30    Examples:
 31        Basic usage:
 32        >>> from langchain.chat_models import init_chat_model
 33        >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
 34        >>> parser = GeoFilterParser(llm=llm)
 35        >>> result = parser.parse("restaurants in Lausanne")
 36        >>> print(result.reference_location.name)
 37        'Lausanne'
 38
 39        With strict confidence mode:
 40        >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
 41        >>> result = parser.parse("near the station")  # May raise LowConfidenceError
 42    """
 43
 44    def __init__(
 45        self,
 46        llm: BaseChatModel,
 47        spatial_config: SpatialRelationConfig | None = None,
 48        confidence_threshold: float = 0.6,
 49        strict_mode: bool = False,
 50        include_examples: bool = True,
 51        datasource: GeoDataSource | None = None,
 52    ):
 53        """
 54        Initialize the parser.
 55
 56        Args:
 57            llm: LangChain LLM instance (required).
 58            spatial_config: Spatial relation configuration. If None, uses defaults
 59            confidence_threshold: Minimum confidence to accept (0-1)
 60            strict_mode: If True, raise error on low confidence. If False, warn only
 61            include_examples: Whether to include few-shot examples in prompt
 62            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
 63                       about the concrete types available in that datasource for better type inference.
 64
 65        Example:
 66            >>> from langchain.chat_models import init_chat_model
 67            >>> from etter.datasources.swissnames3d import SwissNames3DSource
 68            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
 69            >>> datasource = SwissNames3DSource("data/")
 70            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
 71        """
 72        self.llm = llm
 73
 74        # Initialize spatial config
 75        self.spatial_config = spatial_config or SpatialRelationConfig()
 76
 77        # Settings
 78        self.confidence_threshold = confidence_threshold
 79        self.strict_mode = strict_mode
 80        self.include_examples = include_examples
 81        self.datasource = datasource
 82
 83        # Build structured LLM
 84        self.structured_llm = self._build_structured_llm()
 85
 86        # Build prompt template
 87        self.prompt = self._build_prompt()
 88
 89    def _build_structured_llm(self):
 90        """Create LLM with structured output using Pydantic model."""
 91
 92        return self.llm.with_structured_output(
 93            GeoQuery,
 94            method="function_calling",  # Use function_calling for broader schema support
 95            include_raw=True,  # For error debugging
 96        )
 97
 98    def _build_prompt(self) -> ChatPromptTemplate:
 99        """Build prompt template with spatial relations, examples, and available types."""
100        available_types = None
101        if self.datasource is not None:
102            available_types = self.datasource.get_available_types()
103
104        return build_prompt_template(
105            spatial_config=self.spatial_config,
106            include_examples=self.include_examples,
107            available_types=available_types,
108        )
109
110    def parse(self, query: str) -> GeoQuery:
111        """
112        Parse a natural language location query into structured format.
113
114        This is the main method for parsing queries. It:
115        1. Invokes the LLM with structured output
116        2. Validates the spatial relation is registered
117        3. Enriches with default parameters
118        4. Checks confidence threshold
119
120        Args:
121            query: Natural language query in any language
122
123        Returns:
124            GeoQuery: Structured query representation with confidence scores
125
126        Raises:
127            ParsingError: If LLM fails to parse query into valid structure
128            ValidationError: If parsed query fails business logic validation
129            UnknownRelationError: If spatial relation is not registered
130            LowConfidenceError: If confidence below threshold (strict mode only)
131
132        Warns:
133            LowConfidenceWarning: If confidence below threshold (permissive mode)
134
135        Examples:
136            Simple containment query:
137            >>> result = parser.parse("in Bern")
138            >>> result.reference_location.name
139            'Bern'
140            >>> result.spatial_relation.relation
141            'in'
142
143            Buffer query:
144            >>> result = parser.parse("near Lake Geneva")
145            >>> result.spatial_relation.relation
146            'near'
147            >>> result.buffer_config.distance_m
148            5000
149
150            Directional query:
151            >>> result = parser.parse("north of Lausanne")
152            >>> result.spatial_relation.relation
153            'north_of'
154            >>> result.reference_location.name
155            'Lausanne'
156
157            Multilingual:
158            >>> result = parser.parse("près de Genève")
159            >>> result.spatial_relation.relation
160            'near'
161            >>> result.reference_location.name
162            'Genève'
163        """
164        # Format prompt with query
165        formatted_messages = self.prompt.format_messages(query=query)
166
167        # Invoke LLM with structured output
168        try:
169            response = self.structured_llm.invoke(formatted_messages)
170        except Exception as e:
171            raise ParsingError(
172                message=f"LLM invocation failed: {str(e)}",
173                raw_response="",
174                original_error=e,
175            ) from e
176
177        # Check for parsing errors
178        parsed = response.get("parsed") if isinstance(response, dict) else response
179
180        if parsed is None:
181            raw = response.get("raw", "") if isinstance(response, dict) else ""
182            error = response.get("parsing_error") if isinstance(response, dict) else None
183            raise ParsingError(
184                message="Failed to parse query into structured format. "
185                "LLM may have returned invalid JSON or missed required fields.",
186                raw_response=str(raw),
187                original_error=error,
188            )
189
190        geo_query = parsed
191        assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery"
192
193        # Ensure original_query is set correctly
194        if not geo_query.original_query or geo_query.original_query != query:
195            geo_query.original_query = query
196
197        # Run validation pipeline
198        geo_query = validate_query(
199            geo_query,
200            self.spatial_config,
201            confidence_threshold=self.confidence_threshold,
202            strict_mode=self.strict_mode,
203        )
204
205        return geo_query
206
207    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
208        """
209        Parse a natural language location query with streaming reasoning and results.
210
211        This method provides real-time feedback during the parsing process by yielding
212        intermediate reasoning steps and the final GeoQuery result. This is useful for
213        providing users with transparency into the LLM's decision-making process and
214        for building responsive UIs.
215
216        The stream yields dictionaries with the following event types:
217        - {"type": "start"} - Stream started
218        - {"type": "reasoning", "content": str} - Intermediate processing steps
219        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
220        - {"type": "error", "content": str} - Errors encountered during processing
221        - {"type": "finish"} - Stream completed successfully
222
223        Args:
224            query: Natural language query in any language
225
226        Yields:
227            dict: Stream events with type and optional content fields
228
229        Raises:
230            ParsingError: If LLM fails to parse query into valid structure
231            ValidationError: If parsed query fails business logic validation
232            UnknownRelationError: If spatial relation is not registered
233            LowConfidenceError: If confidence below threshold (strict mode only)
234
235        Examples:
236            Basic usage with async iteration:
237            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
238            ...     if event["type"] == "reasoning":
239            ...         print(f"Reasoning: {event['content']}")
240            ...     elif event["type"] == "data-response":
241            ...         geo_query = event["content"]
242            ...         print(f"Location: {geo_query['reference_location']['name']}")
243            ...     elif event["type"] == "error":
244            ...         print(f"Error: {event['content']}")
245
246            Using in a FastAPI streaming endpoint:
247            >>> from fastapi.responses import StreamingResponse
248            >>> @app.get("/stream")
249            >>> async def stream_endpoint(q: str):
250            ...     async def event_stream():
251            ...         async for event in parser.parse_stream(q):
252            ...             yield f"data: {json.dumps(event)}\\n\\n"
253            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
254        """
255        try:
256            # Signal start of stream
257            yield {"type": "start"}
258
259            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
260            formatted_messages = self.prompt.format_messages(query=query)
261
262            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
263            try:
264                response = await self.structured_llm.ainvoke(formatted_messages)
265            except Exception as e:
266                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
267                raise ParsingError(
268                    message=f"LLM invocation failed: {str(e)}",
269                    raw_response="",
270                    original_error=e,
271                ) from e
272
273            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
274            parsed = response.get("parsed") if isinstance(response, dict) else response
275
276            if parsed is None:
277                raw = response.get("raw", "") if isinstance(response, dict) else ""
278                error = response.get("parsing_error") if isinstance(response, dict) else None
279                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
280                raise ParsingError(
281                    message="Failed to parse query into structured format. "
282                    "LLM may have returned invalid JSON or missed required fields.",
283                    raw_response=str(raw),
284                    original_error=error,
285                )
286
287            geo_query = parsed
288            assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery"
289
290            # Ensure original_query is set correctly
291            if not geo_query.original_query or geo_query.original_query != query:
292                geo_query.original_query = query
293
294            if geo_query.confidence_breakdown.reasoning:
295                yield {
296                    "type": "reasoning",
297                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
298                }
299
300            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
301            geo_query = validate_query(
302                geo_query,
303                self.spatial_config,
304                confidence_threshold=self.confidence_threshold,
305                strict_mode=self.strict_mode,
306            )
307
308            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
309            yield {"type": "data-response", "content": geo_query.model_dump()}
310
311            # Signal successful completion
312            yield {"type": "finish"}
313
314        except Exception as e:
315            # Emit error event before re-raising
316            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
317            raise
318
319    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
320        """
321        Parse multiple queries in batch.
322
323        Note: This is a simple sequential implementation.
324        For true parallelization, consider using async methods or ThreadPoolExecutor.
325
326        Args:
327            queries: List of natural language queries
328
329        Returns:
330            List of GeoQuery objects (same order as input)
331
332        Raises:
333            Same exceptions as parse() for any failing query
334        """
335        return [self.parse(query) for query in queries]
336
337    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
338        """
339        Get list of available spatial relations.
340
341        Args:
342            category: Optional filter by category ("containment", "buffer", "directional")
343
344        Returns:
345            List of relation names
346        """
347        return self.spatial_config.list_relations(category=category)
348
349    def describe_relation(self, relation_name: str) -> str:
350        """
351        Get description of a spatial relation.
352
353        Args:
354            relation_name: Name of the relation
355
356        Returns:
357            Human-readable description
358
359        Raises:
360            UnknownRelationError: If relation is not registered
361        """
362        config = self.spatial_config.get_config(relation_name)
363        return config.description

Main entry point for parsing natural language location queries.

This class orchestrates the entire parsing pipeline:

  1. Initialize LLM with structured output
  2. Build prompt with spatial relations and examples
  3. Parse query through LLM
  4. Validate and enrich with defaults
  5. Return structured GeoQuery
Examples:

Basic usage:

>>> from langchain.chat_models import init_chat_model
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
>>> parser = GeoFilterParser(llm=llm)
>>> result = parser.parse("restaurants in Lausanne")
>>> print(result.reference_location.name)
'Lausanne'

With strict confidence mode:

>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
>>> result = parser.parse("near the station")  # May raise LowConfidenceError
GeoFilterParser( llm: langchain_core.language_models.chat_models.BaseChatModel, spatial_config: SpatialRelationConfig | None = None, confidence_threshold: float = 0.6, strict_mode: bool = False, include_examples: bool = True, datasource: GeoDataSource | None = None)
44    def __init__(
45        self,
46        llm: BaseChatModel,
47        spatial_config: SpatialRelationConfig | None = None,
48        confidence_threshold: float = 0.6,
49        strict_mode: bool = False,
50        include_examples: bool = True,
51        datasource: GeoDataSource | None = None,
52    ):
53        """
54        Initialize the parser.
55
56        Args:
57            llm: LangChain LLM instance (required).
58            spatial_config: Spatial relation configuration. If None, uses defaults
59            confidence_threshold: Minimum confidence to accept (0-1)
60            strict_mode: If True, raise error on low confidence. If False, warn only
61            include_examples: Whether to include few-shot examples in prompt
62            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
63                       about the concrete types available in that datasource for better type inference.
64
65        Example:
66            >>> from langchain.chat_models import init_chat_model
67            >>> from etter.datasources.swissnames3d import SwissNames3DSource
68            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
69            >>> datasource = SwissNames3DSource("data/")
70            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
71        """
72        self.llm = llm
73
74        # Initialize spatial config
75        self.spatial_config = spatial_config or SpatialRelationConfig()
76
77        # Settings
78        self.confidence_threshold = confidence_threshold
79        self.strict_mode = strict_mode
80        self.include_examples = include_examples
81        self.datasource = datasource
82
83        # Build structured LLM
84        self.structured_llm = self._build_structured_llm()
85
86        # Build prompt template
87        self.prompt = self._build_prompt()

Initialize the parser.

Arguments:
  • llm: LangChain LLM instance (required).
  • spatial_config: Spatial relation configuration. If None, uses defaults
  • confidence_threshold: Minimum confidence to accept (0-1)
  • strict_mode: If True, raise error on low confidence. If False, warn only
  • include_examples: Whether to include few-shot examples in prompt
  • datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
Example:
>>> from langchain.chat_models import init_chat_model
>>> from etter.datasources.swissnames3d import SwissNames3DSource
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
>>> datasource = SwissNames3DSource("data/")
>>> parser = GeoFilterParser(llm=llm, datasource=datasource)
llm
spatial_config
confidence_threshold
strict_mode
include_examples
datasource
structured_llm
prompt
def parse(self, query: str) -> GeoQuery:
110    def parse(self, query: str) -> GeoQuery:
111        """
112        Parse a natural language location query into structured format.
113
114        This is the main method for parsing queries. It:
115        1. Invokes the LLM with structured output
116        2. Validates the spatial relation is registered
117        3. Enriches with default parameters
118        4. Checks confidence threshold
119
120        Args:
121            query: Natural language query in any language
122
123        Returns:
124            GeoQuery: Structured query representation with confidence scores
125
126        Raises:
127            ParsingError: If LLM fails to parse query into valid structure
128            ValidationError: If parsed query fails business logic validation
129            UnknownRelationError: If spatial relation is not registered
130            LowConfidenceError: If confidence below threshold (strict mode only)
131
132        Warns:
133            LowConfidenceWarning: If confidence below threshold (permissive mode)
134
135        Examples:
136            Simple containment query:
137            >>> result = parser.parse("in Bern")
138            >>> result.reference_location.name
139            'Bern'
140            >>> result.spatial_relation.relation
141            'in'
142
143            Buffer query:
144            >>> result = parser.parse("near Lake Geneva")
145            >>> result.spatial_relation.relation
146            'near'
147            >>> result.buffer_config.distance_m
148            5000
149
150            Directional query:
151            >>> result = parser.parse("north of Lausanne")
152            >>> result.spatial_relation.relation
153            'north_of'
154            >>> result.reference_location.name
155            'Lausanne'
156
157            Multilingual:
158            >>> result = parser.parse("près de Genève")
159            >>> result.spatial_relation.relation
160            'near'
161            >>> result.reference_location.name
162            'Genève'
163        """
164        # Format prompt with query
165        formatted_messages = self.prompt.format_messages(query=query)
166
167        # Invoke LLM with structured output
168        try:
169            response = self.structured_llm.invoke(formatted_messages)
170        except Exception as e:
171            raise ParsingError(
172                message=f"LLM invocation failed: {str(e)}",
173                raw_response="",
174                original_error=e,
175            ) from e
176
177        # Check for parsing errors
178        parsed = response.get("parsed") if isinstance(response, dict) else response
179
180        if parsed is None:
181            raw = response.get("raw", "") if isinstance(response, dict) else ""
182            error = response.get("parsing_error") if isinstance(response, dict) else None
183            raise ParsingError(
184                message="Failed to parse query into structured format. "
185                "LLM may have returned invalid JSON or missed required fields.",
186                raw_response=str(raw),
187                original_error=error,
188            )
189
190        geo_query = parsed
191        assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery"
192
193        # Ensure original_query is set correctly
194        if not geo_query.original_query or geo_query.original_query != query:
195            geo_query.original_query = query
196
197        # Run validation pipeline
198        geo_query = validate_query(
199            geo_query,
200            self.spatial_config,
201            confidence_threshold=self.confidence_threshold,
202            strict_mode=self.strict_mode,
203        )
204
205        return geo_query

Parse a natural language location query into structured format.

This is the main method for parsing queries. It:

  1. Invokes the LLM with structured output
  2. Validates the spatial relation is registered
  3. Enriches with default parameters
  4. Checks confidence threshold
Arguments:
  • query: Natural language query in any language
Returns:

GeoQuery: Structured query representation with confidence scores

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Warns:

LowConfidenceWarning: If confidence below threshold (permissive mode)

Examples:

Simple containment query:

>>> result = parser.parse("in Bern")
>>> result.reference_location.name
'Bern'
>>> result.spatial_relation.relation
'in'

Buffer query:

>>> result = parser.parse("near Lake Geneva")
>>> result.spatial_relation.relation
'near'
>>> result.buffer_config.distance_m
5000

Directional query:

>>> result = parser.parse("north of Lausanne")
>>> result.spatial_relation.relation
'north_of'
>>> result.reference_location.name
'Lausanne'

Multilingual:

>>> result = parser.parse("près de Genève")
>>> result.spatial_relation.relation
'near'
>>> result.reference_location.name
'Genève'
async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
207    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
208        """
209        Parse a natural language location query with streaming reasoning and results.
210
211        This method provides real-time feedback during the parsing process by yielding
212        intermediate reasoning steps and the final GeoQuery result. This is useful for
213        providing users with transparency into the LLM's decision-making process and
214        for building responsive UIs.
215
216        The stream yields dictionaries with the following event types:
217        - {"type": "start"} - Stream started
218        - {"type": "reasoning", "content": str} - Intermediate processing steps
219        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
220        - {"type": "error", "content": str} - Errors encountered during processing
221        - {"type": "finish"} - Stream completed successfully
222
223        Args:
224            query: Natural language query in any language
225
226        Yields:
227            dict: Stream events with type and optional content fields
228
229        Raises:
230            ParsingError: If LLM fails to parse query into valid structure
231            ValidationError: If parsed query fails business logic validation
232            UnknownRelationError: If spatial relation is not registered
233            LowConfidenceError: If confidence below threshold (strict mode only)
234
235        Examples:
236            Basic usage with async iteration:
237            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
238            ...     if event["type"] == "reasoning":
239            ...         print(f"Reasoning: {event['content']}")
240            ...     elif event["type"] == "data-response":
241            ...         geo_query = event["content"]
242            ...         print(f"Location: {geo_query['reference_location']['name']}")
243            ...     elif event["type"] == "error":
244            ...         print(f"Error: {event['content']}")
245
246            Using in a FastAPI streaming endpoint:
247            >>> from fastapi.responses import StreamingResponse
248            >>> @app.get("/stream")
249            >>> async def stream_endpoint(q: str):
250            ...     async def event_stream():
251            ...         async for event in parser.parse_stream(q):
252            ...             yield f"data: {json.dumps(event)}\\n\\n"
253            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
254        """
255        try:
256            # Signal start of stream
257            yield {"type": "start"}
258
259            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
260            formatted_messages = self.prompt.format_messages(query=query)
261
262            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
263            try:
264                response = await self.structured_llm.ainvoke(formatted_messages)
265            except Exception as e:
266                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
267                raise ParsingError(
268                    message=f"LLM invocation failed: {str(e)}",
269                    raw_response="",
270                    original_error=e,
271                ) from e
272
273            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
274            parsed = response.get("parsed") if isinstance(response, dict) else response
275
276            if parsed is None:
277                raw = response.get("raw", "") if isinstance(response, dict) else ""
278                error = response.get("parsing_error") if isinstance(response, dict) else None
279                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
280                raise ParsingError(
281                    message="Failed to parse query into structured format. "
282                    "LLM may have returned invalid JSON or missed required fields.",
283                    raw_response=str(raw),
284                    original_error=error,
285                )
286
287            geo_query = parsed
288            assert isinstance(geo_query, GeoQuery), "Parsed result must be GeoQuery"
289
290            # Ensure original_query is set correctly
291            if not geo_query.original_query or geo_query.original_query != query:
292                geo_query.original_query = query
293
294            if geo_query.confidence_breakdown.reasoning:
295                yield {
296                    "type": "reasoning",
297                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
298                }
299
300            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
301            geo_query = validate_query(
302                geo_query,
303                self.spatial_config,
304                confidence_threshold=self.confidence_threshold,
305                strict_mode=self.strict_mode,
306            )
307
308            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
309            yield {"type": "data-response", "content": geo_query.model_dump()}
310
311            # Signal successful completion
312            yield {"type": "finish"}
313
314        except Exception as e:
315            # Emit error event before re-raising
316            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
317            raise

Parse a natural language location query with streaming reasoning and results.

This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.

The stream yields dictionaries with the following event types:

  • {"type": "start"} - Stream started
  • {"type": "reasoning", "content": str} - Intermediate processing steps
  • {"type": "data-response", "content": dict} - Final GeoQuery as JSON
  • {"type": "error", "content": str} - Errors encountered during processing
  • {"type": "finish"} - Stream completed successfully
Arguments:
  • query: Natural language query in any language
Yields:

dict: Stream events with type and optional content fields

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Examples:

Basic usage with async iteration:

>>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
...     if event["type"] == "reasoning":
...         print(f"Reasoning: {event['content']}")
...     elif event["type"] == "data-response":
...         geo_query = event["content"]
...         print(f"Location: {geo_query['reference_location']['name']}")
...     elif event["type"] == "error":
...         print(f"Error: {event['content']}")

Using in a FastAPI streaming endpoint:

>>> from fastapi.responses import StreamingResponse
>>> @app.get("/stream")
>>> async def stream_endpoint(q: str):
...     async def event_stream():
...         async for event in parser.parse_stream(q):
...             yield f"data: {json.dumps(event)}\n\n"
...     return StreamingResponse(event_stream(), media_type="text/event-stream")
def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
319    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
320        """
321        Parse multiple queries in batch.
322
323        Note: This is a simple sequential implementation.
324        For true parallelization, consider using async methods or ThreadPoolExecutor.
325
326        Args:
327            queries: List of natural language queries
328
329        Returns:
330            List of GeoQuery objects (same order as input)
331
332        Raises:
333            Same exceptions as parse() for any failing query
334        """
335        return [self.parse(query) for query in queries]

Parse multiple queries in batch.

Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.

Arguments:
  • queries: List of natural language queries
Returns:

List of GeoQuery objects (same order as input)

Raises:
  • Same exceptions as parse() for any failing query
def get_available_relations( self, category: Optional[Literal['containment', 'buffer', 'directional']] = None) -> list[str]:
337    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
338        """
339        Get list of available spatial relations.
340
341        Args:
342            category: Optional filter by category ("containment", "buffer", "directional")
343
344        Returns:
345            List of relation names
346        """
347        return self.spatial_config.list_relations(category=category)

Get list of available spatial relations.

Arguments:
  • category: Optional filter by category ("containment", "buffer", "directional")
Returns:

List of relation names

def describe_relation(self, relation_name: str) -> str:
349    def describe_relation(self, relation_name: str) -> str:
350        """
351        Get description of a spatial relation.
352
353        Args:
354            relation_name: Name of the relation
355
356        Returns:
357            Human-readable description
358
359        Raises:
360            UnknownRelationError: If relation is not registered
361        """
362        config = self.spatial_config.get_config(relation_name)
363        return config.description

Get description of a spatial relation.

Arguments:
  • relation_name: Name of the relation
Returns:

Human-readable description

Raises:
  • UnknownRelationError: If relation is not registered
class GeoQuery(pydantic.main.BaseModel):
117class GeoQuery(BaseModel):
118    """
119    Root model representing a parsed geographic query.
120    This is the main output structure returned by the parser.
121    """
122
123    query_type: Literal["simple", "compound", "split", "boolean"] = Field(
124        "simple",
125        description="Type of query. Phase 1 only supports 'simple'. "
126        "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations",
127    )
128    spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location")
129    reference_location: ReferenceLocation = Field(description="Reference location for the spatial query")
130    buffer_config: BufferConfig | None = Field(
131        None,
132        description="Buffer configuration for buffer and directional relations. "
133        "Auto-generated with defaults by enrich_with_defaults() if not provided. "
134        "Required for 'near', 'around', 'north_of', etc. "
135        "Set to None for containment relations ('in').",
136    )
137    confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse")
138    original_query: str = Field(description="Original query text exactly as provided by the user")
139
140    @model_validator(mode="after")
141    def validate_buffer_config_consistency(self) -> "GeoQuery":
142        """Validate buffer_config consistency with relation category."""
143        # Buffer and directional relations must have buffer_config
144        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
145            raise ValueError(
146                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
147            )
148
149        # Containment relations should not have buffer_config
150        if self.spatial_relation.category == "containment" and self.buffer_config is not None:
151            raise ValueError(
152                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
153                f"should not have buffer_config"
154            )
155
156        return self

Root model representing a parsed geographic query. This is the main output structure returned by the parser.

query_type: Literal['simple', 'compound', 'split', 'boolean'] = 'simple'

Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations

spatial_relation: SpatialRelation = PydanticUndefined

Spatial relationship to reference location

reference_location: ReferenceLocation = PydanticUndefined

Reference location for the spatial query

buffer_config: BufferConfig | None = None

Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').

confidence_breakdown: ConfidenceScore = PydanticUndefined

Confidence scores for different aspects of the parse

original_query: str = PydanticUndefined

Original query text exactly as provided by the user

@model_validator(mode='after')
def validate_buffer_config_consistency(self) -> GeoQuery:
140    @model_validator(mode="after")
141    def validate_buffer_config_consistency(self) -> "GeoQuery":
142        """Validate buffer_config consistency with relation category."""
143        # Buffer and directional relations must have buffer_config
144        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
145            raise ValueError(
146                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
147            )
148
149        # Containment relations should not have buffer_config
150        if self.spatial_relation.category == "containment" and self.buffer_config is not None:
151            raise ValueError(
152                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
153                f"should not have buffer_config"
154            )
155
156        return self

Validate buffer_config consistency with relation category.

class SpatialRelation(pydantic.main.BaseModel):
 96class SpatialRelation(BaseModel):
 97    """A spatial relationship between target and reference."""
 98
 99    relation: str = Field(
100        description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', "
101        "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list."
102    )
103    category: RelationCategory = Field(
104        description="Category of spatial relation. "
105        "'containment' = exact boundary matching (in), "
106        "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), "
107        "'directional' = sector-based queries (north_of, south_of, east_of, west_of)"
108    )
109    explicit_distance: float | None = Field(
110        None,
111        description="Distance in meters if explicitly mentioned by user. "
112        "For example: 'within 5km' → 5000, 'within 500 meters' → 500. "
113        "Leave null if not explicitly stated.",
114    )

A spatial relationship between target and reference.

relation: str = PydanticUndefined

Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.

category: Literal['containment', 'buffer', 'directional'] = PydanticUndefined

Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of), 'directional' = sector-based queries (north_of, south_of, east_of, west_of)

explicit_distance: float | None = None

Distance in meters if explicitly mentioned by user. For example: 'within 5km' → 5000, 'within 500 meters' → 500. Leave null if not explicitly stated.

class ReferenceLocation(pydantic.main.BaseModel):
36class ReferenceLocation(BaseModel):
37    """A geographic reference location extracted from the query."""
38
39    name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')")
40    # FIXME: enum ?
41    type: str | None = Field(
42        None,
43        description="Type hint for geographic feature (city, lake, mountain, canton, country, "
44        "train_station, airport, river, road, etc.). This is a HINT for ranking results, "
45        "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, "
46        "'Rhone' could be river or road), provide your best guess or leave null. "
47        "The datasource will return multiple types ranked by relevance.",
48    )
49    type_confidence: ConfidenceLevel | None = Field(
50        None,
51        description="Confidence in the type inference (0-1). High confidence (>0.8) when type is "
52        "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous "
53        "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, "
54        "'in X' → city/region, 'on X' → lake/mountain.",
55    )

A geographic reference location extracted from the query.

name: str = PydanticUndefined

Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')

type: str | None = None

Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.

type_confidence: Optional[Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]] = None

Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.

class BufferConfig(pydantic.main.BaseModel):
58class BufferConfig(BaseModel):
59    """Configuration for buffer-based spatial operations."""
60
61    distance_m: float = Field(
62        description="Buffer distance in meters. Positive values expand outward (proximity), "
63        "negative values erode inward (e.g., 'in the heart of'). "
64        "Examples: 5000 = 5km radius, -500 = 500m erosion"
65    )
66    buffer_from: Literal["center", "boundary"] = Field(
67        description="Buffer origin. 'center' = buffer from centroid point (for proximity), "
68        "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)"
69    )
70    ring_only: bool = Field(
71        False,
72        description="If True, exclude the reference feature itself to create a ring/donut shape. "
73        "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). "
74        "Only valid with buffer_from='boundary'.",
75    )
76    side: Literal["left", "right"] | None = Field(
77        None,
78        description="Side of a linear feature for one-sided buffer. "
79        "'left' = left side relative to line direction, 'right' = right side. "
80        "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().",
81    )
82    inferred: bool = Field(
83        True,
84        description="True if this configuration was inferred from relation defaults. "
85        "False if the user explicitly specified distance or buffer parameters.",
86    )
87
88    @model_validator(mode="after")
89    def validate_ring_only(self) -> "BufferConfig":
90        """Validate that ring_only is only used with boundary buffers."""
91        if self.ring_only and self.buffer_from == "center":
92            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
93        return self

Configuration for buffer-based spatial operations.

distance_m: float = PydanticUndefined

Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion

buffer_from: Literal['center', 'boundary'] = PydanticUndefined

Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)

ring_only: bool = False

If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.

side: Optional[Literal['left', 'right']] = None

Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().

inferred: bool = True

True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.

@model_validator(mode='after')
def validate_ring_only(self) -> BufferConfig:
88    @model_validator(mode="after")
89    def validate_ring_only(self) -> "BufferConfig":
90        """Validate that ring_only is only used with boundary buffers."""
91        if self.ring_only and self.buffer_from == "center":
92            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
93        return self

Validate that ring_only is only used with boundary buffers.

class ConfidenceScore(pydantic.main.BaseModel):
16class ConfidenceScore(BaseModel):
17    """Confidence scores for different aspects of the parsed query."""
18
19    overall: ConfidenceLevel = Field(
20        description="Overall confidence score for the entire query parse. "
21        "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain",
22    )
23    location_confidence: ConfidenceLevel = Field(
24        description="Confidence in correctly identifying the reference location",
25    )
26    relation_confidence: ConfidenceLevel = Field(
27        description="Confidence in correctly identifying the spatial relation",
28    )
29    reasoning: str | None = Field(
30        None,
31        description="Explanation for confidence scores. Always include reasoning for clarity and debugging. "
32        "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.",
33    )

Confidence scores for different aspects of the parsed query.

overall: typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain

location_confidence: typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the reference location

relation_confidence: typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the spatial relation

reasoning: str | None = None

Explanation for confidence scores. Always include reasoning for clarity and debugging. For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.

ConfidenceLevel = typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]
class SpatialRelationConfig:
 40class SpatialRelationConfig:
 41    """
 42    Registry and configuration for spatial relations.
 43
 44    Manages built-in and custom spatial relations with their default parameters.
 45    """
 46
 47    def __init__(self):
 48        """Initialize with built-in spatial relations."""
 49        self.relations: dict[str, RelationConfig] = {}
 50        self._initialize_defaults()
 51
 52    def _initialize_defaults(self):
 53        """Register built-in spatial relations from ARCHITECTURE.md."""
 54
 55        # ===== CONTAINMENT RELATIONS =====
 56        self.register_relation(
 57            RelationConfig(
 58                name="in",
 59                category="containment",
 60                description="Feature is within the reference boundary",
 61            )
 62        )
 63
 64        # ===== BUFFER/PROXIMITY RELATIONS =====
 65        self.register_relation(
 66            RelationConfig(
 67                name="near",
 68                category="buffer",
 69                description="Proximity search with default 5km radius",
 70                default_distance_m=5000,
 71                buffer_from="center",
 72            )
 73        )
 74
 75        self.register_relation(
 76            RelationConfig(
 77                name="on_shores_of",
 78                category="buffer",
 79                description="Ring buffer around lake/water boundary, excluding the water body itself",
 80                default_distance_m=1000,
 81                buffer_from="boundary",
 82                ring_only=True,
 83            )
 84        )
 85
 86        self.register_relation(
 87            RelationConfig(
 88                name="along",
 89                category="buffer",
 90                description="Buffer following a linear feature like a river or road",
 91                default_distance_m=500,
 92                buffer_from="boundary",
 93            )
 94        )
 95
 96        self.register_relation(
 97            RelationConfig(
 98                name="left_bank",
 99                category="buffer",
100                description="Left bank of a linear feature (river, road) relative to its direction/flow",
101                default_distance_m=500,
102                buffer_from="boundary",
103                side="left",
104            )
105        )
106
107        self.register_relation(
108            RelationConfig(
109                name="right_bank",
110                category="buffer",
111                description="Right bank of a linear feature (river, road) relative to its direction/flow",
112                default_distance_m=500,
113                buffer_from="boundary",
114                side="right",
115            )
116        )
117
118        self.register_relation(
119            RelationConfig(
120                name="in_the_heart_of",
121                category="buffer",
122                description="Central area excluding periphery (negative buffer - erosion)",
123                default_distance_m=-500,
124                buffer_from="boundary",
125            )
126        )
127
128        # ===== DIRECTIONAL RELATIONS =====
129        # All directional relations use consistent defaults:
130        # - Distance: 10km radius (default_distance_m=10000)
131        # - Sector: 90° angular wedge (sector_angle_degrees=90)
132        # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults)
133        # These defaults are applied automatically by enrich_with_defaults() for any directional query.
134        # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West)
135        self.register_relation(
136            RelationConfig(
137                name="north_of",
138                category="directional",
139                description="Directional sector north of reference",
140                default_distance_m=10000,
141                sector_angle_degrees=90,
142                direction_angle_degrees=0,
143            )
144        )
145
146        self.register_relation(
147            RelationConfig(
148                name="south_of",
149                category="directional",
150                description="Directional sector south of reference",
151                default_distance_m=10000,
152                sector_angle_degrees=90,
153                direction_angle_degrees=180,
154            )
155        )
156
157        self.register_relation(
158            RelationConfig(
159                name="east_of",
160                category="directional",
161                description="Directional sector east of reference",
162                default_distance_m=10000,
163                sector_angle_degrees=90,
164                direction_angle_degrees=90,
165            )
166        )
167
168        self.register_relation(
169            RelationConfig(
170                name="west_of",
171                category="directional",
172                description="Directional sector west of reference",
173                default_distance_m=10000,
174                sector_angle_degrees=90,
175                direction_angle_degrees=270,
176            )
177        )
178
179        # ===== DIAGONAL DIRECTIONAL RELATIONS =====
180        self.register_relation(
181            RelationConfig(
182                name="northeast_of",
183                category="directional",
184                description="Directional sector northeast of reference",
185                default_distance_m=10000,
186                sector_angle_degrees=90,
187                direction_angle_degrees=45,
188            )
189        )
190
191        self.register_relation(
192            RelationConfig(
193                name="southeast_of",
194                category="directional",
195                description="Directional sector southeast of reference",
196                default_distance_m=10000,
197                sector_angle_degrees=90,
198                direction_angle_degrees=135,
199            )
200        )
201
202        self.register_relation(
203            RelationConfig(
204                name="southwest_of",
205                category="directional",
206                description="Directional sector southwest of reference",
207                default_distance_m=10000,
208                sector_angle_degrees=90,
209                direction_angle_degrees=225,
210            )
211        )
212
213        self.register_relation(
214            RelationConfig(
215                name="northwest_of",
216                category="directional",
217                description="Directional sector northwest of reference",
218                default_distance_m=10000,
219                sector_angle_degrees=90,
220                direction_angle_degrees=315,
221            )
222        )
223
224    def register_relation(self, config: RelationConfig) -> None:
225        """Register a new spatial relation."""
226        self.relations[config.name] = config
227
228    def has_relation(self, name: str) -> bool:
229        """Check if a relation is registered."""
230        return name in self.relations
231
232    def get_config(self, name: str) -> RelationConfig:
233        """Get configuration for a relation. Raises UnknownRelationError if not found."""
234        if not self.has_relation(name):
235            raise UnknownRelationError(
236                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
237                relation_name=name,
238            )
239        return self.relations[name]
240
241    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
242        """List available relation names."""
243        if category is None:
244            return sorted(self.relations.keys())
245        return sorted(r.name for r in self.relations.values() if r.category == category)
246
247    def format_for_prompt(self) -> str:
248        """Format relations for inclusion in LLM prompt."""
249        lines = []
250
251        # Group by category
252        for category in get_args(RelationCategory):
253            category_relations = [r for r in self.relations.values() if r.category == category]
254            if not category_relations:
255                continue
256
257            lines.append(f"\n{category.upper()} RELATIONS:")
258
259            for rel in sorted(category_relations, key=lambda r: r.name):
260                # Build distance info
261                dist_info = ""
262                if rel.default_distance_m is not None:
263                    dist_str = f"{abs(rel.default_distance_m)}m"
264                    if rel.default_distance_m < 0:
265                        dist_info = f" (default: {dist_str} erosion)"
266                    else:
267                        dist_info = f" (default: {dist_str})"
268
269                # Build special flags
270                flags = []
271                if rel.ring_only:
272                    flags.append("ring buffer")
273                if rel.buffer_from:
274                    flags.append(f"from {rel.buffer_from}")
275                if rel.side:
276                    flags.append(f"{rel.side} side only")
277                flag_info = f" [{', '.join(flags)}]" if flags else ""
278
279                # Format line
280                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
281                lines.append(f"    {rel.description}")
282
283        # Add notes
284        lines.append("\nNOTES:")
285        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
286        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake)")
287        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
288
289        return "\n".join(lines)

Registry and configuration for spatial relations.

Manages built-in and custom spatial relations with their default parameters.

SpatialRelationConfig()
47    def __init__(self):
48        """Initialize with built-in spatial relations."""
49        self.relations: dict[str, RelationConfig] = {}
50        self._initialize_defaults()

Initialize with built-in spatial relations.

relations: dict[str, RelationConfig]
def register_relation(self, config: RelationConfig) -> None:
224    def register_relation(self, config: RelationConfig) -> None:
225        """Register a new spatial relation."""
226        self.relations[config.name] = config

Register a new spatial relation.

def has_relation(self, name: str) -> bool:
228    def has_relation(self, name: str) -> bool:
229        """Check if a relation is registered."""
230        return name in self.relations

Check if a relation is registered.

def get_config(self, name: str) -> RelationConfig:
232    def get_config(self, name: str) -> RelationConfig:
233        """Get configuration for a relation. Raises UnknownRelationError if not found."""
234        if not self.has_relation(name):
235            raise UnknownRelationError(
236                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
237                relation_name=name,
238            )
239        return self.relations[name]

Get configuration for a relation. Raises UnknownRelationError if not found.

def list_relations( self, category: Optional[Literal['containment', 'buffer', 'directional']] = None) -> list[str]:
241    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
242        """List available relation names."""
243        if category is None:
244            return sorted(self.relations.keys())
245        return sorted(r.name for r in self.relations.values() if r.category == category)

List available relation names.

def format_for_prompt(self) -> str:
247    def format_for_prompt(self) -> str:
248        """Format relations for inclusion in LLM prompt."""
249        lines = []
250
251        # Group by category
252        for category in get_args(RelationCategory):
253            category_relations = [r for r in self.relations.values() if r.category == category]
254            if not category_relations:
255                continue
256
257            lines.append(f"\n{category.upper()} RELATIONS:")
258
259            for rel in sorted(category_relations, key=lambda r: r.name):
260                # Build distance info
261                dist_info = ""
262                if rel.default_distance_m is not None:
263                    dist_str = f"{abs(rel.default_distance_m)}m"
264                    if rel.default_distance_m < 0:
265                        dist_info = f" (default: {dist_str} erosion)"
266                    else:
267                        dist_info = f" (default: {dist_str})"
268
269                # Build special flags
270                flags = []
271                if rel.ring_only:
272                    flags.append("ring buffer")
273                if rel.buffer_from:
274                    flags.append(f"from {rel.buffer_from}")
275                if rel.side:
276                    flags.append(f"{rel.side} side only")
277                flag_info = f" [{', '.join(flags)}]" if flags else ""
278
279                # Format line
280                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
281                lines.append(f"    {rel.description}")
282
283        # Add notes
284        lines.append("\nNOTES:")
285        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
286        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake)")
287        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
288
289        return "\n".join(lines)

Format relations for inclusion in LLM prompt.

@dataclass
class RelationConfig:
13@dataclass
14class RelationConfig:
15    """
16    Configuration for a single spatial relation.
17
18    Attributes:
19        name: Relation identifier (e.g., "in", "near", "north_of")
20        category: Type of spatial operation
21        description: Human-readable description for LLM prompt
22        default_distance_m: Default buffer distance in meters
23        buffer_from: Buffer origin
24        ring_only: Exclude reference feature to create ring buffer
25        sector_angle_degrees: Angular sector for directional queries
26        direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
27    """
28
29    name: str
30    category: RelationCategory
31    description: str
32    default_distance_m: float | None = None
33    buffer_from: Literal["center", "boundary"] | None = None
34    ring_only: bool = False
35    side: Literal["left", "right"] | None = None
36    sector_angle_degrees: float | None = None
37    direction_angle_degrees: float | None = None

Configuration for a single spatial relation.

Attributes:
  • name: Relation identifier (e.g., "in", "near", "north_of")
  • category: Type of spatial operation
  • description: Human-readable description for LLM prompt
  • default_distance_m: Default buffer distance in meters
  • buffer_from: Buffer origin
  • ring_only: Exclude reference feature to create ring buffer
  • sector_angle_degrees: Angular sector for directional queries
  • direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
RelationConfig( name: str, category: Literal['containment', 'buffer', 'directional'], description: str, default_distance_m: float | None = None, buffer_from: Optional[Literal['center', 'boundary']] = None, ring_only: bool = False, side: Optional[Literal['left', 'right']] = None, sector_angle_degrees: float | None = None, direction_angle_degrees: float | None = None)
name: str
category: Literal['containment', 'buffer', 'directional']
description: str
default_distance_m: float | None = None
buffer_from: Optional[Literal['center', 'boundary']] = None
ring_only: bool = False
side: Optional[Literal['left', 'right']] = None
sector_angle_degrees: float | None = None
direction_angle_degrees: float | None = None
class GeoFilterError(builtins.Exception):
 7class GeoFilterError(Exception):
 8    """Base exception for all GeoFilter errors."""
 9
10    pass

Base exception for all GeoFilter errors.

class ParsingError(etter.GeoFilterError):
13class ParsingError(GeoFilterError):
14    """LLM failed to parse query into valid structure."""
15
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

LLM failed to parse query into valid structure.

ParsingError( message: str, raw_response: str = '', original_error: Exception | None = None)
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

Initialize parsing error.

Arguments:
  • message: Error description
  • raw_response: Raw response from LLM
  • original_error: Original exception that caused parsing failure
raw_response
original_error
class ValidationError(etter.GeoFilterError):
30class ValidationError(GeoFilterError):
31    """Structured output is valid but fails business logic validation."""
32
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Structured output is valid but fails business logic validation.

ValidationError(message: str, field: str | None = None, detail: str | None = None)
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
field
detail
class UnknownRelationError(etter.ValidationError):
47class UnknownRelationError(ValidationError):
48    """Spatial relation is not registered in configuration."""
49
50    def __init__(self, message: str, relation_name: str):
51        """
52        Initialize unknown relation error.
53
54        Args:
55            message: Error description
56            relation_name: The unknown relation name
57        """
58        self.relation_name = relation_name
59        super().__init__(message, field="spatial_relation")

Spatial relation is not registered in configuration.

UnknownRelationError(message: str, relation_name: str)
50    def __init__(self, message: str, relation_name: str):
51        """
52        Initialize unknown relation error.
53
54        Args:
55            message: Error description
56            relation_name: The unknown relation name
57        """
58        self.relation_name = relation_name
59        super().__init__(message, field="spatial_relation")

Initialize unknown relation error.

Arguments:
  • message: Error description
  • relation_name: The unknown relation name
relation_name
class LowConfidenceError(etter.GeoFilterError):
62class LowConfidenceError(GeoFilterError):
63    """Query confidence is below threshold (strict mode)."""
64
65    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
66        """
67        Initialize low confidence error.
68
69        Args:
70            message: Error description
71            confidence: Confidence score (0-1)
72            reasoning: Optional explanation for low confidence
73        """
74        self.confidence = confidence
75        self.reasoning = reasoning
76        super().__init__(message)

Query confidence is below threshold (strict mode).

LowConfidenceError(message: str, confidence: float, reasoning: str | None = None)
65    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
66        """
67        Initialize low confidence error.
68
69        Args:
70            message: Error description
71            confidence: Confidence score (0-1)
72            reasoning: Optional explanation for low confidence
73        """
74        self.confidence = confidence
75        self.reasoning = reasoning
76        super().__init__(message)

Initialize low confidence error.

Arguments:
  • message: Error description
  • confidence: Confidence score (0-1)
  • reasoning: Optional explanation for low confidence
confidence
reasoning
class LowConfidenceWarning(builtins.UserWarning):
79class LowConfidenceWarning(UserWarning):
80    """Query confidence is below threshold (permissive mode)."""
81
82    def __init__(self, confidence: float, message: str = ""):
83        """
84        Initialize low confidence warning.
85
86        Args:
87            confidence: Confidence score (0-1)
88            message: Warning message
89        """
90        self.confidence = confidence
91        super().__init__(message)

Query confidence is below threshold (permissive mode).

LowConfidenceWarning(confidence: float, message: str = '')
82    def __init__(self, confidence: float, message: str = ""):
83        """
84        Initialize low confidence warning.
85
86        Args:
87            confidence: Confidence score (0-1)
88            message: Warning message
89        """
90        self.confidence = confidence
91        super().__init__(message)

Initialize low confidence warning.

Arguments:
  • confidence: Confidence score (0-1)
  • message: Warning message
confidence
class GeoDataSource(typing.Protocol):
12class GeoDataSource(Protocol):
13    """
14    Protocol for geographic data sources.
15
16    Implementations resolve location names to geographic features.
17    Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
18
19    Example of returned feature:
20        {
21            "type": "Feature",
22            "id": "uuid-123",
23            "geometry": {"type": "Point", "coordinates": [8.5, 47.3]},
24            "bbox": [8.4, 47.3, 8.6, 47.4],
25            "properties": {
26                "name": "Zürich",
27                "type": "city",
28                "confidence": 1.0,
29                ...
30            }
31        }
32    """
33
34    def search(
35        self,
36        name: str,
37        type: str | None = None,
38        max_results: int = 10,
39    ) -> list[dict[str, Any]]:
40        """
41        Search for geographic features by name.
42
43        Args:
44            name: Location name to search for (e.g., "Lake Geneva", "Bern").
45            type: Optional type hint for filtering/ranking results.
46                  Examples: "lake", "city", "mountain", "canton", "river".
47                  When provided, matching types are ranked higher.
48            max_results: Maximum number of results to return.
49
50        Returns:
51            List of matching GeoJSON Feature dicts, ranked by relevance.
52            Returns empty list if no matches found.
53        """
54        ...
55
56    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
57        """
58        Get a specific feature by its unique identifier.
59
60        Args:
61            feature_id: Unique identifier from the data source.
62
63        Returns:
64            The matching GeoJSON Feature dict, or None if not found.
65        """
66        ...
67
68    def get_available_types(self) -> list[str]:
69        """
70        Get list of concrete geographic types this datasource can return.
71
72        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
73        that this datasource uses in the "type" property of returned features.
74        These types can be matched against the location type hierarchy for fuzzy matching.
75
76        The returned types should be a subset of or mapped to the standard location
77        type hierarchy defined in location_types.TYPE_HIERARCHY.
78
79        Returns:
80            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
81            Empty list if this datasource does not provide type information.
82
83        Example:
84            >>> source = SwissNames3DSource("data/")
85            >>> types = source.get_available_types()
86            >>> print(types)
87            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
88        """
89        ...

Protocol for geographic data sources.

Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).

Example of returned feature:

{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }

GeoDataSource(*args, **kwargs)
1431def _no_init_or_replace_init(self, *args, **kwargs):
1432    cls = type(self)
1433
1434    if cls._is_protocol:
1435        raise TypeError('Protocols cannot be instantiated')
1436
1437    # Already using a custom `__init__`. No need to calculate correct
1438    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1439    if cls.__init__ is not _no_init_or_replace_init:
1440        return
1441
1442    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1443    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1444    # searches for a proper new `__init__` in the MRO. The new `__init__`
1445    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1446    # instantiation of the protocol subclass will thus use the new
1447    # `__init__` and no longer call `_no_init_or_replace_init`.
1448    for base in cls.__mro__:
1449        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1450        if init is not _no_init_or_replace_init:
1451            cls.__init__ = init
1452            break
1453    else:
1454        # should not happen
1455        cls.__init__ = object.__init__
1456
1457    cls.__init__(self, *args, **kwargs)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
34    def search(
35        self,
36        name: str,
37        type: str | None = None,
38        max_results: int = 10,
39    ) -> list[dict[str, Any]]:
40        """
41        Search for geographic features by name.
42
43        Args:
44            name: Location name to search for (e.g., "Lake Geneva", "Bern").
45            type: Optional type hint for filtering/ranking results.
46                  Examples: "lake", "city", "mountain", "canton", "river".
47                  When provided, matching types are ranked higher.
48            max_results: Maximum number of results to return.
49
50        Returns:
51            List of matching GeoJSON Feature dicts, ranked by relevance.
52            Returns empty list if no matches found.
53        """
54        ...

Search for geographic features by name.

Arguments:
  • name: Location name to search for (e.g., "Lake Geneva", "Bern").
  • type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
56    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
57        """
58        Get a specific feature by its unique identifier.
59
60        Args:
61            feature_id: Unique identifier from the data source.
62
63        Returns:
64            The matching GeoJSON Feature dict, or None if not found.
65        """
66        ...

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier from the data source.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
68    def get_available_types(self) -> list[str]:
69        """
70        Get list of concrete geographic types this datasource can return.
71
72        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
73        that this datasource uses in the "type" property of returned features.
74        These types can be matched against the location type hierarchy for fuzzy matching.
75
76        The returned types should be a subset of or mapped to the standard location
77        type hierarchy defined in location_types.TYPE_HIERARCHY.
78
79        Returns:
80            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
81            Empty list if this datasource does not provide type information.
82
83        Example:
84            >>> source = SwissNames3DSource("data/")
85            >>> types = source.get_available_types()
86            >>> print(types)
87            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
88        """
89        ...

Get list of concrete geographic types this datasource can return.

Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.

The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.

Returns:

List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.

Example:
>>> source = SwissNames3DSource("data/")
>>> types = source.get_available_types()
>>> print(types)
['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
class SwissNames3DSource:
171class SwissNames3DSource:
172    """
173    Geographic data source backed by swisstopo's swissNAMES3D dataset.
174
175    Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase
176    and provides search by name with optional type filtering.
177
178    If data_path is a directory, automatically loads and concatenates all SwissNames3D
179    shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
180
181    All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
182
183    Args:
184        data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
185        layer: Layer name within the data source (for multi-layer formats like GDB).
186
187    Example:
188        >>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
189        >>> results = source.search("Lac Léman", type="lake")
190        >>> print(results[0].geometry)  # GeoJSON in WGS84
191    """
192
193    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
194        self._data_path = Path(data_path)
195        self._layer = layer
196        self._gdf: gpd.GeoDataFrame | None = None
197        self._name_index: dict[str, list[int]] = {}
198
199    def _ensure_loaded(self) -> None:
200        """Load data lazily on first access."""
201        if self._gdf is not None:
202            return
203        self._load_data()
204
205    def _load_data(self) -> None:
206        """Load SwissNames3D data and build the name index."""
207        # Check if data_path is a directory
208        if self._data_path.is_dir():
209            self._load_from_directory()
210        else:
211            # Load single file
212            kwargs: dict[str, Any] = {}
213            if self._layer is not None:
214                kwargs["layer"] = self._layer
215            self._gdf = gpd.read_file(str(self._data_path), **kwargs)
216
217        self._build_name_index()
218
219    def _load_from_directory(self) -> None:
220        """Load and concatenate all SwissNames3D shapefiles from a directory."""
221        # Look for the 3 standard SwissNames3D shapefiles
222        shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"]
223        gdfs: list[gpd.GeoDataFrame] = []
224
225        for name in shapefile_names:
226            shp_path = self._data_path / f"{name}.shp"
227            if shp_path.exists():
228                gdf = gpd.read_file(str(shp_path))
229                gdfs.append(gdf)
230
231        if not gdfs:
232            raise ValueError(
233                f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}"
234            )
235
236        # Find common columns across all loaded GeoDataFrames
237        common_cols = set(gdfs[0].columns)
238        for gdf in gdfs[1:]:
239            common_cols &= set(gdf.columns)
240
241        # Keep only common columns and concatenate
242        gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs]
243        self._gdf = gpd.GeoDataFrame(
244            gpd.pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry"
245        )
246
247    def _build_name_index(self) -> None:
248        """Build a normalized name → row indices lookup for fast search."""
249        assert self._gdf is not None
250        self._name_index = {}
251
252        name_col = self._detect_name_column()
253        for idx, name in enumerate(self._gdf[name_col]):
254            if not isinstance(name, str) or not name.strip():
255                continue
256            normalized = _normalize_name(name)
257            if normalized not in self._name_index:
258                self._name_index[normalized] = []
259            self._name_index[normalized].append(idx)
260
261    def _detect_name_column(self) -> str:
262        """Detect the name column in the data."""
263        assert self._gdf is not None
264        for candidate in ("NAME", "name", "Name", "BEZEICHNUNG"):
265            if candidate in self._gdf.columns:
266                return candidate
267        raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}")
268
269    def _detect_type_column(self) -> str | None:
270        """Detect the feature type column in the data."""
271        assert self._gdf is not None
272        for candidate in ("OBJEKTART", "objektart", "Objektart"):
273            if candidate in self._gdf.columns:
274                return candidate
275        return None
276
277    def _detect_id_column(self) -> str | None:
278        """Detect the unique ID column in the data."""
279        assert self._gdf is not None
280        for candidate in ("UUID", "uuid", "FID", "OBJECTID", "id"):
281            if candidate in self._gdf.columns:
282                return candidate
283        return None
284
285    def _row_to_feature(self, idx: int) -> dict[str, Any]:
286        """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry."""
287        assert self._gdf is not None
288        row = self._gdf.iloc[idx]
289
290        # Get name
291        name_col = self._detect_name_column()
292        name = str(row[name_col])
293
294        # Get type
295        type_col = self._detect_type_column()
296        raw_type = str(row[type_col]) if type_col and row.get(type_col) else "unknown"
297        normalized_type = _objektart_to_type(raw_type)
298
299        # Get ID
300        id_col = self._detect_id_column()
301        feature_id = str(row[id_col]) if id_col and row.get(id_col) else str(idx)
302
303        # Convert geometry to WGS84 GeoJSON
304        geom = row.geometry
305        if geom is None or geom.is_empty:
306            geometry = {"type": "Point", "coordinates": [0, 0]}
307            bbox = None
308        else:
309            # Transform geometry from EPSG:2056 to WGS84 using the module-level transformer
310            # Drop Z coordinates — they are not needed and cause issues with single_sided buffers
311            wgs84_geom = shapely_transform(_TRANSFORMER.transform, force_2d(geom))
312            geometry = mapping(wgs84_geom)
313            bounds = wgs84_geom.bounds  # (minx, miny, maxx, maxy)
314            bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
315
316        # Collect extra properties
317        skip_cols = {name_col, "geometry"}
318        if type_col:
319            skip_cols.add(type_col)
320        if id_col:
321            skip_cols.add(id_col)
322
323        properties: dict[str, Any] = {
324            "name": name,
325            "type": normalized_type,
326            "confidence": 1.0,
327        }
328        for col in self._gdf.columns:
329            if col not in skip_cols:
330                val = row.get(col)
331                if val is not None and str(val) != "nan":
332                    properties[col] = val
333
334        return {
335            "type": "Feature",
336            "id": feature_id,
337            "geometry": geometry,
338            "bbox": bbox,
339            "properties": properties,
340        }
341
342    def search(
343        self,
344        name: str,
345        type: str | None = None,
346        max_results: int = 10,
347    ) -> list[dict[str, Any]]:
348        """
349        Search for geographic features by name.
350
351        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
352        First tries exact matching, then falls back to fuzzy matching if no exact
353        matches found.
354
355        Args:
356            name: Location name to search for.
357            type: Optional type hint to filter results. If provided, only features
358                  of this type are returned.
359            max_results: Maximum number of results to return.
360
361        Returns:
362            List of matching GeoJSON Feature dicts. If type is provided, only
363            features of that type are returned. Empty list if no matches found.
364        """
365        self._ensure_loaded()
366
367        normalized = _normalize_name(name)
368        indices = self._name_index.get(normalized, [])
369
370        # If no exact match, try fuzzy matching
371        if not indices:
372            indices = self._fuzzy_search(normalized)
373
374        features = [self._row_to_feature(idx) for idx in indices]
375
376        # Filter by type if type hint provided.
377        # Expand via the type hierarchy so that category hints (e.g. "water") match
378        # all concrete types within that category ("lake", "river", "pond", ...).
379        if type is not None:
380            matching_types = get_matching_types(type)
381            if matching_types:
382                features = [f for f in features if f["properties"].get("type") in matching_types]
383            else:
384                # Unknown type hint, fall back to exact string match
385                features = [f for f in features if f["properties"].get("type") == type.lower()]
386
387        return features[:max_results]
388
389    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
390        """
391        Fuzzy search for names that partially match the search query.
392
393        Uses token matching to find results where at least one token from the
394        query matches a token in the indexed name. This handles cases like:
395        - "venoge" matching "la venoge"
396        - "rhone" matching "rhone valais"
397
398        Args:
399            normalized: The normalized search query.
400            threshold: Minimum fuzzy match score (0-100) to include a result.
401
402        Returns:
403            List of row indices for fuzzy-matched names, sorted by score (descending).
404        """
405        matches: list[tuple[int, float]] = []
406        query_tokens = set(normalized.split())
407
408        for indexed_name, indices in self._name_index.items():
409            indexed_tokens = set(indexed_name.split())
410
411            # Check if any query token matches any indexed token
412            token_overlap = query_tokens & indexed_tokens
413
414            if token_overlap:
415                # Also use token_set_ratio for better matching of partial strings
416                score = fuzz.token_set_ratio(normalized, indexed_name)
417                if score >= threshold:
418                    for idx in indices:
419                        matches.append((idx, score))
420
421        # Sort by score (descending) to return best matches first
422        matches.sort(key=lambda x: x[1], reverse=True)
423        return [idx for idx, _ in matches]
424
425    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
426        """
427        Get a specific feature by its unique identifier.
428
429        Args:
430            feature_id: Unique identifier (UUID or row index).
431
432        Returns:
433            The matching GeoJSON Feature dict, or None if not found.
434        """
435        self._ensure_loaded()
436        assert self._gdf is not None
437
438        id_col = self._detect_id_column()
439        if id_col:
440            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
441            if not matches.empty:
442                return self._row_to_feature(matches.index[0])
443
444        # Fallback: try as row index
445        try:
446            idx = int(feature_id)
447            if 0 <= idx < len(self._gdf):
448                return self._row_to_feature(idx)
449        except ValueError:
450            pass
451
452        return None
453
454    def get_available_types(self) -> list[str]:
455        """
456        Get list of concrete geographic types this datasource can return.
457
458        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
459        representing all possible types that SwissNames3D data can be classified as.
460
461        Returns:
462            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
463        """
464        return sorted(OBJEKTART_TYPE_MAP.keys())

Geographic data source backed by swisstopo's swissNAMES3D dataset.

Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.

If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.

All geometries are returned as GeoJSON in WGS84 (EPSG:4326).

Arguments:
  • data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
  • layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
>>> results = source.search("Lac Léman", type="lake")
>>> print(results[0].geometry)  # GeoJSON in WGS84
SwissNames3DSource(data_path: str | pathlib.Path, layer: str | None = None)
193    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
194        self._data_path = Path(data_path)
195        self._layer = layer
196        self._gdf: gpd.GeoDataFrame | None = None
197        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
342    def search(
343        self,
344        name: str,
345        type: str | None = None,
346        max_results: int = 10,
347    ) -> list[dict[str, Any]]:
348        """
349        Search for geographic features by name.
350
351        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
352        First tries exact matching, then falls back to fuzzy matching if no exact
353        matches found.
354
355        Args:
356            name: Location name to search for.
357            type: Optional type hint to filter results. If provided, only features
358                  of this type are returned.
359            max_results: Maximum number of results to return.
360
361        Returns:
362            List of matching GeoJSON Feature dicts. If type is provided, only
363            features of that type are returned. Empty list if no matches found.
364        """
365        self._ensure_loaded()
366
367        normalized = _normalize_name(name)
368        indices = self._name_index.get(normalized, [])
369
370        # If no exact match, try fuzzy matching
371        if not indices:
372            indices = self._fuzzy_search(normalized)
373
374        features = [self._row_to_feature(idx) for idx in indices]
375
376        # Filter by type if type hint provided.
377        # Expand via the type hierarchy so that category hints (e.g. "water") match
378        # all concrete types within that category ("lake", "river", "pond", ...).
379        if type is not None:
380            matching_types = get_matching_types(type)
381            if matching_types:
382                features = [f for f in features if f["properties"].get("type") in matching_types]
383            else:
384                # Unknown type hint, fall back to exact string match
385                features = [f for f in features if f["properties"].get("type") == type.lower()]
386
387        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint to filter results. If provided, only features of this type are returned.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
425    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
426        """
427        Get a specific feature by its unique identifier.
428
429        Args:
430            feature_id: Unique identifier (UUID or row index).
431
432        Returns:
433            The matching GeoJSON Feature dict, or None if not found.
434        """
435        self._ensure_loaded()
436        assert self._gdf is not None
437
438        id_col = self._detect_id_column()
439        if id_col:
440            matches = self._gdf[self._gdf[id_col].astype(str) == feature_id]
441            if not matches.empty:
442                return self._row_to_feature(matches.index[0])
443
444        # Fallback: try as row index
445        try:
446            idx = int(feature_id)
447            if 0 <= idx < len(self._gdf):
448                return self._row_to_feature(idx)
449        except ValueError:
450            pass
451
452        return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier (UUID or row index).
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
454    def get_available_types(self) -> list[str]:
455        """
456        Get list of concrete geographic types this datasource can return.
457
458        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
459        representing all possible types that SwissNames3D data can be classified as.
460
461        Returns:
462            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
463        """
464        return sorted(OBJEKTART_TYPE_MAP.keys())

Get list of concrete geographic types this datasource can return.

Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.

Returns:

Sorted list of type strings (e.g., ["lake", "city", "river", ...])

class IGNBDCartoSource:
272class IGNBDCartoSource:
273    """
274    Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
275
276    Loads French geographic data from GeoPackage files extracted to a directory.
277    Supports administrative boundaries (communes, departments, regions, …),
278    hydrography (rivers, lakes, …), named places (quarters, hamlets, …),
279    orographic features (peaks, passes, valleys, …) and protected areas.
280
281    Data must first be downloaded with ``make download-data-ign``, which places
282    the GeoPackage files in ``data/bdcarto/``.
283
284    All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84
285    (EPSG:4326) and returned as standard GeoJSON Feature dicts.
286
287    Args:
288        data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``).
289
290    Example:
291        >>> source = IGNBDCartoSource("data/bdcarto")
292        >>> results = source.search("Ardèche", type="department")
293        >>> results = source.search("Lyon", type="city")
294        >>> results = source.search("Rhône", type="river")
295    """
296
297    def __init__(self, data_path: str | Path) -> None:
298        self._data_path = Path(data_path)
299        self._gdf: gpd.GeoDataFrame | None = None
300        self._name_index: dict[str, list[int]] = {}
301
302    def _ensure_loaded(self) -> None:
303        if self._gdf is not None:
304            return
305        self._load_data()
306
307    def _load_data(self) -> None:
308        if self._data_path.is_dir():
309            self._gdf = self._load_from_directory()
310        else:
311            self._gdf = self._load_from_file(self._data_path)
312        self._build_name_index()
313
314    def _load_from_file(self, path: Path) -> gpd.GeoDataFrame:
315        """Load from a GeoJSON fixture file. Features must include a ``_layer`` column."""
316        full_gdf = gpd.read_file(str(path))
317        if "_layer" not in full_gdf.columns:
318            raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column")
319
320        gdfs: list[gpd.GeoDataFrame] = []
321        for layer_name, cfg in _LAYER_CONFIGS.items():
322            rows = full_gdf[full_gdf["_layer"] == layer_name].copy()
323            if rows.empty:
324                continue
325            name_col: str = cfg["name_col"]
326            if name_col not in rows.columns:
327                continue
328            rows[_NAME_COL] = rows[name_col].astype(str)
329            rows[_TYPE_COL] = rows.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
330            rows = rows.to_crs("EPSG:4326")
331            gdfs.append(rows)
332
333        if not gdfs:
334            raise ValueError(f"No matching BD-CARTO features found in {path}")
335
336        combined = pd.concat(gdfs, ignore_index=True)
337        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
338
339    def _load_from_directory(self) -> gpd.GeoDataFrame:
340        """Load and concatenate all configured layers from the data directory."""
341        gdfs: list[gpd.GeoDataFrame] = []
342
343        for layer_name, cfg in _LAYER_CONFIGS.items():
344            gpkg_path = self._data_path / f"{layer_name}.gpkg"
345            if not gpkg_path.exists():
346                continue
347
348            gdf = gpd.read_file(str(gpkg_path))
349
350            name_col: str = cfg["name_col"]
351            if name_col not in gdf.columns:
352                continue
353
354            gdf[_NAME_COL] = gdf[name_col].astype(str)
355            gdf[_TYPE_COL] = gdf.apply(lambda row, c=cfg: _derive_type(row, c), axis=1)
356            gdf["_layer"] = layer_name
357            gdf = gdf.to_crs("EPSG:4326")
358
359            gdfs.append(gdf)
360
361        if not gdfs:
362            raise ValueError(
363                f"No BD-CARTO GeoPackage files found in {self._data_path}. "
364                f"Run 'make download-data-ign' to download the dataset."
365            )
366
367        combined = pd.concat(gdfs, ignore_index=True)
368        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
369
370    def _build_name_index(self) -> None:
371        """Build normalized name → row indices lookup (with article-stripped variants)."""
372        assert self._gdf is not None
373        self._name_index = {}
374        for idx, name in enumerate(self._gdf[_NAME_COL]):
375            if not isinstance(name, str) or not name.strip() or name == "nan":
376                continue
377            for key in _index_keys(name):
378                if key not in self._name_index:
379                    self._name_index[key] = []
380                self._name_index[key].append(idx)
381
382    def _row_to_feature(self, idx: int) -> dict[str, Any]:
383        """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84)."""
384        assert self._gdf is not None
385        row = self._gdf.iloc[idx]
386
387        name = str(row[_NAME_COL])
388        normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown"
389        feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx)
390
391        geom = row.geometry
392        if geom is None or geom.is_empty:
393            geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]}
394            bbox = None
395        else:
396            geometry = mapping(geom)
397            bounds = geom.bounds
398            bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3])
399
400        skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"}
401        properties: dict[str, Any] = {
402            "name": name,
403            "type": normalized_type,
404            "confidence": 1.0,
405        }
406        for col in self._gdf.columns:
407            if col not in skip_cols:
408                val = _to_json_value(row.get(col))
409                if val is not None:
410                    properties[col] = val
411
412        return {
413            "type": "Feature",
414            "id": feature_id,
415            "geometry": geometry,
416            "bbox": bbox,
417            "properties": properties,
418        }
419
420    def search(
421        self,
422        name: str,
423        type: str | None = None,
424        max_results: int = 10,
425    ) -> list[dict[str, Any]]:
426        """
427        Search for geographic features by name.
428
429        Uses case-insensitive, accent-normalized exact matching with fuzzy
430        fallback when no exact match is found.
431
432        Args:
433            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
434                  ``"Rhône"``).
435            type: Optional type hint for filtering. Supports both concrete types
436                  (``"department"``, ``"city"``, ``"river"``) and category hints
437                  (``"administrative"``, ``"water"``).
438            max_results: Maximum number of results.
439
440        Returns:
441            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
442        """
443        self._ensure_loaded()
444
445        normalized = _normalize_name(name)
446        indices = self._name_index.get(normalized, [])
447
448        if not indices:
449            indices = self._fuzzy_search(normalized)
450
451        features = [self._row_to_feature(idx) for idx in indices]
452
453        if type is not None:
454            matching_types = get_matching_types(type)
455            print(f"Filtering results by type hint '{type}' → matching types: {matching_types}")
456            if matching_types:
457                features = [f for f in features if f["properties"].get("type") in matching_types]
458            else:
459                features = [f for f in features if f["properties"].get("type") == type.lower()]
460
461        features = merge_segments(features)
462
463        return features[:max_results]
464
465    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
466        """Token-overlap + token_set_ratio fuzzy search."""
467        matches: list[tuple[int, float]] = []
468        query_tokens = set(normalized.split())
469
470        for indexed_name, indices in self._name_index.items():
471            if query_tokens & set(indexed_name.split()):
472                score = fuzz.token_set_ratio(normalized, indexed_name)
473                if score >= threshold:
474                    for idx in indices:
475                        matches.append((idx, score))
476
477        matches.sort(key=lambda x: x[1], reverse=True)
478        return [idx for idx, _ in matches]
479
480    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
481        """
482        Get a feature by its ``cleabs`` identifier or row index.
483
484        Args:
485            feature_id: ``cleabs`` string or integer row index.
486
487        Returns:
488            Matching GeoJSON Feature dict, or ``None``.
489        """
490        self._ensure_loaded()
491        assert self._gdf is not None
492
493        if "cleabs" in self._gdf.columns:
494            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
495            if not matches.empty:
496                return self._row_to_feature(matches.index[0])
497
498        try:
499            idx = int(feature_id)
500            if 0 <= idx < len(self._gdf):
501                return self._row_to_feature(idx)
502        except ValueError:
503            pass
504
505        return None
506
507    def get_available_types(self) -> list[str]:
508        """
509        Return the union of all normalized types this source can return.
510
511        Returns:
512            Sorted list of type strings.
513        """
514        types: set[str] = set()
515        for cfg in _LAYER_CONFIGS.values():
516            if cfg.get("commune_flags"):
517                types.update({"city", "municipality"})
518            elif cfg.get("fixed_type"):
519                types.add(cfg["fixed_type"])
520            elif cfg.get("type_map"):
521                types.update(cfg["type_map"].values())
522        return sorted(types)

Geographic data source backed by IGN's BD-CARTO 5.0 dataset.

Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.

Data must first be downloaded with make download-data-ign, which places the GeoPackage files in data/bdcarto/.

All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.

Arguments:
  • data_path: Directory containing the .gpkg files (e.g. "data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto")
>>> results = source.search("Ardèche", type="department")
>>> results = source.search("Lyon", type="city")
>>> results = source.search("Rhône", type="river")
IGNBDCartoSource(data_path: str | pathlib.Path)
297    def __init__(self, data_path: str | Path) -> None:
298        self._data_path = Path(data_path)
299        self._gdf: gpd.GeoDataFrame | None = None
300        self._name_index: dict[str, list[int]] = {}
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
420    def search(
421        self,
422        name: str,
423        type: str | None = None,
424        max_results: int = 10,
425    ) -> list[dict[str, Any]]:
426        """
427        Search for geographic features by name.
428
429        Uses case-insensitive, accent-normalized exact matching with fuzzy
430        fallback when no exact match is found.
431
432        Args:
433            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
434                  ``"Rhône"``).
435            type: Optional type hint for filtering. Supports both concrete types
436                  (``"department"``, ``"city"``, ``"river"``) and category hints
437                  (``"administrative"``, ``"water"``).
438            max_results: Maximum number of results.
439
440        Returns:
441            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
442        """
443        self._ensure_loaded()
444
445        normalized = _normalize_name(name)
446        indices = self._name_index.get(normalized, [])
447
448        if not indices:
449            indices = self._fuzzy_search(normalized)
450
451        features = [self._row_to_feature(idx) for idx in indices]
452
453        if type is not None:
454            matching_types = get_matching_types(type)
455            print(f"Filtering results by type hint '{type}' → matching types: {matching_types}")
456            if matching_types:
457                features = [f for f in features if f["properties"].get("type") in matching_types]
458            else:
459                features = [f for f in features if f["properties"].get("type") == type.lower()]
460
461        features = merge_segments(features)
462
463        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.

Arguments:
  • name: Location name to search for (e.g. "Ardèche", "Lyon", "Rhône").
  • type: Optional type hint for filtering. Supports both concrete types ("department", "city", "river") and category hints ("administrative", "water").
  • max_results: Maximum number of results.
Returns:

List of GeoJSON Feature dicts in WGS84. Empty list if no match.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
480    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
481        """
482        Get a feature by its ``cleabs`` identifier or row index.
483
484        Args:
485            feature_id: ``cleabs`` string or integer row index.
486
487        Returns:
488            Matching GeoJSON Feature dict, or ``None``.
489        """
490        self._ensure_loaded()
491        assert self._gdf is not None
492
493        if "cleabs" in self._gdf.columns:
494            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
495            if not matches.empty:
496                return self._row_to_feature(matches.index[0])
497
498        try:
499            idx = int(feature_id)
500            if 0 <= idx < len(self._gdf):
501                return self._row_to_feature(idx)
502        except ValueError:
503            pass
504
505        return None

Get a feature by its cleabs identifier or row index.

Arguments:
  • feature_id: cleabs string or integer row index.
Returns:

Matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
507    def get_available_types(self) -> list[str]:
508        """
509        Return the union of all normalized types this source can return.
510
511        Returns:
512            Sorted list of type strings.
513        """
514        types: set[str] = set()
515        for cfg in _LAYER_CONFIGS.values():
516            if cfg.get("commune_flags"):
517                types.update({"city", "municipality"})
518            elif cfg.get("fixed_type"):
519                types.add(cfg["fixed_type"])
520            elif cfg.get("type_map"):
521                types.update(cfg["type_map"].values())
522        return sorted(types)

Return the union of all normalized types this source can return.

Returns:

Sorted list of type strings.

class CompositeDataSource:
14class CompositeDataSource:
15    """
16    Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
17
18    ``search`` queries every registered source and merges results in order.
19
20    ``get_by_id`` tries each source in order and returns the first hit.
21
22    ``get_available_types`` returns the union of all sources' types.
23
24    Args:
25        sources: One or more GeoDataSource instances.
26
27    Example:
28        >>> swiss = SwissNames3DSource("data/")
29        >>> ign   = IGNBDTopoSource("data/")
30        >>> combo = CompositeDataSource(swiss, ign)
31        >>> results = combo.search("Geneva", type="city")
32    """
33
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
38
39    # Public API (mirrors GeoDataSource protocol)
40
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[dict[str, Any]]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[dict[str, Any]] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged
67
68    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None
83
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Fan-out datasource that delegates to an ordered list of GeoDataSource instances.

search queries every registered source and merges results in order.

get_by_id tries each source in order and returns the first hit.

get_available_types returns the union of all sources' types.

Arguments:
  • sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/")
>>> ign   = IGNBDTopoSource("data/")
>>> combo = CompositeDataSource(swiss, ign)
>>> results = combo.search("Geneva", type="city")
CompositeDataSource(*sources: GeoDataSource)
34    def __init__(self, *sources: GeoDataSource) -> None:
35        if not sources:
36            raise ValueError("At least one datasource is required.")
37        self._sources: list[GeoDataSource] = list(sources)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
41    def search(
42        self,
43        name: str,
44        type: str | None = None,
45        max_results: int = 10,
46    ) -> list[dict[str, Any]]:
47        """
48        Search all registered sources and return merged.
49
50        Args:
51            name: Location name to search for.
52            type: Optional type hint passed through to every source.
53            max_results: Maximum results per source.
54
55        Returns:
56            List of GeoJSON Feature dicts, merged from all sources.
57        """
58        merged: list[dict[str, Any]] = []
59
60        for source in self._sources:
61            for feature in source.search(name, type=type, max_results=max_results):
62                merged.append(feature)
63                if len(merged) >= max_results:
64                    return merged
65
66        return merged

Search all registered sources and return merged.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint passed through to every source.
  • max_results: Maximum results per source.
Returns:

List of GeoJSON Feature dicts, merged from all sources.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
68    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
69        """
70        Get a feature by ID, trying each source in order.
71
72        Args:
73            feature_id: Unique identifier to look up.
74
75        Returns:
76            The first matching GeoJSON Feature dict, or None.
77        """
78        for source in self._sources:
79            result = source.get_by_id(feature_id)
80            if result is not None:
81                return result
82        return None

Get a feature by ID, trying each source in order.

Arguments:
  • feature_id: Unique identifier to look up.
Returns:

The first matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
84    def get_available_types(self) -> list[str]:
85        """
86        Return the union of all sources' available types, sorted.
87
88        Returns:
89            Sorted list of unique type strings.
90        """
91        types: set[str] = set()
92        for source in self._sources:
93            types.update(source.get_available_types())
94        return sorted(types)

Return the union of all sources' available types, sorted.

Returns:

Sorted list of unique type strings.

class PostGISDataSource:
 62class PostGISDataSource:
 63    """
 64    Geographic data source backed by a PostGIS table.
 65
 66    The table must expose at minimum a name column, a geometry column, and
 67    optionally a type column. The expected schema is:
 68
 69    .. code-block:: sql
 70
 71        CREATE TABLE <table> (
 72            id      TEXT PRIMARY KEY,
 73            name    TEXT NOT NULL,
 74            type    TEXT,
 75            geom    GEOMETRY(Geometry, 4326)
 76        );
 77
 78    The ``type`` column may store either:
 79
 80    - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D),
 81      pass ``type_map`` so the datasource can translate between raw values and
 82      the normalized etter type names.
 83    - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``),
 84      leave ``type_map=None`` (default).
 85
 86    Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly
 87    reprojection.
 88
 89    Args:
 90        connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a
 91            connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``).
 92            When a string is provided the engine is created internally.
 93        table: Fully-qualified table name, e.g. ``"public.swissnames3d"``.
 94        name_column: Column used for name-based search (default ``"name"``).
 95        type_column: Column used for type filtering.  Pass ``None`` to disable
 96            type filtering (default ``"type"``).
 97        geometry_column: PostGIS geometry column (default ``"geom"``).
 98        id_column: Primary-key column (default ``"id"``).
 99        crs: CRS of the stored geometries as an EPSG string.  Defaults to
100            ``"EPSG:4326"`` (no reprojection).
101        type_map: Optional mapping from **normalized etter type names** to
102            **lists of raw type column values** present in the database.
103            This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP``
104            and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be
105            passed directly::
106
107                from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
108                source = PostGISDataSource(
109                    engine,
110                    table="public.swissnames3d",
111                    type_map=OBJEKTART_TYPE_MAP,
112                )
113
114            When ``type_map`` is provided the datasource:
115
116            - Translates raw DB values → normalized types in returned features.
117            - Translates user type hints → raw DB values in SQL ``WHERE`` clauses.
118            - Returns normalized type names from ``get_available_types()``.
119
120            When ``None`` (default) the stored values are used as-is.
121        fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for
122            fuzzy fallback search when no exact ``ILIKE`` match is found.
123
124    Example: unmodified SwissNames3D table::
125
126        from sqlalchemy import create_engine
127        from etter.datasources import PostGISDataSource
128        from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
129
130        engine = create_engine(...)
131        source = PostGISDataSource(
132            engine,
133            table="public.swissnames3d",
134            type_map=OBJEKTART_TYPE_MAP,
135        )
136        results = source.search("Lac Léman", type="lake")
137    """
138
139    def __init__(
140        self,
141        connection: str | Engine,
142        table: str,
143        name_column: str = "name",
144        type_column: str | None = "type",
145        geometry_column: str = "geom",
146        id_column: str = "id",
147        crs: str = "EPSG:4326",
148        type_map: dict[str, list[str]] | None = None,
149        fuzzy_threshold: float = 0.65,
150    ) -> None:
151        sa = _require_sqlalchemy()
152
153        if isinstance(connection, str):
154            self._engine = sa.create_engine(connection)
155        else:
156            self._engine = connection
157
158        try:
159            with self._engine.connect() as conn:
160                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
161        except Exception as exc:
162            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
163
164        self._table = table
165        self._name_col = name_column
166        self._type_col = type_column
167        self._geom_col = geometry_column
168        self._id_col = id_column
169        self._crs = crs
170        self._fuzzy_threshold = fuzzy_threshold
171
172        # Build bidirectional lookup structures from the user-supplied map.
173        if type_map:
174            self._normalized_to_raw: dict[str, list[str]] = dict(type_map)
175            self._raw_to_normalized: dict[str, str] = {
176                raw: normalized for normalized, raws in type_map.items() for raw in raws
177            }
178        else:
179            self._normalized_to_raw = {}
180            self._raw_to_normalized = {}
181
182        self._trgm_available: bool | None = None
183        self._unaccent_available: bool | None = None
184
185    def _get_connection(self) -> Any:
186        """Return a SQLAlchemy connection from the engine."""
187        return self._engine.connect()
188
189    def _check_trgm(self, conn: Any) -> bool:
190        """Return True if pg_trgm extension is available in the database."""
191        if self._trgm_available is not None:
192            return self._trgm_available
193        sa = _require_sqlalchemy()
194        try:
195            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
196            self._trgm_available = result.fetchone() is not None
197        except Exception:
198            logger.exception("Failed to check pg_trgm availability")
199            self._trgm_available = False
200        return self._trgm_available
201
202    def _check_unaccent(self, conn: Any) -> bool:
203        """Return True if the unaccent extension is available in the database."""
204        if self._unaccent_available is not None:
205            return self._unaccent_available
206        sa = _require_sqlalchemy()
207        try:
208            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'"))
209            self._unaccent_available = result.fetchone() is not None
210        except Exception:
211            logger.exception("Failed to check unaccent availability")
212            self._unaccent_available = False
213        return self._unaccent_available
214
215    def _normalize_type(self, raw_type: str | None) -> str | None:
216        """Translate a raw DB type value to its normalized etter name.
217
218        If no type_map was supplied the value is returned unchanged.
219        """
220        if raw_type is None:
221            return None
222        return self._raw_to_normalized.get(raw_type, raw_type)
223
224    def _row_to_feature(self, row: Any) -> dict[str, Any]:
225        """Convert a SQLAlchemy Row to a GeoJSON Feature dict."""
226        feature_id = str(row.id)
227        name = str(row.name)
228        raw_type = getattr(row, "type", None)
229        normalized_type = self._normalize_type(raw_type)
230
231        geojson_str = row.geojson
232        if geojson_str:
233            geometry = json.loads(geojson_str)
234        else:
235            geometry = {"type": "Point", "coordinates": [0, 0]}
236
237        bbox = _bbox_from_geojson(geometry)
238
239        properties: dict[str, Any] = {
240            "name": name,
241            "type": normalized_type,
242            "confidence": 1.0,
243        }
244
245        return {
246            "type": "Feature",
247            "id": feature_id,
248            "geometry": geometry,
249            "bbox": bbox,
250            "properties": properties,
251        }
252
253    def _build_select_columns(self) -> str:
254        """Build the SELECT column list as a SQL fragment."""
255        type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type"
256        if self._crs.upper() != "EPSG:4326":
257            geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson"
258        else:
259            geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson"
260        return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}"
261
262    def search(
263        self,
264        name: str,
265        type: str | None = None,
266        max_results: int = 10,
267    ) -> list[dict[str, Any]]:
268        """
269        Search for geographic features by name.
270
271        Uses a three-step cascade, stopping as soon as any step returns results:
272
273        1. **Normalized exact match**
274        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
275        3. **ILIKE substring**
276
277        ``merge_segments`` is applied after all rows are fetched so that
278        multi-segment linestrings (rivers, roads) are merged before the
279        ``max_results`` cap is applied.
280
281        Args:
282            name: Location name to search for.
283            type: Optional type hint for filtering results.
284            max_results: Maximum number of results to return.
285
286        Returns:
287            List of matching GeoJSON Feature dicts in WGS84.
288        """
289        sa = _require_sqlalchemy()
290        cols = self._build_select_columns()
291
292        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
293        type_filter_values: list[str] | None = None
294        if type is not None and self._type_col is not None:
295            matching_types = get_matching_types(type)
296            concrete_types = matching_types if matching_types else [type.lower()]
297            if self._normalized_to_raw:
298                raw_values: list[str] = []
299                for t in concrete_types:
300                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
301                type_filter_values = raw_values if raw_values else concrete_types
302            else:
303                type_filter_values = concrete_types
304
305        # Fetch more rows than requested so that merge_segments has the full
306        # set of segments to work with.  Without this, a SQL LIMIT applied
307        # *before* merging would only return a partial set of linestring
308        # segments, producing incorrect / truncated geometries.
309        # We cap the internal limit at 2000 to avoid unbounded queries.
310        internal_limit = min(max(max_results * 20, 100), 2000)
311
312        with self._get_connection() as conn:
313            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
314
315        if not features:
316            with self._get_connection() as conn:
317                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
318
319        if not features:
320            with self._get_connection() as conn:
321                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
322
323        features = merge_segments(features)
324        return features[:max_results]
325
326    def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]:
327        """Return a WHERE clause fragment and bind params for type filtering."""
328        if not values or self._type_col is None:
329            return "", {}
330        placeholders = ", ".join(f":type_{i}" for i in range(len(values)))
331        clause = f" AND {self._type_col} IN ({placeholders})"
332        params = {f"type_{i}": v for i, v in enumerate(values)}
333        return clause, params
334
335    def _search_normalized(
336        self,
337        conn: Any,
338        sa: Any,
339        cols: str,
340        name: str,
341        type_filter: list[str] | None,
342        fetch_limit: int,
343    ) -> list[dict[str, Any]]:
344        """
345        Exact accent- and case-insensitive search.
346
347        Accent normalization (NFD decomposition + diacritic strip) is done in
348        Python before the query is sent to the DB.
349        """
350        type_clause, type_params = self._type_filter_sql(type_filter)
351        name_expr = f"lower({self._name_col})"
352        if self._check_unaccent(conn):
353            name_expr = f"unaccent({name_expr})"
354        sql = sa.text(
355            f"SELECT {cols} FROM {self._table} "  # noqa: S608
356            f"WHERE {name_expr} = :query{type_clause} "
357            f"LIMIT :limit"
358        )
359        params: dict[str, Any] = {
360            "query": _normalize_name(name),
361            "limit": fetch_limit,
362            **type_params,
363        }
364        try:
365            result = conn.execute(sql, params)
366            return [self._row_to_feature(row) for row in result]
367        except Exception:
368            logger.exception("Normalized search failed for %r", name)
369            return []
370
371    def _search_ilike(
372        self,
373        conn: Any,
374        sa: Any,
375        cols: str,
376        name: str,
377        type_filter: list[str] | None,
378        fetch_limit: int,
379    ) -> list[dict[str, Any]]:
380        """Case-insensitive substring fallback using ``ILIKE '%name%'``.
381
382        When the ``unaccent`` extension is available, both the stored name column
383        and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches
384        ``"Rhône"``.  Without ``unaccent``, standard ILIKE is used (case-insensitive
385        only).
386        """
387        type_clause, type_params = self._type_filter_sql(type_filter)
388        normalized = _normalize_name(name)
389        if self._check_unaccent(conn):
390            name_expr = f"unaccent(lower({self._name_col}))"
391            pattern = f"%{normalized}%"
392        else:
393            name_expr = self._name_col
394            pattern = f"%{name}%"
395        sql = sa.text(
396            f"SELECT {cols} FROM {self._table} "  # noqa: S608
397            f"WHERE {name_expr} ILIKE :pattern{type_clause} "
398            f"LIMIT :limit"
399        )
400        params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params}
401        try:
402            result = conn.execute(sql, params)
403            return [self._row_to_feature(row) for row in result]
404        except Exception:
405            logger.exception("ILIKE search failed for %r", name)
406            return []
407
408    def _search_fuzzy(
409        self,
410        conn: Any,
411        sa: Any,
412        cols: str,
413        name: str,
414        type_filter: list[str] | None,
415        fetch_limit: int,
416    ) -> list[dict[str, Any]]:
417        """Fuzzy fallback using pg_trgm similarity (if extension is available)."""
418        if not self._check_trgm(conn):
419            logger.warning(
420                "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;"
421            )
422            return []
423        normalized_query = _normalize_name(name)
424        if self._check_unaccent(conn):
425            name_expr = f"unaccent(lower({self._name_col}))"
426        else:
427            logger.warning(
428                "unaccent extension not available. Accent-insensitive fuzzy search degraded. "
429                "Install it with: CREATE EXTENSION unaccent;"
430            )
431            name_expr = f"lower({self._name_col})"
432        type_clause, type_params = self._type_filter_sql(type_filter)
433        sql = sa.text(
434            f"SELECT {cols} FROM {self._table} "  # noqa: S608
435            f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} "
436            f"ORDER BY word_similarity({name_expr}, :query) DESC "
437            f"LIMIT :limit"
438        )
439        params: dict[str, Any] = {
440            "query": normalized_query,
441            "threshold": self._fuzzy_threshold,
442            "limit": fetch_limit,
443            **type_params,
444        }
445        try:
446            result = conn.execute(sql, params)
447            return [self._row_to_feature(row) for row in result]
448        except Exception:
449            logger.exception("Fuzzy search failed for %r", name)
450            return []
451
452    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
453        """
454        Get a specific feature by its unique identifier.
455
456        Args:
457            feature_id: Value of the ``id`` column.
458
459        Returns:
460            The matching GeoJSON Feature dict, or ``None`` if not found.
461        """
462        sa = _require_sqlalchemy()
463        cols = self._build_select_columns()
464        sql = sa.text(
465            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
466        )
467        with self._get_connection() as conn:
468            try:
469                result = conn.execute(sql, {"id": feature_id})
470                row = result.fetchone()
471                return self._row_to_feature(row) if row else None
472            except Exception:
473                logger.exception("get_by_id failed for %r", feature_id)
474                return None
475
476    def get_available_types(self) -> list[str]:
477        """
478        Return the distinct ``type`` values present in the table.
479
480        Returns:
481            Sorted list of concrete type strings, or an empty list if the table
482            has no type column.
483        """
484        if self._type_col is None:
485            return []
486        sa = _require_sqlalchemy()
487        sql = sa.text(
488            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
489            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
490        )
491        with self._get_connection() as conn:
492            try:
493                result = conn.execute(sql)
494                raw_types = [row.type for row in result]
495            except Exception:
496                logger.exception("get_available_types failed")
497                return []
498
499        normalized = {self._normalize_type(t) for t in raw_types if t}
500        return sorted(t for t in normalized if t)

Geographic data source backed by a PostGIS table.

The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:

CREATE TABLE <table> (
    id      TEXT PRIMARY KEY,
    name    TEXT NOT NULL,
    type    TEXT,
    geom    GEOMETRY(Geometry, 4326)
);

The type column may store either:

  • Raw dataset values (e.g. "See", "Berg" for SwissNames3D), pass type_map so the datasource can translate between raw values and the normalized etter type names.
  • Already-normalized values (e.g. "lake", "mountain"), leave type_map=None (default).

Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly reprojection.

Arguments:
  • connection: A SQLAlchemy ~sqlalchemy.engine.Engine or a connection URL string (e.g. "postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally.
  • table: Fully-qualified table name, e.g. "public.swissnames3d".
  • name_column: Column used for name-based search (default "name").
  • type_column: Column used for type filtering. Pass None to disable type filtering (default "type").
  • geometry_column: PostGIS geometry column (default "geom").
  • id_column: Primary-key column (default "id").
  • crs: CRS of the stored geometries as an EPSG string. Defaults to "EPSG:4326" (no reprojection).
  • type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as SwissNames3DSource.OBJEKTART_TYPE_MAP and IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::

    from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
    source = PostGISDataSource(
        engine,
        table="public.swissnames3d",
        type_map=OBJEKTART_TYPE_MAP,
    )
    

    When type_map is provided the datasource:

    • Translates raw DB values → normalized types in returned features.
    • Translates user type hints → raw DB values in SQL WHERE clauses.
    • Returns normalized type names from get_available_types().

    When None (default) the stored values are used as-is.

  • fuzzy_threshold: Minimum pg_trgm similarity score (0-1) used for fuzzy fallback search when no exact ILIKE match is found.

Example: unmodified SwissNames3D table::

from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP

engine = create_engine(...)
source = PostGISDataSource(
    engine,
    table="public.swissnames3d",
    type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
PostGISDataSource( connection: str | sqlalchemy.engine.base.Engine, table: str, name_column: str = 'name', type_column: str | None = 'type', geometry_column: str = 'geom', id_column: str = 'id', crs: str = 'EPSG:4326', type_map: dict[str, list[str]] | None = None, fuzzy_threshold: float = 0.65)
139    def __init__(
140        self,
141        connection: str | Engine,
142        table: str,
143        name_column: str = "name",
144        type_column: str | None = "type",
145        geometry_column: str = "geom",
146        id_column: str = "id",
147        crs: str = "EPSG:4326",
148        type_map: dict[str, list[str]] | None = None,
149        fuzzy_threshold: float = 0.65,
150    ) -> None:
151        sa = _require_sqlalchemy()
152
153        if isinstance(connection, str):
154            self._engine = sa.create_engine(connection)
155        else:
156            self._engine = connection
157
158        try:
159            with self._engine.connect() as conn:
160                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
161        except Exception as exc:
162            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
163
164        self._table = table
165        self._name_col = name_column
166        self._type_col = type_column
167        self._geom_col = geometry_column
168        self._id_col = id_column
169        self._crs = crs
170        self._fuzzy_threshold = fuzzy_threshold
171
172        # Build bidirectional lookup structures from the user-supplied map.
173        if type_map:
174            self._normalized_to_raw: dict[str, list[str]] = dict(type_map)
175            self._raw_to_normalized: dict[str, str] = {
176                raw: normalized for normalized, raws in type_map.items() for raw in raws
177            }
178        else:
179            self._normalized_to_raw = {}
180            self._raw_to_normalized = {}
181
182        self._trgm_available: bool | None = None
183        self._unaccent_available: bool | None = None
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[dict[str, typing.Any]]:
262    def search(
263        self,
264        name: str,
265        type: str | None = None,
266        max_results: int = 10,
267    ) -> list[dict[str, Any]]:
268        """
269        Search for geographic features by name.
270
271        Uses a three-step cascade, stopping as soon as any step returns results:
272
273        1. **Normalized exact match**
274        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
275        3. **ILIKE substring**
276
277        ``merge_segments`` is applied after all rows are fetched so that
278        multi-segment linestrings (rivers, roads) are merged before the
279        ``max_results`` cap is applied.
280
281        Args:
282            name: Location name to search for.
283            type: Optional type hint for filtering results.
284            max_results: Maximum number of results to return.
285
286        Returns:
287            List of matching GeoJSON Feature dicts in WGS84.
288        """
289        sa = _require_sqlalchemy()
290        cols = self._build_select_columns()
291
292        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
293        type_filter_values: list[str] | None = None
294        if type is not None and self._type_col is not None:
295            matching_types = get_matching_types(type)
296            concrete_types = matching_types if matching_types else [type.lower()]
297            if self._normalized_to_raw:
298                raw_values: list[str] = []
299                for t in concrete_types:
300                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
301                type_filter_values = raw_values if raw_values else concrete_types
302            else:
303                type_filter_values = concrete_types
304
305        # Fetch more rows than requested so that merge_segments has the full
306        # set of segments to work with.  Without this, a SQL LIMIT applied
307        # *before* merging would only return a partial set of linestring
308        # segments, producing incorrect / truncated geometries.
309        # We cap the internal limit at 2000 to avoid unbounded queries.
310        internal_limit = min(max(max_results * 20, 100), 2000)
311
312        with self._get_connection() as conn:
313            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
314
315        if not features:
316            with self._get_connection() as conn:
317                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
318
319        if not features:
320            with self._get_connection() as conn:
321                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
322
323        features = merge_segments(features)
324        return features[:max_results]

Search for geographic features by name.

Uses a three-step cascade, stopping as soon as any step returns results:

  1. Normalized exact match
  2. pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
  3. ILIKE substring

merge_segments is applied after all rows are fetched so that multi-segment linestrings (rivers, roads) are merged before the max_results cap is applied.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint for filtering results.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts in WGS84.

def get_by_id(self, feature_id: str) -> dict[str, typing.Any] | None:
452    def get_by_id(self, feature_id: str) -> dict[str, Any] | None:
453        """
454        Get a specific feature by its unique identifier.
455
456        Args:
457            feature_id: Value of the ``id`` column.
458
459        Returns:
460            The matching GeoJSON Feature dict, or ``None`` if not found.
461        """
462        sa = _require_sqlalchemy()
463        cols = self._build_select_columns()
464        sql = sa.text(
465            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
466        )
467        with self._get_connection() as conn:
468            try:
469                result = conn.execute(sql, {"id": feature_id})
470                row = result.fetchone()
471                return self._row_to_feature(row) if row else None
472            except Exception:
473                logger.exception("get_by_id failed for %r", feature_id)
474                return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Value of the id column.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
476    def get_available_types(self) -> list[str]:
477        """
478        Return the distinct ``type`` values present in the table.
479
480        Returns:
481            Sorted list of concrete type strings, or an empty list if the table
482            has no type column.
483        """
484        if self._type_col is None:
485            return []
486        sa = _require_sqlalchemy()
487        sql = sa.text(
488            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
489            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
490        )
491        with self._get_connection() as conn:
492            try:
493                result = conn.execute(sql)
494                raw_types = [row.type for row in result]
495            except Exception:
496                logger.exception("get_available_types failed")
497                return []
498
499        normalized = {self._normalize_type(t) for t in raw_types if t}
500        return sorted(t for t in normalized if t)

Return the distinct type values present in the table.

Returns:

Sorted list of concrete type strings, or an empty list if the table has no type column.

def apply_spatial_relation( geometry: dict[str, typing.Any], relation: SpatialRelation, buffer_config: BufferConfig | None = None, spatial_config: SpatialRelationConfig | None = None) -> dict[str, typing.Any]:
25def apply_spatial_relation(
26    geometry: dict[str, Any],
27    relation: SpatialRelation,
28    buffer_config: BufferConfig | None = None,
29    spatial_config: SpatialRelationConfig | None = None,
30) -> dict[str, Any]:
31    """
32    Transform a reference geometry according to a spatial relation.
33
34    Converts the input GeoJSON geometry to a search area based on the
35    spatial relation category:
36    - Containment: returns the original geometry unchanged
37    - Buffer: applies positive (expand), negative (erode), or ring buffer
38    - Directional: creates an angular sector wedge
39
40    Args:
41        geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
42        relation: Spatial relation to apply.
43        buffer_config: Buffer configuration (required for buffer/directional relations).
44        spatial_config: Spatial relation registry used to look up directional angles.
45            Defaults to the module-level singleton; pass an explicit instance to
46            avoid repeated construction when calling from a hot path.
47
48    Returns:
49        Transformed GeoJSON geometry dict in WGS84.
50
51    Raises:
52        ValueError: If buffer_config is missing for buffer/directional relations,
53                     or if the relation category is unknown.
54
55    Examples:
56        >>> from etter.models import SpatialRelation, BufferConfig
57        >>> # Circular buffer
58        >>> result = apply_spatial_relation(
59        ...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
60        ...     relation=SpatialRelation(relation="near", category="buffer"),
61        ...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
62        ... )
63
64        >>> # Containment (passthrough)
65        >>> result = apply_spatial_relation(
66        ...     geometry=city_polygon,
67        ...     relation=SpatialRelation(relation="in", category="containment"),
68        ... )
69    """
70    if relation.category == "containment":
71        return _apply_containment(geometry)
72    elif relation.category == "buffer":
73        if buffer_config is None:
74            raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config")
75        return _apply_buffer(geometry, buffer_config)
76    elif relation.category == "directional":
77        if buffer_config is None:
78            raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config")
79        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
80        relation_config = cfg.get_config(relation.relation)
81        direction = relation_config.direction_angle_degrees or 0
82        sector_angle = relation_config.sector_angle_degrees or 90
83        return _apply_directional(geometry, buffer_config, direction, sector_angle)
84    else:
85        raise ValueError(f"Unknown relation category: '{relation.category}'")

Transform a reference geometry according to a spatial relation.

Converts the input GeoJSON geometry to a search area based on the spatial relation category:

  • Containment: returns the original geometry unchanged
  • Buffer: applies positive (expand), negative (erode), or ring buffer
  • Directional: creates an angular sector wedge
Arguments:
  • geometry: GeoJSON geometry dict in WGS84 (EPSG:4326).
  • relation: Spatial relation to apply.
  • buffer_config: Buffer configuration (required for buffer/directional relations).
  • spatial_config: Spatial relation registry used to look up directional angles. Defaults to the module-level singleton; pass an explicit instance to avoid repeated construction when calling from a hot path.
Returns:

Transformed GeoJSON geometry dict in WGS84.

Raises:
  • ValueError: If buffer_config is missing for buffer/directional relations, or if the relation category is unknown.
Examples:
>>> from etter.models import SpatialRelation, BufferConfig
>>> # Circular buffer
>>> result = apply_spatial_relation(
...     geometry={"type": "Point", "coordinates": [6.63, 46.52]},
...     relation=SpatialRelation(relation="near", category="buffer"),
...     buffer_config=BufferConfig(distance_m=5000, buffer_from="center"),
... )
>>> # Containment (passthrough)
>>> result = apply_spatial_relation(
...     geometry=city_polygon,
...     relation=SpatialRelation(relation="in", category="containment"),
... )