Skip to content

Commit 5c6ba3f

Browse files
authored
Infer an http store for bare https urls (#575)
Follow up for #552. Also closes #492. Closes #551.
1 parent cff8093 commit 5c6ba3f

File tree

14 files changed

+387
-72
lines changed

14 files changed

+387
-72
lines changed

python/core/python/geoarrow/rust/core/_rust.pyi

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,8 +1466,8 @@ def read_flatgeobuf(
14661466
) -> GeoTable: ...
14671467
async def read_flatgeobuf_async(
14681468
path: str,
1469-
fs: ObjectStore,
14701469
*,
1470+
fs: Optional[ObjectStore] = None,
14711471
batch_size: int = 65536,
14721472
bbox: Tuple[float, float, float, float] | None = None,
14731473
) -> GeoTable: ...
@@ -1479,7 +1479,12 @@ def read_geojson_lines(
14791479
) -> GeoTable: ...
14801480
def read_ipc(file: Union[str, Path, BinaryIO]) -> GeoTable: ...
14811481
def read_ipc_stream(file: Union[str, Path, BinaryIO]) -> GeoTable: ...
1482-
def read_parquet(path: str, *, batch_size: int = 65536) -> GeoTable: ...
1482+
def read_parquet(
1483+
path: str, *, fs: Optional[ObjectStore] = None, batch_size: int = 65536
1484+
) -> GeoTable: ...
1485+
async def read_parquet_async(
1486+
path: str, *, fs: Optional[ObjectStore] = None, batch_size: int = 65536
1487+
) -> GeoTable: ...
14831488
def read_postgis(connection_url: str, sql: str) -> Optional[GeoTable]: ...
14841489
async def read_postgis_async(connection_url: str, sql: str) -> Optional[GeoTable]: ...
14851490
def read_pyogrio(

python/core/src/error.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@ use pyo3::prelude::*;
44
pub enum PyGeoArrowError {
55
GeoArrowError(geoarrow::error::GeoArrowError),
66
PyErr(PyErr),
7+
ObjectStoreError(object_store::Error),
8+
ObjectStorePathError(object_store::path::Error),
9+
UrlParseError(url::ParseError),
710
}
811

912
impl From<PyGeoArrowError> for PyErr {
1013
fn from(error: PyGeoArrowError) -> Self {
1114
match error {
1215
PyGeoArrowError::GeoArrowError(err) => PyException::new_err(err.to_string()),
1316
PyGeoArrowError::PyErr(err) => err,
17+
PyGeoArrowError::ObjectStoreError(err) => PyException::new_err(err.to_string()),
18+
PyGeoArrowError::ObjectStorePathError(err) => PyException::new_err(err.to_string()),
19+
PyGeoArrowError::UrlParseError(err) => PyException::new_err(err.to_string()),
1420
}
1521
}
1622
}
@@ -21,6 +27,24 @@ impl From<geoarrow::error::GeoArrowError> for PyGeoArrowError {
2127
}
2228
}
2329

30+
impl From<object_store::Error> for PyGeoArrowError {
31+
fn from(other: object_store::Error) -> Self {
32+
Self::ObjectStoreError(other)
33+
}
34+
}
35+
36+
impl From<object_store::path::Error> for PyGeoArrowError {
37+
fn from(other: object_store::path::Error) -> Self {
38+
Self::ObjectStorePathError(other)
39+
}
40+
}
41+
42+
impl From<url::ParseError> for PyGeoArrowError {
43+
fn from(other: url::ParseError) -> Self {
44+
Self::UrlParseError(other)
45+
}
46+
}
47+
2448
impl From<PyTypeError> for PyGeoArrowError {
2549
fn from(other: PyTypeError) -> Self {
2650
Self::PyErr((&other).into())

python/core/src/io/csv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::error::PyGeoArrowResult;
2-
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
2+
use crate::io::input::sync::{BinaryFileReader, BinaryFileWriter};
33
use crate::table::GeoTable;
44
use geoarrow::io::csv::read_csv as _read_csv;
55
use geoarrow::io::csv::write_csv as _write_csv;

python/core/src/io/flatgeobuf.rs

Lines changed: 94 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,56 @@
11
use crate::error::{PyGeoArrowError, PyGeoArrowResult};
2-
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
2+
use crate::io::input::sync::BinaryFileWriter;
3+
use crate::io::input::{construct_reader, FileReader};
34
use crate::io::object_store::PyObjectStore;
45
use crate::table::GeoTable;
56
use flatgeobuf::FgbWriterOptions;
67
use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async;
78
use geoarrow::io::flatgeobuf::write_flatgeobuf_with_options as _write_flatgeobuf;
89
use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions};
10+
use pyo3::exceptions::PyValueError;
911
use pyo3::prelude::*;
1012

11-
/// Read a FlatGeobuf file from a path on disk into a GeoTable.
13+
/// Read a FlatGeobuf file from a path on disk or a remote location into a GeoTable.
1214
///
1315
/// Example:
1416
///
15-
/// Reading a remote file on an S3 bucket.
17+
/// Reading from a local path:
1618
///
1719
/// ```py
18-
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf_async
20+
/// from geoarrow.rust.core import read_flatgeobuf
21+
/// table = read_flatgeobuf("path/to/file.fgb")
22+
/// ```
23+
///
24+
/// Reading from a Python file object:
25+
///
26+
/// ```py
27+
/// from geoarrow.rust.core import read_flatgeobuf
28+
///
29+
/// with open("path/to/file.fgb", "rb") as file:
30+
/// table = read_flatgeobuf(file)
31+
/// ```
32+
///
33+
/// Reading from an HTTP(S) url:
34+
///
35+
/// ```py
36+
/// from geoarrow.rust.core import read_flatgeobuf
37+
///
38+
/// url = "http://flatgeobuf.org/test/data/UScounties.fgb"
39+
/// table = read_flatgeobuf(url)
40+
/// ```
41+
///
42+
/// Reading from a remote file on an S3 bucket.
43+
///
44+
/// ```py
45+
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf
1946
///
2047
/// options = {
2148
/// "aws_access_key_id": "...",
2249
/// "aws_secret_access_key": "...",
2350
/// "aws_region": "..."
2451
/// }
2552
/// fs = ObjectStore('s3://bucket', options=options)
26-
/// table = read_flatgeobuf("path/in/bucket.fgb", fs)
53+
/// table = read_flatgeobuf("path/in/bucket.fgb", fs=fs)
2754
/// ```
2855
///
2956
/// Args:
@@ -47,80 +74,102 @@ pub fn read_flatgeobuf(
4774
batch_size: usize,
4875
bbox: Option<(f64, f64, f64, f64)>,
4976
) -> PyGeoArrowResult<GeoTable> {
50-
if let Some(fs) = fs {
51-
fs.rt.block_on(async move {
77+
let reader = construct_reader(py, file, fs)?;
78+
match reader {
79+
FileReader::Async(async_reader) => async_reader.runtime.block_on(async move {
5280
let options = FlatGeobufReaderOptions {
5381
batch_size: Some(batch_size),
5482
bbox,
5583
..Default::default()
5684
};
57-
let path = file.extract::<String>(py)?;
58-
let table = _read_flatgeobuf_async(fs.inner, path.into(), options)
85+
let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options)
5986
.await
6087
.map_err(PyGeoArrowError::GeoArrowError)?;
6188

6289
Ok(GeoTable(table))
63-
})
64-
} else {
65-
let mut reader = file.extract::<BinaryFileReader>(py)?;
66-
let options = FlatGeobufReaderOptions {
67-
batch_size: Some(batch_size),
68-
bbox,
69-
..Default::default()
70-
};
71-
let table = _read_flatgeobuf(&mut reader, options)?;
72-
Ok(GeoTable(table))
90+
}),
91+
FileReader::Sync(mut sync_reader) => {
92+
let options = FlatGeobufReaderOptions {
93+
batch_size: Some(batch_size),
94+
bbox,
95+
..Default::default()
96+
};
97+
let table = _read_flatgeobuf(&mut sync_reader, options)?;
98+
Ok(GeoTable(table))
99+
}
73100
}
74101
}
75102

76103
/// Read a FlatGeobuf file from a url into a GeoTable.
77104
///
78105
/// Example:
79106
///
80-
/// ```py
81-
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf_async
107+
/// Reading from an HTTP(S) url:
108+
///
109+
/// ```py
110+
/// from geoarrow.rust.core import read_flatgeobuf_async
111+
///
112+
/// url = "http://flatgeobuf.org/test/data/UScounties.fgb"
113+
/// table = await read_flatgeobuf_async(url)
114+
/// ```
82115
///
83-
/// options = {
84-
/// "aws_access_key_id": "...",
85-
/// "aws_secret_access_key": "...",
86-
/// }
87-
/// fs = ObjectStore('s3://bucket', options=options)
88-
/// table = await read_flatgeobuf_async("path/in/bucket.fgb", fs)
89-
/// ```
116+
/// Reading from an S3 bucket:
117+
///
118+
/// ```py
119+
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf_async
120+
///
121+
/// options = {
122+
/// "aws_access_key_id": "...",
123+
/// "aws_secret_access_key": "...",
124+
/// "aws_region": "..."
125+
/// }
126+
/// fs = ObjectStore('s3://bucket', options=options)
127+
/// table = await read_flatgeobuf_async("path/in/bucket.fgb", fs=fs)
128+
/// ```
90129
///
91130
/// Args:
92-
/// url: the url to a remote FlatGeobuf file
93-
/// fs: an ObjectStore instance for this url.
131+
/// path: the url or relative path to a remote FlatGeobuf file. If an argument is passed for
132+
/// `fs`, this should be a path fragment relative to the root passed to the `ObjectStore`
133+
/// constructor.
94134
///
95135
/// Other args:
136+
/// fs: an ObjectStore instance for this url. This is required for non-HTTP urls.
96137
/// batch_size: the number of rows to include in each internal batch of the table.
97138
/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
98139
/// `None`, no spatial filtering will be performed.
99140
///
100141
/// Returns:
101142
/// Table from FlatGeobuf file.
102143
#[pyfunction]
103-
#[pyo3(signature = (path, fs, *, batch_size=65536, bbox=None))]
144+
#[pyo3(signature = (path, *, fs=None, batch_size=65536, bbox=None))]
104145
pub fn read_flatgeobuf_async(
105146
py: Python,
106-
path: String,
107-
fs: PyObjectStore,
147+
path: PyObject,
148+
fs: Option<PyObjectStore>,
108149
batch_size: usize,
109150
bbox: Option<(f64, f64, f64, f64)>,
110151
) -> PyGeoArrowResult<PyObject> {
111-
let fut = pyo3_asyncio::tokio::future_into_py(py, async move {
112-
let options = FlatGeobufReaderOptions {
113-
batch_size: Some(batch_size),
114-
bbox,
115-
..Default::default()
116-
};
117-
let table = _read_flatgeobuf_async(fs.inner, path.into(), options)
118-
.await
119-
.map_err(PyGeoArrowError::GeoArrowError)?;
152+
let reader = construct_reader(py, path, fs)?;
153+
match reader {
154+
FileReader::Async(async_reader) => {
155+
let fut = pyo3_asyncio::tokio::future_into_py(py, async move {
156+
let options = FlatGeobufReaderOptions {
157+
batch_size: Some(batch_size),
158+
bbox,
159+
..Default::default()
160+
};
161+
let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options)
162+
.await
163+
.map_err(PyGeoArrowError::GeoArrowError)?;
120164

121-
Ok(GeoTable(table))
122-
})?;
123-
Ok(fut.into())
165+
Ok(GeoTable(table))
166+
})?;
167+
Ok(fut.into())
168+
}
169+
FileReader::Sync(_) => {
170+
Err(PyValueError::new_err("Local file paths not supported in async reader.").into())
171+
}
172+
}
124173
}
125174

126175
/// Write a GeoTable to a FlatGeobuf file on disk.

python/core/src/io/geojson.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::error::PyGeoArrowResult;
2-
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
2+
use crate::io::input::sync::{BinaryFileReader, BinaryFileWriter};
33
use crate::table::GeoTable;
44
use geoarrow::io::geojson::read_geojson as _read_geojson;
55
use geoarrow::io::geojson::write_geojson as _write_geojson;

python/core/src/io/geojson_lines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::error::PyGeoArrowResult;
2-
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
2+
use crate::io::input::sync::{BinaryFileReader, BinaryFileWriter};
33
use crate::table::GeoTable;
44
use geoarrow::io::geojson_lines::read_geojson_lines as _read_geojson_lines;
55
use geoarrow::io::geojson_lines::write_geojson_lines as _write_geojson_lines;

python/core/src/io/input/mod.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
pub mod sync;
2+
3+
use std::sync::Arc;
4+
5+
use crate::error::PyGeoArrowResult;
6+
use crate::io::object_store::PyObjectStore;
7+
use object_store::http::HttpBuilder;
8+
use object_store::path::Path;
9+
use object_store::{ClientOptions, ObjectStore};
10+
use pyo3::exceptions::PyValueError;
11+
use sync::BinaryFileReader;
12+
13+
use pyo3::prelude::*;
14+
use tokio::runtime::Runtime;
15+
use url::Url;
16+
17+
pub struct AsyncFileReader {
18+
pub store: Arc<dyn ObjectStore>,
19+
pub path: Path,
20+
pub runtime: Arc<Runtime>,
21+
}
22+
23+
pub enum FileReader {
24+
Sync(BinaryFileReader),
25+
Async(AsyncFileReader),
26+
}
27+
28+
/// Construct a reader for the user that can be either synchronous or asynchronous
29+
///
30+
/// If the user has not passed in an object store instance but the `file` points to a http address,
31+
/// an HTTPStore will be created for it.
32+
pub fn construct_reader(
33+
py: Python,
34+
file: PyObject,
35+
fs: Option<PyObjectStore>,
36+
) -> PyGeoArrowResult<FileReader> {
37+
// If the user passed an object store instance, use that
38+
if let Some(fs) = fs {
39+
let path = file.extract::<String>(py)?;
40+
let async_reader = AsyncFileReader {
41+
store: fs.inner,
42+
runtime: fs.rt,
43+
path: path.into(),
44+
};
45+
Ok(FileReader::Async(async_reader))
46+
} else {
47+
// If the user's path is a "known" URL (i.e. http(s)) then construct an object store
48+
// instance for them.
49+
if let Ok(path_or_url) = file.extract::<String>(py) {
50+
if path_or_url.starts_with("http") {
51+
let url = Url::parse(&path_or_url)?;
52+
// Expecting that the url input is something like
53+
let store_input = format!("{}://{}", url.scheme(), url.domain().unwrap());
54+
55+
let options = ClientOptions::new().with_allow_http(true);
56+
let store = HttpBuilder::new()
57+
.with_url(store_input)
58+
.with_client_options(options)
59+
.build()?;
60+
let path = url.path().trim_start_matches('/');
61+
62+
let runtime = Arc::new(
63+
tokio::runtime::Runtime::new()
64+
.map_err(|err| PyValueError::new_err(err.to_string()))?,
65+
);
66+
let async_reader = AsyncFileReader {
67+
store: Arc::new(store),
68+
runtime,
69+
path: path.into(),
70+
};
71+
return Ok(FileReader::Async(async_reader));
72+
}
73+
}
74+
75+
Ok(FileReader::Sync(file.extract(py)?))
76+
}
77+
}

0 commit comments

Comments
 (0)