Skip to content

Commit 0162ff9

Browse files
authored
Add optional Arrow deserialization support (#86)
1 parent c42377c commit 0162ff9

File tree

7 files changed

+64
-3
lines changed

7 files changed

+64
-3
lines changed

docs/guide/configuration.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ When using the `ignore_status` parameter the error response will be returned ser
242242
[[serializer]]
243243
=== Serializers
244244

245-
Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, and `application/mapbox-vector-tile`.
245+
Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, `application/vnd.apache.arrow.stream` and `application/mapbox-vector-tile`.
246246

247247
You can define custom serializers via the `serializers` parameter:
248248

elasticsearch_serverless/serializer.py

+34
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@
4848
except ImportError:
4949
_OrjsonSerializer = None # type: ignore[assignment,misc]
5050

51+
try:
52+
import pyarrow as pa
53+
54+
__all__.append("PyArrowSerializer")
55+
except ImportError:
56+
pa = None
57+
5158

5259
class JsonSerializer(_JsonSerializer):
5360
mimetype: ClassVar[str] = "application/json"
@@ -114,6 +121,29 @@ def dumps(self, data: bytes) -> bytes:
114121
raise SerializationError(f"Cannot serialize {data!r} into a MapBox vector tile")
115122

116123

124+
if pa is not None:
125+
126+
class PyArrowSerializer(Serializer):
127+
"""PyArrow serializer for deserializing Arrow Stream data."""
128+
129+
mimetype: ClassVar[str] = "application/vnd.apache.arrow.stream"
130+
131+
def loads(self, data: bytes) -> pa.Table:
132+
try:
133+
with pa.ipc.open_stream(data) as reader:
134+
return reader.read_all()
135+
except pa.ArrowException as e:
136+
raise SerializationError(
137+
message=f"Unable to deserialize as Arrow stream: {data!r}",
138+
errors=(e,),
139+
)
140+
141+
def dumps(self, data: Any) -> bytes:
142+
raise SerializationError(
143+
message="Elasticsearch does not accept Arrow input data"
144+
)
145+
146+
117147
DEFAULT_SERIALIZERS: Dict[str, Serializer] = {
118148
JsonSerializer.mimetype: JsonSerializer(),
119149
MapboxVectorTileSerializer.mimetype: MapboxVectorTileSerializer(),
@@ -122,6 +152,10 @@ def dumps(self, data: bytes) -> bytes:
122152
CompatibilityModeNdjsonSerializer.mimetype: CompatibilityModeNdjsonSerializer(),
123153
}
124154

155+
if pa is not None:
156+
DEFAULT_SERIALIZERS[PyArrowSerializer.mimetype] = PyArrowSerializer()
157+
158+
125159
# Alias for backwards compatibility
126160
JSONSerializer = JsonSerializer
127161

noxfile.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ def lint(session):
8585
session.run("flake8", *SOURCE_FILES)
8686
session.run("python", "utils/license-headers.py", "check", *SOURCE_FILES)
8787

88-
# Workaround to make '-r' to still work despite uninstalling aiohttp below.
89-
session.install(".[async,requests,orjson]", env=INSTALL_ENV)
88+
session.install(".[async,requests,orjson,pyarrow]", env=INSTALL_ENV)
9089

9190
# Run mypy on the package and then the type examples separately for
9291
# the two different mypy use-cases, ourselves and our users.

pyproject.toml

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ dependencies = [
4747
async = ["aiohttp>=3,<4"]
4848
requests = ["requests>=2.4.0, <3.0.0" ]
4949
orjson = ["orjson>=3"]
50+
pyarrow = ["pyarrow>=1"]
5051
dev = [
5152
"requests>=2, <3",
5253
"aiohttp",
@@ -65,6 +66,7 @@ dev = [
6566
"nox",
6667
"orjson",
6768
"numpy",
69+
"pyarrow",
6870
"pandas",
6971
"mapbox-vector-tile",
7072
]

test_elasticsearch_serverless/test_client/test_deprecated_options.py

+2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class CustomSerializer(JsonSerializer):
7373
"application/x-ndjson",
7474
"application/json",
7575
"text/*",
76+
"application/vnd.apache.arrow.stream",
7677
"application/vnd.elasticsearch+json",
7778
"application/vnd.elasticsearch+x-ndjson",
7879
}
@@ -93,6 +94,7 @@ class CustomSerializer(JsonSerializer):
9394
"application/x-ndjson",
9495
"application/json",
9596
"text/*",
97+
"application/vnd.apache.arrow.stream",
9698
"application/vnd.elasticsearch+json",
9799
"application/vnd.elasticsearch+x-ndjson",
98100
"application/cbor",

test_elasticsearch_serverless/test_client/test_serializers.py

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class CustomSerializer:
7171
"application/json",
7272
"text/*",
7373
"application/x-ndjson",
74+
"application/vnd.apache.arrow.stream",
7475
"application/vnd.mapbox-vector-tile",
7576
"application/vnd.elasticsearch+json",
7677
"application/vnd.elasticsearch+x-ndjson",
@@ -98,6 +99,7 @@ class CustomSerializer:
9899
"application/json",
99100
"text/*",
100101
"application/x-ndjson",
102+
"application/vnd.apache.arrow.stream",
101103
"application/vnd.mapbox-vector-tile",
102104
"application/vnd.elasticsearch+json",
103105
"application/vnd.elasticsearch+x-ndjson",
@@ -117,6 +119,7 @@ class CustomSerializer:
117119
"application/json",
118120
"text/*",
119121
"application/x-ndjson",
122+
"application/vnd.apache.arrow.stream",
120123
"application/vnd.mapbox-vector-tile",
121124
"application/vnd.elasticsearch+json",
122125
"application/vnd.elasticsearch+x-ndjson",

test_elasticsearch_serverless/test_serializer.py

+21
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from datetime import datetime
2121
from decimal import Decimal
2222

23+
import pyarrow as pa
2324
import pytest
2425

2526
try:
@@ -35,6 +36,7 @@
3536
from elasticsearch_serverless.serializer import (
3637
JSONSerializer,
3738
OrjsonSerializer,
39+
PyArrowSerializer,
3840
TextSerializer,
3941
)
4042

@@ -161,6 +163,25 @@ def test_serializes_pandas_category(json_serializer):
161163
assert b'{"d":[1,2,3]}' == json_serializer.dumps({"d": cat})
162164

163165

166+
def test_pyarrow_loads():
167+
data = [
168+
pa.array([1, 2, 3, 4]),
169+
pa.array(["foo", "bar", "baz", None]),
170+
pa.array([True, None, False, True]),
171+
]
172+
batch = pa.record_batch(data, names=["f0", "f1", "f2"])
173+
sink = pa.BufferOutputStream()
174+
with pa.ipc.new_stream(sink, batch.schema) as writer:
175+
writer.write_batch(batch)
176+
177+
serializer = PyArrowSerializer()
178+
assert serializer.loads(sink.getvalue()).to_pydict() == {
179+
"f0": [1, 2, 3, 4],
180+
"f1": ["foo", "bar", "baz", None],
181+
"f2": [True, None, False, True],
182+
}
183+
184+
164185
def test_json_raises_serialization_error_on_dump_error(json_serializer):
165186
with pytest.raises(SerializationError):
166187
json_serializer.dumps(object())

0 commit comments

Comments
 (0)