etter

etter - Natural language geographic query parsing using LLMs.

Parse location queries into structured geographic queries using LLM.

 1"""
 2etter - Natural language geographic query parsing using LLMs.
 3
 4Parse location queries into structured geographic queries using LLM.
 5"""
 6
 7from importlib.metadata import PackageNotFoundError, version
 8
 9try:
10    __version__ = version("etter")
11except PackageNotFoundError:  # running from source without install
12    __version__ = "unknown"
13
14# Main API
15# Exceptions
16# Datasources
17from .datasources import (
18    CompositeDataSource,
19    GeoDataSource,
20    IGNBDCartoSource,
21    PostGISDataSource,
22    SwissBoundaries3DSource,
23    SwissNames3DSource,
24)
25from .exceptions import (
26    GeoFilterError,
27    LowConfidenceError,
28    LowConfidenceWarning,
29    NoReferenceLocationError,
30    ParsingError,
31    UnknownRelationError,
32    ValidationError,
33)
34from .geometry_format import convert_feature_geometry, convert_geometry
35
36# Models (for type hints and result access)
37from .models import (
38    BufferConfig,
39    ConfidenceLevel,
40    ConfidenceScore,
41    GeometryFormat,
42    GeoQuery,
43    ReferenceLocation,
44    SpatialRelation,
45)
46from .parser import GeoFilterParser
47
48# Spatial operations
49from .spatial import apply_spatial_relation
50
51# Configuration
52from .spatial_config import RelationConfig, SpatialRelationConfig
53
54__all__ = [
55    # Main API
56    "GeoFilterParser",
57    # Models
58    "GeoQuery",
59    "SpatialRelation",
60    "ReferenceLocation",
61    "BufferConfig",
62    "ConfidenceScore",
63    "ConfidenceLevel",
64    "GeometryFormat",
65    # Configuration
66    "SpatialRelationConfig",
67    "RelationConfig",
68    # Exceptions
69    "GeoFilterError",
70    "ParsingError",
71    "ValidationError",
72    "NoReferenceLocationError",
73    "UnknownRelationError",
74    "LowConfidenceError",
75    "LowConfidenceWarning",
76    # Datasources
77    "GeoDataSource",
78    "SwissNames3DSource",
79    "SwissBoundaries3DSource",
80    "IGNBDCartoSource",
81    "CompositeDataSource",
82    "PostGISDataSource",
83    # Spatial
84    "apply_spatial_relation",
85    "convert_geometry",
86    "convert_feature_geometry",
87]
class GeoFilterParser:
 19class GeoFilterParser:
 20    """
 21    Main entry point for parsing natural language location queries.
 22
 23    This class orchestrates the entire parsing pipeline:
 24    1. Initialize LLM with structured output
 25    2. Build prompt with spatial relations and examples
 26    3. Parse query through LLM
 27    4. Validate and enrich with defaults
 28    5. Return structured GeoQuery
 29
 30    Examples:
 31        Basic usage:
 32        >>> from langchain.chat_models import init_chat_model
 33        >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
 34        >>> parser = GeoFilterParser(llm=llm)
 35        >>> result = parser.parse("restaurants in Lausanne")
 36        >>> print(result.reference_location.name)
 37        'Lausanne'
 38
 39        With strict confidence mode:
 40        >>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
 41        >>> result = parser.parse("near the station")  # May raise LowConfidenceError
 42    """
 43
 44    def __init__(
 45        self,
 46        llm: BaseChatModel,
 47        spatial_config: SpatialRelationConfig | None = None,
 48        confidence_threshold: float = 0.6,
 49        strict_mode: bool = False,
 50        include_examples: bool = True,
 51        datasource: GeoDataSource | None = None,
 52        additional_instructions: str | None = None,
 53    ):
 54        """
 55        Initialize the parser.
 56
 57        Args:
 58            llm: LangChain LLM instance (required).
 59            spatial_config: Spatial relation configuration. If None, uses defaults
 60            confidence_threshold: Minimum confidence to accept (0-1)
 61            strict_mode: If True, raise error on low confidence. If False, warn only
 62            include_examples: Whether to include few-shot examples in prompt
 63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
 64                       about the concrete types available in that datasource for better type inference.
 65            additional_instructions: Free-form text injected as a system message after the main
 66                       system prompt and before few-shot examples. Use this to add caller-specific
 67                       rules such as region-specific endonyms, domain aliases, or
 68                       organization-specific place names without forking the default prompt.
 69
 70        Example:
 71            >>> from langchain.chat_models import init_chat_model
 72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
 73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
 74            >>> datasource = SwissNames3DSource("data/")
 75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
 76        """
 77        self.llm = llm
 78
 79        # Initialize spatial config
 80        self.spatial_config = spatial_config or SpatialRelationConfig()
 81
 82        # Settings
 83        self.confidence_threshold = confidence_threshold
 84        self.strict_mode = strict_mode
 85        self.include_examples = include_examples
 86        self.datasource = datasource
 87        self.additional_instructions = additional_instructions
 88
 89        # Build structured LLM
 90        self.structured_llm = self._build_structured_llm()
 91
 92        # Build prompt template
 93        self.prompt = self._build_prompt()
 94
 95    def _build_structured_llm(self):
 96        """Create LLM with structured output using Pydantic model."""
 97
 98        return self.llm.with_structured_output(
 99            GeoQuery,
100            method="function_calling",  # Use function_calling for broader schema support
101            include_raw=True,  # For error debugging
102        )
103
104    def _build_prompt(self) -> ChatPromptTemplate:
105        """Build prompt template with spatial relations, examples, and available types."""
106        available_types = None
107        if self.datasource is not None:
108            available_types = self.datasource.get_available_types()
109
110        return build_prompt_template(
111            spatial_config=self.spatial_config,
112            include_examples=self.include_examples,
113            available_types=available_types,
114            additional_instructions=self.additional_instructions,
115        )
116
117    def _unpack_response(self, response) -> GeoQuery:
118        """Extract and validate the GeoQuery from a structured-LLM response."""
119        parsed = response.get("parsed") if isinstance(response, dict) else response
120
121        if parsed is None:
122            raw = response.get("raw", "") if isinstance(response, dict) else ""
123            error = response.get("parsing_error") if isinstance(response, dict) else None
124            raise ParsingError(
125                message="Failed to parse query into structured format. "
126                "LLM may have returned invalid JSON or missed required fields.",
127                raw_response=str(raw),
128                original_error=error,
129            )
130
131        if not isinstance(parsed, GeoQuery):
132            raise ParsingError(
133                message=f"Expected GeoQuery, got {type(parsed).__name__}",
134                raw_response=str(parsed),
135            )
136        return parsed
137
138    def _finalize(self, geo_query: GeoQuery, query: str) -> GeoQuery:
139        """Set original_query and run the validation pipeline."""
140        geo_query.original_query = query
141
142        return validate_query(
143            geo_query,
144            self.spatial_config,
145            confidence_threshold=self.confidence_threshold,
146            strict_mode=self.strict_mode,
147        )
148
149    def parse(self, query: str) -> GeoQuery:
150        """
151        Parse a natural language location query into structured format.
152
153        This is the main method for parsing queries. It:
154        1. Invokes the LLM with structured output
155        2. Validates the spatial relation is registered
156        3. Enriches with default parameters
157        4. Checks confidence threshold
158
159        Args:
160            query: Natural language query in any language
161
162        Returns:
163            GeoQuery: Structured query representation with confidence scores
164
165        Raises:
166            ParsingError: If LLM fails to parse query into valid structure
167            ValidationError: If parsed query fails business logic validation
168            UnknownRelationError: If spatial relation is not registered
169            LowConfidenceError: If confidence below threshold (strict mode only)
170
171        Warns:
172            LowConfidenceWarning: If confidence below threshold (permissive mode)
173
174        Examples:
175            Simple containment query:
176            >>> result = parser.parse("in Bern")
177            >>> result.reference_location.name
178            'Bern'
179            >>> result.spatial_relation.relation
180            'in'
181
182            Buffer query:
183            >>> result = parser.parse("near Lake Geneva")
184            >>> result.spatial_relation.relation
185            'near'
186            >>> result.buffer_config.distance_m
187            5000
188
189            Directional query:
190            >>> result = parser.parse("north of Lausanne")
191            >>> result.spatial_relation.relation
192            'north_of'
193            >>> result.reference_location.name
194            'Lausanne'
195
196            Multilingual:
197            >>> result = parser.parse("près de Genève")
198            >>> result.spatial_relation.relation
199            'near'
200            >>> result.reference_location.name
201            'Genève'
202        """
203        formatted_messages = self.prompt.format_messages(query=query)
204
205        try:
206            response = self.structured_llm.invoke(formatted_messages)
207        except Exception as e:
208            raise ParsingError(
209                message=f"LLM invocation failed: {str(e)}",
210                raw_response="",
211                original_error=e,
212            ) from e
213
214        return self._finalize(self._unpack_response(response), query)
215
216    async def aparse(self, query: str) -> GeoQuery:
217        """
218        Asynchronously parse a natural language location query into structured format.
219
220        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
221        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
222        blocking. Validation is synchronous and runs after the LLM call.
223        """
224        formatted_messages = self.prompt.format_messages(query=query)
225
226        try:
227            response = await self.structured_llm.ainvoke(formatted_messages)
228        except Exception as e:
229            raise ParsingError(
230                message=f"LLM invocation failed: {str(e)}",
231                raw_response="",
232                original_error=e,
233            ) from e
234
235        return self._finalize(self._unpack_response(response), query)
236
237    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
238        """
239        Parse a natural language location query with streaming reasoning and results.
240
241        This method provides real-time feedback during the parsing process by yielding
242        intermediate reasoning steps and the final GeoQuery result. This is useful for
243        providing users with transparency into the LLM's decision-making process and
244        for building responsive UIs.
245
246        The stream yields dictionaries with the following event types:
247        - {"type": "start"} - Stream started
248        - {"type": "reasoning", "content": str} - Intermediate processing steps
249        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
250        - {"type": "error", "content": str} - Errors encountered during processing
251        - {"type": "finish"} - Stream completed successfully
252
253        Args:
254            query: Natural language query in any language
255
256        Yields:
257            dict: Stream events with type and optional content fields
258
259        Raises:
260            ParsingError: If LLM fails to parse query into valid structure
261            ValidationError: If parsed query fails business logic validation
262            UnknownRelationError: If spatial relation is not registered
263            LowConfidenceError: If confidence below threshold (strict mode only)
264
265        Examples:
266            Basic usage with async iteration:
267            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
268            ...     if event["type"] == "reasoning":
269            ...         print(f"Reasoning: {event['content']}")
270            ...     elif event["type"] == "data-response":
271            ...         geo_query = event["content"]
272            ...         print(f"Location: {geo_query['reference_location']['name']}")
273            ...     elif event["type"] == "error":
274            ...         print(f"Error: {event['content']}")
275
276            Using in a FastAPI streaming endpoint:
277            >>> from fastapi.responses import StreamingResponse
278            >>> @app.get("/stream")
279            >>> async def stream_endpoint(q: str):
280            ...     async def event_stream():
281            ...         async for event in parser.parse_stream(q):
282            ...             yield f"data: {json.dumps(event)}\\n\\n"
283            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
284        """
285        try:
286            # Signal start of stream
287            yield {"type": "start"}
288
289            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
290            formatted_messages = self.prompt.format_messages(query=query)
291
292            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
293            try:
294                response = await self.structured_llm.ainvoke(formatted_messages)
295            except Exception as e:
296                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
297                raise ParsingError(
298                    message=f"LLM invocation failed: {str(e)}",
299                    raw_response="",
300                    original_error=e,
301                ) from e
302
303            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
304            try:
305                geo_query = self._unpack_response(response)
306            except ParsingError:
307                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
308                raise
309
310            if geo_query.confidence_breakdown.reasoning:
311                yield {
312                    "type": "reasoning",
313                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
314                }
315
316            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
317            geo_query = self._finalize(geo_query, query)
318
319            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
320            yield {"type": "data-response", "content": geo_query.model_dump()}
321
322            # Signal successful completion
323            yield {"type": "finish"}
324
325        except Exception as e:
326            # Emit error event before re-raising
327            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
328            raise
329
330    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
331        """
332        Parse multiple queries in batch.
333
334        Note: This is a simple sequential implementation.
335        For true parallelization, consider using async methods or ThreadPoolExecutor.
336
337        Args:
338            queries: List of natural language queries
339
340        Returns:
341            List of GeoQuery objects (same order as input)
342
343        Raises:
344            Same exceptions as parse() for any failing query
345        """
346        return [self.parse(query) for query in queries]
347
348    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
349        """
350        Get list of available spatial relations.
351
352        Args:
353            category: Optional filter by category ("containment", "buffer", "directional")
354
355        Returns:
356            List of relation names
357        """
358        return self.spatial_config.list_relations(category=category)
359
360    def describe_relation(self, relation_name: str) -> str:
361        """
362        Get description of a spatial relation.
363
364        Args:
365            relation_name: Name of the relation
366
367        Returns:
368            Human-readable description
369
370        Raises:
371            UnknownRelationError: If relation is not registered
372        """
373        config = self.spatial_config.get_config(relation_name)
374        return config.description

Main entry point for parsing natural language location queries.

This class orchestrates the entire parsing pipeline:

  1. Initialize LLM with structured output
  2. Build prompt with spatial relations and examples
  3. Parse query through LLM
  4. Validate and enrich with defaults
  5. Return structured GeoQuery
Examples:

Basic usage:

>>> from langchain.chat_models import init_chat_model
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", api_key="sk-...")
>>> parser = GeoFilterParser(llm=llm)
>>> result = parser.parse("restaurants in Lausanne")
>>> print(result.reference_location.name)
'Lausanne'

With strict confidence mode:

>>> parser = GeoFilterParser(llm=llm, confidence_threshold=0.8, strict_mode=True)
>>> result = parser.parse("near the station")  # May raise LowConfidenceError
GeoFilterParser( llm: langchain_core.language_models.chat_models.BaseChatModel, spatial_config: SpatialRelationConfig | None = None, confidence_threshold: float = 0.6, strict_mode: bool = False, include_examples: bool = True, datasource: GeoDataSource | None = None, additional_instructions: str | None = None)
44    def __init__(
45        self,
46        llm: BaseChatModel,
47        spatial_config: SpatialRelationConfig | None = None,
48        confidence_threshold: float = 0.6,
49        strict_mode: bool = False,
50        include_examples: bool = True,
51        datasource: GeoDataSource | None = None,
52        additional_instructions: str | None = None,
53    ):
54        """
55        Initialize the parser.
56
57        Args:
58            llm: LangChain LLM instance (required).
59            spatial_config: Spatial relation configuration. If None, uses defaults
60            confidence_threshold: Minimum confidence to accept (0-1)
61            strict_mode: If True, raise error on low confidence. If False, warn only
62            include_examples: Whether to include few-shot examples in prompt
63            datasource: Optional GeoDataSource instance. If provided, the LLM will be informed
64                       about the concrete types available in that datasource for better type inference.
65            additional_instructions: Free-form text injected as a system message after the main
66                       system prompt and before few-shot examples. Use this to add caller-specific
67                       rules such as region-specific endonyms, domain aliases, or
68                       organization-specific place names without forking the default prompt.
69
70        Example:
71            >>> from langchain.chat_models import init_chat_model
72            >>> from etter.datasources.swissnames3d import SwissNames3DSource
73            >>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
74            >>> datasource = SwissNames3DSource("data/")
75            >>> parser = GeoFilterParser(llm=llm, datasource=datasource)
76        """
77        self.llm = llm
78
79        # Initialize spatial config
80        self.spatial_config = spatial_config or SpatialRelationConfig()
81
82        # Settings
83        self.confidence_threshold = confidence_threshold
84        self.strict_mode = strict_mode
85        self.include_examples = include_examples
86        self.datasource = datasource
87        self.additional_instructions = additional_instructions
88
89        # Build structured LLM
90        self.structured_llm = self._build_structured_llm()
91
92        # Build prompt template
93        self.prompt = self._build_prompt()

Initialize the parser.

Arguments:
  • llm: LangChain LLM instance (required).
  • spatial_config: Spatial relation configuration. If None, uses defaults
  • confidence_threshold: Minimum confidence to accept (0-1)
  • strict_mode: If True, raise error on low confidence. If False, warn only
  • include_examples: Whether to include few-shot examples in prompt
  • datasource: Optional GeoDataSource instance. If provided, the LLM will be informed about the concrete types available in that datasource for better type inference.
  • additional_instructions: Free-form text injected as a system message after the main system prompt and before few-shot examples. Use this to add caller-specific rules such as region-specific endonyms, domain aliases, or organization-specific place names without forking the default prompt.
Example:
>>> from langchain.chat_models import init_chat_model
>>> from etter.datasources.swissnames3d import SwissNames3DSource
>>> llm = init_chat_model(model="gpt-4o", model_provider="openai", temperature=0)
>>> datasource = SwissNames3DSource("data/")
>>> parser = GeoFilterParser(llm=llm, datasource=datasource)
llm
spatial_config
confidence_threshold
strict_mode
include_examples
datasource
additional_instructions
structured_llm
prompt
def parse(self, query: str) -> GeoQuery:
149    def parse(self, query: str) -> GeoQuery:
150        """
151        Parse a natural language location query into structured format.
152
153        This is the main method for parsing queries. It:
154        1. Invokes the LLM with structured output
155        2. Validates the spatial relation is registered
156        3. Enriches with default parameters
157        4. Checks confidence threshold
158
159        Args:
160            query: Natural language query in any language
161
162        Returns:
163            GeoQuery: Structured query representation with confidence scores
164
165        Raises:
166            ParsingError: If LLM fails to parse query into valid structure
167            ValidationError: If parsed query fails business logic validation
168            UnknownRelationError: If spatial relation is not registered
169            LowConfidenceError: If confidence below threshold (strict mode only)
170
171        Warns:
172            LowConfidenceWarning: If confidence below threshold (permissive mode)
173
174        Examples:
175            Simple containment query:
176            >>> result = parser.parse("in Bern")
177            >>> result.reference_location.name
178            'Bern'
179            >>> result.spatial_relation.relation
180            'in'
181
182            Buffer query:
183            >>> result = parser.parse("near Lake Geneva")
184            >>> result.spatial_relation.relation
185            'near'
186            >>> result.buffer_config.distance_m
187            5000
188
189            Directional query:
190            >>> result = parser.parse("north of Lausanne")
191            >>> result.spatial_relation.relation
192            'north_of'
193            >>> result.reference_location.name
194            'Lausanne'
195
196            Multilingual:
197            >>> result = parser.parse("près de Genève")
198            >>> result.spatial_relation.relation
199            'near'
200            >>> result.reference_location.name
201            'Genève'
202        """
203        formatted_messages = self.prompt.format_messages(query=query)
204
205        try:
206            response = self.structured_llm.invoke(formatted_messages)
207        except Exception as e:
208            raise ParsingError(
209                message=f"LLM invocation failed: {str(e)}",
210                raw_response="",
211                original_error=e,
212            ) from e
213
214        return self._finalize(self._unpack_response(response), query)

Parse a natural language location query into structured format.

This is the main method for parsing queries. It:

  1. Invokes the LLM with structured output
  2. Validates the spatial relation is registered
  3. Enriches with default parameters
  4. Checks confidence threshold
Arguments:
  • query: Natural language query in any language
Returns:

GeoQuery: Structured query representation with confidence scores

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Warns:

LowConfidenceWarning: If confidence below threshold (permissive mode)

Examples:

Simple containment query:

>>> result = parser.parse("in Bern")
>>> result.reference_location.name
'Bern'
>>> result.spatial_relation.relation
'in'

Buffer query:

>>> result = parser.parse("near Lake Geneva")
>>> result.spatial_relation.relation
'near'
>>> result.buffer_config.distance_m
5000

Directional query:

>>> result = parser.parse("north of Lausanne")
>>> result.spatial_relation.relation
'north_of'
>>> result.reference_location.name
'Lausanne'

Multilingual:

>>> result = parser.parse("près de Genève")
>>> result.spatial_relation.relation
'near'
>>> result.reference_location.name
'Genève'
async def aparse(self, query: str) -> GeoQuery:
216    async def aparse(self, query: str) -> GeoQuery:
217        """
218        Asynchronously parse a natural language location query into structured format.
219
220        Async counterpart to :meth:`parse`. Uses ``ainvoke`` on the structured LLM
221        so it can be awaited inside event loops (e.g. FastAPI endpoints) without
222        blocking. Validation is synchronous and runs after the LLM call.
223        """
224        formatted_messages = self.prompt.format_messages(query=query)
225
226        try:
227            response = await self.structured_llm.ainvoke(formatted_messages)
228        except Exception as e:
229            raise ParsingError(
230                message=f"LLM invocation failed: {str(e)}",
231                raw_response="",
232                original_error=e,
233            ) from e
234
235        return self._finalize(self._unpack_response(response), query)

Asynchronously parse a natural language location query into structured format.

Async counterpart to parse(). Uses ainvoke on the structured LLM so it can be awaited inside event loops (e.g. FastAPI endpoints) without blocking. Validation is synchronous and runs after the LLM call.

async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
237    async def parse_stream(self, query: str) -> AsyncGenerator[dict]:
238        """
239        Parse a natural language location query with streaming reasoning and results.
240
241        This method provides real-time feedback during the parsing process by yielding
242        intermediate reasoning steps and the final GeoQuery result. This is useful for
243        providing users with transparency into the LLM's decision-making process and
244        for building responsive UIs.
245
246        The stream yields dictionaries with the following event types:
247        - {"type": "start"} - Stream started
248        - {"type": "reasoning", "content": str} - Intermediate processing steps
249        - {"type": "data-response", "content": dict} - Final GeoQuery as JSON
250        - {"type": "error", "content": str} - Errors encountered during processing
251        - {"type": "finish"} - Stream completed successfully
252
253        Args:
254            query: Natural language query in any language
255
256        Yields:
257            dict: Stream events with type and optional content fields
258
259        Raises:
260            ParsingError: If LLM fails to parse query into valid structure
261            ValidationError: If parsed query fails business logic validation
262            UnknownRelationError: If spatial relation is not registered
263            LowConfidenceError: If confidence below threshold (strict mode only)
264
265        Examples:
266            Basic usage with async iteration:
267            >>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
268            ...     if event["type"] == "reasoning":
269            ...         print(f"Reasoning: {event['content']}")
270            ...     elif event["type"] == "data-response":
271            ...         geo_query = event["content"]
272            ...         print(f"Location: {geo_query['reference_location']['name']}")
273            ...     elif event["type"] == "error":
274            ...         print(f"Error: {event['content']}")
275
276            Using in a FastAPI streaming endpoint:
277            >>> from fastapi.responses import StreamingResponse
278            >>> @app.get("/stream")
279            >>> async def stream_endpoint(q: str):
280            ...     async def event_stream():
281            ...         async for event in parser.parse_stream(q):
282            ...             yield f"data: {json.dumps(event)}\\n\\n"
283            ...     return StreamingResponse(event_stream(), media_type="text/event-stream")
284        """
285        try:
286            # Signal start of stream
287            yield {"type": "start"}
288
289            yield {"type": "reasoning", "content": "Preparing query for LLM processing"}
290            formatted_messages = self.prompt.format_messages(query=query)
291
292            yield {"type": "reasoning", "content": "Analyzing spatial relationship and location"}
293            try:
294                response = await self.structured_llm.ainvoke(formatted_messages)
295            except Exception as e:
296                yield {"type": "error", "content": f"LLM invocation failed: {str(e)}"}
297                raise ParsingError(
298                    message=f"LLM invocation failed: {str(e)}",
299                    raw_response="",
300                    original_error=e,
301                ) from e
302
303            yield {"type": "reasoning", "content": "Parsing LLM response into structured format"}
304            try:
305                geo_query = self._unpack_response(response)
306            except ParsingError:
307                yield {"type": "error", "content": "Failed to parse response - invalid JSON or missing fields"}
308                raise
309
310            if geo_query.confidence_breakdown.reasoning:
311                yield {
312                    "type": "reasoning",
313                    "content": f"LLM reasoning: {geo_query.confidence_breakdown.reasoning}",
314                }
315
316            yield {"type": "reasoning", "content": "Validating spatial relation configuration"}
317            geo_query = self._finalize(geo_query, query)
318
319            yield {"type": "reasoning", "content": "Query parsing completed successfully"}
320            yield {"type": "data-response", "content": geo_query.model_dump()}
321
322            # Signal successful completion
323            yield {"type": "finish"}
324
325        except Exception as e:
326            # Emit error event before re-raising
327            yield {"type": "error", "content": f"Error during parsing: {str(e)}"}
328            raise

Parse a natural language location query with streaming reasoning and results.

This method provides real-time feedback during the parsing process by yielding intermediate reasoning steps and the final GeoQuery result. This is useful for providing users with transparency into the LLM's decision-making process and for building responsive UIs.

The stream yields dictionaries with the following event types:

  • {"type": "start"} - Stream started
  • {"type": "reasoning", "content": str} - Intermediate processing steps
  • {"type": "data-response", "content": dict} - Final GeoQuery as JSON
  • {"type": "error", "content": str} - Errors encountered during processing
  • {"type": "finish"} - Stream completed successfully
Arguments:
  • query: Natural language query in any language
Yields:

dict: Stream events with type and optional content fields

Raises:
  • ParsingError: If LLM fails to parse query into valid structure
  • ValidationError: If parsed query fails business logic validation
  • UnknownRelationError: If spatial relation is not registered
  • LowConfidenceError: If confidence below threshold (strict mode only)
Examples:

Basic usage with async iteration:

>>> async for event in parser.parse_stream("restaurants near Lake Geneva"):
...     if event["type"] == "reasoning":
...         print(f"Reasoning: {event['content']}")
...     elif event["type"] == "data-response":
...         geo_query = event["content"]
...         print(f"Location: {geo_query['reference_location']['name']}")
...     elif event["type"] == "error":
...         print(f"Error: {event['content']}")

Using in a FastAPI streaming endpoint:

>>> from fastapi.responses import StreamingResponse
>>> @app.get("/stream")
>>> async def stream_endpoint(q: str):
...     async def event_stream():
...         async for event in parser.parse_stream(q):
...             yield f"data: {json.dumps(event)}\n\n"
...     return StreamingResponse(event_stream(), media_type="text/event-stream")
def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
330    def parse_batch(self, queries: list[str]) -> list[GeoQuery]:
331        """
332        Parse multiple queries in batch.
333
334        Note: This is a simple sequential implementation.
335        For true parallelization, consider using async methods or ThreadPoolExecutor.
336
337        Args:
338            queries: List of natural language queries
339
340        Returns:
341            List of GeoQuery objects (same order as input)
342
343        Raises:
344            Same exceptions as parse() for any failing query
345        """
346        return [self.parse(query) for query in queries]

Parse multiple queries in batch.

Note: This is a simple sequential implementation. For true parallelization, consider using async methods or ThreadPoolExecutor.

Arguments:
  • queries: List of natural language queries
Returns:

List of GeoQuery objects (same order as input)

Raises:
  • Same exceptions as parse() for any failing query
def get_available_relations( self, category: Optional[Literal['containment', 'buffer', 'directional', 'clipping']] = None) -> list[str]:
348    def get_available_relations(self, category: RelationCategory | None = None) -> list[str]:
349        """
350        Get list of available spatial relations.
351
352        Args:
353            category: Optional filter by category ("containment", "buffer", "directional")
354
355        Returns:
356            List of relation names
357        """
358        return self.spatial_config.list_relations(category=category)

Get list of available spatial relations.

Arguments:
  • category: Optional filter by category ("containment", "buffer", "directional")
Returns:

List of relation names

def describe_relation(self, relation_name: str) -> str:
360    def describe_relation(self, relation_name: str) -> str:
361        """
362        Get description of a spatial relation.
363
364        Args:
365            relation_name: Name of the relation
366
367        Returns:
368            Human-readable description
369
370        Raises:
371            UnknownRelationError: If relation is not registered
372        """
373        config = self.spatial_config.get_config(relation_name)
374        return config.description

Get description of a spatial relation.

Arguments:
  • relation_name: Name of the relation
Returns:

Human-readable description

Raises:
  • UnknownRelationError: If relation is not registered
class GeoQuery(pydantic.main.BaseModel):
123class GeoQuery(BaseModel):
124    """
125    Root model representing a parsed geographic query.
126    This is the main output structure returned by the parser.
127    """
128
129    query_type: Literal["simple", "compound", "split", "boolean"] = Field(
130        "simple",
131        description="Type of query. Phase 1 only supports 'simple'. "
132        "Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations",
133    )
134    spatial_relation: SpatialRelation = Field(description="Spatial relationship to reference location")
135    reference_location: ReferenceLocation | None = Field(
136        None,
137        description="Reference location for the spatial query. "
138        "None when the query contains no named geographic location.",
139    )
140    buffer_config: BufferConfig | None = Field(
141        None,
142        description="Buffer configuration for buffer and directional relations. "
143        "Auto-generated with defaults by enrich_with_defaults() if not provided. "
144        "Required for 'near', 'around', 'north_of', etc. "
145        "Set to None for containment relations ('in').",
146    )
147    confidence_breakdown: ConfidenceScore = Field(description="Confidence scores for different aspects of the parse")
148    original_query: str = Field(
149        default="",
150        description="Original query text exactly as provided by the user",
151    )
152
153    @model_validator(mode="after")
154    def validate_buffer_config_consistency(self) -> "GeoQuery":
155        """Validate buffer_config consistency with relation category."""
156        # Buffer and directional relations must have buffer_config
157        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
158            raise ValueError(
159                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
160            )
161
162        # Containment and clipping relations should not have buffer_config
163        if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None:
164            raise ValueError(
165                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
166                f"should not have buffer_config"
167            )
168
169        return self

Root model representing a parsed geographic query. This is the main output structure returned by the parser.

query_type: Literal['simple', 'compound', 'split', 'boolean'] = 'simple'

Type of query. Phase 1 only supports 'simple'. Future: 'compound' = multi-step, 'split' = area division, 'boolean' = AND/OR/NOT operations

spatial_relation: SpatialRelation = PydanticUndefined

Spatial relationship to reference location

reference_location: ReferenceLocation | None = None

Reference location for the spatial query. None when the query contains no named geographic location.

buffer_config: BufferConfig | None = None

Buffer configuration for buffer and directional relations. Auto-generated with defaults by enrich_with_defaults() if not provided. Required for 'near', 'around', 'north_of', etc. Set to None for containment relations ('in').

confidence_breakdown: ConfidenceScore = PydanticUndefined

Confidence scores for different aspects of the parse

original_query: str = ''

Original query text exactly as provided by the user

@model_validator(mode='after')
def validate_buffer_config_consistency(self) -> GeoQuery:
153    @model_validator(mode="after")
154    def validate_buffer_config_consistency(self) -> "GeoQuery":
155        """Validate buffer_config consistency with relation category."""
156        # Buffer and directional relations must have buffer_config
157        if self.spatial_relation.category in ("buffer", "directional") and self.buffer_config is None:
158            raise ValueError(
159                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' requires buffer_config"
160            )
161
162        # Containment and clipping relations should not have buffer_config
163        if self.spatial_relation.category in ("containment", "clipping") and self.buffer_config is not None:
164            raise ValueError(
165                f"{self.spatial_relation.category} relation '{self.spatial_relation.relation}' "
166                f"should not have buffer_config"
167            )
168
169        return self

Validate buffer_config consistency with relation category.

class SpatialRelation(pydantic.main.BaseModel):
101class SpatialRelation(BaseModel):
102    """A spatial relationship between target and reference."""
103
104    relation: str = Field(
105        description="Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', "
106        "'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list."
107    )
108    category: RelationCategory = Field(
109        description="Category of spatial relation. "
110        "'containment' = exact boundary matching (in), "
111        "'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), "
112        "'directional' = sector-based queries (north_of, south_of, east_of, west_of), "
113        "'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)"
114    )
115    explicit_distance: float | None = Field(
116        None,
117        description="Distance in meters if explicitly mentioned by user. "
118        "For example: 'within 5km' → 5000, 'within 500 meters' → 500. "
119        "Leave null if not explicitly stated.",
120    )

A spatial relationship between target and reference.

relation: str = PydanticUndefined

Spatial relation keyword. Examples: 'in', 'near', 'around', 'north_of', 'on_shores_of', 'in_the_heart_of', etc. Use the exact relation name from the available list.

category: Literal['containment', 'buffer', 'directional', 'clipping'] = PydanticUndefined

Category of spatial relation. 'containment' = exact boundary matching (in), 'buffer' = proximity or erosion operations (near, around, on_shores_of, in_the_heart_of, bordering), 'directional' = sector-based queries (north_of, south_of, east_of, west_of), 'clipping' = clip reference to a directional half (northern_part_of, southern_part_of, etc.)

explicit_distance: float | None = None

Distance in meters if explicitly mentioned by user. For example: 'within 5km' → 5000, 'within 500 meters' → 500. Leave null if not explicitly stated.

class ReferenceLocation(pydantic.main.BaseModel):
41class ReferenceLocation(BaseModel):
42    """A geographic reference location extracted from the query."""
43
44    name: str = Field(description="Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')")
45    # FIXME: enum ?
46    type: str | None = Field(
47        None,
48        description="Type hint for geographic feature (city, lake, mountain, canton, country, "
49        "train_station, airport, river, road, etc.). This is a HINT for ranking results, "
50        "NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, "
51        "'Rhone' could be river or road), provide your best guess or leave null. "
52        "The datasource will return multiple types ranked by relevance.",
53    )
54    type_confidence: ConfidenceLevel | None = Field(
55        None,
56        description="Confidence in the type inference (0-1). High confidence (>0.8) when type is "
57        "explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous "
58        "(e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, "
59        "'in X' → city/region, 'on X' → lake/mountain.",
60    )

A geographic reference location extracted from the query.

name: str = PydanticUndefined

Location name as mentioned in the query (e.g., 'Lausanne', 'Lake Geneva')

type: str | None = None

Type hint for geographic feature (city, lake, mountain, canton, country, train_station, airport, river, road, etc.). This is a HINT for ranking results, NOT a strict filter. For ambiguous cases (e.g., 'Bern' could be city or canton, 'Rhone' could be river or road), provide your best guess or leave null. The datasource will return multiple types ranked by relevance.

type_confidence: Optional[Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]] = None

Confidence in the type inference (0-1). High confidence (>0.8) when type is explicit in query (e.g., 'Lake Geneva'). Low confidence (<0.6) when ambiguous (e.g., 'Bern', 'Rhone'). Use spatial relation as hint: 'along X' → river/road, 'in X' → city/region, 'on X' → lake/mountain.

class BufferConfig(pydantic.main.BaseModel):
63class BufferConfig(BaseModel):
64    """Configuration for buffer-based spatial operations."""
65
66    distance_m: float = Field(
67        description="Buffer distance in meters. Positive values expand outward (proximity), "
68        "negative values erode inward (e.g., 'in the heart of'). "
69        "Examples: 5000 = 5km radius, -500 = 500m erosion"
70    )
71    buffer_from: Literal["center", "boundary"] = Field(
72        description="Buffer origin. 'center' = buffer from centroid point (for proximity), "
73        "'boundary' = buffer from polygon boundary (for shores, along roads, erosion)"
74    )
75    ring_only: bool = Field(
76        False,
77        description="If True, exclude the reference feature itself to create a ring/donut shape. "
78        "Used for queries like 'on the shores of Lake X' (exclude the lake water itself). "
79        "Only valid with buffer_from='boundary'.",
80    )
81    side: Literal["left", "right"] | None = Field(
82        None,
83        description="Side of a linear feature for one-sided buffer. "
84        "'left' = left side relative to line direction, 'right' = right side. "
85        "None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().",
86    )
87    inferred: bool = Field(
88        True,
89        description="True if this configuration was inferred from relation defaults. "
90        "False if the user explicitly specified distance or buffer parameters.",
91    )
92
93    @model_validator(mode="after")
94    def validate_ring_only(self) -> "BufferConfig":
95        """Validate that ring_only is only used with boundary buffers."""
96        if self.ring_only and self.buffer_from == "center":
97            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
98        return self

Configuration for buffer-based spatial operations.

distance_m: float = PydanticUndefined

Buffer distance in meters. Positive values expand outward (proximity), negative values erode inward (e.g., 'in the heart of'). Examples: 5000 = 5km radius, -500 = 500m erosion

buffer_from: Literal['center', 'boundary'] = PydanticUndefined

Buffer origin. 'center' = buffer from centroid point (for proximity), 'boundary' = buffer from polygon boundary (for shores, along roads, erosion)

ring_only: bool = False

If True, exclude the reference feature itself to create a ring/donut shape. Used for queries like 'on the shores of Lake X' (exclude the lake water itself). Only valid with buffer_from='boundary'.

side: Optional[Literal['left', 'right']] = None

Side of a linear feature for one-sided buffer. 'left' = left side relative to line direction, 'right' = right side. None = both sides (symmetric buffer). Populated from relation config by enrich_with_defaults().

inferred: bool = True

True if this configuration was inferred from relation defaults. False if the user explicitly specified distance or buffer parameters.

@model_validator(mode='after')
def validate_ring_only(self) -> BufferConfig:
93    @model_validator(mode="after")
94    def validate_ring_only(self) -> "BufferConfig":
95        """Validate that ring_only is only used with boundary buffers."""
96        if self.ring_only and self.buffer_from == "center":
97            raise ValueError("ring_only=True requires buffer_from='boundary' (cannot create ring from center point)")
98        return self

Validate that ring_only is only used with boundary buffers.

class ConfidenceScore(pydantic.main.BaseModel):
21class ConfidenceScore(BaseModel):
22    """Confidence scores for different aspects of the parsed query."""
23
24    overall: ConfidenceLevel = Field(
25        description="Overall confidence score for the entire query parse. "
26        "0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain",
27    )
28    location_confidence: ConfidenceLevel = Field(
29        description="Confidence in correctly identifying the reference location",
30    )
31    relation_confidence: ConfidenceLevel = Field(
32        description="Confidence in correctly identifying the spatial relation",
33    )
34    reasoning: str | None = Field(
35        None,
36        description="Explanation for confidence scores. Always include reasoning for clarity and debugging. "
37        "For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.",
38    )

Confidence scores for different aspects of the parsed query.

overall: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Overall confidence score for the entire query parse. 0.9-1.0 = highly confident, 0.7-0.9 = confident, 0.5-0.7 = uncertain, <0.5 = very uncertain

location_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the reference location

relation_confidence: Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])] = PydanticUndefined

Confidence in correctly identifying the spatial relation

reasoning: str | None = None

Explanation for confidence scores. Always include reasoning for clarity and debugging. For example: 'Ambiguous location name', 'Unclear spatial relationship', 'High confidence in location matching', etc.

ConfidenceLevel = typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, description='Confidence score between 0 and 1', metadata=[Ge(ge=0.0), Le(le=1.0)])]
GeometryFormat = typing.Literal['geojson', 'wkt', 'wkb']
class SpatialRelationConfig:
 41class SpatialRelationConfig:
 42    """
 43    Registry and configuration for spatial relations.
 44
 45    Manages built-in and custom spatial relations with their default parameters.
 46    """
 47
 48    def __init__(self):
 49        """Initialize with built-in spatial relations."""
 50        self.relations: dict[str, RelationConfig] = {}
 51        self._initialize_defaults()
 52
 53    def _initialize_defaults(self):
 54        """Register built-in spatial relations from ARCHITECTURE.md."""
 55
 56        # ===== CONTAINMENT RELATIONS =====
 57        self.register_relation(
 58            RelationConfig(
 59                name="in",
 60                category="containment",
 61                description="Feature is within the reference boundary",
 62            )
 63        )
 64
 65        # ===== BUFFER/PROXIMITY RELATIONS =====
 66        self.register_relation(
 67            RelationConfig(
 68                name="around",
 69                category="buffer",
 70                description="Proximity search around a point with default 1km radius",
 71                default_distance_m=1000,
 72                buffer_from="center",
 73            )
 74        )
 75
 76        self.register_relation(
 77            RelationConfig(
 78                name="near",
 79                category="buffer",
 80                description="Proximity search with default 5km radius",
 81                default_distance_m=5000,
 82                buffer_from="center",
 83            )
 84        )
 85
 86        self.register_relation(
 87            RelationConfig(
 88                name="on_shores_of",
 89                category="buffer",
 90                description="Ring buffer around lake/water boundary, excluding the water body itself",
 91                default_distance_m=1000,
 92                buffer_from="boundary",
 93                ring_only=True,
 94            )
 95        )
 96
 97        self.register_relation(
 98            RelationConfig(
 99                name="along",
100                category="buffer",
101                description="Buffer following a linear feature like a river or road",
102                default_distance_m=500,
103                buffer_from="boundary",
104            )
105        )
106
107        self.register_relation(
108            RelationConfig(
109                name="left_bank",
110                category="buffer",
111                description="Left bank of a linear feature (river, road) relative to its direction/flow",
112                default_distance_m=500,
113                buffer_from="boundary",
114                side="left",
115            )
116        )
117
118        self.register_relation(
119            RelationConfig(
120                name="right_bank",
121                category="buffer",
122                description="Right bank of a linear feature (river, road) relative to its direction/flow",
123                default_distance_m=500,
124                buffer_from="boundary",
125                side="right",
126            )
127        )
128
129        self.register_relation(
130            RelationConfig(
131                name="in_the_heart_of",
132                category="buffer",
133                description="Central area excluding periphery (negative buffer - erosion)",
134                default_distance_m=-500,
135                buffer_from="boundary",
136            )
137        )
138
139        self.register_relation(
140            RelationConfig(
141                name="bordering",
142                category="buffer",
143                description="Thin ring just outside the reference boundary, for land-border adjacency queries (e.g. 'cities bordering Germany')",
144                default_distance_m=2000,
145                buffer_from="boundary",
146                ring_only=True,
147            )
148        )
149
150        # ===== CLIPPING RELATIONS =====
151        # Clip the reference geometry to a directional half-plane using bbox intersection.
152        # These answer "what is in the northern/southern/eastern/western portion of X?"
153        # as opposed to directional relations which answer "what is north/south/etc. of X?".
154        self.register_relation(
155            RelationConfig(
156                name="northern_part_of",
157                category="clipping",
158                description="Northern half of the reference geometry (bbox clip to upper half)",
159                clip_direction="north",
160            )
161        )
162
163        self.register_relation(
164            RelationConfig(
165                name="southern_part_of",
166                category="clipping",
167                description="Southern half of the reference geometry (bbox clip to lower half)",
168                clip_direction="south",
169            )
170        )
171
172        self.register_relation(
173            RelationConfig(
174                name="eastern_part_of",
175                category="clipping",
176                description="Eastern half of the reference geometry (bbox clip to right half)",
177                clip_direction="east",
178            )
179        )
180
181        self.register_relation(
182            RelationConfig(
183                name="western_part_of",
184                category="clipping",
185                description="Western half of the reference geometry (bbox clip to left half)",
186                clip_direction="west",
187            )
188        )
189
190        # ===== DIRECTIONAL RELATIONS =====
191        # All directional relations use consistent defaults:
192        # - Distance: 10km radius (default_distance_m=10000)
193        # - Sector: 90° angular wedge (sector_angle_degrees=90)
194        # - Origin: Centroid of reference location (buffer_from="center" set in enrich_with_defaults)
195        # These defaults are applied automatically by enrich_with_defaults() for any directional query.
196        # Convention: 0° = North, angles increase clockwise (90° = East, 180° = South, 270° = West)
197        self.register_relation(
198            RelationConfig(
199                name="north_of",
200                category="directional",
201                description="Directional sector north of reference",
202                default_distance_m=10000,
203                sector_angle_degrees=90,
204                direction_angle_degrees=0,
205            )
206        )
207
208        self.register_relation(
209            RelationConfig(
210                name="south_of",
211                category="directional",
212                description="Directional sector south of reference",
213                default_distance_m=10000,
214                sector_angle_degrees=90,
215                direction_angle_degrees=180,
216            )
217        )
218
219        self.register_relation(
220            RelationConfig(
221                name="east_of",
222                category="directional",
223                description="Directional sector east of reference",
224                default_distance_m=10000,
225                sector_angle_degrees=90,
226                direction_angle_degrees=90,
227            )
228        )
229
230        self.register_relation(
231            RelationConfig(
232                name="west_of",
233                category="directional",
234                description="Directional sector west of reference",
235                default_distance_m=10000,
236                sector_angle_degrees=90,
237                direction_angle_degrees=270,
238            )
239        )
240
241        # ===== DIAGONAL DIRECTIONAL RELATIONS =====
242        self.register_relation(
243            RelationConfig(
244                name="northeast_of",
245                category="directional",
246                description="Directional sector northeast of reference",
247                default_distance_m=10000,
248                sector_angle_degrees=90,
249                direction_angle_degrees=45,
250            )
251        )
252
253        self.register_relation(
254            RelationConfig(
255                name="southeast_of",
256                category="directional",
257                description="Directional sector southeast of reference",
258                default_distance_m=10000,
259                sector_angle_degrees=90,
260                direction_angle_degrees=135,
261            )
262        )
263
264        self.register_relation(
265            RelationConfig(
266                name="southwest_of",
267                category="directional",
268                description="Directional sector southwest of reference",
269                default_distance_m=10000,
270                sector_angle_degrees=90,
271                direction_angle_degrees=225,
272            )
273        )
274
275        self.register_relation(
276            RelationConfig(
277                name="northwest_of",
278                category="directional",
279                description="Directional sector northwest of reference",
280                default_distance_m=10000,
281                sector_angle_degrees=90,
282                direction_angle_degrees=315,
283            )
284        )
285
286    def register_relation(self, config: RelationConfig) -> None:
287        """Register a new spatial relation."""
288        self.relations[config.name] = config
289
290    def has_relation(self, name: str) -> bool:
291        """Check if a relation is registered."""
292        return name in self.relations
293
294    def get_config(self, name: str) -> RelationConfig:
295        """Get configuration for a relation. Raises UnknownRelationError if not found."""
296        if not self.has_relation(name):
297            raise UnknownRelationError(
298                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
299                relation_name=name,
300            )
301        return self.relations[name]
302
303    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
304        """List available relation names."""
305        if category is None:
306            return sorted(self.relations.keys())
307        return sorted(r.name for r in self.relations.values() if r.category == category)
308
309    def format_for_prompt(self) -> str:
310        """Format relations for inclusion in LLM prompt."""
311        lines = []
312
313        # Group by category
314        for category in get_args(RelationCategory):
315            category_relations = [r for r in self.relations.values() if r.category == category]
316            if not category_relations:
317                continue
318
319            lines.append(f"\n{category.upper()} RELATIONS:")
320
321            for rel in sorted(category_relations, key=lambda r: r.name):
322                # Build distance info
323                dist_info = ""
324                if rel.default_distance_m is not None:
325                    dist_str = f"{abs(rel.default_distance_m)}m"
326                    if rel.default_distance_m < 0:
327                        dist_info = f" (default: {dist_str} erosion)"
328                    else:
329                        dist_info = f" (default: {dist_str})"
330
331                # Build special flags
332                flags = []
333                if rel.ring_only:
334                    flags.append("ring buffer")
335                if rel.buffer_from:
336                    flags.append(f"from {rel.buffer_from}")
337                if rel.side:
338                    flags.append(f"{rel.side} side only")
339                flag_info = f" [{', '.join(flags)}]" if flags else ""
340
341                # Format line
342                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
343                lines.append(f"    {rel.description}")
344
345        # Add notes
346        lines.append("\nNOTES:")
347        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
348        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)")
349        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
350        lines.append("  • Clipping relations return a sub-area of the reference geometry (not a buffer outward)")
351
352        return "\n".join(lines)

Registry and configuration for spatial relations.

Manages built-in and custom spatial relations with their default parameters.

SpatialRelationConfig()
48    def __init__(self):
49        """Initialize with built-in spatial relations."""
50        self.relations: dict[str, RelationConfig] = {}
51        self._initialize_defaults()

Initialize with built-in spatial relations.

relations: dict[str, RelationConfig]
def register_relation(self, config: RelationConfig) -> None:
286    def register_relation(self, config: RelationConfig) -> None:
287        """Register a new spatial relation."""
288        self.relations[config.name] = config

Register a new spatial relation.

def has_relation(self, name: str) -> bool:
290    def has_relation(self, name: str) -> bool:
291        """Check if a relation is registered."""
292        return name in self.relations

Check if a relation is registered.

def get_config(self, name: str) -> RelationConfig:
294    def get_config(self, name: str) -> RelationConfig:
295        """Get configuration for a relation. Raises UnknownRelationError if not found."""
296        if not self.has_relation(name):
297            raise UnknownRelationError(
298                f"Unknown spatial relation: '{name}'. Available relations: {', '.join(sorted(self.relations.keys()))}",
299                relation_name=name,
300            )
301        return self.relations[name]

Get configuration for a relation. Raises UnknownRelationError if not found.

def list_relations( self, category: Optional[Literal['containment', 'buffer', 'directional', 'clipping']] = None) -> list[str]:
303    def list_relations(self, category: RelationCategory | None = None) -> list[str]:
304        """List available relation names."""
305        if category is None:
306            return sorted(self.relations.keys())
307        return sorted(r.name for r in self.relations.values() if r.category == category)

List available relation names.

def format_for_prompt(self) -> str:
309    def format_for_prompt(self) -> str:
310        """Format relations for inclusion in LLM prompt."""
311        lines = []
312
313        # Group by category
314        for category in get_args(RelationCategory):
315            category_relations = [r for r in self.relations.values() if r.category == category]
316            if not category_relations:
317                continue
318
319            lines.append(f"\n{category.upper()} RELATIONS:")
320
321            for rel in sorted(category_relations, key=lambda r: r.name):
322                # Build distance info
323                dist_info = ""
324                if rel.default_distance_m is not None:
325                    dist_str = f"{abs(rel.default_distance_m)}m"
326                    if rel.default_distance_m < 0:
327                        dist_info = f" (default: {dist_str} erosion)"
328                    else:
329                        dist_info = f" (default: {dist_str})"
330
331                # Build special flags
332                flags = []
333                if rel.ring_only:
334                    flags.append("ring buffer")
335                if rel.buffer_from:
336                    flags.append(f"from {rel.buffer_from}")
337                if rel.side:
338                    flags.append(f"{rel.side} side only")
339                flag_info = f" [{', '.join(flags)}]" if flags else ""
340
341                # Format line
342                lines.append(f"  • {rel.name}{dist_info}{flag_info}")
343                lines.append(f"    {rel.description}")
344
345        # Add notes
346        lines.append("\nNOTES:")
347        lines.append("  • Negative distances indicate erosion/shrinking (e.g., in_the_heart_of)")
348        lines.append("  • Ring buffers exclude the reference feature itself (e.g., shores of lake, bordering)")
349        lines.append("  • Buffer from 'center' vs 'boundary' determines buffer origin")
350        lines.append("  • Clipping relations return a sub-area of the reference geometry (not a buffer outward)")
351
352        return "\n".join(lines)

Format relations for inclusion in LLM prompt.

@dataclass
class RelationConfig:
13@dataclass
14class RelationConfig:
15    """
16    Configuration for a single spatial relation.
17
18    Attributes:
19        name: Relation identifier (e.g., "in", "near", "north_of")
20        category: Type of spatial operation
21        description: Human-readable description for LLM prompt
22        default_distance_m: Default buffer distance in meters
23        buffer_from: Buffer origin
24        ring_only: Exclude reference feature to create ring buffer
25        sector_angle_degrees: Angular sector for directional queries
26        direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
27    """
28
29    name: str
30    category: RelationCategory
31    description: str
32    default_distance_m: float | None = None
33    buffer_from: Literal["center", "boundary"] | None = None
34    ring_only: bool = False
35    side: Literal["left", "right"] | None = None
36    sector_angle_degrees: float | None = None
37    direction_angle_degrees: float | None = None
38    clip_direction: Literal["north", "south", "east", "west"] | None = None

Configuration for a single spatial relation.

Attributes:
  • name: Relation identifier (e.g., "in", "near", "north_of")
  • category: Type of spatial operation
  • description: Human-readable description for LLM prompt
  • default_distance_m: Default buffer distance in meters
  • buffer_from: Buffer origin
  • ring_only: Exclude reference feature to create ring buffer
  • sector_angle_degrees: Angular sector for directional queries
  • direction_angle_degrees: Direction angle in degrees (0=North, 90=East, 180=South, 270=West, clockwise)
RelationConfig( name: str, category: Literal['containment', 'buffer', 'directional', 'clipping'], description: str, default_distance_m: float | None = None, buffer_from: Optional[Literal['center', 'boundary']] = None, ring_only: bool = False, side: Optional[Literal['left', 'right']] = None, sector_angle_degrees: float | None = None, direction_angle_degrees: float | None = None, clip_direction: Optional[Literal['north', 'south', 'east', 'west']] = None)
name: str
category: Literal['containment', 'buffer', 'directional', 'clipping']
description: str
default_distance_m: float | None = None
buffer_from: Optional[Literal['center', 'boundary']] = None
ring_only: bool = False
side: Optional[Literal['left', 'right']] = None
sector_angle_degrees: float | None = None
direction_angle_degrees: float | None = None
clip_direction: Optional[Literal['north', 'south', 'east', 'west']] = None
class GeoFilterError(builtins.Exception):
 7class GeoFilterError(Exception):
 8    """Base exception for all GeoFilter errors."""
 9
10    pass

Base exception for all GeoFilter errors.

class ParsingError(etter.GeoFilterError):
13class ParsingError(GeoFilterError):
14    """LLM failed to parse query into valid structure."""
15
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

LLM failed to parse query into valid structure.

ParsingError( message: str, raw_response: str = '', original_error: Exception | None = None)
16    def __init__(self, message: str, raw_response: str = "", original_error: Exception | None = None):
17        """
18        Initialize parsing error.
19
20        Args:
21            message: Error description
22            raw_response: Raw response from LLM
23            original_error: Original exception that caused parsing failure
24        """
25        self.raw_response = raw_response
26        self.original_error = original_error
27        super().__init__(message)

Initialize parsing error.

Arguments:
  • message: Error description
  • raw_response: Raw response from LLM
  • original_error: Original exception that caused parsing failure
raw_response
original_error
class ValidationError(etter.GeoFilterError):
30class ValidationError(GeoFilterError):
31    """Structured output is valid but fails business logic validation."""
32
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Structured output is valid but fails business logic validation.

ValidationError(message: str, field: str | None = None, detail: str | None = None)
33    def __init__(self, message: str, field: str | None = None, detail: str | None = None):
34        """
35        Initialize validation error.
36
37        Args:
38            message: Error description
39            field: Field name that failed validation
40            detail: Additional detail about the validation failure
41        """
42        self.field = field
43        self.detail = detail
44        super().__init__(message)

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
field
detail
class NoReferenceLocationError(etter.ValidationError):
47class NoReferenceLocationError(ValidationError):
48    """Query contains no named geographic reference location."""
49
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Query contains no named geographic reference location.

NoReferenceLocationError(message: str)
50    def __init__(self, message: str):
51        super().__init__(message, field="reference_location")

Initialize validation error.

Arguments:
  • message: Error description
  • field: Field name that failed validation
  • detail: Additional detail about the validation failure
class UnknownRelationError(etter.ValidationError):
54class UnknownRelationError(ValidationError):
55    """Spatial relation is not registered in configuration."""
56
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Spatial relation is not registered in configuration.

UnknownRelationError(message: str, relation_name: str)
57    def __init__(self, message: str, relation_name: str):
58        """
59        Initialize unknown relation error.
60
61        Args:
62            message: Error description
63            relation_name: The unknown relation name
64        """
65        self.relation_name = relation_name
66        super().__init__(message, field="spatial_relation")

Initialize unknown relation error.

Arguments:
  • message: Error description
  • relation_name: The unknown relation name
relation_name
class LowConfidenceError(etter.GeoFilterError):
69class LowConfidenceError(GeoFilterError):
70    """Query confidence is below threshold (strict mode)."""
71
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Query confidence is below threshold (strict mode).

LowConfidenceError(message: str, confidence: float, reasoning: str | None = None)
72    def __init__(self, message: str, confidence: float, reasoning: str | None = None):
73        """
74        Initialize low confidence error.
75
76        Args:
77            message: Error description
78            confidence: Confidence score (0-1)
79            reasoning: Optional explanation for low confidence
80        """
81        self.confidence = confidence
82        self.reasoning = reasoning
83        super().__init__(message)

Initialize low confidence error.

Arguments:
  • message: Error description
  • confidence: Confidence score (0-1)
  • reasoning: Optional explanation for low confidence
confidence
reasoning
class LowConfidenceWarning(builtins.UserWarning):
86class LowConfidenceWarning(UserWarning):
87    """Query confidence is below threshold (permissive mode)."""
88
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Query confidence is below threshold (permissive mode).

LowConfidenceWarning(confidence: float, message: str = '')
89    def __init__(self, confidence: float, message: str = ""):
90        """
91        Initialize low confidence warning.
92
93        Args:
94            confidence: Confidence score (0-1)
95            message: Warning message
96        """
97        self.confidence = confidence
98        super().__init__(message)

Initialize low confidence warning.

Arguments:
  • confidence: Confidence score (0-1)
  • message: Warning message
confidence
class GeoDataSource(typing.Protocol):
14class GeoDataSource(Protocol):
15    """
16    Protocol for geographic data sources.
17
18    Implementations resolve location names to geographic features.
19    Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).
20
21    Example of returned feature:
22        {
23            "type": "Feature",
24            "id": "uuid-123",
25            "geometry": {"type": "Point", "coordinates": [8.5, 47.3]},
26            "bbox": [8.4, 47.3, 8.6, 47.4],
27            "properties": {
28                "name": "Zürich",
29                "type": "city",
30                "confidence": 1.0,
31                ...
32            }
33        }
34    """
35
36    def search(
37        self,
38        name: str,
39        type: str | None = None,
40        max_results: int = 10,
41    ) -> list[Feature]:
42        """
43        Search for geographic features by name.
44
45        Args:
46            name: Location name to search for (e.g., "Lake Geneva", "Bern").
47            type: Optional type hint for filtering/ranking results.
48                  Examples: "lake", "city", "mountain", "canton", "river".
49                  When provided, matching types are ranked higher.
50            max_results: Maximum number of results to return.
51
52        Returns:
53            List of matching GeoJSON Feature dicts, ranked by relevance.
54            Returns empty list if no matches found.
55        """
56        ...
57
58    def get_by_id(self, feature_id: str) -> Feature | None:
59        """
60        Get a specific feature by its unique identifier.
61
62        Args:
63            feature_id: Unique identifier from the data source.
64
65        Returns:
66            The matching GeoJSON Feature dict, or None if not found.
67        """
68        ...
69
70    def get_available_types(self) -> list[str]:
71        """
72        Get list of concrete geographic types this datasource can return.
73
74        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
75        that this datasource uses in the "type" property of returned features.
76        These types can be matched against the location type hierarchy for fuzzy matching.
77
78        The returned types should be a subset of or mapped to the standard location
79        type hierarchy defined in location_types.TYPE_HIERARCHY.
80
81        Returns:
82            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
83            Empty list if this datasource does not provide type information.
84
85        Example:
86            >>> source = SwissNames3DSource("data/")
87            >>> types = source.get_available_types()
88            >>> print(types)
89            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
90        """
91        ...

Protocol for geographic data sources.

Implementations resolve location names to geographic features. Features are returned as standard GeoJSON Feature objects (dicts) in WGS84 (EPSG:4326).

Example of returned feature:

{ "type": "Feature", "id": "uuid-123", "geometry": {"type": "Point", "coordinates": [8.5, 47.3]}, "bbox": [8.4, 47.3, 8.6, 47.4], "properties": { "name": "Zürich", "type": "city", "confidence": 1.0, ... } }

GeoDataSource(*args, **kwargs)
1965def _no_init_or_replace_init(self, *args, **kwargs):
1966    cls = type(self)
1967
1968    if cls._is_protocol:
1969        raise TypeError('Protocols cannot be instantiated')
1970
1971    # Already using a custom `__init__`. No need to calculate correct
1972    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1973    if cls.__init__ is not _no_init_or_replace_init:
1974        return
1975
1976    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1977    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1978    # searches for a proper new `__init__` in the MRO. The new `__init__`
1979    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1980    # instantiation of the protocol subclass will thus use the new
1981    # `__init__` and no longer call `_no_init_or_replace_init`.
1982    for base in cls.__mro__:
1983        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1984        if init is not _no_init_or_replace_init:
1985            cls.__init__ = init
1986            break
1987    else:
1988        # should not happen
1989        cls.__init__ = object.__init__
1990
1991    cls.__init__(self, *args, **kwargs)
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
36    def search(
37        self,
38        name: str,
39        type: str | None = None,
40        max_results: int = 10,
41    ) -> list[Feature]:
42        """
43        Search for geographic features by name.
44
45        Args:
46            name: Location name to search for (e.g., "Lake Geneva", "Bern").
47            type: Optional type hint for filtering/ranking results.
48                  Examples: "lake", "city", "mountain", "canton", "river".
49                  When provided, matching types are ranked higher.
50            max_results: Maximum number of results to return.
51
52        Returns:
53            List of matching GeoJSON Feature dicts, ranked by relevance.
54            Returns empty list if no matches found.
55        """
56        ...

Search for geographic features by name.

Arguments:
  • name: Location name to search for (e.g., "Lake Geneva", "Bern").
  • type: Optional type hint for filtering/ranking results. Examples: "lake", "city", "mountain", "canton", "river". When provided, matching types are ranked higher.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts, ranked by relevance. Returns empty list if no matches found.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
58    def get_by_id(self, feature_id: str) -> Feature | None:
59        """
60        Get a specific feature by its unique identifier.
61
62        Args:
63            feature_id: Unique identifier from the data source.
64
65        Returns:
66            The matching GeoJSON Feature dict, or None if not found.
67        """
68        ...

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier from the data source.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
70    def get_available_types(self) -> list[str]:
71        """
72        Get list of concrete geographic types this datasource can return.
73
74        Returns a list of concrete type values (e.g., "lake", "city", "restaurant")
75        that this datasource uses in the "type" property of returned features.
76        These types can be matched against the location type hierarchy for fuzzy matching.
77
78        The returned types should be a subset of or mapped to the standard location
79        type hierarchy defined in location_types.TYPE_HIERARCHY.
80
81        Returns:
82            List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]).
83            Empty list if this datasource does not provide type information.
84
85        Example:
86            >>> source = SwissNames3DSource("data/")
87            >>> types = source.get_available_types()
88            >>> print(types)
89            ['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
90        """
91        ...

Get list of concrete geographic types this datasource can return.

Returns a list of concrete type values (e.g., "lake", "city", "restaurant") that this datasource uses in the "type" property of returned features. These types can be matched against the location type hierarchy for fuzzy matching.

The returned types should be a subset of or mapped to the standard location type hierarchy defined in location_types.TYPE_HIERARCHY.

Returns:

List of concrete type strings (e.g., ["lake", "river", "city", "mountain"]). Empty list if this datasource does not provide type information.

Example:
>>> source = SwissNames3DSource("data/")
>>> types = source.get_available_types()
>>> print(types)
['lake', 'river', 'city', 'mountain', 'peak', 'hill', ...]
class SwissNames3DSource:
158class SwissNames3DSource:
159    """
160    Geographic data source backed by swisstopo's swissNAMES3D dataset.
161
162    Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase
163    and provides search by name with optional type filtering.
164
165    If data_path is a directory, automatically loads and concatenates all SwissNames3D
166    shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.
167
168    All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
169
170    Args:
171        data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
172        layer: Layer name within the data source (for multi-layer formats like GDB).
173
174    Example:
175        >>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
176        >>> results = source.search("Lac Léman", type="lake")
177        >>> print(results[0].geometry)  # GeoJSON in WGS84
178    """
179
180    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
181        self._data_path = Path(data_path)
182        self._layer = layer
183        self._gdf: gpd.GeoDataFrame | None = None
184        self._name_index: dict[str, list[int]] = {}
185        self._token_index: dict[str, set[str]] = {}
186        self._name_col: str = ""
187        self._type_col: str | None = None
188        self._id_col: str | None = None
189        self._extra_cols: list[str] = []
190
191    def preload(self) -> None:
192        """Eagerly load data. Call at startup to avoid first-query latency."""
193        self._ensure_loaded()
194
195    def _ensure_loaded(self) -> None:
196        """Load data lazily on first access."""
197        if self._gdf is not None:
198            return
199        self._load_data()
200
201    def _load_data(self) -> None:
202        """Load SwissNames3D data and build the name index."""
203        if self._data_path.is_dir():
204            self._load_from_directory()
205        else:
206            kwargs: dict[str, Any] = {}
207            if self._layer is not None:
208                kwargs["layer"] = self._layer
209            self._gdf = gpd.read_file(str(self._data_path), **kwargs)
210
211        assert self._gdf is not None
212
213        # Drop Z coordinates once — vectorized; the source has LN02 height and
214        # single_sided buffers reject 3D geometries
215        self._gdf.geometry = force_2d(self._gdf.geometry.values)
216
217        # Reproject to WGS84 once — avoids per-query coordinate transform
218        self._gdf = self._gdf.to_crs("EPSG:4326")
219
220        # Cache column names once — reused on every _row_to_feature() call
221        self._name_col = self._detect_name_column()
222        self._type_col = self._detect_type_column()
223        self._id_col = self._detect_id_column()
224        skip = {self._name_col, "geometry"}
225        if self._type_col:
226            skip.add(self._type_col)
227        if self._id_col:
228            skip.add(self._id_col)
229        self._extra_cols = [c for c in self._gdf.columns if c not in skip]
230
231        self._build_name_index()
232
233    def _load_from_directory(self) -> None:
234        """Load and concatenate all SwissNames3D shapefiles from a directory."""
235        # Look for the 3 standard SwissNames3D shapefiles
236        shapefile_names = ["swissNAMES3D_PKT", "swissNAMES3D_LIN", "swissNAMES3D_PLY"]
237        gdfs: list[gpd.GeoDataFrame] = []
238
239        for name in shapefile_names:
240            shp_path = self._data_path / f"{name}.shp"
241            if shp_path.exists():
242                gdf = gpd.read_file(str(shp_path))
243                gdfs.append(gdf)
244
245        if not gdfs:
246            raise ValueError(
247                f"No SwissNames3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}"
248            )
249
250        # Find common columns across all loaded GeoDataFrames
251        common_cols = set(gdfs[0].columns)
252        for gdf in gdfs[1:]:
253            common_cols &= set(gdf.columns)
254
255        # Keep only common columns and concatenate
256        gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs]
257        self._gdf = gpd.GeoDataFrame(pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry")
258
259    def _build_name_index(self) -> None:
260        """Build normalized name → row indices and token → candidate names indexes."""
261        assert self._gdf is not None
262        self._name_index = {}
263        self._token_index = {}
264
265        for idx, name in enumerate(self._gdf[self._name_col]):
266            if not isinstance(name, str) or not name.strip():
267                continue
268            normalized = _normalize_name(name)
269            if normalized not in self._name_index:
270                self._name_index[normalized] = []
271            self._name_index[normalized].append(idx)
272            for token in normalized.split():
273                if token not in self._token_index:
274                    self._token_index[token] = set()
275                self._token_index[token].add(normalized)
276
277    def _detect_name_column(self) -> str:
278        """Detect the name column in the data."""
279        assert self._gdf is not None
280        for col in self._gdf.columns:
281            if col.upper() in ("NAME", "BEZEICHNUNG"):
282                return col
283        raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}")
284
285    def _detect_type_column(self) -> str | None:
286        """Detect the feature type column in the data."""
287        assert self._gdf is not None
288        for col in self._gdf.columns:
289            if col.upper() == "OBJEKTART":
290                return col
291        return None
292
293    def _detect_id_column(self) -> str | None:
294        """Detect the unique ID column in the data."""
295        assert self._gdf is not None
296        for candidate in ("UUID", "FID", "OBJECTID", "ID"):
297            for col in self._gdf.columns:
298                if col.upper() == candidate:
299                    return col
300        return None
301
302    def _row_to_feature(self, idx: int) -> Feature:
303        """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry."""
304        assert self._gdf is not None
305        row = self._gdf.iloc[idx]
306
307        name = str(row[self._name_col])
308
309        raw_type = str(row[self._type_col]) if self._type_col and row.get(self._type_col) else "unknown"
310        normalized_type = _objektart_to_type(raw_type)
311
312        feature_id = str(row[self._id_col]) if self._id_col and row.get(self._id_col) else str(idx)
313
314        # Geometry is already in WGS84 (2D) — pre-converted at load time
315        geom = row.geometry
316        if geom is None or geom.is_empty:
317            geometry = {"type": "Point", "coordinates": [0, 0]}
318            bbox = None
319        else:
320            geometry = mapping(geom)
321            bounds = geom.bounds
322            bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
323
324        properties: dict[str, Any] = {
325            "name": name,
326            "type": normalized_type,
327            "confidence": 1.0,
328        }
329        for col in self._extra_cols:
330            val = row.get(col)
331            if val is not None and str(val) != "nan":
332                properties[col] = val
333
334        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
335
336    def search(
337        self,
338        name: str,
339        type: str | None = None,
340        max_results: int = 10,
341    ) -> list[Feature]:
342        """
343        Search for geographic features by name.
344
345        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
346        First tries exact matching, then falls back to fuzzy matching if no exact
347        matches found.
348
349        Args:
350            name: Location name to search for.
351            type: Optional type hint to filter results. If provided, only features
352                  of this type are returned.
353            max_results: Maximum number of results to return.
354
355        Returns:
356            List of matching GeoJSON Feature dicts. If type is provided, only
357            features of that type are returned. Empty list if no matches found.
358        """
359        self._ensure_loaded()
360
361        normalized = _normalize_name(name)
362        indices = self._name_index.get(normalized, [])
363
364        # If no exact match, try fuzzy matching
365        if not indices:
366            indices = self._fuzzy_search(normalized)
367
368        features = [self._row_to_feature(idx) for idx in indices]
369
370        # Filter by type if type hint provided.
371        # Expand via the type hierarchy so that category hints (e.g. "water") match
372        # all concrete types within that category ("lake", "river", "pond", ...).
373        if type is not None:
374            matching_types = get_matching_types(type)
375            if matching_types:
376                features = [f for f in features if f["properties"].get("type") in matching_types]
377            else:
378                # Unknown type hint, fall back to exact string match
379                features = [f for f in features if f["properties"].get("type") == type.lower()]
380
381        return features[:max_results]
382
383    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
384        return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold)
385
386    def get_by_id(self, feature_id: str) -> Feature | None:
387        """
388        Get a specific feature by its unique identifier.
389
390        Args:
391            feature_id: Unique identifier (UUID or row index).
392
393        Returns:
394            The matching GeoJSON Feature dict, or None if not found.
395        """
396        self._ensure_loaded()
397        assert self._gdf is not None
398
399        if self._id_col:
400            matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id]
401            if not matches.empty:
402                return self._row_to_feature(matches.index[0])
403
404        # Fallback: try as row index
405        try:
406            idx = int(feature_id)
407            if 0 <= idx < len(self._gdf):
408                return self._row_to_feature(idx)
409        except ValueError:
410            pass
411
412        return None
413
414    def get_available_types(self) -> list[str]:
415        """
416        Get list of concrete geographic types this datasource can return.
417
418        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
419        representing all possible types that SwissNames3D data can be classified as.
420
421        Returns:
422            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
423        """
424        return sorted(OBJEKTART_TYPE_MAP.keys())

Geographic data source backed by swisstopo's swissNAMES3D dataset.

Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.

If data_path is a directory, automatically loads and concatenates all SwissNames3D shapefiles (swissNAMES3D_PKT, swissNAMES3D_LIN, swissNAMES3D_PLY) found within.

All geometries are returned as GeoJSON in WGS84 (EPSG:4326).

Arguments:
  • data_path: Path to SwissNames3D data file or directory containing SwissNames3D shapefiles.
  • layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissNames3DSource("data/")  # Load all 3 geometry types
>>> results = source.search("Lac Léman", type="lake")
>>> print(results[0].geometry)  # GeoJSON in WGS84
SwissNames3DSource(data_path: str | pathlib._local.Path, layer: str | None = None)
180    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
181        self._data_path = Path(data_path)
182        self._layer = layer
183        self._gdf: gpd.GeoDataFrame | None = None
184        self._name_index: dict[str, list[int]] = {}
185        self._token_index: dict[str, set[str]] = {}
186        self._name_col: str = ""
187        self._type_col: str | None = None
188        self._id_col: str | None = None
189        self._extra_cols: list[str] = []
def preload(self) -> None:
191    def preload(self) -> None:
192        """Eagerly load data. Call at startup to avoid first-query latency."""
193        self._ensure_loaded()

Eagerly load data. Call at startup to avoid first-query latency.

def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
336    def search(
337        self,
338        name: str,
339        type: str | None = None,
340        max_results: int = 10,
341    ) -> list[Feature]:
342        """
343        Search for geographic features by name.
344
345        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
346        First tries exact matching, then falls back to fuzzy matching if no exact
347        matches found.
348
349        Args:
350            name: Location name to search for.
351            type: Optional type hint to filter results. If provided, only features
352                  of this type are returned.
353            max_results: Maximum number of results to return.
354
355        Returns:
356            List of matching GeoJSON Feature dicts. If type is provided, only
357            features of that type are returned. Empty list if no matches found.
358        """
359        self._ensure_loaded()
360
361        normalized = _normalize_name(name)
362        indices = self._name_index.get(normalized, [])
363
364        # If no exact match, try fuzzy matching
365        if not indices:
366            indices = self._fuzzy_search(normalized)
367
368        features = [self._row_to_feature(idx) for idx in indices]
369
370        # Filter by type if type hint provided.
371        # Expand via the type hierarchy so that category hints (e.g. "water") match
372        # all concrete types within that category ("lake", "river", "pond", ...).
373        if type is not None:
374            matching_types = get_matching_types(type)
375            if matching_types:
376                features = [f for f in features if f["properties"].get("type") in matching_types]
377            else:
378                # Unknown type hint, fall back to exact string match
379                features = [f for f in features if f["properties"].get("type") == type.lower()]
380
381        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint to filter results. If provided, only features of this type are returned.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
386    def get_by_id(self, feature_id: str) -> Feature | None:
387        """
388        Get a specific feature by its unique identifier.
389
390        Args:
391            feature_id: Unique identifier (UUID or row index).
392
393        Returns:
394            The matching GeoJSON Feature dict, or None if not found.
395        """
396        self._ensure_loaded()
397        assert self._gdf is not None
398
399        if self._id_col:
400            matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id]
401            if not matches.empty:
402                return self._row_to_feature(matches.index[0])
403
404        # Fallback: try as row index
405        try:
406            idx = int(feature_id)
407            if 0 <= idx < len(self._gdf):
408                return self._row_to_feature(idx)
409        except ValueError:
410            pass
411
412        return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier (UUID or row index).
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
414    def get_available_types(self) -> list[str]:
415        """
416        Get list of concrete geographic types this datasource can return.
417
418        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
419        representing all possible types that SwissNames3D data can be classified as.
420
421        Returns:
422            Sorted list of type strings (e.g., ["lake", "city", "river", ...])
423        """
424        return sorted(OBJEKTART_TYPE_MAP.keys())

Get list of concrete geographic types this datasource can return.

Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that SwissNames3D data can be classified as.

Returns:

Sorted list of type strings (e.g., ["lake", "city", "river", ...])

class SwissBoundaries3DSource:
 67class SwissBoundaries3DSource:
 68    """
 69    Geographic data source backed by swisstopo's swissBOUNDARIES3D dataset.
 70
 71    Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase
 72    and provides search by name with optional type filtering.
 73
 74    If data_path is a directory, automatically loads and concatenates all swissBoundaries3D
 75    shapefiles (swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET, swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET, swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET) found within.
 76
 77    IMPORTANT:
 78        The swissBOUNDARIES3D_1_5_TLM_LANDESGEBIET is NOT read because it contains enclaves of Germany which are not relevant for Swiss geographic names.
 79        The swissBOUNDARIES3D_1_5_TLM_HOHEITSGRENZE is NOT read because it contains lines already in swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET.
 80    All geometries are returned as GeoJSON in WGS84 (EPSG:4326).
 81
 82    Args:
 83        data_path: Path to swissBoundaries3D data file or directory containing swissBoundaries3D shapefiles.
 84        layer: Layer name within the data source (for multi-layer formats like GDB).
 85
 86    Example:
 87        >>> source = SwissBoundaries3DSource("data/")  # Load all 3 geometry types
 88        >>> results = source.search("Bern", type="canton")
 89        >>> print(results[0].geometry)  # GeoJSON in WGS84
 90    """
 91
 92    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
 93        self._data_path = Path(data_path)
 94        self._layer = layer
 95        self._gdf: gpd.GeoDataFrame | None = None
 96        self._name_index: dict[str, list[int]] = {}
 97        self._token_index: dict[str, set[str]] = {}
 98        self._name_col: str = ""
 99        self._type_col: str | None = None
100        self._id_col: str | None = None
101        self._extra_cols: list[str] = []
102
103    def preload(self) -> None:
104        """Eagerly load data. Call at startup to avoid first-query latency."""
105        self._ensure_loaded()
106
107    def _ensure_loaded(self) -> None:
108        """Load data lazily on first access."""
109        if self._gdf is not None:
110            return
111        self._load_data()
112
113    def _load_data(self) -> None:
114        """Load swissBoundaries3D data and build the name index."""
115        if self._data_path.is_dir():
116            self._load_from_directory()
117        else:
118            kwargs: dict[str, Any] = {}
119            if self._layer is not None:
120                kwargs["layer"] = self._layer
121            self._gdf = gpd.read_file(str(self._data_path), **kwargs)
122
123        assert self._gdf is not None
124
125        # Drop Z coordinates once — vectorized; the source has LN02 height and
126        # single_sided buffers reject 3D geometries
127        self._gdf.geometry = force_2d(self._gdf.geometry.values)
128
129        # Reproject to WGS84 once — avoids per-query coordinate transform
130        self._gdf = self._gdf.to_crs("EPSG:4326")
131
132        # Cache column names once — reused on every _row_to_feature() call
133        self._name_col = self._detect_name_column()
134        self._type_col = self._detect_type_column()
135        self._id_col = self._detect_id_column()
136        skip = {self._name_col, "geometry"}
137        if self._type_col:
138            skip.add(self._type_col)
139        if self._id_col:
140            skip.add(self._id_col)
141        self._extra_cols = [c for c in self._gdf.columns if c not in skip]
142
143        self._build_name_index()
144
145    def _load_from_directory(self) -> None:
146        """Load and concatenate all swissBoundaries3D shapefiles from a directory."""
147        # Look for the 3 standard swissBoundaries3D shapefiles
148        shapefile_names = [
149            "swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET",
150            "swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET",
151            "swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET",
152        ]
153        gdfs: list[gpd.GeoDataFrame] = []
154
155        for name in shapefile_names:
156            shp_path = self._data_path / f"{name}.shp"
157            if shp_path.exists():
158                gdf = gpd.read_file(str(shp_path))
159                gdfs.append(gdf)
160
161        if not gdfs:
162            raise ValueError(
163                f"No swissBoundaries3D shapefiles found in {self._data_path}. Expected: {', '.join(shapefile_names)}"
164            )
165
166        # Find common columns across all loaded GeoDataFrames
167        common_cols = set(gdfs[0].columns)
168        for gdf in gdfs[1:]:
169            common_cols &= set(gdf.columns)
170
171        # Keep only common columns and concatenate
172        gdfs_filtered = [gdf[sorted(common_cols)] for gdf in gdfs]
173        self._gdf = gpd.GeoDataFrame(pd.concat(gdfs_filtered, ignore_index=True), crs=gdfs[0].crs, geometry="geometry")
174
175    def _build_name_index(self) -> None:
176        """Build normalized name → row indices and token → candidate names indexes."""
177        assert self._gdf is not None
178        self._name_index = {}
179        self._token_index = {}
180
181        for idx, name in enumerate(self._gdf[self._name_col]):
182            if not isinstance(name, str) or not name.strip():
183                continue
184            normalized = _normalize_name(name)
185            if normalized not in self._name_index:
186                self._name_index[normalized] = []
187            self._name_index[normalized].append(idx)
188            for token in normalized.split():
189                if token not in self._token_index:
190                    self._token_index[token] = set()
191                self._token_index[token].add(normalized)
192
193    def _detect_name_column(self) -> str:
194        """Detect the name column in the data."""
195        assert self._gdf is not None
196        for col in self._gdf.columns:
197            if col.upper() in ("NAME", "BEZEICHNUNG"):
198                return col
199        raise ValueError(f"Cannot find name column in data. Available columns: {list(self._gdf.columns)}")
200
201    def _detect_type_column(self) -> str | None:
202        """Detect the feature type column in the data."""
203        assert self._gdf is not None
204        for col in self._gdf.columns:
205            if col.upper() == "OBJEKTART":
206                return col
207        return None
208
209    def _detect_id_column(self) -> str | None:
210        """Detect the unique ID column in the data."""
211        assert self._gdf is not None
212        for candidate in ("UUID", "FID", "OBJECTID", "ID"):
213            for col in self._gdf.columns:
214                if col.upper() == candidate:
215                    return col
216        return None
217
218    def _row_to_feature(self, idx: int) -> Feature:
219        """Convert a GeoDataFrame row to a GeoJSON Feature dict with WGS84 geometry."""
220        assert self._gdf is not None
221        row = self._gdf.iloc[idx]
222
223        name = str(row[self._name_col])
224
225        raw_type = str(row[self._type_col]) if self._type_col and row.get(self._type_col) else "unknown"
226        normalized_type = _objektart_to_type(raw_type)
227
228        feature_id = str(row[self._id_col]) if self._id_col and row.get(self._id_col) else str(idx)
229
230        # Geometry is already in WGS84 (2D) — pre-converted at load time
231        geom = row.geometry
232        if geom is None or geom.is_empty:
233            geometry = {"type": "Point", "coordinates": [0, 0]}
234            bbox = None
235        else:
236            geometry = mapping(geom)
237            bounds = geom.bounds
238            bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
239
240        properties: dict[str, Any] = {
241            "name": name,
242            "type": normalized_type,
243            "confidence": 1.0,
244        }
245        for col in self._extra_cols:
246            val = row.get(col)
247            if val is not None and str(val) != "nan":
248                properties[col] = val
249
250        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
251
252    def search(
253        self,
254        name: str,
255        type: str | None = None,
256        max_results: int = 10,
257    ) -> list[Feature]:
258        """
259        Search for geographic features by name.
260
261        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
262        First tries exact matching, then falls back to fuzzy matching if no exact
263        matches found.
264
265        Args:
266            name: Location name to search for.
267            type: Optional type hint to filter results. If provided, only features
268                  of this type are returned.
269            max_results: Maximum number of results to return.
270
271        Returns:
272            List of matching GeoJSON Feature dicts. If type is provided, only
273            features of that type are returned. Empty list if no matches found.
274        """
275        self._ensure_loaded()
276
277        normalized = _normalize_name(name)
278        indices = self._name_index.get(normalized, [])
279
280        # If no exact match, try fuzzy matching
281        if not indices:
282            indices = self._fuzzy_search(normalized)
283
284        features = [self._row_to_feature(idx) for idx in indices]
285
286        # Filter by type if type hint provided.
287        # Expand via the type hierarchy so that category hints (e.g. "water") match
288        # all concrete types within that category ("lake", "river", "pond", ...).
289        if type is not None:
290            matching_types = get_matching_types(type)
291            if matching_types:
292                features = [f for f in features if f["properties"].get("type") in matching_types]
293            else:
294                # Unknown type hint, fall back to exact string match
295                features = [f for f in features if f["properties"].get("type") == type.lower()]
296
297        return features[:max_results]
298
299    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
300        return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold)
301
302    def get_by_id(self, feature_id: str) -> Feature | None:
303        """
304        Get a specific feature by its unique identifier.
305
306        Args:
307            feature_id: Unique identifier (UUID or row index).
308
309        Returns:
310            The matching GeoJSON Feature dict, or None if not found.
311        """
312        self._ensure_loaded()
313        assert self._gdf is not None
314
315        if self._id_col:
316            matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id]
317            if not matches.empty:
318                return self._row_to_feature(matches.index[0])
319
320        # Fallback: try as row index
321        try:
322            idx = int(feature_id)
323            if 0 <= idx < len(self._gdf):
324                return self._row_to_feature(idx)
325        except ValueError:
326            pass
327
328        return None
329
330    def get_available_types(self) -> list[str]:
331        """
332        Get list of concrete geographic types this datasource can return.
333
334        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
335        representing all possible types that swissBoundaries3D data can be classified as.
336
337        Returns:
338            Sorted list of type strings (e.g., ["canton", "municipality", "district", ...])
339        """
340        return sorted(OBJEKTART_TYPE_MAP.keys())

Geographic data source backed by swisstopo's swissBOUNDARIES3D dataset.

Loads geographic names from a Shapefile, GeoPackage, or ESRI File Geodatabase and provides search by name with optional type filtering.

If data_path is a directory, automatically loads and concatenates all swissBoundaries3D shapefiles (swissBOUNDARIES3D_1_5_TLM_BEZIRKSGEBIET, swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET, swissBOUNDARIES3D_1_5_TLM_KANTONSGEBIET) found within.

IMPORTANT:

The swissBOUNDARIES3D_1_5_TLM_LANDESGEBIET is NOT read because it contains enclaves of Germany which are not relevant for Swiss geographic names. The swissBOUNDARIES3D_1_5_TLM_HOHEITSGRENZE is NOT read because it contains lines already in swissBOUNDARIES3D_1_5_TLM_HOHEITSGEBIET.

All geometries are returned as GeoJSON in WGS84 (EPSG:4326).

Arguments:
  • data_path: Path to swissBoundaries3D data file or directory containing swissBoundaries3D shapefiles.
  • layer: Layer name within the data source (for multi-layer formats like GDB).
Example:
>>> source = SwissBoundaries3DSource("data/")  # Load all 3 geometry types
>>> results = source.search("Bern", type="canton")
>>> print(results[0].geometry)  # GeoJSON in WGS84
SwissBoundaries3DSource(data_path: str | pathlib._local.Path, layer: str | None = None)
 92    def __init__(self, data_path: str | Path, layer: str | None = None) -> None:
 93        self._data_path = Path(data_path)
 94        self._layer = layer
 95        self._gdf: gpd.GeoDataFrame | None = None
 96        self._name_index: dict[str, list[int]] = {}
 97        self._token_index: dict[str, set[str]] = {}
 98        self._name_col: str = ""
 99        self._type_col: str | None = None
100        self._id_col: str | None = None
101        self._extra_cols: list[str] = []
def preload(self) -> None:
103    def preload(self) -> None:
104        """Eagerly load data. Call at startup to avoid first-query latency."""
105        self._ensure_loaded()

Eagerly load data. Call at startup to avoid first-query latency.

def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
252    def search(
253        self,
254        name: str,
255        type: str | None = None,
256        max_results: int = 10,
257    ) -> list[Feature]:
258        """
259        Search for geographic features by name.
260
261        Uses case-insensitive, accent-normalized matching with fuzzy fallback.
262        First tries exact matching, then falls back to fuzzy matching if no exact
263        matches found.
264
265        Args:
266            name: Location name to search for.
267            type: Optional type hint to filter results. If provided, only features
268                  of this type are returned.
269            max_results: Maximum number of results to return.
270
271        Returns:
272            List of matching GeoJSON Feature dicts. If type is provided, only
273            features of that type are returned. Empty list if no matches found.
274        """
275        self._ensure_loaded()
276
277        normalized = _normalize_name(name)
278        indices = self._name_index.get(normalized, [])
279
280        # If no exact match, try fuzzy matching
281        if not indices:
282            indices = self._fuzzy_search(normalized)
283
284        features = [self._row_to_feature(idx) for idx in indices]
285
286        # Filter by type if type hint provided.
287        # Expand via the type hierarchy so that category hints (e.g. "water") match
288        # all concrete types within that category ("lake", "river", "pond", ...).
289        if type is not None:
290            matching_types = get_matching_types(type)
291            if matching_types:
292                features = [f for f in features if f["properties"].get("type") in matching_types]
293            else:
294                # Unknown type hint, fall back to exact string match
295                features = [f for f in features if f["properties"].get("type") == type.lower()]
296
297        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized matching with fuzzy fallback. First tries exact matching, then falls back to fuzzy matching if no exact matches found.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint to filter results. If provided, only features of this type are returned.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts. If type is provided, only features of that type are returned. Empty list if no matches found.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
302    def get_by_id(self, feature_id: str) -> Feature | None:
303        """
304        Get a specific feature by its unique identifier.
305
306        Args:
307            feature_id: Unique identifier (UUID or row index).
308
309        Returns:
310            The matching GeoJSON Feature dict, or None if not found.
311        """
312        self._ensure_loaded()
313        assert self._gdf is not None
314
315        if self._id_col:
316            matches = self._gdf[self._gdf[self._id_col].astype(str) == feature_id]
317            if not matches.empty:
318                return self._row_to_feature(matches.index[0])
319
320        # Fallback: try as row index
321        try:
322            idx = int(feature_id)
323            if 0 <= idx < len(self._gdf):
324                return self._row_to_feature(idx)
325        except ValueError:
326            pass
327
328        return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Unique identifier (UUID or row index).
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
330    def get_available_types(self) -> list[str]:
331        """
332        Get list of concrete geographic types this datasource can return.
333
334        Returns all normalized types from the OBJEKTART_TYPE_MAP keys,
335        representing all possible types that swissBoundaries3D data can be classified as.
336
337        Returns:
338            Sorted list of type strings (e.g., ["canton", "municipality", "district", ...])
339        """
340        return sorted(OBJEKTART_TYPE_MAP.keys())

Get list of concrete geographic types this datasource can return.

Returns all normalized types from the OBJEKTART_TYPE_MAP keys, representing all possible types that swissBoundaries3D data can be classified as.

Returns:

Sorted list of type strings (e.g., ["canton", "municipality", "district", ...])

class IGNBDCartoSource:
264class IGNBDCartoSource:
265    """
266    Geographic data source backed by IGN's BD-CARTO 5.0 dataset.
267
268    Loads French geographic data from GeoPackage files extracted to a directory.
269    Supports administrative boundaries (communes, departments, regions, …),
270    hydrography (rivers, lakes, …), named places (quarters, hamlets, …),
271    orographic features (peaks, passes, valleys, …) and protected areas.
272
273    Data must first be downloaded with ``make download-data-ign``, which places
274    the GeoPackage files in ``data/bdcarto/``.
275
276    All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84
277    (EPSG:4326) and returned as standard GeoJSON Feature dicts.
278
279    Args:
280        data_path: Directory containing the ``.gpkg`` files (e.g. ``"data/bdcarto"``).
281
282    Example:
283        >>> source = IGNBDCartoSource("data/bdcarto")
284        >>> results = source.search("Ardèche", type="department")
285        >>> results = source.search("Lyon", type="city")
286        >>> results = source.search("Rhône", type="river")
287    """
288
289    def __init__(self, data_path: str | Path) -> None:
290        self._data_path = Path(data_path)
291        self._gdf: gpd.GeoDataFrame | None = None
292        self._name_index: dict[str, list[int]] = {}
293        self._token_index: dict[str, set[str]] = {}
294
295    def preload(self) -> None:
296        """Eagerly load data. Call at startup to avoid first-query latency."""
297        self._ensure_loaded()
298
299    def _ensure_loaded(self) -> None:
300        if self._gdf is not None:
301            return
302        self._load_data()
303
304    def _load_data(self) -> None:
305        if self._data_path.is_dir():
306            self._gdf = self._load_from_directory()
307        else:
308            self._gdf = self._load_from_file(self._data_path)
309
310        self._build_name_index()
311
312    def _load_from_file(self, path: Path) -> gpd.GeoDataFrame:
313        """Load from a GeoJSON fixture file. Features must include a ``_layer`` column."""
314        full_gdf = gpd.read_file(str(path))
315        if "_layer" not in full_gdf.columns:
316            raise ValueError(f"GeoJSON fixture {path} must include a '_layer' column")
317
318        gdfs: list[gpd.GeoDataFrame] = []
319        for layer_name, cfg in _LAYER_CONFIGS.items():
320            rows = full_gdf[full_gdf["_layer"] == layer_name].copy()
321            if rows.empty:
322                continue
323            name_col: str = cfg["name_col"]
324            if name_col not in rows.columns:
325                continue
326            rows[_NAME_COL] = rows[name_col].astype(str)
327            _assign_type_col(rows, cfg)
328            rows = rows.to_crs("EPSG:4326")
329            gdfs.append(rows)
330
331        if not gdfs:
332            raise ValueError(f"No matching BD-CARTO features found in {path}")
333
334        combined = pd.concat(gdfs, ignore_index=True)
335        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
336
337    def _load_from_directory(self) -> gpd.GeoDataFrame:
338        """Load and concatenate all configured layers from the data directory."""
339        gdfs: list[gpd.GeoDataFrame] = []
340
341        for layer_name, cfg in _LAYER_CONFIGS.items():
342            gpkg_path = self._data_path / f"{layer_name}.gpkg"
343            if not gpkg_path.exists():
344                continue
345
346            gdf = gpd.read_file(str(gpkg_path))
347
348            name_col: str = cfg["name_col"]
349            if name_col not in gdf.columns:
350                continue
351
352            gdf[_NAME_COL] = gdf[name_col].astype(str)
353            _assign_type_col(gdf, cfg)
354            gdf["_layer"] = layer_name
355            gdf = gdf.to_crs("EPSG:4326")
356
357            gdfs.append(gdf)
358
359        if not gdfs:
360            raise ValueError(
361                f"No BD-CARTO GeoPackage files found in {self._data_path}. "
362                f"Run 'make download-data-ign' to download the dataset."
363            )
364
365        combined = pd.concat(gdfs, ignore_index=True)
366        return gpd.GeoDataFrame(combined, crs="EPSG:4326", geometry="geometry")
367
368    def _build_name_index(self) -> None:
369        """Build normalized name → row indices and token → candidate names indexes."""
370        assert self._gdf is not None
371        self._name_index = {}
372        self._token_index = {}
373        for idx, name in enumerate(self._gdf[_NAME_COL]):
374            if not isinstance(name, str) or not name.strip() or name == "nan":
375                continue
376            for key in _index_keys(name):
377                if key not in self._name_index:
378                    self._name_index[key] = []
379                self._name_index[key].append(idx)
380                for token in key.split():
381                    if token not in self._token_index:
382                        self._token_index[token] = set()
383                    self._token_index[token].add(key)
384
385    def _row_to_feature(self, idx: int) -> Feature:
386        """Convert a GeoDataFrame row to a GeoJSON Feature dict (WGS84)."""
387        assert self._gdf is not None
388        row = self._gdf.iloc[idx]
389
390        name = str(row[_NAME_COL])
391        normalized_type = str(row[_TYPE_COL]) if pd.notna(row.get(_TYPE_COL)) else "unknown"
392        feature_id = str(row["cleabs"]) if pd.notna(row.get("cleabs")) else str(idx)
393
394        geom = row.geometry
395        if geom is None or geom.is_empty:
396            geometry: dict[str, Any] = {"type": "Point", "coordinates": [0, 0]}
397            bbox = None
398        else:
399            geometry = mapping(geom)
400            bounds = geom.bounds
401            bbox: tuple[float, float, float, float] | None = (bounds[0], bounds[1], bounds[2], bounds[3])
402
403        skip_cols = {_NAME_COL, _TYPE_COL, "geometry", "cleabs"}
404        properties: dict[str, Any] = {
405            "name": name,
406            "type": normalized_type,
407            "confidence": 1.0,
408        }
409        for col in self._gdf.columns:
410            if col not in skip_cols:
411                val = _to_json_value(row.get(col))
412                if val is not None:
413                    properties[col] = val
414
415        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
416
417    def search(
418        self,
419        name: str,
420        type: str | None = None,
421        max_results: int = 10,
422    ) -> list[Feature]:
423        """
424        Search for geographic features by name.
425
426        Uses case-insensitive, accent-normalized exact matching with fuzzy
427        fallback when no exact match is found.
428
429        Args:
430            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
431                  ``"Rhône"``).
432            type: Optional type hint for filtering. Supports both concrete types
433                  (``"department"``, ``"city"``, ``"river"``) and category hints
434                  (``"administrative"``, ``"water"``).
435            max_results: Maximum number of results.
436
437        Returns:
438            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
439        """
440        self._ensure_loaded()
441
442        normalized = _normalize_name(name)
443        indices = self._name_index.get(normalized, [])
444
445        if not indices:
446            indices = self._fuzzy_search(normalized)
447
448        features = [self._row_to_feature(idx) for idx in indices]
449
450        if type is not None:
451            matching_types = get_matching_types(type)
452            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
453            if matching_types:
454                features = [f for f in features if f["properties"].get("type") in matching_types]
455            else:
456                features = [f for f in features if f["properties"].get("type") == type.lower()]
457
458        features = merge_segments(features)
459
460        return features[:max_results]
461
462    def _fuzzy_search(self, normalized: str, threshold: float = 75.0) -> list[int]:
463        return fuzzy_search_index(normalized, self._token_index, self._name_index, threshold)
464
465    def get_by_id(self, feature_id: str) -> Feature | None:
466        """
467        Get a feature by its ``cleabs`` identifier or row index.
468
469        Args:
470            feature_id: ``cleabs`` string or integer row index.
471
472        Returns:
473            Matching GeoJSON Feature dict, or ``None``.
474        """
475        self._ensure_loaded()
476        assert self._gdf is not None
477
478        if "cleabs" in self._gdf.columns:
479            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
480            if not matches.empty:
481                return self._row_to_feature(matches.index[0])
482
483        try:
484            idx = int(feature_id)
485            if 0 <= idx < len(self._gdf):
486                return self._row_to_feature(idx)
487        except ValueError:
488            pass
489
490        return None
491
492    def get_available_types(self) -> list[str]:
493        """
494        Return the union of all normalized types this source can return.
495
496        Returns:
497            Sorted list of type strings.
498        """
499        types: set[str] = set()
500        for cfg in _LAYER_CONFIGS.values():
501            if cfg.get("commune_flags"):
502                types.update({"city", "municipality"})
503            elif cfg.get("fixed_type"):
504                types.add(cfg["fixed_type"])
505            elif cfg.get("type_map"):
506                types.update(cfg["type_map"].values())
507        return sorted(types)

Geographic data source backed by IGN's BD-CARTO 5.0 dataset.

Loads French geographic data from GeoPackage files extracted to a directory. Supports administrative boundaries (communes, departments, regions, …), hydrography (rivers, lakes, …), named places (quarters, hamlets, …), orographic features (peaks, passes, valleys, …) and protected areas.

Data must first be downloaded with make download-data-ign, which places the GeoPackage files in data/bdcarto/.

All geometries are reprojected from EPSG:2154 (Lambert-93) to WGS84 (EPSG:4326) and returned as standard GeoJSON Feature dicts.

Arguments:
  • data_path: Directory containing the .gpkg files (e.g. "data/bdcarto").
Example:
>>> source = IGNBDCartoSource("data/bdcarto")
>>> results = source.search("Ardèche", type="department")
>>> results = source.search("Lyon", type="city")
>>> results = source.search("Rhône", type="river")
IGNBDCartoSource(data_path: str | pathlib._local.Path)
289    def __init__(self, data_path: str | Path) -> None:
290        self._data_path = Path(data_path)
291        self._gdf: gpd.GeoDataFrame | None = None
292        self._name_index: dict[str, list[int]] = {}
293        self._token_index: dict[str, set[str]] = {}
def preload(self) -> None:
295    def preload(self) -> None:
296        """Eagerly load data. Call at startup to avoid first-query latency."""
297        self._ensure_loaded()

Eagerly load data. Call at startup to avoid first-query latency.

def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
417    def search(
418        self,
419        name: str,
420        type: str | None = None,
421        max_results: int = 10,
422    ) -> list[Feature]:
423        """
424        Search for geographic features by name.
425
426        Uses case-insensitive, accent-normalized exact matching with fuzzy
427        fallback when no exact match is found.
428
429        Args:
430            name: Location name to search for (e.g. ``"Ardèche"``, ``"Lyon"``,
431                  ``"Rhône"``).
432            type: Optional type hint for filtering. Supports both concrete types
433                  (``"department"``, ``"city"``, ``"river"``) and category hints
434                  (``"administrative"``, ``"water"``).
435            max_results: Maximum number of results.
436
437        Returns:
438            List of GeoJSON Feature dicts in WGS84. Empty list if no match.
439        """
440        self._ensure_loaded()
441
442        normalized = _normalize_name(name)
443        indices = self._name_index.get(normalized, [])
444
445        if not indices:
446            indices = self._fuzzy_search(normalized)
447
448        features = [self._row_to_feature(idx) for idx in indices]
449
450        if type is not None:
451            matching_types = get_matching_types(type)
452            logger.debug("Filtering results by type hint %r → matching types: %s", type, matching_types)
453            if matching_types:
454                features = [f for f in features if f["properties"].get("type") in matching_types]
455            else:
456                features = [f for f in features if f["properties"].get("type") == type.lower()]
457
458        features = merge_segments(features)
459
460        return features[:max_results]

Search for geographic features by name.

Uses case-insensitive, accent-normalized exact matching with fuzzy fallback when no exact match is found.

Arguments:
  • name: Location name to search for (e.g. "Ardèche", "Lyon", "Rhône").
  • type: Optional type hint for filtering. Supports both concrete types ("department", "city", "river") and category hints ("administrative", "water").
  • max_results: Maximum number of results.
Returns:

List of GeoJSON Feature dicts in WGS84. Empty list if no match.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
465    def get_by_id(self, feature_id: str) -> Feature | None:
466        """
467        Get a feature by its ``cleabs`` identifier or row index.
468
469        Args:
470            feature_id: ``cleabs`` string or integer row index.
471
472        Returns:
473            Matching GeoJSON Feature dict, or ``None``.
474        """
475        self._ensure_loaded()
476        assert self._gdf is not None
477
478        if "cleabs" in self._gdf.columns:
479            matches = self._gdf[self._gdf["cleabs"].astype(str) == feature_id]
480            if not matches.empty:
481                return self._row_to_feature(matches.index[0])
482
483        try:
484            idx = int(feature_id)
485            if 0 <= idx < len(self._gdf):
486                return self._row_to_feature(idx)
487        except ValueError:
488            pass
489
490        return None

Get a feature by its cleabs identifier or row index.

Arguments:
  • feature_id: cleabs string or integer row index.
Returns:

Matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
492    def get_available_types(self) -> list[str]:
493        """
494        Return the union of all normalized types this source can return.
495
496        Returns:
497            Sorted list of type strings.
498        """
499        types: set[str] = set()
500        for cfg in _LAYER_CONFIGS.values():
501            if cfg.get("commune_flags"):
502                types.update({"city", "municipality"})
503            elif cfg.get("fixed_type"):
504                types.add(cfg["fixed_type"])
505            elif cfg.get("type_map"):
506                types.update(cfg["type_map"].values())
507        return sorted(types)

Return the union of all normalized types this source can return.

Returns:

Sorted list of type strings.

class CompositeDataSource:
 20class CompositeDataSource:
 21    """
 22    Fan-out datasource that delegates to an ordered list of GeoDataSource instances.
 23
 24    ``search`` queries every registered source and merges results in order.
 25
 26    ``get_by_id`` tries each source in order and returns the first hit.
 27
 28    ``get_available_types`` returns the union of all sources' types.
 29
 30    Args:
 31        sources: One or more GeoDataSource instances.
 32
 33    Example:
 34        >>> swiss = SwissNames3DSource("data/")
 35        >>> ign   = IGNBDTopoSource("data/")
 36        >>> combo = CompositeDataSource(swiss, ign)
 37        >>> results = combo.search("Geneva", type="city")
 38    """
 39
 40    def __init__(self, *sources: GeoDataSource) -> None:
 41        if not sources:
 42            raise ValueError("At least one datasource is required.")
 43        self._sources: list[GeoDataSource] = list(sources)
 44
 45    def preload(self) -> None:
 46        """Eagerly load all sources that support preloading."""
 47        for source in self._sources:
 48            if isinstance(source, _Preloadable):
 49                source.preload()
 50
 51    # Public API (mirrors GeoDataSource protocol)
 52
 53    def search(
 54        self,
 55        name: str,
 56        type: str | None = None,
 57        max_results: int = 10,
 58    ) -> list[Feature]:
 59        """
 60        Search all registered sources and return merged.
 61
 62        Args:
 63            name: Location name to search for.
 64            type: Optional type hint passed through to every source.
 65            max_results: Maximum results per source.
 66
 67        Returns:
 68            List of GeoJSON Feature dicts, merged from all sources.
 69        """
 70        merged: list[Feature] = []
 71
 72        for source in self._sources:
 73            merged.extend(source.search(name, type=type, max_results=max_results))
 74
 75        return merged
 76
 77    def get_by_id(self, feature_id: str) -> Feature | None:
 78        """
 79        Get a feature by ID, trying each source in order.
 80
 81        Args:
 82            feature_id: Unique identifier to look up.
 83
 84        Returns:
 85            The first matching GeoJSON Feature dict, or None.
 86        """
 87        for source in self._sources:
 88            result = source.get_by_id(feature_id)
 89            if result is not None:
 90                return result
 91        return None
 92
 93    def get_available_types(self) -> list[str]:
 94        """
 95        Return the union of all sources' available types, sorted.
 96
 97        Returns:
 98            Sorted list of unique type strings.
 99        """
100        types: set[str] = set()
101        for source in self._sources:
102            types.update(source.get_available_types())
103        return sorted(types)

Fan-out datasource that delegates to an ordered list of GeoDataSource instances.

search queries every registered source and merges results in order.

get_by_id tries each source in order and returns the first hit.

get_available_types returns the union of all sources' types.

Arguments:
  • sources: One or more GeoDataSource instances.
Example:
>>> swiss = SwissNames3DSource("data/")
>>> ign   = IGNBDTopoSource("data/")
>>> combo = CompositeDataSource(swiss, ign)
>>> results = combo.search("Geneva", type="city")
CompositeDataSource(*sources: GeoDataSource)
40    def __init__(self, *sources: GeoDataSource) -> None:
41        if not sources:
42            raise ValueError("At least one datasource is required.")
43        self._sources: list[GeoDataSource] = list(sources)
def preload(self) -> None:
45    def preload(self) -> None:
46        """Eagerly load all sources that support preloading."""
47        for source in self._sources:
48            if isinstance(source, _Preloadable):
49                source.preload()

Eagerly load all sources that support preloading.

def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
53    def search(
54        self,
55        name: str,
56        type: str | None = None,
57        max_results: int = 10,
58    ) -> list[Feature]:
59        """
60        Search all registered sources and return merged.
61
62        Args:
63            name: Location name to search for.
64            type: Optional type hint passed through to every source.
65            max_results: Maximum results per source.
66
67        Returns:
68            List of GeoJSON Feature dicts, merged from all sources.
69        """
70        merged: list[Feature] = []
71
72        for source in self._sources:
73            merged.extend(source.search(name, type=type, max_results=max_results))
74
75        return merged

Search all registered sources and return merged.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint passed through to every source.
  • max_results: Maximum results per source.
Returns:

List of GeoJSON Feature dicts, merged from all sources.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
77    def get_by_id(self, feature_id: str) -> Feature | None:
78        """
79        Get a feature by ID, trying each source in order.
80
81        Args:
82            feature_id: Unique identifier to look up.
83
84        Returns:
85            The first matching GeoJSON Feature dict, or None.
86        """
87        for source in self._sources:
88            result = source.get_by_id(feature_id)
89            if result is not None:
90                return result
91        return None

Get a feature by ID, trying each source in order.

Arguments:
  • feature_id: Unique identifier to look up.
Returns:

The first matching GeoJSON Feature dict, or None.

def get_available_types(self) -> list[str]:
 93    def get_available_types(self) -> list[str]:
 94        """
 95        Return the union of all sources' available types, sorted.
 96
 97        Returns:
 98            Sorted list of unique type strings.
 99        """
100        types: set[str] = set()
101        for source in self._sources:
102            types.update(source.get_available_types())
103        return sorted(types)

Return the union of all sources' available types, sorted.

Returns:

Sorted list of unique type strings.

class PostGISDataSource:
 67class PostGISDataSource:
 68    """
 69    Geographic data source backed by a PostGIS table.
 70
 71    The table must expose at minimum a name column, a geometry column, and
 72    optionally a type column. The expected schema is:
 73
 74    .. code-block:: sql
 75
 76        CREATE TABLE <table> (
 77            id      TEXT PRIMARY KEY,
 78            name    TEXT NOT NULL,
 79            type    TEXT,
 80            geom    GEOMETRY(Geometry, 4326)
 81        );
 82
 83    The ``type`` column may store either:
 84
 85    - **Raw dataset values** (e.g. ``"See"``, ``"Berg"`` for SwissNames3D),
 86      pass ``type_map`` so the datasource can translate between raw values and
 87      the normalized etter type names.
 88    - **Already-normalized values** (e.g. ``"lake"``, ``"mountain"``),
 89      leave ``type_map=None`` (default).
 90
 91    Geometries must be in WGS84 (EPSG:4326) or supply ``crs`` for on-the-fly
 92    reprojection.
 93
 94    Args:
 95        connection: A SQLAlchemy :class:`~sqlalchemy.engine.Engine` **or** a
 96            connection URL string (e.g. ``"postgresql+psycopg2://user:pass@host/db"``).
 97            When a string is provided the engine is created internally.
 98        table: Fully-qualified table name, e.g. ``"public.swissnames3d"``.
 99        name_column: Column used for name-based search (default ``"name"``).
100        type_column: Column used for type filtering.  Pass ``None`` to disable
101            type filtering (default ``"type"``).
102        geometry_column: PostGIS geometry column (default ``"geom"``).
103        id_column: Primary-key column (default ``"id"``).
104        crs: CRS of the stored geometries as an EPSG string.  Defaults to
105            ``"EPSG:4326"`` (no reprojection).
106        type_map: Optional mapping from **normalized etter type names** to
107            **lists of raw type column values** present in the database.
108            This is the same format as ``SwissNames3DSource.OBJEKTART_TYPE_MAP``
109            and ``IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP``, so they can be
110            passed directly::
111
112                from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
113                source = PostGISDataSource(
114                    engine,
115                    table="public.swissnames3d",
116                    type_map=OBJEKTART_TYPE_MAP,
117                )
118
119            When ``type_map`` is provided the datasource:
120
121            - Translates raw DB values → normalized types in returned features.
122            - Translates user type hints → raw DB values in SQL ``WHERE`` clauses.
123            - Returns normalized type names from ``get_available_types()``.
124
125            When ``None`` (default) the stored values are used as-is.
126        fuzzy_threshold: Minimum ``pg_trgm`` similarity score (0-1) used for
127            fuzzy fallback search when no exact ``ILIKE`` match is found.
128
129    Example: unmodified SwissNames3D table::
130
131        from sqlalchemy import create_engine
132        from etter.datasources import PostGISDataSource
133        from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
134
135        engine = create_engine(...)
136        source = PostGISDataSource(
137            engine,
138            table="public.swissnames3d",
139            type_map=OBJEKTART_TYPE_MAP,
140        )
141        results = source.search("Lac Léman", type="lake")
142    """
143
144    def __init__(
145        self,
146        connection: str | Engine,
147        table: str,
148        name_column: str = "name",
149        type_column: str | None = "type",
150        geometry_column: str = "geom",
151        id_column: str = "id",
152        crs: str = "EPSG:4326",
153        type_map: TypeMap | None = None,
154        fuzzy_threshold: float = 0.65,
155    ) -> None:
156        sa = _require_sqlalchemy()
157
158        if isinstance(connection, str):
159            self._engine = sa.create_engine(connection)
160        else:
161            self._engine = connection
162
163        try:
164            with self._engine.connect() as conn:
165                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
166        except Exception as exc:
167            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
168
169        self._table = table
170        self._name_col = name_column
171        self._type_col = type_column
172        self._geom_col = geometry_column
173        self._id_col = id_column
174        self._crs = crs
175        self._fuzzy_threshold = fuzzy_threshold
176
177        # Build bidirectional lookup structures from the user-supplied map.
178        if type_map:
179            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
180            self._raw_to_normalized: dict[str, str] = {
181                raw: normalized for normalized, raws in type_map.items() for raw in raws
182            }
183        else:
184            self._normalized_to_raw = {}
185            self._raw_to_normalized = {}
186
187        self._trgm_available: bool | None = None
188        self._unaccent_available: bool | None = None
189
190    def _get_connection(self) -> Connection:
191        """Return a SQLAlchemy connection from the engine."""
192        return self._engine.connect()
193
194    def _check_trgm(self, conn: Connection) -> bool:
195        """Return True if pg_trgm extension is available in the database."""
196        if self._trgm_available is not None:
197            return self._trgm_available
198        sa = _require_sqlalchemy()
199        try:
200            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
201            self._trgm_available = result.fetchone() is not None
202        except Exception:
203            logger.exception("Failed to check pg_trgm availability")
204            self._trgm_available = False
205        return self._trgm_available
206
207    def _check_unaccent(self, conn: Connection) -> bool:
208        """Return True if the unaccent extension is available in the database."""
209        if self._unaccent_available is not None:
210            return self._unaccent_available
211        sa = _require_sqlalchemy()
212        try:
213            result = conn.execute(sa.text("SELECT 1 FROM pg_extension WHERE extname = 'unaccent'"))
214            self._unaccent_available = result.fetchone() is not None
215        except Exception:
216            logger.exception("Failed to check unaccent availability")
217            self._unaccent_available = False
218        return self._unaccent_available
219
220    def _normalize_type(self, raw_type: str | None) -> str | None:
221        """Translate a raw DB type value to its normalized etter name.
222
223        If no type_map was supplied the value is returned unchanged.
224        """
225        if raw_type is None:
226            return None
227        return self._raw_to_normalized.get(raw_type, raw_type)
228
229    def _row_to_feature(self, row: Row) -> Feature:
230        """Convert a SQLAlchemy Row to a GeoJSON Feature dict."""
231        feature_id = str(row.id)
232        name = str(row.name)
233        raw_type = getattr(row, "type", None)
234        normalized_type = self._normalize_type(raw_type)
235
236        geojson_str = row.geojson
237        if geojson_str:
238            geometry = json.loads(geojson_str)
239        else:
240            geometry = {"type": "Point", "coordinates": [0, 0]}
241
242        bbox = _bbox_from_geojson(geometry)
243
244        properties: dict[str, Any] = {
245            "name": name,
246            "type": normalized_type,
247            "confidence": 1.0,
248        }
249
250        return Feature(geometry=geometry, properties=properties, id=feature_id, bbox=bbox)
251
252    def _build_select_columns(self) -> str:
253        """Build the SELECT column list as a SQL fragment."""
254        type_expr = f", {self._type_col} AS type" if self._type_col else ", NULL AS type"
255        if self._crs.upper() != "EPSG:4326":
256            geom_expr = f", ST_AsGeoJSON(ST_Transform({self._geom_col}, 4326)) AS geojson"
257        else:
258            geom_expr = f", ST_AsGeoJSON({self._geom_col}) AS geojson"
259        return f"{self._id_col} AS id, {self._name_col} AS name{type_expr}{geom_expr}"
260
261    def search(
262        self,
263        name: str,
264        type: str | None = None,
265        max_results: int = 10,
266    ) -> list[Feature]:
267        """
268        Search for geographic features by name.
269
270        Uses a three-step cascade, stopping as soon as any step returns results:
271
272        1. **Normalized exact match**
273        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
274        3. **ILIKE substring**
275
276        ``merge_segments`` is applied after all rows are fetched so that
277        multi-segment linestrings (rivers, roads) are merged before the
278        ``max_results`` cap is applied.
279
280        Args:
281            name: Location name to search for.
282            type: Optional type hint for filtering results.
283            max_results: Maximum number of results to return.
284
285        Returns:
286            List of matching GeoJSON Feature dicts in WGS84.
287        """
288        sa = _require_sqlalchemy()
289        cols = self._build_select_columns()
290
291        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
292        type_filter_values: list[str] | None = None
293        if type is not None and self._type_col is not None:
294            matching_types = get_matching_types(type)
295            concrete_types = matching_types if matching_types else [type.lower()]
296            if self._normalized_to_raw:
297                raw_values: list[str] = []
298                for t in concrete_types:
299                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
300                type_filter_values = raw_values if raw_values else concrete_types
301            else:
302                type_filter_values = concrete_types
303
304        # Fetch more rows than requested so that merge_segments has the full
305        # set of segments to work with.  Without this, a SQL LIMIT applied
306        # *before* merging would only return a partial set of linestring
307        # segments, producing incorrect / truncated geometries.
308        # We cap the internal limit at 2000 to avoid unbounded queries.
309        internal_limit = min(max(max_results * 20, 100), 2000)
310
311        with self._get_connection() as conn:
312            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
313
314        if not features:
315            with self._get_connection() as conn:
316                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
317
318        if not features:
319            with self._get_connection() as conn:
320                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
321
322        features = merge_segments(features)
323        return features[:max_results]
324
325    def _type_filter_sql(self, values: list[str] | None) -> tuple[str, dict[str, Any]]:
326        """Return a WHERE clause fragment and bind params for type filtering."""
327        if not values or self._type_col is None:
328            return "", {}
329        placeholders = ", ".join(f":type_{i}" for i in range(len(values)))
330        clause = f" AND {self._type_col} IN ({placeholders})"
331        params = {f"type_{i}": v for i, v in enumerate(values)}
332        return clause, params
333
334    def _search_normalized(
335        self,
336        conn: Connection,
337        sa: types.ModuleType,
338        cols: str,
339        name: str,
340        type_filter: list[str] | None,
341        fetch_limit: int,
342    ) -> list[Feature]:
343        """
344        Exact accent- and case-insensitive search.
345
346        Accent normalization (NFD decomposition + diacritic strip) is done in
347        Python before the query is sent to the DB.
348        """
349        type_clause, type_params = self._type_filter_sql(type_filter)
350        name_expr = f"lower({self._name_col})"
351        if self._check_unaccent(conn):
352            name_expr = f"unaccent({name_expr})"
353        sql = sa.text(
354            f"SELECT {cols} FROM {self._table} "  # noqa: S608
355            f"WHERE {name_expr} = :query{type_clause} "
356            f"LIMIT :limit"
357        )
358        params: dict[str, Any] = {
359            "query": _normalize_name(name),
360            "limit": fetch_limit,
361            **type_params,
362        }
363        try:
364            result = conn.execute(sql, params)
365            return [self._row_to_feature(row) for row in result]
366        except Exception:
367            logger.exception("Normalized search failed for %r", name)
368            return []
369
370    def _search_ilike(
371        self,
372        conn: Connection,
373        sa: types.ModuleType,
374        cols: str,
375        name: str,
376        type_filter: list[str] | None,
377        fetch_limit: int,
378    ) -> list[Feature]:
379        """Case-insensitive substring fallback using ``ILIKE '%name%'``.
380
381        When the ``unaccent`` extension is available, both the stored name column
382        and the pattern are accent-stripped so that e.g. ``"Rhone"`` matches
383        ``"Rhône"``.  Without ``unaccent``, standard ILIKE is used (case-insensitive
384        only).
385        """
386        type_clause, type_params = self._type_filter_sql(type_filter)
387        normalized = _normalize_name(name)
388        if self._check_unaccent(conn):
389            name_expr = f"unaccent(lower({self._name_col}))"
390            pattern = f"%{normalized}%"
391        else:
392            name_expr = self._name_col
393            pattern = f"%{name}%"
394        sql = sa.text(
395            f"SELECT {cols} FROM {self._table} "  # noqa: S608
396            f"WHERE {name_expr} ILIKE :pattern{type_clause} "
397            f"LIMIT :limit"
398        )
399        params: dict[str, Any] = {"pattern": pattern, "limit": fetch_limit, **type_params}
400        try:
401            result = conn.execute(sql, params)
402            return [self._row_to_feature(row) for row in result]
403        except Exception:
404            logger.exception("ILIKE search failed for %r", name)
405            return []
406
407    def _search_fuzzy(
408        self,
409        conn: Connection,
410        sa: types.ModuleType,
411        cols: str,
412        name: str,
413        type_filter: list[str] | None,
414        fetch_limit: int,
415    ) -> list[Feature]:
416        """Fuzzy fallback using pg_trgm similarity (if extension is available)."""
417        if not self._check_trgm(conn):
418            logger.warning(
419                "pg_trgm extension not available. Fuzzy search disabled. Install it with: CREATE EXTENSION pg_trgm;"
420            )
421            return []
422        normalized_query = _normalize_name(name)
423        if self._check_unaccent(conn):
424            name_expr = f"unaccent(lower({self._name_col}))"
425        else:
426            logger.warning(
427                "unaccent extension not available. Accent-insensitive fuzzy search degraded. "
428                "Install it with: CREATE EXTENSION unaccent;"
429            )
430            name_expr = f"lower({self._name_col})"
431        type_clause, type_params = self._type_filter_sql(type_filter)
432        sql = sa.text(
433            f"SELECT {cols} FROM {self._table} "  # noqa: S608
434            f"WHERE word_similarity({name_expr}, :query) > :threshold{type_clause} "
435            f"ORDER BY word_similarity({name_expr}, :query) DESC "
436            f"LIMIT :limit"
437        )
438        params: dict[str, Any] = {
439            "query": normalized_query,
440            "threshold": self._fuzzy_threshold,
441            "limit": fetch_limit,
442            **type_params,
443        }
444        try:
445            result = conn.execute(sql, params)
446            return [self._row_to_feature(row) for row in result]
447        except Exception:
448            logger.exception("Fuzzy search failed for %r", name)
449            return []
450
451    def get_by_id(self, feature_id: str) -> Feature | None:
452        """
453        Get a specific feature by its unique identifier.
454
455        Args:
456            feature_id: Value of the ``id`` column.
457
458        Returns:
459            The matching GeoJSON Feature dict, or ``None`` if not found.
460        """
461        sa = _require_sqlalchemy()
462        cols = self._build_select_columns()
463        sql = sa.text(
464            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
465        )
466        with self._get_connection() as conn:
467            try:
468                result = conn.execute(sql, {"id": feature_id})
469                row = result.fetchone()
470                return self._row_to_feature(row) if row else None
471            except Exception:
472                logger.exception("get_by_id failed for %r", feature_id)
473                return None
474
475    def get_available_types(self) -> list[str]:
476        """
477        Return the distinct ``type`` values present in the table.
478
479        Returns:
480            Sorted list of concrete type strings, or an empty list if the table
481            has no type column.
482        """
483        if self._type_col is None:
484            return []
485        sa = _require_sqlalchemy()
486        sql = sa.text(
487            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
488            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
489        )
490        with self._get_connection() as conn:
491            try:
492                result = conn.execute(sql)
493                raw_types = [row.type for row in result]
494            except Exception:
495                logger.exception("get_available_types failed")
496                return []
497
498        normalized = {self._normalize_type(t) for t in raw_types if t}
499        return sorted(t for t in normalized if t)

Geographic data source backed by a PostGIS table.

The table must expose at minimum a name column, a geometry column, and optionally a type column. The expected schema is:

CREATE TABLE <table> (
    id      TEXT PRIMARY KEY,
    name    TEXT NOT NULL,
    type    TEXT,
    geom    GEOMETRY(Geometry, 4326)
);

The type column may store either:

  • Raw dataset values (e.g. "See", "Berg" for SwissNames3D), pass type_map so the datasource can translate between raw values and the normalized etter type names.
  • Already-normalized values (e.g. "lake", "mountain"), leave type_map=None (default).

Geometries must be in WGS84 (EPSG:4326) or supply crs for on-the-fly reprojection.

Arguments:
  • connection: A SQLAlchemy ~sqlalchemy.engine.Engine or a connection URL string (e.g. "postgresql+psycopg2://user:pass@host/db"). When a string is provided the engine is created internally.
  • table: Fully-qualified table name, e.g. "public.swissnames3d".
  • name_column: Column used for name-based search (default "name").
  • type_column: Column used for type filtering. Pass None to disable type filtering (default "type").
  • geometry_column: PostGIS geometry column (default "geom").
  • id_column: Primary-key column (default "id").
  • crs: CRS of the stored geometries as an EPSG string. Defaults to "EPSG:4326" (no reprojection).
  • type_map: Optional mapping from normalized etter type names to lists of raw type column values present in the database. This is the same format as SwissNames3DSource.OBJEKTART_TYPE_MAP and IGNBDCartoSource.IGN_BDCARTO_TYPE_MAP, so they can be passed directly::

    from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP
    source = PostGISDataSource(
        engine,
        table="public.swissnames3d",
        type_map=OBJEKTART_TYPE_MAP,
    )
    

    When type_map is provided the datasource:

    • Translates raw DB values → normalized types in returned features.
    • Translates user type hints → raw DB values in SQL WHERE clauses.
    • Returns normalized type names from get_available_types().

    When None (default) the stored values are used as-is.

  • fuzzy_threshold: Minimum pg_trgm similarity score (0-1) used for fuzzy fallback search when no exact ILIKE match is found.

Example: unmodified SwissNames3D table::

from sqlalchemy import create_engine
from etter.datasources import PostGISDataSource
from etter.datasources.swissnames3d import OBJEKTART_TYPE_MAP

engine = create_engine(...)
source = PostGISDataSource(
    engine,
    table="public.swissnames3d",
    type_map=OBJEKTART_TYPE_MAP,
)
results = source.search("Lac Léman", type="lake")
PostGISDataSource( connection: str | sqlalchemy.engine.base.Engine, table: str, name_column: str = 'name', type_column: str | None = 'type', geometry_column: str = 'geom', id_column: str = 'id', crs: str = 'EPSG:4326', type_map: dict[typing.Literal['alpine_pasture', 'airport', 'area', 'arrondissement', 'border_marker', 'boulder', 'bridge', 'building', 'bus_stop', 'boat_stop', 'camping', 'canton', 'cave', 'cemetery', 'city', 'correctional_facility', 'country', 'customs', 'dam', 'department', 'district', 'ditch', 'entrance_exit', 'exit', 'fairground', 'ferry', 'field_name', 'forest', 'fountain', 'glacier', 'hamlet', 'heliport', 'hill', 'historical_site', 'hospital', 'island', 'junction', 'lake', 'leisure_facility', 'landfill', 'lift', 'loading_station', 'local_name', 'massif', 'military_training_area', 'monastery', 'monument', 'mountain', 'municipality', 'nature_reserve', 'park', 'parking', 'pass', 'peak', 'peninsula', 'plain', 'pond', 'power_plant', 'private_driving_area', 'quarry', 'railway', 'railway_area', 'region', 'religious_building', 'rest_area', 'restaurant', 'ridge', 'river', 'road', 'rock_head', 'school', 'spring', 'sports_facility', 'standing_area', 'swimming_pool', 'town', 'tower', 'train_station', 'tunnel', 'unknown', 'valley', 'viewpoint', 'village', 'wastewater_treatment', 'waste_incineration', 'waterfall', 'weir', 'zoo', 'administrative', 'amenity', 'infrastructure', 'landforms', 'natural', 'other', 'settlement', 'transport', 'water'], list[str]] | None = None, fuzzy_threshold: float = 0.65)
144    def __init__(
145        self,
146        connection: str | Engine,
147        table: str,
148        name_column: str = "name",
149        type_column: str | None = "type",
150        geometry_column: str = "geom",
151        id_column: str = "id",
152        crs: str = "EPSG:4326",
153        type_map: TypeMap | None = None,
154        fuzzy_threshold: float = 0.65,
155    ) -> None:
156        sa = _require_sqlalchemy()
157
158        if isinstance(connection, str):
159            self._engine = sa.create_engine(connection)
160        else:
161            self._engine = connection
162
163        try:
164            with self._engine.connect() as conn:
165                conn.execute(sa.text(f"SELECT 1 FROM {table} LIMIT 1"))
166        except Exception as exc:
167            raise ValueError(f"Failed to connect to database or access table {table!r}") from exc
168
169        self._table = table
170        self._name_col = name_column
171        self._type_col = type_column
172        self._geom_col = geometry_column
173        self._id_col = id_column
174        self._crs = crs
175        self._fuzzy_threshold = fuzzy_threshold
176
177        # Build bidirectional lookup structures from the user-supplied map.
178        if type_map:
179            self._normalized_to_raw: dict[str, list[str]] = {k: list(v) for k, v in type_map.items()}
180            self._raw_to_normalized: dict[str, str] = {
181                raw: normalized for normalized, raws in type_map.items() for raw in raws
182            }
183        else:
184            self._normalized_to_raw = {}
185            self._raw_to_normalized = {}
186
187        self._trgm_available: bool | None = None
188        self._unaccent_available: bool | None = None
def search( self, name: str, type: str | None = None, max_results: int = 10) -> list[geojson.feature.Feature]:
261    def search(
262        self,
263        name: str,
264        type: str | None = None,
265        max_results: int = 10,
266    ) -> list[Feature]:
267        """
268        Search for geographic features by name.
269
270        Uses a three-step cascade, stopping as soon as any step returns results:
271
272        1. **Normalized exact match**
273        2. **pg_trgm fuzzy with unaccent** (pg_trgm extension required and unaccent extension recommended)
274        3. **ILIKE substring**
275
276        ``merge_segments`` is applied after all rows are fetched so that
277        multi-segment linestrings (rivers, roads) are merged before the
278        ``max_results`` cap is applied.
279
280        Args:
281            name: Location name to search for.
282            type: Optional type hint for filtering results.
283            max_results: Maximum number of results to return.
284
285        Returns:
286            List of matching GeoJSON Feature dicts in WGS84.
287        """
288        sa = _require_sqlalchemy()
289        cols = self._build_select_columns()
290
291        # Resolve type filter to the raw DB values to use in the SQL WHERE clause.
292        type_filter_values: list[str] | None = None
293        if type is not None and self._type_col is not None:
294            matching_types = get_matching_types(type)
295            concrete_types = matching_types if matching_types else [type.lower()]
296            if self._normalized_to_raw:
297                raw_values: list[str] = []
298                for t in concrete_types:
299                    raw_values.extend(self._normalized_to_raw.get(t, [t]))
300                type_filter_values = raw_values if raw_values else concrete_types
301            else:
302                type_filter_values = concrete_types
303
304        # Fetch more rows than requested so that merge_segments has the full
305        # set of segments to work with.  Without this, a SQL LIMIT applied
306        # *before* merging would only return a partial set of linestring
307        # segments, producing incorrect / truncated geometries.
308        # We cap the internal limit at 2000 to avoid unbounded queries.
309        internal_limit = min(max(max_results * 20, 100), 2000)
310
311        with self._get_connection() as conn:
312            features = self._search_normalized(conn, sa, cols, name, type_filter_values, internal_limit)
313
314        if not features:
315            with self._get_connection() as conn:
316                features = self._search_fuzzy(conn, sa, cols, name, type_filter_values, internal_limit)
317
318        if not features:
319            with self._get_connection() as conn:
320                features = self._search_ilike(conn, sa, cols, name, type_filter_values, internal_limit)
321
322        features = merge_segments(features)
323        return features[:max_results]

Search for geographic features by name.

Uses a three-step cascade, stopping as soon as any step returns results:

  1. Normalized exact match
  2. pg_trgm fuzzy with unaccent (pg_trgm extension required and unaccent extension recommended)
  3. ILIKE substring

merge_segments is applied after all rows are fetched so that multi-segment linestrings (rivers, roads) are merged before the max_results cap is applied.

Arguments:
  • name: Location name to search for.
  • type: Optional type hint for filtering results.
  • max_results: Maximum number of results to return.
Returns:

List of matching GeoJSON Feature dicts in WGS84.

def get_by_id(self, feature_id: str) -> geojson.feature.Feature | None:
451    def get_by_id(self, feature_id: str) -> Feature | None:
452        """
453        Get a specific feature by its unique identifier.
454
455        Args:
456            feature_id: Value of the ``id`` column.
457
458        Returns:
459            The matching GeoJSON Feature dict, or ``None`` if not found.
460        """
461        sa = _require_sqlalchemy()
462        cols = self._build_select_columns()
463        sql = sa.text(
464            f"SELECT {cols} FROM {self._table} WHERE {self._id_col} = :id LIMIT 1"  # noqa: S608
465        )
466        with self._get_connection() as conn:
467            try:
468                result = conn.execute(sql, {"id": feature_id})
469                row = result.fetchone()
470                return self._row_to_feature(row) if row else None
471            except Exception:
472                logger.exception("get_by_id failed for %r", feature_id)
473                return None

Get a specific feature by its unique identifier.

Arguments:
  • feature_id: Value of the id column.
Returns:

The matching GeoJSON Feature dict, or None if not found.

def get_available_types(self) -> list[str]:
475    def get_available_types(self) -> list[str]:
476        """
477        Return the distinct ``type`` values present in the table.
478
479        Returns:
480            Sorted list of concrete type strings, or an empty list if the table
481            has no type column.
482        """
483        if self._type_col is None:
484            return []
485        sa = _require_sqlalchemy()
486        sql = sa.text(
487            f"SELECT DISTINCT {self._type_col} AS type FROM {self._table} "  # noqa: S608
488            f"WHERE {self._type_col} IS NOT NULL ORDER BY 1"
489        )
490        with self._get_connection() as conn:
491            try:
492                result = conn.execute(sql)
493                raw_types = [row.type for row in result]
494            except Exception:
495                logger.exception("get_available_types failed")
496                return []
497
498        normalized = {self._normalize_type(t) for t in raw_types if t}
499        return sorted(t for t in normalized if t)

Return the distinct type values present in the table.

Returns:

Sorted list of concrete type strings, or an empty list if the table has no type column.

def apply_spatial_relation( geometry: dict[str, typing.Any] | list[dict[str, typing.Any]], relation: SpatialRelation, buffer_config: BufferConfig | None = None, spatial_config: SpatialRelationConfig | None = None, geometry_format: Literal['geojson', 'wkt', 'wkb'] = 'geojson') -> dict[str, typing.Any] | str:
112def apply_spatial_relation(
113    geometry: GeoJsonGeometry | list[GeoJsonGeometry],
114    relation: SpatialRelation,
115    buffer_config: BufferConfig | None = None,
116    spatial_config: SpatialRelationConfig | None = None,
117    geometry_format: GeometryFormat = "geojson",
118) -> GeoJsonGeometry | str:
119    """Transform one or more reference geometries according to a spatial relation.
120
121    A list of geometries is unioned into one before the transformation, so that
122    features split across multiple datasource records (e.g. a river in segments)
123    produce a single coherent search area.
124
125    When ``buffer_config.inferred`` is True (i.e. no explicit distance was
126    stated), the buffer distance is refined from the actual geometry area so
127    that small features receive small buffers and large regions receive large
128    ones.
129
130    Args:
131        geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
132        relation: Spatial relation to apply.
133        buffer_config: Required for buffer/directional relations.
134        spatial_config: Relation registry; defaults to the module-level singleton.
135        geometry_format: "geojson" (default), "wkt", or "wkb".
136
137    Returns:
138        Transformed geometry in the requested format.
139    """
140    if isinstance(geometry, list):
141        if not geometry:
142            raise ValueError("geometry list must not be empty")
143        geom = unary_union([shape(g) for g in geometry])
144        geom_dict: GeoJsonGeometry = mapping(geom)
145    else:
146        geom = shape(geometry)
147        geom_dict = geometry
148
149    # Refine inferred buffer distance from geometry area before dispatching.
150    if buffer_config is not None and buffer_config.inferred:
151        buffer_config = _refine_buffer_config(geom, buffer_config, relation)
152
153    if relation.category == "containment":
154        result = geom_dict
155    elif relation.category == "buffer":
156        if buffer_config is None:
157            raise ValueError(f"Buffer relation '{relation.relation}' requires buffer_config")
158        result = _apply_buffer(geom, buffer_config)
159    elif relation.category == "directional":
160        if buffer_config is None:
161            raise ValueError(f"Directional relation '{relation.relation}' requires buffer_config")
162        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
163        relation_config = cfg.get_config(relation.relation)
164        direction = relation_config.direction_angle_degrees or 0
165        sector_angle = relation_config.sector_angle_degrees or 90
166        result = _apply_directional(geom, buffer_config, direction, sector_angle)
167    elif relation.category == "clipping":
168        cfg = spatial_config if spatial_config is not None else _DEFAULT_SPATIAL_CONFIG
169        relation_config = cfg.get_config(relation.relation)
170        clip_direction = relation_config.clip_direction or "north"
171        result = _apply_clipping(geom, clip_direction)
172    else:
173        raise ValueError(f"Unknown relation category: '{relation.category}'")
174
175    return convert_geometry(result, geometry_format)

Transform one or more reference geometries according to a spatial relation.

A list of geometries is unioned into one before the transformation, so that features split across multiple datasource records (e.g. a river in segments) produce a single coherent search area.

When buffer_config.inferred is True (i.e. no explicit distance was stated), the buffer distance is refined from the actual geometry area so that small features receive small buffers and large regions receive large ones.

Arguments:
  • geometry: GeoJSON geometry dict or non-empty list of dicts (WGS84).
  • relation: Spatial relation to apply.
  • buffer_config: Required for buffer/directional relations.
  • spatial_config: Relation registry; defaults to the module-level singleton.
  • geometry_format: "geojson" (default), "wkt", or "wkb".
Returns:

Transformed geometry in the requested format.

def convert_geometry( geometry: dict[str, typing.Any], fmt: Literal['geojson', 'wkt', 'wkb']) -> dict[str, typing.Any] | str:
11def convert_geometry(geometry: GeoJsonGeometry, fmt: GeometryFormat) -> GeoJsonGeometry | str:
12    """
13    Convert a GeoJSON geometry dict to the requested format.
14
15    Args:
16        geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
17        fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string,
18             "wkb" returns a hex-encoded WKB string.
19
20    Returns:
21        The geometry in the requested format.
22    """
23    if fmt == "geojson":
24        return geometry
25    geom = shape(geometry)
26    if fmt == "wkt":
27        return geom.wkt
28    return geom.wkb_hex

Convert a GeoJSON geometry dict to the requested format.

Arguments:
  • geometry: GeoJSON geometry dict (e.g. {"type": "Point", "coordinates": [...]})
  • fmt: Target format — "geojson" returns the dict unchanged, "wkt" returns a WKT string, "wkb" returns a hex-encoded WKB string.
Returns:

The geometry in the requested format.

def convert_feature_geometry( feature: geojson.feature.Feature, fmt: Literal['geojson', 'wkt', 'wkb']) -> geojson.feature.Feature | dict:
31def convert_feature_geometry(feature: Feature, fmt: GeometryFormat) -> Feature | dict:
32    """
33    Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.
34
35    Args:
36        feature: GeoJSON Feature dict with a "geometry" key.
37        fmt: Target geometry format.
38
39    Returns:
40        A new dict identical to the input except the "geometry" value is converted.
41        Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).
42    """
43    if fmt == "geojson":
44        return feature
45    return {**feature, "geometry": convert_geometry(feature["geometry"], fmt)}

Return a copy of a GeoJSON Feature dict with its geometry converted to the requested format.

Arguments:
  • feature: GeoJSON Feature dict with a "geometry" key.
  • fmt: Target geometry format.
Returns:

A new dict identical to the input except the "geometry" value is converted. Returns a Feature when fmt is "geojson"; a plain dict otherwise (geometry becomes a string).