Joshua Sundance Bailey
commited on
Commit
·
e9d1bce
1
Parent(s):
9d62f11
not working
Browse files
geospatial-data-converter/kml_tricks.py
CHANGED
@@ -1,19 +1,24 @@
|
|
1 |
import zipfile
|
|
|
2 |
from typing import Any
|
3 |
|
4 |
import bs4
|
5 |
-
import fiona
|
6 |
import geopandas as gpd
|
|
|
7 |
import pandas as pd
|
8 |
|
9 |
-
fiona.drvsupport.supported_drivers["KML"] = "rw"
|
10 |
-
|
11 |
|
12 |
def parse_description_to_gdf(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
|
13 |
-
|
14 |
-
|
15 |
-
|
16 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
17 |
|
18 |
for df in parsed_dataframes:
|
19 |
df.columns = df.iloc[0]
|
@@ -34,12 +39,18 @@ def read_kml_file(path: str) -> Any:
|
|
34 |
"KMZ contains more than one KML. Extract or convert to multiple KMLs.",
|
35 |
)
|
36 |
|
37 |
-
return gpd.read_file(
|
|
|
|
|
|
|
|
|
38 |
|
39 |
|
40 |
def parse_file_to_gdf(path: str) -> gpd.GeoDataFrame:
|
41 |
if path.endswith(".kml"):
|
42 |
-
return parse_description_to_gdf(
|
|
|
|
|
43 |
|
44 |
if path.endswith(".kmz"):
|
45 |
return parse_description_to_gdf(read_kml_file(path))
|
@@ -48,7 +59,7 @@ def parse_file_to_gdf(path: str) -> gpd.GeoDataFrame:
|
|
48 |
|
49 |
|
50 |
def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame:
|
51 |
-
soup = bs4.BeautifulSoup(kml_code, "
|
52 |
rows = soup.find_all("schemadata")
|
53 |
|
54 |
data = (
|
@@ -61,7 +72,6 @@ def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame:
|
|
61 |
|
62 |
def extract_kml_from_file(file_path: str) -> str:
|
63 |
file_extension = file_path.lower().split(".")[-1]
|
64 |
-
kml_files = None
|
65 |
|
66 |
if file_extension == "kml":
|
67 |
with open(file_path, "r") as kml:
|
@@ -70,28 +80,28 @@ def extract_kml_from_file(file_path: str) -> str:
|
|
70 |
if file_extension == "kmz":
|
71 |
with zipfile.ZipFile(file_path) as kmz:
|
72 |
kml_files = [f for f in kmz.namelist() if f.lower().endswith(".kml")]
|
73 |
-
|
74 |
-
|
75 |
-
|
76 |
-
|
77 |
-
|
78 |
-
|
79 |
|
80 |
raise ValueError("File path must end with .kml or .kmz")
|
81 |
|
82 |
|
83 |
-
def extract_data_from_file(file_path: str) ->
|
84 |
df = extract_data_from_kml_code(extract_kml_from_file(file_path))
|
85 |
|
86 |
if file_path.endswith(".kmz"):
|
87 |
file_gdf = read_kml_file(file_path)
|
88 |
else:
|
89 |
-
file_gdf = gpd.read_file(file_path, driver="KML")
|
90 |
|
91 |
return gpd.GeoDataFrame(df, geometry=file_gdf["geometry"], crs=file_gdf.crs)
|
92 |
|
93 |
|
94 |
-
def read_ge_file(file_path: str) ->
|
95 |
try:
|
96 |
return parse_file_to_gdf(file_path)
|
97 |
except (pd.errors.ParserError, ValueError):
|
|
|
1 |
import zipfile
|
2 |
+
from io import StringIO
|
3 |
from typing import Any
|
4 |
|
5 |
import bs4
|
|
|
6 |
import geopandas as gpd
|
7 |
+
import lxml # nosec
|
8 |
import pandas as pd
|
9 |
|
|
|
|
|
10 |
|
11 |
def parse_description_to_gdf(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
|
12 |
+
def _gen():
|
13 |
+
for desc in gdf["Description"]:
|
14 |
+
try:
|
15 |
+
html_df = pd.read_html(StringIO(desc), flavor="lxml")
|
16 |
+
yield html_df[-1].T
|
17 |
+
except (lxml.etree.ParserError, lxml.etree.XMLSyntaxError) as e:
|
18 |
+
print(desc)
|
19 |
+
raise pd.errors.ParserError from e
|
20 |
+
|
21 |
+
parsed_dataframes = list(_gen())
|
22 |
|
23 |
for df in parsed_dataframes:
|
24 |
df.columns = df.iloc[0]
|
|
|
39 |
"KMZ contains more than one KML. Extract or convert to multiple KMLs.",
|
40 |
)
|
41 |
|
42 |
+
return gpd.read_file(
|
43 |
+
f"zip://{path}\\{kml_files[0]}",
|
44 |
+
driver="KML",
|
45 |
+
engine="pyogrio",
|
46 |
+
)
|
47 |
|
48 |
|
49 |
def parse_file_to_gdf(path: str) -> gpd.GeoDataFrame:
|
50 |
if path.endswith(".kml"):
|
51 |
+
return parse_description_to_gdf(
|
52 |
+
gpd.read_file(path, driver="KML", engine="pyogrio"),
|
53 |
+
)
|
54 |
|
55 |
if path.endswith(".kmz"):
|
56 |
return parse_description_to_gdf(read_kml_file(path))
|
|
|
59 |
|
60 |
|
61 |
def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame:
|
62 |
+
soup = bs4.BeautifulSoup(kml_code, features="xml")
|
63 |
rows = soup.find_all("schemadata")
|
64 |
|
65 |
data = (
|
|
|
72 |
|
73 |
def extract_kml_from_file(file_path: str) -> str:
|
74 |
file_extension = file_path.lower().split(".")[-1]
|
|
|
75 |
|
76 |
if file_extension == "kml":
|
77 |
with open(file_path, "r") as kml:
|
|
|
80 |
if file_extension == "kmz":
|
81 |
with zipfile.ZipFile(file_path) as kmz:
|
82 |
kml_files = [f for f in kmz.namelist() if f.lower().endswith(".kml")]
|
83 |
+
if len(kml_files) != 1:
|
84 |
+
raise IndexError(
|
85 |
+
"KMZ contains more than one KML. Extract or convert to multiple KMLs.",
|
86 |
+
)
|
87 |
+
with kmz.open(kml_files[0]) as kml:
|
88 |
+
return kml.read().decode()
|
89 |
|
90 |
raise ValueError("File path must end with .kml or .kmz")
|
91 |
|
92 |
|
93 |
+
def extract_data_from_file(file_path: str) -> gpd.GeoDataFrame:
|
94 |
df = extract_data_from_kml_code(extract_kml_from_file(file_path))
|
95 |
|
96 |
if file_path.endswith(".kmz"):
|
97 |
file_gdf = read_kml_file(file_path)
|
98 |
else:
|
99 |
+
file_gdf = gpd.read_file(file_path, driver="KML", engine="pyogrio")
|
100 |
|
101 |
return gpd.GeoDataFrame(df, geometry=file_gdf["geometry"], crs=file_gdf.crs)
|
102 |
|
103 |
|
104 |
+
def read_ge_file(file_path: str) -> gpd.GeoDataFrame:
|
105 |
try:
|
106 |
return parse_file_to_gdf(file_path)
|
107 |
except (pd.errors.ParserError, ValueError):
|
geospatial-data-converter/utils.py
CHANGED
@@ -6,6 +6,8 @@ from typing import BinaryIO
|
|
6 |
|
7 |
import geopandas as gpd
|
8 |
|
|
|
|
|
9 |
output_format_dict = {
|
10 |
"ESRI Shapefile": ("shp", "zip", "application/zip"), # must be zipped
|
11 |
"OpenFileGDB": ("gdb", "zip", "application/zip"), # must be zipped
|
@@ -17,7 +19,9 @@ output_format_dict = {
|
|
17 |
|
18 |
def read_file(file: BinaryIO, *args, **kwargs) -> gpd.GeoDataFrame:
|
19 |
"""Read a file and return a GeoDataFrame"""
|
20 |
-
|
|
|
|
|
21 |
with TemporaryDirectory() as tmp_dir:
|
22 |
tmp_file_path = os.path.join(tmp_dir, file.name)
|
23 |
with open(tmp_file_path, "wb") as tmp_file:
|
@@ -28,6 +32,12 @@ def read_file(file: BinaryIO, *args, **kwargs) -> gpd.GeoDataFrame:
|
|
28 |
engine="pyogrio",
|
29 |
**kwargs,
|
30 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
31 |
return gpd.read_file(file, *args, engine="pyogrio", **kwargs)
|
32 |
|
33 |
|
|
|
6 |
|
7 |
import geopandas as gpd
|
8 |
|
9 |
+
from kml_tricks import read_ge_file
|
10 |
+
|
11 |
output_format_dict = {
|
12 |
"ESRI Shapefile": ("shp", "zip", "application/zip"), # must be zipped
|
13 |
"OpenFileGDB": ("gdb", "zip", "application/zip"), # must be zipped
|
|
|
19 |
|
20 |
def read_file(file: BinaryIO, *args, **kwargs) -> gpd.GeoDataFrame:
|
21 |
"""Read a file and return a GeoDataFrame"""
|
22 |
+
basename, ext = os.path.splitext(os.path.basename(file.name))
|
23 |
+
ext = ext.lower().strip(".")
|
24 |
+
if ext == "zip":
|
25 |
with TemporaryDirectory() as tmp_dir:
|
26 |
tmp_file_path = os.path.join(tmp_dir, file.name)
|
27 |
with open(tmp_file_path, "wb") as tmp_file:
|
|
|
32 |
engine="pyogrio",
|
33 |
**kwargs,
|
34 |
)
|
35 |
+
elif ext in ("kml", "kmz"):
|
36 |
+
with TemporaryDirectory() as tmp_dir:
|
37 |
+
tmp_file_path = os.path.join(tmp_dir, file.name)
|
38 |
+
with open(tmp_file_path, "wb") as tmp_file:
|
39 |
+
tmp_file.write(file.read())
|
40 |
+
return read_ge_file(tmp_file_path)
|
41 |
return gpd.read_file(file, *args, engine="pyogrio", **kwargs)
|
42 |
|
43 |
|