Skip to content

Commit 9346463

Browse files
authored
Merge pull request #162 from tobiasschuerg/fix/flushing_timeout
fix: flushing not full buffer when timeout occurs for the first time
2 parents eaee8e3 + bdc1cb8 commit 9346463

File tree

9 files changed

+63
-11
lines changed

9 files changed

+63
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### Fixes
44
- [#150](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/150) - `HTTPOptions::httpReadTimeout` is also set as the connect timeout for HTTP connection on ESP32. It doesn't work for HTTPS connection yet.
55
- [#156](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/156) - Correctly rounding _writeBufferSize_, when _bufferSize/batchSize >= 256_.
6+
- [#162](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/162) - Fixed flushing of not full buffer after to timeout.
67

78
## 3.8.0 [2021-04-01]
89
### Features

src/InfluxDbClient.cpp

+10-7
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ bool InfluxDBClient::init() {
9898
INFLUXDB_CLIENT_DEBUG("[D] Org: %s\n", _connInfo.org.c_str());
9999
INFLUXDB_CLIENT_DEBUG("[D] Bucket: %s\n", _connInfo.bucket.c_str());
100100
INFLUXDB_CLIENT_DEBUG("[D] Token: %s\n", _connInfo.authToken.c_str());
101-
INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _dbVersion);
101+
INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _connInfo.dbVersion);
102102
if(_connInfo.serverUrl.length() == 0 || (_connInfo.dbVersion == 2 && (_connInfo.org.length() == 0 || _connInfo.bucket.length() == 0 || _connInfo.authToken.length() == 0))) {
103103
INFLUXDB_CLIENT_DEBUG("[E] Invalid parameters\n");
104104
_connInfo.lastError = F("Invalid parameters");
@@ -137,7 +137,7 @@ void InfluxDBClient::clean() {
137137
_service = nullptr;
138138
}
139139
_buckets = nullptr;
140-
_lastFlushed = 0;
140+
_lastFlushed = millis();
141141
_retryTime = 0;
142142
}
143143

@@ -359,11 +359,14 @@ bool InfluxDBClient::checkBuffer() {
359359
// in case we (over)reach batchSize with non full buffer
360360
bool bufferReachedBatchsize = _writeBuffer[_batchPointer] && _writeBuffer[_batchPointer]->isFull();
361361
// or flush interval timed out
362-
bool flushTimeout = _writeOptions._flushInterval > 0 && _lastFlushed > 0 && (millis()/1000 - _lastFlushed) > _writeOptions._flushInterval;
362+
bool flushTimeout = _writeOptions._flushInterval > 0 && ((millis() - _lastFlushed)/1000) >= _writeOptions._flushInterval;
363363

364+
INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n",
365+
bool2string(bufferReachedBatchsize),bool2string(flushTimeout), bool2string(isBufferFull()));
366+
364367
if(bufferReachedBatchsize || flushTimeout || isBufferFull() ) {
365-
INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n", bufferReachedBatchsize?"true":"false",flushTimeout?"true":"false", isBufferFull()?"true":"false");
366-
return flushBufferInternal(true);
368+
369+
return flushBufferInternal(!flushTimeout);
367370
}
368371
return true;
369372
}
@@ -412,7 +415,7 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
412415
success = statusCode >= 200 && statusCode < 300;
413416
// advance even on message failure x e <300;429)
414417
if(success || !retry) {
415-
_lastFlushed = millis()/1000;
418+
_lastFlushed = millis();
416419
dropCurrentBatch();
417420
} else if(retry) {
418421
_writeBuffer[_batchPointer]->retryCount++;
@@ -542,7 +545,7 @@ FluxQueryResult InfluxDBClient::query(String fluxQuery) {
542545
String header = httpClient->header(TransferEncoding);
543546
chunked = header.equalsIgnoreCase("chunked");
544547
}
545-
INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", chunked?"true":"false");
548+
INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", bool2string(chunked));
546549
HttpStreamScanner *scanner = new HttpStreamScanner(httpClient, chunked);
547550
reader = new CsvReader(scanner);
548551
return false;

src/InfluxDbClient.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ class InfluxDBClient {
200200
// Index of bath start for next write
201201
uint8_t _batchPointer = 0;
202202
// Last time in sec buffer has been successfully flushed
203-
uint32_t _lastFlushed = 0;
203+
uint32_t _lastFlushed;
204204
// Bucket sub-client
205205
BucketsClient _buckets;
206206
protected:

src/Point.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <Arduino.h>
3131
#include "WritePrecision.h"
32+
#include "util/helpers.h"
3233

3334
/**
3435
* Class Point represents InfluxDB point in line protocol.
@@ -49,7 +50,7 @@ friend class InfluxDBClient;
4950
void addField(String name, unsigned int value) { putField(name, String(value)+"i"); }
5051
void addField(String name, long value) { putField(name, String(value)+"i"); }
5152
void addField(String name, unsigned long value) { putField(name, String(value)+"i"); }
52-
void addField(String name, bool value) { putField(name,value?"true":"false"); }
53+
void addField(String name, bool value) { putField(name, bool2string(value)); }
5354
void addField(String name, String value) { addField(name, value.c_str()); }
5455
void addField(String name, const char *value);
5556
// Set timestamp to `now()` and store it in specified precision, nanoseconds by default. Date and time must be already set. See `configTime` in the device API

src/query/HttpStreamScanner.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
// Uncomment bellow in case of a problem and rebuild sketch
3030
//#define INFLUXDB_CLIENT_DEBUG_ENABLE
3131
#include "util/debug.h"
32+
#include "util/helpers.h"
3233

3334
HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked)
3435
{
@@ -37,7 +38,7 @@ HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked)
3738
_chunked = chunked;
3839
_chunkHeader = chunked;
3940
_len = client->getSize();
40-
INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", _chunked?"true":"false", _len);
41+
INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", bool2string(_chunked), _len);
4142
}
4243

4344
bool HttpStreamScanner::next() {

src/util/helpers.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,6 @@ bool isValidID(const char *idString) {
161161
return true;
162162
}
163163

164+
const char *bool2string(bool val) {
165+
return (val?"true":"false");
166+
}

src/util/helpers.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ String escapeValue(const char *value);
5050
String urlEncode(const char* src);
5151
// Returns true of string contains valid InfluxDB ID type
5252
bool isValidID(const char *idString);
53-
53+
// Return "true" if val is true, otherwise "false"
54+
const char *bool2string(bool val);
5455

5556
#endif //_INFLUXDB_CLIENT_HELPERS_H

test/Test.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ void Test::run() {
5656
testFluxParserMissingDatatype();
5757
testFluxParserErrorInRow();
5858
testBasicFunction();
59+
testFlushing();
5960
testInit();
6061
testRepeatedInit();
6162
testV1();
@@ -2076,6 +2077,46 @@ void Test::testBuckets() {
20762077
TEST_END();
20772078
}
20782079

2080+
void Test::testFlushing() {
2081+
TEST_INIT("testFlushing");
2082+
InfluxDBClient client(Test::apiUrl, Test::orgName, Test::bucketName, Test::token);
2083+
TEST_ASSERT(waitServer(Test::managementUrl, true));
2084+
TEST_ASSERT(client.validateConnection());
2085+
TEST_ASSERT(!client.isBufferFull());
2086+
TEST_ASSERT(client.isBufferEmpty());
2087+
client.setWriteOptions(WriteOptions().batchSize(10).bufferSize(30).flushInterval(2));
2088+
2089+
for (int i = 0; i < 5; i++) {
2090+
Point *p = createPoint("test1");
2091+
p->addField("index", i);
2092+
TEST_ASSERT(client.writePoint(*p));
2093+
delete p;
2094+
}
2095+
TEST_ASSERT(!client.isBufferFull());
2096+
TEST_ASSERT(!client.isBufferEmpty());
2097+
client.checkBuffer();
2098+
TEST_ASSERT(!client.isBufferFull());
2099+
TEST_ASSERT(!client.isBufferEmpty());
2100+
String query = "select";
2101+
FluxQueryResult q = client.query(query);
2102+
int count = countLines(q);
2103+
TEST_ASSERTM(q.getError()=="", q.getError());
2104+
TEST_ASSERTM( count == 0, String(count) + " vs 0"); //5 points
2105+
2106+
delay(2100);
2107+
client.checkBuffer();
2108+
TEST_ASSERT(!client.isBufferFull());
2109+
TEST_ASSERT(client.isBufferEmpty());
2110+
2111+
q = client.query(query);
2112+
count = countLines(q);
2113+
TEST_ASSERTM(q.getError()=="", q.getError());
2114+
TEST_ASSERTM( count == 5, String(count) + " vs 0"); //5 points
2115+
2116+
TEST_END();
2117+
deleteAll(Test::apiUrl);
2118+
}
2119+
20792120
void Test::setServerUrl(InfluxDBClient &client, String serverUrl) {
20802121
client._connInfo.serverUrl = serverUrl;
20812122
client._service->_apiURL = serverUrl + "/api/v2/";

test/Test.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Test : public TestBase {
7070
static void testRepeatedInit();
7171
static void testIsValidID();
7272
static void testBuckets();
73+
static void testFlushing();
7374
};
7475

7576
#endif //_TEST_H_

0 commit comments

Comments
 (0)