Skip to content

rasteret.core.raster_accessor

Data-loading handle for a single Parquet row (record). Each record in a Collection gets a RasterAccessor via Collection.iterate_rasters().

raster_accessor

Classes

BandResolutionError

Bases: ValueError

Raised when a requested band cannot be mapped to a readable COG asset.

RasterAccessor

RasterAccessor(info: RasterInfo, data_source: str)

Data-loading handle for a single Parquet record (row) in a Collection.

Each record in a Rasteret Collection represents one raster item: typically a satellite scene, but could be a drone image, derived product, or any tiled GeoTIFF. RasterAccessor wraps that record's metadata and provides methods to load band data as arrays.

Handles: - Async band data loading via cached COG metadata - Tile management and geometry masking - Multi-band concurrent fetching

Initialize from a record's metadata.

Parameters:

Name Type Description Default
info RasterInfo

Record metadata including URLs and COG metadata.

required
data_source str

Data source identifier for band mapping.

required
Source code in src/rasteret/core/raster_accessor.py
def __init__(self, info: RasterInfo, data_source: str) -> None:
    """Initialize from a record's metadata.

    Parameters
    ----------
    info : RasterInfo
        Record metadata including URLs and COG metadata.
    data_source : str
        Data source identifier for band mapping.
    """
    self.id = info.id
    self.datetime = info.datetime
    self.bbox = info.bbox
    self.footprint = info.footprint
    self.crs = info.crs
    self.cloud_cover = info.cloud_cover
    self.assets = info.assets
    self.band_metadata = info.band_metadata
    self.collection = info.collection
    self.data_source = data_source
Attributes
geometry property
geometry

Alias for footprint.

available_bands property
available_bands: list[str]

List available band keys for this record.

Functions
try_get_band_cog_metadata
try_get_band_cog_metadata(
    band_code: str,
) -> tuple[CogMetadata | None, str | None, int | None]

Return tiled GeoTIFF/COG metadata and URL for band_code.

Returns (None, None) when the asset or required per-band metadata is missing.

Source code in src/rasteret/core/raster_accessor.py
def try_get_band_cog_metadata(
    self,
    band_code: str,
) -> tuple[CogMetadata | None, str | None, int | None]:
    """Return tiled GeoTIFF/COG metadata and URL for *band_code*.

    Returns ``(None, None)`` when the asset or required per-band metadata
    is missing.
    """

    # Support both legacy asset-key conventions:
    # - Old STAC-backed Collections often use STAC asset keys (e.g. "blue")
    # - Newer/normalized Collections use logical band codes (e.g. "B02")
    #
    # Resolve by trying: direct band code, registry forward map (B02->blue),
    # then registry reverse map ("blue"->B02), taking the first key that exists.
    candidates: list[str] = [band_code]
    band_map = BandRegistry.get(self.data_source)
    forward = band_map.get(band_code)
    if forward:
        candidates.append(forward)
    if band_map and band_code in band_map.values():
        reverse = {v: k for k, v in band_map.items()}
        back = reverse.get(band_code)
        if back:
            candidates.append(back)

    asset_key = next((c for c in candidates if c in self.assets), None)
    if asset_key is None:
        return None, None, None

    asset = self.assets[asset_key]

    url = self._extract_asset_href(asset)
    band_index = asset.get("band_index") if isinstance(asset, dict) else None

    # Band metadata key could be either band_code or resolved asset_key
    metadata_keys = [f"{band_code}_metadata", f"{asset_key}_metadata"]
    raw_metadata = None
    for key in metadata_keys:
        if key in self.band_metadata:
            raw_metadata = self.band_metadata[key]
            break

    if raw_metadata is None or url is None:
        return None, None, None

    try:
        cog_metadata = CogMetadata.from_dict(raw_metadata, crs=self.crs)
        idx = None
        if band_index is not None:
            try:
                idx = int(band_index)
            except (TypeError, ValueError):
                idx = None
        return cog_metadata, url, idx
    except KeyError:
        return None, None, None
intersects
intersects(geometry) -> bool

Return True if this record's bbox overlaps geometry's bbox.

Source code in src/rasteret/core/raster_accessor.py
def intersects(self, geometry) -> bool:
    """Return ``True`` if this record's bbox overlaps *geometry*'s bbox."""
    from rasteret.core.geometry import (
        bbox_array,
        bbox_intersects,
        coerce_to_geoarrow,
    )

    geo_arr = coerce_to_geoarrow(geometry)
    xmin, ymin, xmax, ymax = bbox_array(geo_arr)
    geom_bbox = (xmin[0].as_py(), ymin[0].as_py(), xmax[0].as_py(), ymax[0].as_py())
    record_bbox = tuple(self.bbox) if self.bbox else None
    if record_bbox is None:
        return False
    return bbox_intersects(record_bbox, geom_bbox)
sample_points async
sample_points(
    *,
    points: Array,
    band_codes: list[str],
    point_indices: list[int] | None = None,
    max_concurrent: int = 50,
    backend: object | None = None,
    reader: object | None = None,
    geometry_crs: int | None = 4326,
    method: str = "nearest",
    max_distance_pixels: int = 0,
    return_neighbourhood: Literal[
        "off", "always", "if_center_nodata"
    ] = "off",
) -> Table

Sample point values for this record.

Parameters:

Name Type Description Default
points Array

GeoArrow-native point array.

required
band_codes list of str

Band codes to sample.

required
point_indices list of int

Absolute point indices corresponding to points. When omitted, emitted rows use the local point positions.

None
max_concurrent int

Maximum concurrent HTTP requests.

50
backend object

Pluggable I/O backend.

None
reader COGReader

Active shared COG reader for connection/session reuse across records. When omitted, this method creates and owns a reader.

None
geometry_crs int

CRS EPSG code of input points. Defaults to EPSG:4326.

4326
method str

Sampling method. Only "nearest" is currently supported.

'nearest'
max_distance_pixels int

Maximum pixel distance for nodata fallback search, measured in Chebyshev distance (square rings). Rasteret samples the base pixel containing the point first; when that pixel is nodata and this is

0, Rasteret searches outward in square rings up to this distance and picks the closest candidate by exact point-to-pixel-rectangle distance. 0 disables fallback and returns the base pixel value as-is.

0
return_neighbourhood ('off', 'always', 'if_center_nodata')

Controls whether a neighbourhood window is returned: "off" omits the window column. "always" returns the full window for every sampled row. "if_center_nodata" returns the full window only when the center pixel is nodata/NaN; other rows have a NULL window.

"off"

Returns:

Type Description
Table

One row per (point, band) sample for this record.

Source code in src/rasteret/core/raster_accessor.py
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
async def sample_points(
    self,
    *,
    points: pa.Array,
    band_codes: list[str],
    point_indices: list[int] | None = None,
    max_concurrent: int = 50,
    backend: object | None = None,
    reader: object | None = None,
    geometry_crs: int | None = 4326,
    method: str = "nearest",
    max_distance_pixels: int = 0,
    return_neighbourhood: Literal["off", "always", "if_center_nodata"] = "off",
) -> pa.Table:
    """Sample point values for this record.

    Parameters
    ----------
    points : pa.Array
        GeoArrow-native point array.
    band_codes : list of str
        Band codes to sample.
    point_indices : list of int, optional
        Absolute point indices corresponding to *points*. When omitted,
        emitted rows use the local point positions.
    max_concurrent : int
        Maximum concurrent HTTP requests.
    backend : object, optional
        Pluggable I/O backend.
    reader : COGReader, optional
        Active shared COG reader for connection/session reuse across
        records. When omitted, this method creates and owns a reader.
    geometry_crs : int, optional
        CRS EPSG code of input points. Defaults to EPSG:4326.
    method : str
        Sampling method. Only ``"nearest"`` is currently supported.
    max_distance_pixels : int
        Maximum pixel distance for nodata fallback search, measured in
        Chebyshev distance (square rings). Rasteret samples the base pixel
        containing the point first; when that pixel is nodata and this is
        > 0, Rasteret searches outward in square rings up to this distance
        and picks the closest candidate by exact
        point-to-pixel-rectangle distance. ``0`` disables fallback and
        returns the base pixel value as-is.
    return_neighbourhood : {"off", "always", "if_center_nodata"}
        Controls whether a neighbourhood window is returned:
        ``"off"`` omits the window column.
        ``"always"`` returns the full window for every sampled row.
        ``"if_center_nodata"`` returns the full window only when the center
        pixel is nodata/NaN; other rows have a NULL window.

    Returns
    -------
    pyarrow.Table
        One row per ``(point, band)`` sample for this record.
    """
    if method != "nearest":
        raise ValueError("Only nearest point sampling is supported currently.")
    if max_distance_pixels < 0:
        raise ValueError("max_distance_pixels must be >= 0")
    if return_neighbourhood not in {"off", "always", "if_center_nodata"}:
        raise ValueError(
            "return_neighbourhood must be one of: 'off', 'always', 'if_center_nodata'"
        )
    if return_neighbourhood != "off" and max_distance_pixels <= 0:
        raise ValueError(
            "max_distance_pixels must be > 0 when return_neighbourhood is enabled"
        )

    include_neighbourhood = return_neighbourhood != "off"
    neighbourhood_always = return_neighbourhood == "always"
    neighbourhood_if_center_nodata = return_neighbourhood == "if_center_nodata"

    output_schema = (
        POINT_SAMPLES_NEIGHBORHOOD_SCHEMA
        if include_neighbourhood
        else POINT_SAMPLES_SCHEMA
    )

    type_name = getattr(points.type, "extension_name", "") or ""
    if "geoarrow.point" not in type_name:
        from rasteret.core.geometry import UnsupportedGeometryError

        raise UnsupportedGeometryError(
            "Point sampling requires Point geometries. "
            "Use get_xarray/get_numpy/get_gdf for Polygon/MultiPolygon AOIs."
        )

    import geoarrow.pyarrow as ga

    from rasteret.fetch.cog import COGReader, COGTileRequest

    transformer = None
    if (
        geometry_crs is not None
        and self.crs is not None
        and geometry_crs != self.crs
    ):
        from pyproj import Transformer

        transformer = Transformer.from_crs(geometry_crs, self.crs, always_xy=True)

    record_datetime_us: np.datetime64 | None = None
    if self.datetime is not None:
        try:
            record_datetime_us = np.datetime64(
                pd.Timestamp(self.datetime).to_datetime64(), "us"
            )
        except (OverflowError, TypeError, ValueError):
            logger.debug("Could not normalize record datetime for %s", self.id)

    point_crs_value = int(geometry_crs) if geometry_crs is not None else None
    raster_crs_value = int(self.crs) if self.crs is not None else None
    point_indices_arr: np.ndarray | None = None
    point_xs_arr, point_ys_arr = ga.point_coords(points)
    point_xs = point_xs_arr.to_numpy(zero_copy_only=False)
    point_ys = point_ys_arr.to_numpy(zero_copy_only=False)
    if point_indices is not None:
        point_indices_arr = np.asarray(point_indices, dtype=np.int64)
        if len(point_indices_arr) != len(point_xs):
            raise ValueError("point_indices length must match the number of points")
    sample_xs, sample_ys = point_xs, point_ys
    if transformer is not None:
        sample_xs, sample_ys = transformer.transform(point_xs, point_ys)

    def _constant_int32_array(value: int | None, row_count: int) -> pa.Array:
        if value is None:
            return pa.nulls(row_count, type=pa.int32())
        return pa.array(np.full(row_count, value, dtype=np.int32), type=pa.int32())

    def _constant_timestamp_array(
        value: np.datetime64 | None, row_count: int
    ) -> pa.Array:
        if value is None:
            return pa.nulls(row_count, type=pa.timestamp("us"))
        return pa.array(
            np.full(row_count, value, dtype="datetime64[us]"),
            type=pa.timestamp("us"),
        )

    def _constant_string_array(value: str, row_count: int) -> pa.Array:
        return pa.array(np.full(row_count, value, dtype=object), type=pa.string())

    band_sources: list[dict[str, object]] = []
    for band_code in band_codes:
        cog_meta, url, band_index = self.try_get_band_cog_metadata(band_code)
        if cog_meta is None or url is None or cog_meta.transform is None:
            raise self._missing_band_error(band_code)

        scale_x, trans_x, scale_y, trans_y = normalize_transform(cog_meta.transform)
        src_transform = Affine(
            float(scale_x),
            0.0,
            float(trans_x),
            0.0,
            float(scale_y),
            float(trans_y),
        )
        band_sources.append(
            {
                "band_code": band_code,
                "metadata": cog_meta,
                "url": url,
                "band_index": band_index,
                "transform": src_transform,
                "group_key": (
                    url,
                    tuple(float(value) for value in cog_meta.transform),
                    int(cog_meta.width),
                    int(cog_meta.height),
                    int(cog_meta.tile_width),
                    int(cog_meta.tile_height),
                    str(np.dtype(cog_meta.dtype)),
                    int(getattr(cog_meta, "samples_per_pixel", 1) or 1),
                    int(getattr(cog_meta, "planar_configuration", 1) or 1),
                ),
            }
        )

    async def _sample_with_reader(shared_reader: COGReader) -> pa.Table:
        record_batches: list[pa.RecordBatch] = []
        grouped_sources: dict[object, list[dict[str, object]]] = {}
        for source in band_sources:
            grouped_sources.setdefault(source["group_key"], []).append(source)

        neighborhood_row_offsets: np.ndarray | None = None
        neighborhood_col_offsets: np.ndarray | None = None
        neighborhood_size = 0
        if include_neighbourhood:
            neighborhood_row_offsets, neighborhood_col_offsets = (
                square_neighborhood_offsets(max_distance_pixels)
            )
            neighborhood_size = int(neighborhood_row_offsets.size)

        for source_group in grouped_sources.values():
            # Phase 1: map -> pixel -> tile grouping.
            # This is the key point-sampling-specific planning step that lets us
            # fetch each tile once and then do fast local indexing.
            first_source = source_group[0]
            cog_meta = first_source["metadata"]
            url = str(first_source["url"])
            src_transform = first_source["transform"]

            (
                point_rows,
                point_cols,
                point_row_fs,
                point_col_fs,
                tile_groups,
                tiles,
            ) = plan_point_tile_groups(
                np.asarray(sample_xs, dtype=np.float64),
                np.asarray(sample_ys, dtype=np.float64),
                scale_x=float(src_transform.a),
                translate_x=float(src_transform.c),
                scale_y=float(src_transform.e),
                translate_y=float(src_transform.f),
                raster_height=int(cog_meta.height),
                raster_width=int(cog_meta.width),
                tile_height=int(cog_meta.tile_height),
                tile_width=int(cog_meta.tile_width),
            )
            if not tiles:
                continue

            # Phase 2: fetch base tiles and sample nearest pixel per point.
            source_values: list[np.ndarray] = [
                np.full(len(point_xs), np.nan, dtype=np.float64)
                for _ in source_group
            ]
            source_sampled_masks: list[np.ndarray] = [
                np.zeros(len(point_xs), dtype=bool) for _ in source_group
            ]
            source_tile_cache: list[dict[tuple[int, int], np.ndarray]] = [
                {} for _ in source_group
            ]
            source_neighborhood_values: list[np.ndarray | None] = [
                (
                    np.full(
                        (len(point_xs), neighborhood_size),
                        np.nan,
                        dtype=np.float64,
                    )
                    if include_neighbourhood
                    else None
                )
                for _ in source_group
            ]
            source_neighborhood_computed: list[np.ndarray | None] = [
                (
                    np.zeros(len(point_xs), dtype=bool)
                    if include_neighbourhood
                    else None
                )
                for _ in source_group
            ]
            # When `return_neighbourhood="if_center_nodata"`, we must key off the
            # original center-pixel sample, not the post-fallback value. The
            # fallback path mutates `source_values` for unresolved points.
            source_center_invalid_masks: list[np.ndarray | None] = [
                (None if neighbourhood_if_center_nodata else None)
                for _ in source_group
            ]

            async def _cache_candidate_tiles(
                *,
                candidate_rows: np.ndarray,
                candidate_cols: np.ndarray,
                in_bounds: np.ndarray,
                tile_cache: dict[tuple[int, int], np.ndarray],
                source_meta: CogMetadata,
                source_band_index: int,
                tiles_x: int,
                tiles_y: int,
            ) -> None:
                unique_tile_pairs = unique_tile_pairs_for_candidates(
                    candidate_rows,
                    candidate_cols,
                    in_bounds,
                    tile_height=int(source_meta.tile_height),
                    tile_width=int(source_meta.tile_width),
                )
                if not unique_tile_pairs.size:
                    return

                extra_requests: list[COGTileRequest] = []
                for (
                    tile_row,
                    tile_col,
                    offset,
                    size,
                ) in tile_request_specs_for_pairs(
                    unique_tile_pairs,
                    source_meta.tile_offsets,
                    source_meta.tile_byte_counts,
                    tiles_x=tiles_x,
                    tiles_y=tiles_y,
                ):
                    if (tile_row, tile_col) in tile_cache:
                        continue
                    extra_requests.append(
                        COGTileRequest(
                            url=url,
                            offset=offset,
                            size=size,
                            row=tile_row,
                            col=tile_col,
                            metadata=source_meta,
                            band_index=source_band_index,
                        )
                    )

                if not extra_requests:
                    return

                extra_map = await shared_reader.read_merged_tiles(extra_requests)
                for (
                    tile_row,
                    tile_col,
                    band_index,
                ), tile_data in extra_map.items():
                    if band_index != source_band_index:
                        continue
                    tile_cache[(tile_row, tile_col)] = tile_data

            # Batch tile reads to keep request lists bounded. `read_merged_tiles()`
            # works on (tile-offset, byte-count) ranges and materializes whole
            # tiles, so the main limiter here is the size of the request list,
            # not per-pixel reads.
            #
            # Use `max_concurrent` as a proxy for how much outstanding I/O we
            # can efficiently schedule, and cap to avoid huge batches that
            # increase range-merging overhead.
            requests_per_tile = max(1, len(source_group))
            max_requests_per_batch = max(64, min(4096, int(max_concurrent) * 8))
            tiles_per_batch = max(1, max_requests_per_batch // requests_per_tile)
            for start in range(0, len(tiles), tiles_per_batch):
                batch = tiles[start : start + tiles_per_batch]
                sample_requests: list[COGTileRequest] = []
                for source in source_group:
                    source_meta = source["metadata"]
                    source_band_index = source["band_index"]
                    tile_offsets = source_meta.tile_offsets
                    tile_byte_counts = source_meta.tile_byte_counts
                    if tile_offsets is None or tile_byte_counts is None:
                        continue

                    tiles_x, tiles_y = tile_grid_shape(
                        raster_width=int(source_meta.width),
                        raster_height=int(source_meta.height),
                        tile_width=int(source_meta.tile_width),
                        tile_height=int(source_meta.tile_height),
                    )

                    for (
                        tile_row,
                        tile_col,
                        offset,
                        size,
                    ) in tile_request_specs_for_pairs(
                        batch,
                        tile_offsets,
                        tile_byte_counts,
                        tiles_x=tiles_x,
                        tiles_y=tiles_y,
                    ):
                        sample_requests.append(
                            COGTileRequest(
                                url=url,
                                offset=offset,
                                size=size,
                                row=tile_row,
                                col=tile_col,
                                metadata=source_meta,
                                band_index=source_band_index,
                            )
                        )

                tile_arrays_map = (
                    await shared_reader.read_merged_tiles(sample_requests)
                    if sample_requests
                    else {}
                )

                for tile_row, tile_col in batch:
                    tile_arrays: list[np.ndarray] = []
                    missing_band = False
                    for source_idx, source in enumerate(source_group):
                        band_index = source["band_index"]
                        tile_data = tile_arrays_map.get(
                            (tile_row, tile_col, band_index)
                        )
                        if tile_data is None:
                            missing_band = True
                            break
                        tile_arrays.append(tile_data)
                        source_tile_cache[source_idx][(tile_row, tile_col)] = (
                            tile_data
                        )
                    if missing_band or not tile_arrays:
                        continue

                    sample_matrix = tile_groups.get((tile_row, tile_col))
                    if sample_matrix is None or sample_matrix.size == 0:
                        continue
                    row_start = tile_row * int(cog_meta.tile_height)
                    col_start = tile_col * int(cog_meta.tile_width)

                    local_point_indices = sample_matrix[:, 0]
                    local_rows = sample_matrix[:, 1] - row_start
                    local_cols = sample_matrix[:, 2] - col_start

                    for source_idx, tile_data in enumerate(tile_arrays):
                        values = np.asarray(
                            tile_data[local_rows, local_cols], dtype=np.float64
                        )
                        source_values[source_idx][local_point_indices] = values
                        source_sampled_masks[source_idx][local_point_indices] = True

            if max_distance_pixels > 0:
                # Phase 3: optional nodata fallback (nearest non-nodata pixel).
                # For points whose nearest sample is nodata, search outward in
                # Chebyshev rings up to `max_distance_pixels`. Within the first ring
                # that yields valid candidates, pick the candidate with minimum
                # exact distance from the query point to the candidate pixel
                # footprint (scaled by pixel width/height).
                #
                # This mirrors PostGIS' ST_NearestValue behavior (perimeter scan
                # of expanding extents + minimum point-to-pixel distance), but
                # we cap the search radius to avoid unbounded remote I/O.
                for source_idx, source in enumerate(source_group):
                    source_meta = source["metadata"]
                    source_band_index = source["band_index"]
                    nodata_value = getattr(source_meta, "nodata", None)
                    if (
                        source_meta.tile_offsets is None
                        or source_meta.tile_byte_counts is None
                    ):
                        continue

                    center_invalid = source_sampled_masks[source_idx] & nodata_mask(
                        source_values[source_idx], nodata_value
                    )
                    if neighbourhood_if_center_nodata:
                        source_center_invalid_masks[source_idx] = (
                            center_invalid.copy()
                        )

                    unresolved = center_invalid.copy()
                    if not np.any(unresolved):
                        continue

                    tile_height = int(source_meta.tile_height)
                    tile_width = int(source_meta.tile_width)
                    raster_height = int(source_meta.height)
                    raster_width = int(source_meta.width)
                    pixel_width = float(src_transform.a)
                    pixel_height = float(src_transform.e)
                    tiles_x, tiles_y = tile_grid_shape(
                        raster_width=raster_width,
                        raster_height=raster_height,
                        tile_width=tile_width,
                        tile_height=tile_height,
                    )
                    tile_cache = source_tile_cache[source_idx]

                    for radius in range(1, max_distance_pixels + 1):
                        if not np.any(unresolved):
                            break
                        row_offsets, col_offsets = chebyshev_ring_offsets(radius)
                        if row_offsets.size == 0:
                            continue

                        unresolved_indices = np.nonzero(unresolved)[0]
                        candidate_rows, candidate_cols, in_bounds = (
                            ring_candidate_grid(
                                point_rows[unresolved_indices],
                                point_cols[unresolved_indices],
                                row_offsets,
                                col_offsets,
                                raster_height=raster_height,
                                raster_width=raster_width,
                            )
                        )
                        if not np.any(in_bounds):
                            continue

                        await _cache_candidate_tiles(
                            candidate_rows=candidate_rows,
                            candidate_cols=candidate_cols,
                            in_bounds=in_bounds,
                            tile_cache=tile_cache,
                            source_meta=source_meta,
                            source_band_index=source_band_index,
                            tiles_x=tiles_x,
                            tiles_y=tiles_y,
                        )

                        candidate_values, candidate_sampled = (
                            candidate_values_from_tile_cache(
                                candidate_rows,
                                candidate_cols,
                                in_bounds,
                                tile_cache=tile_cache,
                                tile_height=tile_height,
                                tile_width=tile_width,
                                tiles_x=tiles_x,
                            )
                        )
                        valid_candidates = candidate_sampled & ~nodata_mask(
                            candidate_values, nodata_value
                        )
                        if not np.any(valid_candidates):
                            continue

                        distance_sq = pixel_rect_distance_sq(
                            point_row_fs[unresolved_indices],
                            point_col_fs[unresolved_indices],
                            candidate_rows,
                            candidate_cols,
                            pixel_width=pixel_width,
                            pixel_height=pixel_height,
                        )
                        distance_sq = np.where(
                            valid_candidates, distance_sq, np.inf
                        )
                        best_offset = np.argmin(distance_sq, axis=1)
                        best_distance_sq = distance_sq[
                            np.arange(unresolved_indices.size),
                            best_offset,
                        ]
                        resolved_here = np.isfinite(best_distance_sq)
                        if not np.any(resolved_here):
                            continue

                        accepted_indices = unresolved_indices[resolved_here]
                        accepted_offsets = best_offset[resolved_here]
                        source_values[source_idx][accepted_indices] = (
                            candidate_values[
                                resolved_here,
                                accepted_offsets,
                            ]
                        )
                        unresolved[accepted_indices] = False

            if include_neighbourhood:
                assert neighborhood_row_offsets is not None
                assert neighborhood_col_offsets is not None
                for source_idx, source in enumerate(source_group):
                    if neighbourhood_always:
                        local_point_indices = np.nonzero(
                            source_sampled_masks[source_idx]
                        )[0]
                    elif neighbourhood_if_center_nodata:
                        center_invalid = source_center_invalid_masks[source_idx]
                        if center_invalid is None:
                            source_meta = source["metadata"]
                            nodata_value = getattr(source_meta, "nodata", None)
                            center_invalid = source_sampled_masks[
                                source_idx
                            ] & nodata_mask(source_values[source_idx], nodata_value)
                        local_point_indices = np.nonzero(center_invalid)[0]
                    else:
                        local_point_indices = np.empty(0, dtype=np.int64)
                    if local_point_indices.size == 0:
                        continue

                    source_meta = source["metadata"]
                    source_band_index = source["band_index"]
                    if (
                        source_meta.tile_offsets is None
                        or source_meta.tile_byte_counts is None
                    ):
                        continue

                    tile_height = int(source_meta.tile_height)
                    tile_width = int(source_meta.tile_width)
                    raster_height = int(source_meta.height)
                    raster_width = int(source_meta.width)
                    tiles_x, tiles_y = tile_grid_shape(
                        raster_width=raster_width,
                        raster_height=raster_height,
                        tile_width=tile_width,
                        tile_height=tile_height,
                    )
                    tile_cache = source_tile_cache[source_idx]

                    candidate_rows, candidate_cols, in_bounds = ring_candidate_grid(
                        point_rows[local_point_indices],
                        point_cols[local_point_indices],
                        neighborhood_row_offsets,
                        neighborhood_col_offsets,
                        raster_height=raster_height,
                        raster_width=raster_width,
                    )

                    await _cache_candidate_tiles(
                        candidate_rows=candidate_rows,
                        candidate_cols=candidate_cols,
                        in_bounds=in_bounds,
                        tile_cache=tile_cache,
                        source_meta=source_meta,
                        source_band_index=source_band_index,
                        tiles_x=tiles_x,
                        tiles_y=tiles_y,
                    )

                    neighborhood_values, _ = candidate_values_from_tile_cache(
                        candidate_rows,
                        candidate_cols,
                        in_bounds,
                        tile_cache=tile_cache,
                        tile_height=tile_height,
                        tile_width=tile_width,
                        tiles_x=tiles_x,
                    )
                    neighborhood_store = source_neighborhood_values[source_idx]
                    assert neighborhood_store is not None
                    neighborhood_store[local_point_indices] = neighborhood_values
                    computed_store = source_neighborhood_computed[source_idx]
                    assert computed_store is not None
                    computed_store[local_point_indices] = True

            # Phase 4: build Arrow record batches: one row per (point, band).
            for source_idx, source in enumerate(source_group):
                local_point_indices = np.nonzero(source_sampled_masks[source_idx])[
                    0
                ]
                if local_point_indices.size == 0:
                    continue
                output_point_indices = (
                    point_indices_arr[local_point_indices]
                    if point_indices_arr is not None
                    else local_point_indices
                )
                point_x_values = point_xs[local_point_indices]
                point_y_values = point_ys[local_point_indices]
                row_count = int(local_point_indices.size)
                point_index_arr = pa.array(output_point_indices, type=pa.int64())
                point_x_arr = pa.array(
                    np.asarray(point_x_values, dtype=np.float64),
                    type=pa.float64(),
                )
                point_y_arr = pa.array(
                    np.asarray(point_y_values, dtype=np.float64),
                    type=pa.float64(),
                )
                point_crs_arr = _constant_int32_array(point_crs_value, row_count)
                record_id_arr = _constant_string_array(str(self.id), row_count)
                datetime_arr = _constant_timestamp_array(
                    record_datetime_us, row_count
                )
                collection_arr = _constant_string_array(
                    str(self.collection), row_count
                )
                raster_crs_arr = _constant_int32_array(raster_crs_value, row_count)
                batch_columns = {
                    "point_index": point_index_arr,
                    "point_x": point_x_arr,
                    "point_y": point_y_arr,
                    "point_crs": point_crs_arr,
                    "record_id": record_id_arr,
                    "datetime": datetime_arr,
                    "collection": collection_arr,
                    "band": _constant_string_array(
                        str(source["band_code"]), row_count
                    ),
                    "value": pa.array(
                        source_values[source_idx][local_point_indices],
                        type=pa.float64(),
                    ),
                    "raster_crs": raster_crs_arr,
                }
                if include_neighbourhood:
                    neighborhood_store = source_neighborhood_values[source_idx]
                    assert neighborhood_store is not None
                    computed_store = source_neighborhood_computed[source_idx]
                    assert computed_store is not None
                    batch_columns["neighbourhood_values"] = pa.array(
                        [
                            neighborhood_store[i].tolist()
                            if bool(computed_store[i])
                            else None
                            for i in local_point_indices.tolist()
                        ],
                        type=pa.list_(pa.float64()),
                    )
                record_batches.append(
                    pa.record_batch(
                        [batch_columns[field.name] for field in output_schema],
                        schema=output_schema,
                    )
                )

        if not record_batches:
            return (
                empty_point_samples_neighborhood_table()
                if include_neighbourhood
                else empty_point_samples_table()
            )
        return pa.Table.from_batches(record_batches, schema=output_schema)

    if reader is not None:
        return await _sample_with_reader(reader)

    async with COGReader(max_concurrent=max_concurrent, backend=backend) as owned:
        return await _sample_with_reader(owned)
load_bands async
load_bands(
    geometries: Array,
    band_codes: list[str],
    max_concurrent: int = 50,
    for_xarray: bool = True,
    for_numpy: bool = False,
    progress: bool = False,
    backend: object | None = None,
    target_crs: int | None = None,
    geometry_crs: int | None = 4326,
    all_touched: bool = False,
)

Load bands for all geometries with parallel processing.

Parameters:

Name Type Description Default
geometries Array

GeoArrow native array of areas of interest.

required
band_codes list of str

Band codes to load.

required
max_concurrent int

Maximum concurrent HTTP requests.

50
for_xarray bool

If True, return xr.Dataset; otherwise gpd.GeoDataFrame.

True
for_numpy bool

If True, return raw per-geometry band results for NumPy assembly without constructing GeoPandas objects.

False
backend object

Pluggable I/O backend.

None
target_crs int

Reproject results to this CRS.

None
geometry_crs int

CRS of the geometries input (default EPSG:4326).

4326
all_touched bool

Passed through to polygon masking behavior. False matches rasterio default semantics.

False

Returns:

Type Description
Dataset or GeoDataFrame

Data is returned in the native COG dtype (e.g. uint16, int8, float32). Integer arrays promote to float32 only when geometry masking requires NaN and no nodata value is declared in the COG metadata.

Source code in src/rasteret/core/raster_accessor.py
async def load_bands(
    self,
    geometries: pa.Array,
    band_codes: list[str],
    max_concurrent: int = 50,
    for_xarray: bool = True,
    for_numpy: bool = False,
    progress: bool = False,
    backend: object | None = None,
    target_crs: int | None = None,
    geometry_crs: int | None = 4326,
    all_touched: bool = False,
):
    """Load bands for all geometries with parallel processing.

    Parameters
    ----------
    geometries : pa.Array
        GeoArrow native array of areas of interest.
    band_codes : list of str
        Band codes to load.
    max_concurrent : int
        Maximum concurrent HTTP requests.
    for_xarray : bool
        If ``True``, return ``xr.Dataset``; otherwise ``gpd.GeoDataFrame``.
    for_numpy : bool
        If ``True``, return raw per-geometry band results for NumPy assembly
        without constructing GeoPandas objects.
    backend : object, optional
        Pluggable I/O backend.
    target_crs : int, optional
        Reproject results to this CRS.
    geometry_crs : int, optional
        CRS of the *geometries* input (default EPSG:4326).
    all_touched : bool
        Passed through to polygon masking behavior. ``False`` matches
        rasterio default semantics.

    Returns
    -------
    xarray.Dataset or geopandas.GeoDataFrame
        Data is returned in the native COG dtype (e.g. ``uint16``,
        ``int8``, ``float32``). Integer arrays promote to ``float32``
        only when geometry masking requires NaN and no nodata value is
        declared in the COG metadata.
    """
    from rasteret.fetch.cog import COGReader

    if for_xarray and for_numpy:
        raise ValueError(
            "load_bands() cannot request xarray and numpy outputs together"
        )

    n_geoms = len(geometries)
    logger.debug(f"Loading {len(band_codes)} bands for {n_geoms} geometries")

    geom_progress = None
    if progress:
        geom_progress = tqdm(total=n_geoms, desc=f"Record {self.id}")

    async with COGReader(max_concurrent=max_concurrent, backend=backend) as reader:

        async def process_geometry(geom_idx: int, geom_id: int):
            band_progress = None
            if progress:
                band_progress = tqdm(
                    total=len(band_codes), desc=f"Geom {geom_id}", leave=False
                )

            band_tasks = []
            for band_code in band_codes:
                task = self._load_single_band(
                    geometries,
                    geom_idx,
                    band_code,
                    max_concurrent,
                    reader=reader,
                    geometry_crs=geometry_crs,
                    all_touched=all_touched,
                )
                band_tasks.append(task)

            raw_results = await asyncio.gather(*band_tasks, return_exceptions=True)
            results = []
            first_error: BaseException | None = None
            failed_band_codes: list[str] = []
            for band_code, r in zip(band_codes, raw_results):
                if isinstance(r, Exception):
                    from rasteret.core.geometry import UnsupportedGeometryError

                    if isinstance(r, UnsupportedGeometryError):
                        # Deterministic user input issue: fail fast.
                        raise UnsupportedGeometryError(
                            "Unsupported geometry type for Rasteret sampling "
                            f"(record_id='{self.id}', geometry_index={geom_id}). "
                            "Rasteret currently supports Polygon and MultiPolygon geometries "
                            "for masking-based sampling via get_xarray/get_numpy/get_gdf. "
                            "Use sample_points() for Point geometries."
                        ) from r
                    failed_band_codes.append(band_code)
                    if first_error is None:
                        first_error = r
                    logger.error(
                        "Band load failed (record_id=%s, geometry_index=%s, band=%s): %s",
                        self.id,
                        geom_id,
                        band_code,
                        r,
                    )
                else:
                    results.append(r)
            if band_progress is not None:
                band_progress.update(len(band_codes))
                band_progress.close()
            if geom_progress is not None:
                geom_progress.update(1)

            valid = [r for r in results if r is not None]
            if not valid and first_error is not None:
                from rasteret.core.geometry import UnsupportedGeometryError

                if isinstance(first_error, UnsupportedGeometryError):
                    raise UnsupportedGeometryError(
                        f"Unsupported geometry type for Rasteret sampling "
                        f"(record_id='{self.id}', geometry_index={geom_id}). "
                        "Rasteret currently supports Polygon and MultiPolygon geometries "
                        "for masking-based sampling via get_xarray/get_numpy/get_gdf. "
                        "Use sample_points() for Point geometries."
                    ) from first_error
                raise RuntimeError(
                    "All band reads failed for the requested geometry "
                    f"(record_id='{self.id}', geometry_index={geom_id}). "
                    "See the chained exception for the first failure."
                ) from first_error
            if target_crs is not None and target_crs != self.crs and valid:
                valid = self._reproject_band_results(valid, target_crs)
            return valid, geom_id, failed_band_codes, first_error

        # Process geometries concurrently with semaphore
        sem = asyncio.Semaphore(max_concurrent)

        async def bounded_process(geom_idx: int, geom_id: int):
            async with sem:
                return await process_geometry(geom_idx, geom_id)

        tasks = [bounded_process(idx, idx + 1) for idx in range(n_geoms)]
        raw_geom_results = await asyncio.gather(*tasks, return_exceptions=True)

    results: list[tuple[list[dict], int]] = []
    first_error: BaseException | None = None
    geom_failures = 0
    partial_band_failures: list[tuple[int, list[str], BaseException | None]] = []
    for r in raw_geom_results:
        if isinstance(r, Exception):
            from rasteret.core.geometry import UnsupportedGeometryError

            if isinstance(r, UnsupportedGeometryError):
                # Geometry-type errors are deterministic user input issues.
                # Fail fast so they do not become a misleading
                # "No valid data found" downstream.
                raise r
            geom_failures += 1
            if first_error is None:
                first_error = r
            logger.error("Geometry processing failed: %s", r)
        else:
            band_results, geom_id, failed_band_codes, band_first_error = r
            results.append((band_results, geom_id))
            if failed_band_codes:
                partial_band_failures.append(
                    (geom_id, failed_band_codes, band_first_error)
                )

    if geom_progress is not None:
        geom_progress.close()

    if not results and first_error is not None:
        raise RuntimeError(
            f"All geometry reads failed for record_id='{self.id}'. "
            "See the chained exception for the first failure."
        ) from first_error
    if results and (geom_failures or partial_band_failures):
        parts = []
        if geom_failures:
            parts.append(f"{geom_failures}/{n_geoms} geometry task(s) failed")
        if partial_band_failures:
            n_failed_bands = sum(
                len(bands) for _gid, bands, _err in partial_band_failures
            )
            parts.append(
                f"{n_failed_bands} band read(s) failed across {len(partial_band_failures)} geometries"
            )

        first_detail = None
        if geom_failures and first_error is not None:
            first_detail = f"first geometry failure: {first_error}"
        elif partial_band_failures:
            gid, bands, err = partial_band_failures[0]
            band = bands[0] if bands else "<unknown>"
            if err is not None:
                first_detail = f"first band failure: geometry_index={gid}, band='{band}': {err}"
            else:
                first_detail = (
                    f"first band failure: geometry_index={gid}, band='{band}'"
                )

        msg = (
            f"Partial read failures for record_id='{self.id}': "
            + "; ".join(parts)
            + (f"; {first_detail}" if first_detail else "")
            + "."
        )
        warnings.warn(msg, RuntimeWarning, stacklevel=2)

    # Process results
    if for_numpy:
        return results
    if for_xarray:
        return self._merge_xarray_results(results, target_crs=target_crs)
    else:
        return self._merge_geodataframe_results(
            results,
            geometries,
            geometry_crs=geometry_crs,
            target_crs=target_crs,
        )

Functions