Merge branch 'main' into main

This commit is contained in:
Marcelo de Oliveira Rosa Prates
2021-09-13 16:41:31 -03:00
committed by GitHub
103 changed files with 22027 additions and 53 deletions

View File

@@ -1,15 +1,34 @@
from ast import Dict
from functools import reduce
from tokenize import Number, String
from typing import Optional, Union, Tuple
from xmlrpc.client import Boolean
import osmnx as ox
from osmnx import utils_geo
import numpy as np
from shapely.geometry import Point, Polygon, MultiPolygon, MultiLineString
from shapely.geometry import Point, Polygon, MultiPolygon, LineString, MultiLineString
from shapely.ops import unary_union
from geopandas import GeoDataFrame, read_file
# Compute circular or square boundary given point, radius and crs
def get_boundary(point, radius, crs, circle=True, dilate=0):
def get_boundary(
point: Tuple, radius: float, crs: String, circle: Boolean = True, dilate: float = 0
) -> Polygon:
"""
Compute circular or square boundary given point, radius and crs.
Args:
point (Tuple): GPS coordinates
radius (Number): radius in meters
crs (String): Coordinate Reference System
circle (bool, optional): Whether to use a circular (True) or square (False) boundary. Defaults to True.
dilate (int, optional): Dilate the boundary by this much, in meters. Defaults to 0.
Returns:
Polygon: a shapely Polygon representing the boundary
"""
if circle:
return (
ox.project_gdf(GeoDataFrame(geometry=[Point(point[::-1])], crs=crs))
@@ -28,8 +47,17 @@ def get_boundary(point, radius, crs, circle=True, dilate=0):
).buffer(dilate)
# Get perimeter
def get_perimeter(query, by_osmid=False, **kwargs):
def get_perimeter(query, by_osmid: Boolean = False, **kwargs) -> GeoDataFrame:
"""
Fetch perimeter given query
Args:
query (String): Query for the perimeter to be fetched (for example, "France")
by_osmid (bool, optional): Whether to fetch perimeter by OSM Id. Defaults to False.
Returns:
GeoDataFrame: GeoDataFrame representation of the perimeter
"""
return ox.geocode_to_gdf(
query,
by_osmid=by_osmid,
@@ -38,25 +66,39 @@ def get_perimeter(query, by_osmid=False, **kwargs):
)
# Get coastline
def get_coast(perimeter = None, point = None, radius = None, perimeter_tolerance = 0, union = True, buffer = 0, circle = True, dilate = 0, file_location = None):
def get_coast(
perimeter=None,
point=None,
radius=None,
perimeter_tolerance=0,
union=True,
buffer=0,
circle=True,
dilate=0,
file_location=None,
):
if perimeter is not None:
# Boundary defined by polygon (perimeter)
bbox=perimeter.to_crs(3174)
bbox=bbox.buffer(perimeter_tolerance+dilate+buffer)
bbox=bbox.to_crs(4326)
bbox=bbox.envelope
bbox = perimeter.to_crs(3174)
bbox = bbox.buffer(perimeter_tolerance + dilate + buffer)
bbox = bbox.to_crs(4326)
bbox = bbox.envelope
# Load the polygons for the coastline from a file
geometries = read_file(file_location, bbox=bbox)
perimeter = unary_union(ox.project_gdf(perimeter).geometry)
elif (point is not None) and (radius is not None):
# Boundary defined by circle with radius 'radius' around point
north,south,west,east=utils_geo.bbox_from_point(point, dist=radius+dilate+buffer)
bbox=(west,south,east,north)
north, south, west, east = utils_geo.bbox_from_point(
point, dist=radius + dilate + buffer
)
bbox = (west, south, east, north)
# Load the polygons for the coastline from a file
geometries=read_file(file_location, bbox=bbox)
perimeter = get_boundary(point, radius, geometries.crs, circle = circle, dilate = dilate)
geometries = read_file(file_location, bbox=bbox)
perimeter = get_boundary(
point, radius, geometries.crs, circle=circle, dilate=dilate
)
# Project GDF
if len(geometries) > 0:
@@ -66,30 +108,58 @@ def get_coast(perimeter = None, point = None, radius = None, perimeter_tolerance
geometries = geometries.intersection(perimeter)
if union:
geometries = unary_union(reduce(lambda x,y: x+y, [
[x] if type(x) == Polygon else list(x)
for x in geometries if type(x) in [Polygon, MultiPolygon]
], []))
geometries = unary_union(
reduce(
lambda x, y: x + y,
[
[x] if type(x) == Polygon else list(x)
for x in geometries
if type(x) in [Polygon, MultiPolygon]
],
[],
)
)
else:
geometries = MultiPolygon(reduce(lambda x,y: x+y, [
[x] if type(x) == Polygon else list(x)
for x in geometries if type(x) in [Polygon, MultiPolygon]
], []))
geometries = MultiPolygon(
reduce(
lambda x, y: x + y,
[
[x] if type(x) == Polygon else list(x)
for x in geometries
if type(x) in [Polygon, MultiPolygon]
],
[],
)
)
return geometries
# Get geometries
def get_geometries(
perimeter=None,
point=None,
radius=None,
tags={},
perimeter_tolerance=0,
union=True,
buffer=0,
circle=True,
dilate=0,
):
perimeter: Optional[GeoDataFrame] = None,
point: Optional[Tuple] = None,
radius: Optional[float] = None,
tags: Dict = {},
perimeter_tolerance: float = 0,
union: Boolean = True,
circle: Boolean = True,
dilate: float = 0,
) -> Union[Polygon, MultiPolygon]:
"""Get geometries
Args:
perimeter (Optional[GeoDataFrame], optional): Perimeter from within geometries will be fetched. Defaults to None.
point (Optional[Tuple], optional): GPS coordinates. Defaults to None.
radius (Optional[Number], optional): Radius in meters. Defaults to None.
tags (Dict, optional): OpenStreetMap tags for the geometries to be fetched. Defaults to {}.
perimeter_tolerance (Number, optional): Tolerance in meters for fetching geometries that fall outside the perimeter. Defaults to 0.
union (Boolean, optional): Whether to compute the union of all geometries. Defaults to True.
circle (Boolean, optional): Whether to fetch geometries in a circular (True) or square (False) shape. Defaults to True.
dilate (Number, optional): Dilate the boundary by this much in meters. Defaults to 0.
Returns:
[type]: [description]
"""
if perimeter is not None:
# Boundary defined by polygon (perimeter)
@@ -120,7 +190,7 @@ def get_geometries(
geometries = geometries.intersection(perimeter).buffer(0)
if union:
geometries = unary_union(
polys = unary_union(
reduce(
lambda x, y: x + y,
[
@@ -131,6 +201,18 @@ def get_geometries(
[],
)
)
points = unary_union([
x for x in geometries
if isinstance(x, Point)
]).buffer(2)
lines = unary_union([
x for x in geometries
if isinstance(x, LineString)
]).buffer(3)
geometries = unary_union([polys, points, lines])
else:
geometries = MultiPolygon(
reduce(
@@ -147,19 +229,36 @@ def get_geometries(
return geometries
# Get streets
def get_streets(
perimeter=None,
point=None,
radius=None,
layer="streets",
width=6,
custom_filter=None,
buffer=0,
retain_all=False,
circle=True,
dilate=0,
):
perimeter: Optional[GeoDataFrame] = None,
point: Optional[Tuple] = None,
radius: Optional[float] = None,
layer: String = "streets",
width: float = 6,
custom_filter: Optional[str] = None,
buffer: float = 0,
retain_all: Boolean = False,
circle: Boolean = True,
dilate: float = 0,
) -> MultiPolygon:
"""
Get streets
Args:
perimeter (Optional[GeoDataFrame], optional): [description]. Defaults to None.
point (Optional[Tuple], optional): [description]. Defaults to None.
radius (Optional[Number], optional): [description]. Defaults to None.
layer (String, optional): [description]. Defaults to "streets".
width (Number, optional): [description]. Defaults to 6.
custom_filter (Optional[String], optional): [description]. Defaults to None.
buffer (Number, optional): [description]. Defaults to 0.
retain_all (Boolean, optional): [description]. Defaults to False.
circle (Boolean, optional): [description]. Defaults to True.
dilate (Number, optional): [description]. Defaults to 0.
Returns:
MultiPolygon: [description]
"""
if layer == "streets":
layer = "highway"
@@ -229,8 +328,18 @@ def get_streets(
return streets
# Get any layer
def get_layer(layer, **kwargs):
def get_layer(layer: String, **kwargs) -> Union[Polygon, MultiPolygon]:
"""[summary]
Args:
layer (String): [description]
Raises:
Exception: [description]
Returns:
Union[Polygon, MultiPolygon]: [description]
"""
# Fetch perimeter
if layer == "perimeter":
# If perimeter is already provided:
@@ -249,10 +358,10 @@ def get_layer(layer, **kwargs):
else:
raise Exception("Either 'perimeter' or 'point' & 'radius' must be provided")
# Fetch streets or railway
if layer in ['streets', 'railway', 'waterway']:
if layer in ["streets", "railway", "waterway"]:
return get_streets(**kwargs, layer=layer)
# Fetch Coastline
elif layer == 'coastline':
elif layer == "coastline":
return get_coast(**kwargs)
# Fetch geometries
else: