async def parse_cog_header(self, url: str) -> CogMetadata | None:
"""Parse COG header from URL."""
try:
# Read initial header bytes
header_bytes = await self._fetch_byte_range(url, 0, 16)
# Check byte order
big_endian = header_bytes[0:2] == b"MM"
endian = ">" if big_endian else "<"
# Parse version and IFD offset
version = struct.unpack(f"{endian}H", header_bytes[2:4])[0]
if version == 42:
ifd_offset = struct.unpack(f"{endian}L", header_bytes[4:8])[0]
entry_size = 12
offset_size = 4
elif version == 43:
ifd_offset = struct.unpack(f"{endian}Q", header_bytes[8:16])[0]
entry_size = 20
offset_size = 8
else:
raise ValueError(f"Unsupported TIFF version: {version}")
# Read IFD entries
ifd_count_size = 2 if version == 42 else 8
ifd_count_bytes = await self._fetch_byte_range(
url, ifd_offset, ifd_count_size
)
entry_count = (
struct.unpack(f"{endian}H", ifd_count_bytes)[0]
if version == 42
else struct.unpack(f"{endian}Q", ifd_count_bytes)[0]
)
ifd_bytes = await self._fetch_byte_range(
url, ifd_offset + ifd_count_size, entry_count * entry_size
)
# Parse tags
tags = {}
for i in range(entry_count):
entry = ifd_bytes[i * entry_size : (i + 1) * entry_size]
tag = struct.unpack(f"{endian}H", entry[0:2])[0]
type_id = struct.unpack(f"{endian}H", entry[2:4])[0]
if version == 42:
count = struct.unpack(f"{endian}L", entry[4:8])[0]
value_or_offset = entry[8:12]
else:
count = struct.unpack(f"{endian}Q", entry[4:12])[0]
value_or_offset = entry[12:20]
tags[tag] = await self._parse_tiff_tag_value(
url,
tag,
type_id,
int(count),
value_or_offset,
endian,
offset_size=offset_size,
)
# Extract essential metadata
image_width = tags.get(TAG_IMAGE_WIDTH)[0]
image_height = tags.get(TAG_IMAGE_LENGTH)[0]
tile_width = tags.get(TAG_TILE_WIDTH, [image_width])[0]
tile_height = tags.get(TAG_TILE_LENGTH, [image_height])[0]
compression = tags.get(TAG_COMPRESSION, (1,))[0]
predictor = tags.get(TAG_PREDICTOR, (1,))[0]
# Data type
sample_format = tags.get(TAG_SAMPLE_FORMAT, (1,))[0]
bits_per_sample = tags.get(TAG_BITS_PER_SAMPLE, (8,))[0]
dtype_key = (sample_format, bits_per_sample)
dtype = self.dtype_map.get(dtype_key)
if dtype is None:
raise NotImplementedError(
"Unsupported TIFF dtype: "
f"SampleFormat={sample_format}, BitsPerSample={bits_per_sample}"
)
# Band/sample layout
samples_per_pixel = tags.get(TAG_SAMPLES_PER_PIXEL, (1,))[0]
planar_configuration = tags.get(TAG_PLANAR_CONFIGURATION, (1,))[0]
photometric = tags.get(TAG_PHOTOMETRIC, (None,))[0]
extra_samples = tags.get(TAG_EXTRA_SAMPLES)
# GDAL nodata (ASCII string like "-128" or "nan")
nodata = None
raw_nodata = tags.get(TAG_GDAL_NODATA)
if raw_nodata is not None:
nodata_str = (
raw_nodata[0] if isinstance(raw_nodata, tuple) else raw_nodata
)
if isinstance(nodata_str, (bytes, bytearray)):
try:
nodata_str = nodata_str.decode("ascii", errors="ignore")
except (UnicodeDecodeError, AttributeError):
nodata_str = ""
if isinstance(nodata_str, str) and nodata_str:
nodata = _parse_nodata(nodata_str)
else:
# Some GeoTIFFs store nodata in GDALMetadata XML
# instead of the dedicated GDAL_NODATA tag.
raw_xml = tags.get(TAG_GDAL_METADATA)
xml_str = raw_xml[0] if isinstance(raw_xml, tuple) else raw_xml
if isinstance(xml_str, (bytes, bytearray)):
try:
xml_str = xml_str.decode("utf-8", errors="ignore")
except (UnicodeDecodeError, AttributeError):
xml_str = ""
if isinstance(xml_str, str) and xml_str.strip():
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_str)
for item in root.findall(".//Item"):
name = item.attrib.get("name", "")
if name in {"NODATA_VALUES", "NODATA_VALUE"}:
text = (item.text or "").strip()
if text:
# NODATA_VALUES may contain multiple values; take the first.
nodata = _parse_nodata(text.split()[0])
break
except ET.ParseError:
# Best-effort: ignore malformed GDALMetadata XML.
logger.debug(
"Malformed GDALMetadata XML, skipping nodata extraction"
)
# Tile layout (only present in tiled TIFFs/COGs)
tile_offsets = list(tags.get(TAG_TILE_OFFSETS, []))
tile_byte_counts = list(tags.get(TAG_TILE_BYTE_COUNTS, []))
if not tile_offsets or not tile_byte_counts:
raise NotImplementedError(
"Rasteret requires a tiled GeoTIFF/COG "
"(no TileOffsets/TileByteCounts)."
)
if compression == COMPRESSION_JPEG:
raise NotImplementedError(
"TIFF JPEG compression is not supported yet. "
"Some TIFFs also use shared JPEGTables (tag 347), which requires "
"concatenating the tables with each tile stream during decode."
)
# Geotransform
pixel_scale = tags.get(TAG_MODEL_PIXEL_SCALE)
tiepoint = tags.get(TAG_MODEL_TIEPOINT)
model_transform = tags.get(TAG_MODEL_TRANSFORM)
# Calculate transform
transform = None
if model_transform:
# GeoTIFF ModelTransformationTag is a 4x4 matrix mapping raster
# coordinates (col, row, z, 1) to model space (x, y, z, 1).
#
# Rasteret supports axis-aligned transforms only (north-up or
# south-up, no rotation/shear). For those cases:
# x = m00 * col + m03
# y = m11 * row + m13
try:
values = list(model_transform)
if len(values) != 16:
raise ValueError(
f"ModelTransformationTag must have 16 values, got {len(values)}"
)
m00, m01, m02, m03 = values[0:4]
m10, m11, m12, m13 = values[4:8]
m20, m21, m22, m23 = values[8:12]
m30, m31, m32, m33 = values[12:16]
rotated = any(
abs(float(v)) > 1e-12
for v in (m01, m02, m10, m12, m20, m21, m23, m30, m31, m32)
)
if rotated or abs(float(m33) - 1.0) > 1e-12:
raise ValueError("rotated/sheared ModelTransformationTag")
transform = (
float(m00),
float(m03),
float(m11),
float(m13),
)
except (TypeError, ValueError) as exc:
raise NotImplementedError(
f"Unsupported ModelTransformationTag (rotation/shear): {exc}"
) from exc
elif pixel_scale and tiepoint:
scale_x = float(pixel_scale[0])
scale_y = -float(pixel_scale[1])
# ModelTiepointTag values are (I, J, K, X, Y, Z) tuples. The
# tiepoint may refer to a raster point (I, J) that is not the
# origin. Convert to the PixelIsArea-style affine where
# pixel (0, 0) maps to (translate_x, translate_y).
try:
n_tp = len(tiepoint)
except TypeError as exc:
raise ValueError(
"Invalid ModelTiepointTag: expected a sequence of values"
) from exc
if n_tp < 6 or n_tp % 6 != 0:
raise ValueError(
"Invalid ModelTiepointTag: expected 6*N values "
f"(I, J, K, X, Y, Z), got {n_tp}"
)
translate_x = None
translate_y = None
for idx in range(0, n_tp, 6):
i0 = float(tiepoint[idx + 0])
j0 = float(tiepoint[idx + 1])
x0 = float(tiepoint[idx + 3])
y0 = float(tiepoint[idx + 4])
tx = x0 - i0 * scale_x
ty = y0 - j0 * scale_y
if translate_x is None:
translate_x, translate_y = tx, ty
else:
assert translate_y is not None
# Multiple tiepoints must imply the same origin for an
# axis-aligned PixelScale+Tiepoint transform.
if abs(tx - translate_x) > 1e-6 or abs(ty - translate_y) > 1e-6:
raise NotImplementedError(
"GeoTIFF ModelTiepointTag contains multiple tiepoints "
"that do not imply a single consistent origin. "
"Rasteret supports axis-aligned transforms only."
)
assert translate_x is not None and translate_y is not None
transform = (scale_x, translate_x, scale_y, translate_y)
else:
raise NotImplementedError(
"Missing GeoTIFF georeferencing tags (no ModelTransformationTag "
"and no ModelPixelScaleTag+ModelTiepointTag)."
)
# --- PixelIsPoint correction (GDAL RFC 33) ---
# When GTRasterTypeGeoKey == 2, the tiepoint references the pixel
# centre, not the upper-left corner. Shift the origin by half a
# pixel so that CogMetadata.transform is always PixelIsArea-based.
if transform is not None:
raster_type = get_raster_type_from_geokeys(tags)
if raster_type == 2: # RasterPixelIsPoint
scale_x, translate_x, scale_y, translate_y = transform
translate_x -= scale_x / 2
translate_y -= scale_y / 2
transform = (scale_x, translate_x, scale_y, translate_y)
logger.debug("Applied PixelIsPoint correction for %s", url)
crs = get_crs_from_tiff_tags(tags)
return CogMetadata(
width=image_width,
height=image_height,
tile_width=tile_width,
tile_height=tile_height,
dtype=dtype,
transform=transform,
predictor=predictor,
compression=compression,
tile_offsets=tile_offsets,
tile_byte_counts=tile_byte_counts,
crs=crs,
pixel_scale=pixel_scale,
tiepoint=tiepoint,
nodata=nodata,
samples_per_pixel=samples_per_pixel,
planar_configuration=planar_configuration,
photometric=photometric,
extra_samples=extra_samples,
)
except NotImplementedError:
raise
except (
struct.error,
KeyError,
IndexError,
TypeError,
ValueError,
IOError,
) as e:
logger.exception("Failed to parse header for %s: %s", url, e)
raise