Skip to content

Commit cea4cdc

Browse files
committed
chore: gut BlobReadChannel
In order to facilitate migrating any RestorableState<ReadChannel> customers might have, we leave the existing class hierarchy in place and update BlobReadChannel.StateImpl#restore() to produce a new BlobReadChannelV2. In the next major version this compatibility path will be removed. To test compatibility a serialized instance of a BlobReadChannel from v2.16.0 has been created and serialized into blobWriteChannel.ser.properties along with a comment describing how the ser was generated.
1 parent c6fce61 commit cea4cdc

File tree

4 files changed

+156
-491
lines changed

4 files changed

+156
-491
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java

+45-202
Original file line numberDiff line numberDiff line change
@@ -16,262 +16,105 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import static com.google.cloud.RetryHelper.runWithRetries;
20-
21-
import com.google.api.client.util.Preconditions;
22-
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2319
import com.google.api.services.storage.model.StorageObject;
2420
import com.google.cloud.ReadChannel;
2521
import com.google.cloud.RestorableState;
26-
import com.google.cloud.RetryHelper;
27-
import com.google.cloud.Tuple;
22+
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
2823
import com.google.cloud.storage.spi.v1.StorageRpc;
24+
import com.google.common.annotations.VisibleForTesting;
2925
import com.google.common.base.MoreObjects;
3026
import java.io.IOException;
3127
import java.io.Serializable;
3228
import java.nio.ByteBuffer;
33-
import java.nio.channels.ClosedChannelException;
3429
import java.util.Map;
3530
import java.util.Objects;
3631

37-
/** Default implementation for ReadChannel. */
32+
/**
33+
* Hierarchy retained for {@link RestorableState#restore()}. Will be removed in next major version!
34+
*/
35+
@Deprecated
3836
class BlobReadChannel implements ReadChannel {
3937

40-
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
41-
42-
private final HttpStorageOptions serviceOptions;
43-
private final BlobId blob;
44-
private final Map<StorageRpc.Option, ?> requestOptions;
45-
private final HttpRetryAlgorithmManager retryAlgorithmManager;
46-
private String lastEtag;
47-
private long position;
48-
private boolean isOpen;
49-
private boolean endOfStream;
50-
private int chunkSize = DEFAULT_CHUNK_SIZE;
51-
52-
private final StorageRpc storageRpc;
53-
private final StorageObject storageObject;
54-
private int bufferPos;
55-
private byte[] buffer;
56-
private long limit;
57-
58-
BlobReadChannel(
59-
HttpStorageOptions serviceOptions, BlobId blob, Map<StorageRpc.Option, ?> requestOptions) {
60-
this.serviceOptions = serviceOptions;
61-
this.blob = blob;
62-
this.requestOptions = requestOptions;
63-
this.retryAlgorithmManager = serviceOptions.getRetryAlgorithmManager();
64-
isOpen = true;
65-
storageRpc = serviceOptions.getStorageRpcV1();
66-
storageObject = Conversions.apiary().blobId().encode(blob);
67-
this.limit = Long.MAX_VALUE;
68-
}
38+
private BlobReadChannel() {}
6939

7040
@Override
7141
public RestorableState<ReadChannel> capture() {
72-
StateImpl.Builder builder =
73-
StateImpl.builder(serviceOptions, blob, requestOptions)
74-
.setPosition(position)
75-
.setIsOpen(isOpen)
76-
.setEndOfStream(endOfStream)
77-
.setChunkSize(chunkSize)
78-
.setLimit(limit);
79-
if (buffer != null) {
80-
builder.setPosition(position + bufferPos);
81-
builder.setEndOfStream(false);
82-
}
83-
return builder.build();
42+
throw new IllegalStateException("Illegal method access");
8443
}
8544

8645
@Override
8746
public boolean isOpen() {
88-
return isOpen;
47+
throw new IllegalStateException("Illegal method access");
8948
}
9049

9150
@Override
9251
public void close() {
93-
if (isOpen) {
94-
buffer = null;
95-
isOpen = false;
96-
}
97-
}
98-
99-
private void validateOpen() throws ClosedChannelException {
100-
if (!isOpen) {
101-
throw new ClosedChannelException();
102-
}
52+
throw new IllegalStateException("Illegal method access");
10353
}
10454

10555
@Override
10656
public void seek(long position) throws IOException {
107-
validateOpen();
108-
this.position = position;
109-
buffer = null;
110-
bufferPos = 0;
111-
endOfStream = false;
57+
throw new IllegalStateException("Illegal method access");
11258
}
11359

11460
@Override
11561
public void setChunkSize(int chunkSize) {
116-
this.chunkSize = chunkSize <= 0 ? DEFAULT_CHUNK_SIZE : chunkSize;
62+
throw new IllegalStateException("Illegal method access");
11763
}
11864

11965
@Override
12066
public int read(ByteBuffer byteBuffer) throws IOException {
121-
validateOpen();
122-
if (buffer == null) {
123-
if (endOfStream) {
124-
return -1;
125-
}
126-
final int toRead =
127-
Math.toIntExact(Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize)));
128-
if (toRead <= 0) {
129-
endOfStream = true;
130-
return -1;
131-
}
132-
try {
133-
ResultRetryAlgorithm<?> algorithm =
134-
retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions);
135-
Tuple<String, byte[]> result =
136-
runWithRetries(
137-
() -> storageRpc.read(storageObject, requestOptions, position, toRead),
138-
serviceOptions.getRetrySettings(),
139-
algorithm,
140-
serviceOptions.getClock());
141-
String etag = result.x();
142-
byte[] bytes = result.y();
143-
if (bytes.length > 0 && lastEtag != null && !Objects.equals(etag, lastEtag)) {
144-
throw new IOException("Blob " + blob + " was updated while reading");
145-
}
146-
lastEtag = etag;
147-
buffer = bytes;
148-
} catch (RetryHelper.RetryHelperException e) {
149-
throw new IOException(e.getCause());
150-
}
151-
if (toRead > buffer.length) {
152-
endOfStream = true;
153-
if (buffer.length == 0) {
154-
buffer = null;
155-
return -1;
156-
}
157-
}
158-
}
159-
int toWrite = Math.min(buffer.length - bufferPos, byteBuffer.remaining());
160-
byteBuffer.put(buffer, bufferPos, toWrite);
161-
bufferPos += toWrite;
162-
if (bufferPos >= buffer.length) {
163-
position += buffer.length;
164-
buffer = null;
165-
bufferPos = 0;
166-
}
167-
return toWrite;
67+
throw new IllegalStateException("Illegal method access");
16868
}
16969

17070
@Override
17171
public ReadChannel limit(long limit) {
172-
Preconditions.checkArgument(limit >= 0, "Limit must be >= 0");
173-
this.limit = limit;
174-
return this;
72+
throw new IllegalStateException("Illegal method access");
17573
}
17674

17775
@Override
17876
public long limit() {
179-
return limit;
77+
throw new IllegalStateException("Illegal method access");
18078
}
18179

80+
/** Retained for binary compatibility. Will be removed at next major version! */
81+
@SuppressWarnings("unused")
82+
@Deprecated
83+
@VisibleForTesting
18284
static class StateImpl implements RestorableState<ReadChannel>, Serializable {
18385

18486
private static final long serialVersionUID = 7784852608213694645L;
18587

186-
private final HttpStorageOptions serviceOptions;
187-
private final BlobId blob;
188-
private final Map<StorageRpc.Option, ?> requestOptions;
189-
private final String lastEtag;
190-
private final long position;
191-
private final boolean isOpen;
192-
private final boolean endOfStream;
193-
private final int chunkSize;
194-
private final long limit;
195-
196-
StateImpl(Builder builder) {
197-
this.serviceOptions = builder.serviceOptions;
198-
this.blob = builder.blob;
199-
this.requestOptions = builder.requestOptions;
200-
this.lastEtag = builder.lastEtag;
201-
this.position = builder.position;
202-
this.isOpen = builder.isOpen;
203-
this.endOfStream = builder.endOfStream;
204-
this.chunkSize = builder.chunkSize;
205-
this.limit = builder.limit;
206-
}
207-
208-
static class Builder {
209-
private final HttpStorageOptions serviceOptions;
210-
private final BlobId blob;
211-
private final Map<StorageRpc.Option, ?> requestOptions;
212-
private String lastEtag;
213-
private long position;
214-
private boolean isOpen;
215-
private boolean endOfStream;
216-
private int chunkSize;
217-
private long limit;
218-
219-
private Builder(
220-
HttpStorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
221-
this.serviceOptions = options;
222-
this.blob = blob;
223-
this.requestOptions = reqOptions;
224-
}
225-
226-
Builder setLastEtag(String lastEtag) {
227-
this.lastEtag = lastEtag;
228-
return this;
229-
}
230-
231-
Builder setPosition(long position) {
232-
this.position = position;
233-
return this;
234-
}
235-
236-
Builder setIsOpen(boolean isOpen) {
237-
this.isOpen = isOpen;
238-
return this;
239-
}
240-
241-
Builder setEndOfStream(boolean endOfStream) {
242-
this.endOfStream = endOfStream;
243-
return this;
244-
}
245-
246-
Builder setChunkSize(int chunkSize) {
247-
this.chunkSize = chunkSize;
248-
return this;
249-
}
250-
251-
Builder setLimit(long limit) {
252-
this.limit = limit;
253-
return this;
254-
}
88+
// the following fields are dangling as they are only set via object deserialization, and only
89+
// read in #restore()
90+
private HttpStorageOptions serviceOptions;
91+
private BlobId blob;
92+
private Map<StorageRpc.Option, ?> requestOptions;
93+
private String lastEtag;
94+
private long position;
95+
private boolean isOpen;
96+
private boolean endOfStream;
97+
private int chunkSize;
98+
private long limit;
25599

256-
RestorableState<ReadChannel> build() {
257-
return new StateImpl(this);
258-
}
259-
}
260-
261-
static Builder builder(
262-
HttpStorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
263-
return new Builder(options, blob, reqOptions);
264-
}
100+
private StateImpl() {}
265101

266102
@Override
267103
public ReadChannel restore() {
268-
BlobReadChannel channel = new BlobReadChannel(serviceOptions, blob, requestOptions);
269-
channel.lastEtag = lastEtag;
270-
channel.position = position;
271-
channel.isOpen = isOpen;
272-
channel.endOfStream = endOfStream;
273-
channel.chunkSize = chunkSize;
274-
channel.limit = limit;
104+
StorageObject encode = Conversions.apiary().blobId().encode(blob);
105+
BlobReadChannelV2 channel =
106+
new BlobReadChannelV2(
107+
encode, requestOptions, BlobReadChannelContext.from(serviceOptions));
108+
try {
109+
channel.seek(position);
110+
channel.limit(limit);
111+
channel.setChunkSize(chunkSize);
112+
if (!isOpen) {
113+
channel.close();
114+
}
115+
} catch (IOException e) {
116+
throw StorageException.coalesce(e);
117+
}
275118
return channel;
276119
}
277120

0 commit comments

Comments
 (0)