Source code for exactextract.writer

import copy
import os
from typing import Mapping, Optional, Tuple

from ._exactextract import Writer as _Writer
from .feature import GDALFeature, JSONFeature, QGISFeature

__all__ = ["Writer", "JSONWriter", "PandasWriter", "QGISWriter", "GDALWriter"]


[docs] class Writer(_Writer): """Writes the results of summary operations to a desired format""" def __init__(self): super().__init__()
[docs] class JSONWriter(Writer): """ Creates GeoJSON-like features """
[docs] def __init__( self, *, array_type: str = "numpy", map_fields: Optional[Mapping[str, Tuple[str]]] = None ): """ Args: array_type: type that should be used to represent array outputs. either "numpy" (default), "list", or "set" map_fields: an optional dictionary of fields to be created by interpreting one field as keys and another as values, in the format ``{ dst_field : (src_keys, src_vals) }``. for example, the fields "values" and "frac" would be combined into a field called "frac_map" using ``map_fields = {"frac_map": ("values", "frac")}``. """ super().__init__() if array_type not in ("numpy", "list", "set"): raise ValueError("Unsupported array_type: " + array_type) self.array_type = array_type self.feature_list = [] self.map_fields = map_fields or {} self.ops = [] self.remove_temporary_fields = False
[docs] def add_operation(self, op): self.ops.append(op) if op.name.startswith("@delete@"): self.remove_temporary_fields = True
[docs] def write(self, feature): f = JSONFeature() feature.copy_to(f) for op in self.ops: if op.name not in f.feature["properties"]: f.feature["properties"][op.name] = None self._create_map_fields(f) self._convert_arrays(f) self._remove_temporary_fields(f) self.feature_list.append(f.feature)
[docs] def features(self): return self.feature_list
def _convert_arrays(self, f): props = f.feature["properties"] if self.array_type == "list": import numpy as np for k in props: if type(props[k]) is np.ndarray: props[k] = list(props[k]) elif self.array_type == "set": import numpy as np for k in props: if type(props[k]) is np.ndarray: props[k] = set(props[k]) def _fields_for_stat(self, stat): return [o.name for o in self.ops if o.stat == stat] def _create_map_fields(self, f): props = f.feature["properties"] new_fields = {} for field in self.map_fields: key_stat, val_stat = self.map_fields[field] key_fields = self._fields_for_stat(key_stat) val_fields = self._fields_for_stat(val_stat) if not val_fields: continue assert len(key_fields) == len(val_fields) for key_field, val_field in zip(key_fields, val_fields): new_field = val_field.replace(val_stat, field) assert len(props[key_field]) == len(props[val_field]) new_fields[new_field] = { k: v for k, v in zip(props[key_field], props[val_field]) } props.update(new_fields) def _remove_temporary_fields(self, f): if self.remove_temporary_fields: for field in list(f.feature["properties"]): if field.startswith("@delete@"): del f.feature["properties"][field]
[docs] class PandasWriter(Writer): """Creates a (Geo)Pandas DataFrame""" def __init__(self, *, srs_wkt=None): super().__init__() self.fields = {} self.geoms = [] self.srs_wkt = srs_wkt
[docs] def add_operation(self, op): self.fields[op.name] = []
[docs] def add_column(self, col_name): self.fields[col_name] = []
[docs] def add_geometry(self): self.fields["geometry"] = []
[docs] def write(self, feature): f = JSONFeature() feature.copy_to(f) props = f.feature["properties"] for field in self.fields: if field == "geometry" and "geometry" in f.feature: import shapely self.fields["geometry"].append( shapely.geometry.shape(f.feature["geometry"]) ) elif field == "id" and "id" in f.feature: self.fields["id"].append(f.feature["id"]) elif field in props: self.fields[field].append(props[field]) else: self.fields[field].append(None)
[docs] def features(self): if "geometry" in self.fields: import geopandas as gpd return gpd.GeoDataFrame(self.fields, geometry="geometry", crs=self.srs_wkt) else: import pandas as pd return pd.DataFrame(self.fields, copy=False)
[docs] class QGISWriter(Writer): """Creates QGSVectorLayer""" def __init__(self, *, srs_wkt=None): super().__init__() from qgis.core import QgsCoordinateReferenceSystem self.fields = [] self.feats = [] self.geom_type = "unknown" self.crs = QgsCoordinateReferenceSystem() self.crs.createFromWkt(srs_wkt) self.vector_layer = None
[docs] def add_operation(self, op): self.fields.append(op.name)
[docs] def add_column(self, col_name): self.fields.append(col_name)
[docs] def write(self, feature): if not self.vector_layer: # test every property vs fields type and set correct type tmp_feature = JSONFeature() feature.copy_to(tmp_feature) self.fields = self._set_fields_types(self.fields, tmp_feature) self.fields = self._convert_to_QgsFields(self.fields) self.geom_type = self._get_geometry_type(tmp_feature) self.vector_layer, self.data_provider = self._create_vector_layer() qgis_feature = QGISFeature(fields=self.fields) feature.copy_to(qgis_feature) self.data_provider.addFeature(qgis_feature.feature)
[docs] def features(self): self.vector_layer.updateExtents() return self.vector_layer
def _create_vector_layer(self): """ Creates a vector layer with the defined geometry type, CRS and fields. Returns the created layer as well as its data provider. Returns: Tuple[QgsVectorLayer, QgsVectorDataProvider]: initialized vector layer, data provider for created vector layer """ from qgis.core import QgsVectorLayer if self.geom_type == "unknown": raise TypeError("Unknown geometry type!") vector_layer = QgsVectorLayer( self.geom_type + "?crs=" + self.crs.authid(), "temporary_layer", "memory" ) data_provider = vector_layer.dataProvider() # add fields data_provider.addAttributes(self.fields) vector_layer.updateFields() # tell the vector layer to fetch changes from the provider return vector_layer, data_provider @staticmethod def _get_geometry_type(feature: JSONFeature) -> str: """ get geometry type of an input feature geometry Args: feature (JSONFeature): feature with geometry field Returns: str: describes geometry type (point, linestring, polygon, unknown) """ if "geometry" in feature.feature: geom_type = feature.feature["geometry"]["type"].replace("Multi", "").lower() if geom_type in {"point", "linestring", "polygon"}: return geom_type else: return "unknown" @staticmethod def _convert_to_QgsFields(fields_list: list): """ Converts a list with `QgsField` into a `QgsFields` object Args: fields_list (list): list with `QgsField` Returns: QgsFields: `QgsFields` object with the same fields as in input list """ from qgis.core import QgsFields result_fields = QgsFields() for qgs_field in fields_list: result_fields.append(qgs_field) return result_fields @staticmethod def _set_fields_types(fields_list: list[str], feature: JSONFeature) -> list: """ Sets field types based on provided data. If no type is specified it will be set as string by default with type_name set as "Unknown". Args: fields_list (list[str]): list with fields names feature (JSONFeature): feature with values that will be tested Returns: list: list with `QgsField` """ import numpy from qgis.core import QgsField from qgis.PyQt.QtCore import QVariant mod_fields_list = [] for field_name in fields_list: try: value = feature.get(field_name) except KeyError: value = None if type(value) is str: field_type = QVariant.String type_name = "String" elif type(value) is float: field_type = QVariant.Double type_name = "Double" elif type(value) is int: field_type = QVariant.Int type_name = "Int" elif type(value) is numpy.ndarray: field_type = QVariant.String type_name = "Array" else: field_type = QVariant.String type_name = "Unknown" mod_fields_list.append(QgsField(field_name, field_type, type_name)) return mod_fields_list
[docs] class GDALWriter(Writer): """Writes results using GDAL/OGR"""
[docs] def __init__( self, dataset=None, *, filename=None, driver=None, layer_name="", srs_wkt=None ): """ Args: dataset: a ``gdal.Dataset`` or ``ogr.DataSource`` to which results should be created in a new layer filename: file to write results to, if ``dataset`` is ``None`` driver: driver to use when creating ``filename`` layer_name: name of new layer to create in output dataset srs_wkt: spatial reference system to assign to output dataset. No coordinate transformation will be performed. """ super().__init__() from osgeo import gdal, ogr if dataset is not None: assert isinstance(dataset, gdal.Dataset) or isinstance( dataset, ogr.DataSource ) self.ds = dataset elif isinstance(filename, (str, os.PathLike)): from osgeo_utils.auxiliary.util import GetOutputDriverFor if driver is None: driver = GetOutputDriverFor(filename, is_raster=False) ogr_drv = ogr.GetDriverByName(driver) self.ds = ogr_drv.CreateDataSource(str(filename)) else: raise ValueError("Unhandled output type.") self.layer_name = layer_name self.prototype = {"type": "Feature", "properties": {}} self.srs_wkt = srs_wkt self.lyr = None
[docs] def add_operation(self, op): # Create a prototype feature so that field names # match the order they are specified in input # operations. self.prototype["properties"][op.name] = None
[docs] def add_column(self, col_name): self.prototype["properties"][col_name] = None
[docs] def write(self, feature): from osgeo import ogr, osr if self.lyr is None: f = JSONFeature(copy.deepcopy(self.prototype)) feature.copy_to(f) fields = self._collect_fields(f) srs = osr.SpatialReference() if self.srs_wkt: srs.ImportFromWkt(self.srs_wkt) self.lyr = self.ds.CreateLayer(self.layer_name, srs) for field_def in fields.values(): self.lyr.CreateField(field_def) ogr_feature = ogr.Feature(self.lyr.GetLayerDefn()) feature.copy_to(GDALFeature(ogr_feature)) self.lyr.CreateFeature(ogr_feature)
[docs] def features(self): return None
@staticmethod def _collect_fields(feature): import numpy as np from osgeo import ogr ogr_fields = {} for field_name in feature.fields(): if field_name not in ogr_fields: field_type = None value = feature.get(field_name) if type(value) is float: field_type = ogr.OFTReal elif type(value) is int: field_type = ogr.OFTInteger elif type(value) is np.ndarray: if value.dtype == np.int64: field_type = ogr.OFTInteger64List elif value.dtype == np.int32: field_type = ogr.OFTIntegerList elif value.dtype == np.float64: field_type = ogr.OFTRealList else: field_type = ogr.OFTString ogr_fields[field_name] = ogr.FieldDefn(field_name, field_type) return ogr_fields