Skip to content

Commit bfcbcf6

Browse files
authored
Merge pull request #172 from tobiasschuerg/feat/stream_batch
feat: streaming write
2 parents 87e74b3 + d621b07 commit bfcbcf6

13 files changed

+365
-41
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### Features
44
- [167](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/167) - Added `InfluxDBClient::writeRecord(const char *record)`.
55
- [167](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/167) - Added possibility to disable retrying by setting `maxRetryAttempts` to zero: `client.setWriteOptions(WriteOptions().maxRetryAttempts(0));`
6+
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/170) - Added directly streaming batch for write. It can be enable by `InfluxDBClient::setStreamWrite(bool enable = true)`. Writing by streaming lines of batch saves RAM as it sends data without allocating a buffer. On the other hand, this way of writing is about half times slower than the classic way, when allocating the buffer for writing the whole batch.
7+
- [172](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/170) - Allowing larger batch size, > 255.
68

79
## 3.9.0 [2021-09-17]
810
### Features

README.md

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ This library doesn't support using those devices as a peripheral.
2323
- [Timestamp](#timestamp)
2424
- [Configure Time](#configure-time)
2525
- [Batch Size](#batch-size)
26+
- [Large Batch Size](#large-batch-size)
27+
- [Write Modes](#write-modes)
2628
- [Buffer Handling and Retrying](#buffer-handling-and-retrying)
2729
- [Write Options](#write-options)
2830
- [HTTP Options](#http-options)
@@ -176,7 +178,7 @@ client.setWriteOptions(WriteOptions().writePrecision(WritePrecision::MS));
176178
When a write precision is configured, the client will automatically assign the current time to the timestamp of each written point which doesn't have a timestamp assigned.
177179

178180
If you want to manage timestamp on your own, there are several ways to set the timestamp explicitly.
179-
- `setTime(WritePrecision writePrecision)` - Sets the timestamp to the actual time in the desired precision
181+
- `setTime(WritePrecision writePrecision)` - Sets the timestamp to the actual time in the desired precision. The same precision must set in WriteOptions.
180182
- `setTime(unsigned long long timestamp)` - Sets the timestamp to an offset since the epoch. Correct precision must be set InfluxDBClient::setWriteOptions.
181183
- `setTime(String timestamp)` - Sets the timestamp to an offset since the epoch. Correct precision must be set InfluxDBClient::setWriteOptions.
182184

@@ -229,13 +231,13 @@ void timeSync(const char *tzInfo, const char* ntpServer1, const char* ntpServer2
229231
Setting batch size depends on data gathering and DB updating strategy.
230232

231233
If data is written in short periods (seconds), the batch size should be set according to your expected write periods and update frequency requirements.
232-
For example, if you would like to see updates (on the dashboard or in processing) each minute and you are measuring a single value (1 point) every 10s (6 points per minute), the batch size should be 6. If it is sufficient to update each hour and you are creating 1 point each minute, your batch size should be 60. The maximum recommended batch size is 200. Maximum batch size depends on the RAM of the device (80KB for ESP8266 and 512KB for ESP32).
234+
For example, if you would like to see updates (on the dashboard or in processing) each minute and you are measuring a single value (1 point) every 10s (6 points per minute), the batch size should be 6. If it is sufficient to update each hour and you are creating 1 point each minute, your batch size should be 60.
233235

234236
In cases where the data should be written in longer periods and gathered data consists of several points, the batch size should be set to the expected number of points to be gathered.
235237

236238
To set the batch size we use `WriteOptions` object and [setWriteOptions](#write-options) function:
237239
```cpp
238-
// Enable messages batching
240+
// Enable lines batching
239241
client.setWriteOptions(WriteOptions().batchSize(10));
240242
```
241243
Writing the point will add a point to the underlying buffer until the batch size is reached:
@@ -257,6 +259,28 @@ if(!client.writePoint(point10)) {
257259

258260
In case cases where the number of points is not always the same, set the batch size to the maximum number of points and use the `flushBuffer()` function to force writing to the database. See [Buffer Handling](#buffer-handling-and-retrying) for more details.
259261

262+
### Large batch size
263+
The maximum batch size depends on the available RAM of the device (~45KB for ESP8266 and ~260KB for ESP32). Larger batch size, >100 for ESP8255, >2000 for ESP32, must be chosen carefully to not crash the app with out of memory error. The Stream write mode must be used, see [Write Modes](#write-modes)
264+
265+
Always determine your typical line length using `client.pointToLineProtocol(point).length()`. For example, ESP32 can handle 2048 lines with an average length of 69. When the length of line or batch size is increased, the device becomes unstable, even there is more than 76k, it cannot send data or even crashes. ESP8266 handles successfully 330 of such lines.
266+
267+
:warning: Thoroughly test your app when using large batch files.
268+
269+
### Write Modes
270+
Client has two modes of writing:
271+
- Buffer (default)
272+
- Stream
273+
274+
Writing is performed the way that client keeps written lines (points) separately and when a batch is completed, it allocates a data buffer for sending to a server via WiFi Client.
275+
This is the fastest way to write data but requires some amount of free memory. Thus a big batch size cannot be used.
276+
277+
Another way of writing is *stream write*.
278+
```cpp
279+
// Enables stream write
280+
client.setStreamWrite(true);
281+
```
282+
In this mode client continuously streams lines from batch to WiFi Client. No buffer allocation. As lines are allocated separately, it avoids problems with max allocable block size. The downside is, that writing is about 50% slower than in the Buffer mode.
283+
260284
## Buffer Handling and Retrying
261285
InfluxDB contains an underlying buffer for handling writing in batches and automatic retrying on server back-pressure and connection failure.
262286

@@ -300,12 +324,15 @@ Writing points can be controlled via `WriteOptions`, which is set in the `setWri
300324
| batchSize | `1` | Number of points that will be written to the database at once |
301325
| bufferSize | `5` | Maximum number of points in buffer. Buffer contains new data that will be written to the database and also data that failed to be written due to network failure or server overloading |
302326
| flushInterval | `60` | Maximum time(in seconds) data will be held in buffer before points are written to the db |
327+
| retryInterval | `5` | Default retry interval in sec, if not sent by server. Value `0` disables retrying |
328+
| maxRetryInterval | `300` | Maximum retry interval in sec |
329+
| maxRetryAttempts | `3` | Maximum count of retry attempts of failed writes |
303330
304331
## HTTP Options
305332
`HTTPOptions` controls some aspects of HTTP communication and they are set via `setHTTPOptions` function:
306333
| Parameter | Default Value | Meaning |
307334
|-----------|---------------|---------|
308-
| reuseConnection | `false` | Whether HTTP connection should be kept open after initial communication. Usable for frequent writes/queries. |
335+
| connectionReuse | `false` | Whether HTTP connection should be kept open after initial communication. Usable for frequent writes/queries. |
309336
| httpReadTimeout | `5000` | Timeout (ms) for reading server response |
310337
311338
## Secure Connection

src/BucketsClient.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,8 @@ friend class E2ETest;
9797
// Returns true if a bucket exists
9898
bool checkBucketExists(const char *bucketName);
9999
// Returns a Bucket instance if a bucket is found.
100-
// Returned instance must be manually deleted at the end of usage.
101100
Bucket findBucket(const char *bucketName);
102101
// Creates a bucket with given name and optional retention policy. 0 means infinite.
103-
// Returned instance must be manually deleted at the end of usage.
104102
Bucket createBucket(const char *bucketName, uint32_t expiresSec = 0);
105103
// Delete a bucket with given id. Use findBucket to get a bucket with id.
106104
bool deleteBucket(const char *id);

src/HTTPService.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,18 @@ bool HTTPService::doPOST(const char *url, const char *data, const char *contentT
159159
return afterRequest(expectedCode, cb);
160160
}
161161

162+
bool HTTPService::doPOST(const char *url, Stream *stream, const char *contentType, int expectedCode, httpResponseCallback cb) {
163+
INFLUXDB_CLIENT_DEBUG("[D] POST request - %s, data: %dbytes, type %s\n", url, stream->available(), contentType);
164+
if(!beforeRequest(url)) {
165+
return false;
166+
}
167+
if(contentType) {
168+
_httpClient->addHeader(F("Content-Type"), FPSTR(contentType));
169+
}
170+
_lastStatusCode = _httpClient->sendRequest("POST", stream, stream->available());
171+
return afterRequest(expectedCode, cb);
172+
}
173+
162174
bool HTTPService::doGET(const char *url, int expectedCode, httpResponseCallback cb) {
163175
INFLUXDB_CLIENT_DEBUG("[D] GET request - %s\n", url);
164176
if(!beforeRequest(url)) {

src/HTTPService.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ friend class Test;
113113
HTTPOptions &getHTTPOptions() { return _httpOptions; }
114114
// Performs HTTP POST by sending data. On success calls response call back
115115
bool doPOST(const char *url, const char *data, const char *contentType, int expectedCode, httpResponseCallback cb);
116+
// Performs HTTP POST by sending stream. On success calls response call back
117+
bool doPOST(const char *url, Stream *stream, const char *contentType, int expectedCode, httpResponseCallback cb);
116118
// Performs HTTP GET. On success calls response call back
117119
bool doGET(const char *url, int expectedCode, httpResponseCallback cb);
118120
// Performs HTTP DELETE. On success calls response call back

src/InfluxDbClient.cpp

Lines changed: 141 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static String precisionToString(WritePrecision precision, uint8_t version = 2) {
5050
}
5151
}
5252

53+
5354
InfluxDBClient::InfluxDBClient() {
5455
resetBuffer();
5556
}
@@ -122,6 +123,9 @@ bool InfluxDBClient::init() {
122123

123124
InfluxDBClient::~InfluxDBClient() {
124125
if(_writeBuffer) {
126+
for(int i=0;i<_writeBufferSize;i++) {
127+
delete _writeBuffer[i];
128+
}
125129
delete [] _writeBuffer;
126130
_writeBuffer = nullptr;
127131
_bufferPointer = 0;
@@ -294,15 +298,34 @@ bool InfluxDBClient::writePoint(Point & point) {
294298
return false;
295299
}
296300

301+
InfluxDBClient::Batch::Batch(uint16_t size):_size(size) {
302+
buffer = new char*[size];
303+
for(int i=0;i< _size; i++) {
304+
buffer[i] = nullptr;
305+
}
306+
}
307+
308+
309+
InfluxDBClient::Batch::~Batch() {
310+
clear();
311+
delete [] buffer;
312+
buffer = nullptr;
313+
}
314+
315+
void InfluxDBClient::Batch::clear() {
316+
for(int i=0;i< _size; i++) {
317+
free(buffer[i]);
318+
buffer[i] = nullptr;
319+
}
320+
}
321+
297322
bool InfluxDBClient::Batch::append(const char *line) {
298323
if(pointer == _size) {
299324
//overwriting, clean buffer
300-
for(int i=0;i< _size; i++) {
301-
buffer[i] = (const char *)nullptr;
302-
}
325+
clear();
303326
pointer = 0;
304327
}
305-
buffer[pointer] = line;
328+
buffer[pointer] = strdup(line);
306329
++pointer;
307330
return isFull();
308331
}
@@ -311,7 +334,7 @@ char * InfluxDBClient::Batch::createData() {
311334
int length = 0;
312335
char *buff = nullptr;
313336
for(int c=0; c < pointer; c++) {
314-
length += buffer[c].length();
337+
length += strlen(buffer[c]);
315338
yield();
316339
}
317340
//create buffer for all lines including new line char and terminating char
@@ -320,7 +343,7 @@ char * InfluxDBClient::Batch::createData() {
320343
if(buff) {
321344
buff[0] = 0;
322345
for(int c=0; c < pointer; c++) {
323-
strcat(buff+strlen(buff), buffer[c].c_str());
346+
strcat(buff+strlen(buff), buffer[c]);
324347
strcat(buff+strlen(buff), "\n");
325348
yield();
326349
}
@@ -402,7 +425,6 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
402425
bool success = true;
403426
// send all batches, It could happen there was long network outage and buffer is full
404427
while(_writeBuffer[_batchPointer] && (!flashOnlyFull || _writeBuffer[_batchPointer]->isFull())) {
405-
data = _writeBuffer[_batchPointer]->createData();
406428
if(!_writeBuffer[_batchPointer]->isFull() && _writeBuffer[_batchPointer]->retryCount == 0 ) { //do not increase pointer in case of retrying
407429
// points will be written so increase _bufferPointer as it happen when buffer is flushed when is full
408430
if(++_bufferPointer == _writeBufferSize) {
@@ -411,9 +433,15 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
411433
}
412434

413435
INFLUXDB_CLIENT_DEBUG("[D] Writing batch, batchpointer: %d, size %d\n", _batchPointer, _writeBuffer[_batchPointer]->pointer);
414-
if(data) {
415-
int statusCode = postData(data);
416-
delete [] data;
436+
if(!_writeBuffer[_batchPointer]->isEmpty()) {
437+
int statusCode = 0;
438+
if(_streamWrite) {
439+
statusCode = postData(_writeBuffer[_batchPointer]);
440+
} else {
441+
data = _writeBuffer[_batchPointer]->createData();
442+
statusCode = postData(data);
443+
delete [] data;
444+
}
417445
// retry on unsuccessfull connection or retryable status codes
418446
bool retry = (statusCode < 0 || statusCode >= 429) && _writeOptions._maxRetryAttempts > 0;
419447
success = statusCode >= 200 && statusCode < 300;
@@ -490,7 +518,11 @@ bool InfluxDBClient::validateConnection() {
490518
}
491519
INFLUXDB_CLIENT_DEBUG("[D] Validating connection to %s\n", url.c_str());
492520

493-
return _service->doGET(url.c_str(), 200, nullptr);
521+
bool ret = _service->doGET(url.c_str(), 200, nullptr);
522+
if(!ret) {
523+
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
524+
}
525+
return ret;
494526
}
495527

496528
int InfluxDBClient::postData(const char *data) {
@@ -500,14 +532,35 @@ int InfluxDBClient::postData(const char *data) {
500532
if(data) {
501533
INFLUXDB_CLIENT_DEBUG("[D] Writing to %s\n", _writeUrl.c_str());
502534
INFLUXDB_CLIENT_DEBUG("[D] Sending:\n%s\n", data);
503-
504-
_service->doPOST(_writeUrl.c_str(), data, PSTR("text/plain"), 204, nullptr);
535+
if(!_service->doPOST(_writeUrl.c_str(), data, PSTR("text/plain"), 204, nullptr)) {
536+
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
537+
}
505538
_retryTime = _service->getLastRetryAfter();
506539
return _service->getLastStatusCode();
507540
}
508541
return 0;
509542
}
510543

544+
int InfluxDBClient::postData(Batch *batch) {
545+
if(!_service && !init()) {
546+
return 0;
547+
}
548+
549+
BatchStreamer *bs = new BatchStreamer(batch);
550+
INFLUXDB_CLIENT_DEBUG("[D] Writing to %s\n", _writeUrl.c_str());
551+
INFLUXDB_CLIENT_DEBUG("[D] Sending:\n");
552+
553+
if(!_service->doPOST(_writeUrl.c_str(), bs, PSTR("text/plain"), 204, nullptr)) {
554+
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
555+
}
556+
delete bs;
557+
_retryTime = _service->getLastRetryAfter();
558+
return _service->getLastStatusCode();
559+
}
560+
561+
void InfluxDBClient::setStreamWrite(bool enable) {
562+
_streamWrite = enable;
563+
}
511564

512565

513566
static const char QueryDialect[] PROGMEM = "\
@@ -598,3 +651,78 @@ static String escapeJSONString(String &value) {
598651
}
599652
return ret;
600653
}
654+
655+
InfluxDBClient::BatchStreamer::BatchStreamer(InfluxDBClient::Batch *batch) {
656+
_batch = batch;
657+
_read = 0;
658+
_length = 0;
659+
_pointer = 0;
660+
_linePointer = 0;
661+
for(int i=0;i<_batch->pointer;i++) {
662+
_length += strlen(_batch->buffer[i])+1;
663+
}
664+
}
665+
666+
int InfluxDBClient::BatchStreamer::available() {
667+
return _length-_read;
668+
}
669+
670+
int InfluxDBClient::BatchStreamer::availableForWrite() {
671+
return 0;
672+
}
673+
674+
#if defined(ESP8266)
675+
int InfluxDBClient::BatchStreamer::read(uint8_t* buffer, size_t len) {
676+
INFLUXDB_CLIENT_DEBUG("BatchStream::read %d\n", len);
677+
return readBytes((char *)buffer, len);
678+
}
679+
#endif
680+
size_t InfluxDBClient::BatchStreamer::readBytes(char* buffer, size_t len) {
681+
682+
INFLUXDB_CLIENT_DEBUG("BatchStream::readBytes %d\n", len);
683+
int r=0;
684+
for(int i=0;i<len;i++) {
685+
if(available()) {
686+
buffer[i] = read();
687+
r++;
688+
} else {
689+
break;
690+
}
691+
}
692+
return r;
693+
}
694+
695+
int InfluxDBClient::BatchStreamer::read() {
696+
int r = peek();
697+
if(r > 0) {
698+
++_read;
699+
++_linePointer;
700+
if(!_batch->buffer[_pointer][_linePointer-1]) {
701+
++_pointer;
702+
_linePointer = 0;
703+
}
704+
}
705+
return r;
706+
}
707+
708+
int InfluxDBClient::BatchStreamer::peek() {
709+
if(_pointer == _batch->pointer) {
710+
//This should not happen
711+
return -1;
712+
}
713+
714+
int r;
715+
if(!_batch->buffer[_pointer][_linePointer]) {
716+
r = '\n';
717+
} else {
718+
r = _batch->buffer[_pointer][_linePointer];
719+
}
720+
// #ifdef INFLUXDB_CLIENT_DEBUG_ENABLE
721+
// Serial.printf_P(PSTR("%c"), r);
722+
// #endif
723+
return r;
724+
}
725+
726+
size_t InfluxDBClient::BatchStreamer::write(uint8_t data) {
727+
return 0;
728+
}

0 commit comments

Comments
 (0)