reactor-c 1.0
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
influxdb.h
Go to the documentation of this file.
1
23
24#include <sys/types.h>
25#include <sys/socket.h>
26#include <netinet/in.h>
27#include <arpa/inet.h>
28#include <sys/uio.h>
29#include <netdb.h>
30#include <stdarg.h>
31#include <string.h>
32#include <stdio.h>
33#include <unistd.h>
34#include <curl/curl.h>
35
36#define INFLUX_MEAS(m) IF_TYPE_MEAS, (m)
37#define INFLUX_TAG(k, v) IF_TYPE_TAG, (k), (v)
38#define INFLUX_F_STR(k, v) IF_TYPE_FIELD_STRING, (k), (v)
39#define INFLUX_F_FLT(k, v, p) IF_TYPE_FIELD_FLOAT, (k), (double)(v), (int)(p)
40#define INFLUX_F_INT(k, v) IF_TYPE_FIELD_INTEGER, (k), (long long)(v)
41#define INFLUX_F_BOL(k, v) IF_TYPE_FIELD_BOOLEAN, (k), ((v) ? 1 : 0)
42#define INFLUX_TS(ts) IF_TYPE_TIMESTAMP, (long long)(ts)
43#define INFLUX_END IF_TYPE_ARG_END
44
49typedef struct influx_client_t {
50 char* host;
51 int port;
52 char* db; // http only
53 char* usr; // http only [optional for auth]
54 char* pwd; // http only [optional for auth]
55 char* token; // http only
57
62typedef struct influx_v2_client_t {
63 char* host;
64 int port;
65 char* org;
66 char* bucket;
67 char* precision;
68 char* usr; // http only [optional for auth]
69 char* pwd; // http only [optional for auth]
70 char* token; // http only
72
83int format_line(char** buf, int* len, size_t used, ...);
84
93int post_http(influx_client_t* c, ...);
94
103int send_udp(influx_client_t* c, ...);
104
113int post_curl(influx_v2_client_t* c, ...);
114
115#define IF_TYPE_ARG_END 0
116#define IF_TYPE_MEAS 1
117#define IF_TYPE_TAG 2
118#define IF_TYPE_FIELD_STRING 3
119#define IF_TYPE_FIELD_FLOAT 4
120#define IF_TYPE_FIELD_INTEGER 5
121#define IF_TYPE_FIELD_BOOLEAN 6
122#define IF_TYPE_TIMESTAMP 7
123
124int _escaped_append(char** dest, size_t* len, size_t* used, const char* src, const char* escape_seq);
125int _begin_line(char** buf);
126int _format_line(char** buf, va_list ap);
127int _format_line2(char** buf, va_list ap, size_t*, size_t);
128
138int post_http_send_line(influx_client_t* c, char* buf, int len);
139
149int send_udp_line(influx_client_t* c, char* line, int len);
150
151int post_http_send_line(influx_client_t* c, char* buf, int len) {
152 int sock = -1, ret_code = 0, content_length = 0;
153 struct sockaddr_in addr;
154 struct iovec iv[2];
155 char ch;
156
157 iv[1].iov_base = buf;
158 iv[1].iov_len = len;
159
160 if (!(iv[0].iov_base = (char*)malloc(len = 0x800))) {
161 free(iv[1].iov_base);
162 return -2;
163 }
164
165 for (;;) {
166 iv[0].iov_len =
167 snprintf((char*)iv[0].iov_base, len,
168 "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\n"
169 "Host: %s\r\n"
170 "Accept: application/json\r\n"
171 "Content-type: text/plain\r\n"
172 "Authorization: Token %s\r\n"
173 "Content-Length: %zd\r\n"
174 "\r\n", // Final blank line is needed.
175 c->db, c->usr ? c->usr : "", c->pwd ? c->pwd : "", c->host, c->token ? c->token : "", iv[1].iov_len);
176 if ((int)iv[0].iov_len >= len && !(iv[0].iov_base = (char*)realloc(iv[0].iov_base, len *= 2))) {
177 free(iv[1].iov_base);
178 free(iv[0].iov_base);
179 return -3;
180 } else
181 break;
182 }
183
184 fprintf(stderr, "influxdb-c::post_http: iv[0] = '%s'\n", (char*)iv[0].iov_base);
185 fprintf(stderr, "influxdb-c::post_http: iv[1] = '%s'\n", (char*)iv[1].iov_base);
186
187 addr.sin_family = AF_INET;
188 addr.sin_port = htons(c->port);
189 // EAL: Rather than just an IP address, allow a hostname, like "localhost"
190 struct hostent* resolved_host = gethostbyname(c->host);
191 if (!resolved_host) {
192 free(iv[1].iov_base);
193 free(iv[0].iov_base);
194 return -4;
195 }
196 memcpy(&addr.sin_addr, resolved_host->h_addr_list[0], resolved_host->h_length);
197 /*
198 if((addr.sin_addr.s_addr = inet_addr(resolved_host->h_addr)) == INADDR_NONE) {
199 free(iv[1].iov_base);
200 free(iv[0].iov_base);
201 return -4;
202 }
203 */
204
205 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
206 free(iv[1].iov_base);
207 free(iv[0].iov_base);
208 return -5;
209 }
210
211 if (connect(sock, (struct sockaddr*)(&addr), sizeof(addr)) < 0) {
212 ret_code = -6;
213 goto END;
214 }
215
216 if (writev(sock, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) {
217 ret_code = -7;
218 goto END;
219 }
220 iv[0].iov_len = len;
221
222#define _GET_NEXT_CHAR() \
223 (ch = (len >= (int)iv[0].iov_len && \
224 (iv[0].iov_len = recv(sock, iv[0].iov_base, iv[0].iov_len, len = 0)) == (size_t)(-1) \
225 ? 0 \
226 : *((char*)iv[0].iov_base + len++)))
227#define _LOOP_NEXT(statement) \
228 for (;;) { \
229 if (!(_GET_NEXT_CHAR())) { \
230 ret_code = -8; \
231 goto END; \
232 } \
233 statement \
234 }
235#define _UNTIL(c) _LOOP_NEXT(if (ch == c) break;)
236#define _GET_NUMBER(n) _LOOP_NEXT(if (ch >= '0' && ch <= '9') n = n * 10 + (ch - '0'); else break;)
237#define _(c) \
238 if ((_GET_NEXT_CHAR()) != c) \
239 break;
240
241 _UNTIL(' ') _GET_NUMBER(ret_code) for (;;) {
242 _UNTIL('\n')
243 switch (_GET_NEXT_CHAR()) {
244 case 'C':
245 _('o')
246 _('n')
247 _('t')
248 _('e')
249 _('n') _('t') _('-') _('L') _('e') _('n') _('g') _('t') _('h') _(':') _(' ') _GET_NUMBER(content_length) break;
250 case '\r':
251 _('\n')
252 while (content_length-- > 0 && _GET_NEXT_CHAR())
253 ; // printf("%c", ch);
254 goto END;
255 }
256 if (!ch) {
257 ret_code = -10;
258 goto END;
259 }
260 }
261 ret_code = -11;
262END:
263 close(sock);
264 free(iv[0].iov_base);
265 free(iv[1].iov_base);
266 return ret_code / 100 == 2 ? 0 : ret_code;
267}
268#undef _GET_NEXT_CHAR
269#undef _LOOP_NEXT
270#undef _UNTIL
271#undef _GET_NUMBER
272#undef _
273
275 va_list ap;
276 char* line = NULL;
277 int ret_code = 0, len = 0;
278
279 va_start(ap, c);
280 len = _format_line((char**)&line, ap);
281 va_end(ap);
282 if (len < 0)
283 return -1;
284
285 ret_code = post_http_send_line(c, line, len);
286
287 return ret_code;
288}
289
290int send_udp_line(influx_client_t* c, char* line, int len) {
291 int sock = -1, ret = 0;
292 struct sockaddr_in addr;
293
294 addr.sin_family = AF_INET;
295 addr.sin_port = htons(c->port);
296 if ((addr.sin_addr.s_addr = inet_addr(c->host)) == INADDR_NONE) {
297 ret = -2;
298 goto END;
299 }
300
301 if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
302 ret = -3;
303 goto END;
304 }
305
306 if (sendto(sock, line, len, 0, (struct sockaddr*)&addr, sizeof(addr)) < len)
307 ret = -4;
308
309END:
310 if (sock >= 0) {
311 close(sock);
312 }
313 return ret;
314}
315
317 int ret = 0, len;
318 va_list ap;
319 char* line = NULL;
320
321 va_start(ap, c);
322 len = _format_line(&line, ap);
323 va_end(ap);
324 if (len < 0)
325 return -1;
326
327 ret = send_udp_line(c, line, len);
328
329 free(line);
330 return ret;
331}
332
334 va_list ap;
335 char* data = NULL;
336 int len = 0;
337 va_start(ap, c);
338 len = _format_line((char**)&data, ap);
339 va_end(ap);
340
341 CURL* curl;
342
343 /* In windows, this will init the winsock stuff */
344 curl_global_init(CURL_GLOBAL_ALL);
345 CURLcode res;
346
347 /* get a curl handle */
348 curl = curl_easy_init();
349 if (!curl) {
350 return CURLE_FAILED_INIT;
351 }
352
353 char* url_string = (char*)malloc(len);
354 snprintf(url_string, len, "http://%s:%d/api/v2/write?org=%s&bucket=%s&precision=%s", c->host ? c->host : "localhost",
355 c->port ? c->port : 8086, c->org, c->bucket, c->precision ? c->precision : "ns");
356
357 curl_easy_setopt(curl, CURLOPT_URL, url_string);
358 free(url_string);
359
360 char token_string[120];
361 snprintf(token_string, sizeof(token_string), "Authorization: Token %s", c->token);
362
363 struct curl_slist* list = NULL;
364 list = curl_slist_append(list, token_string);
365 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
366
367 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
368 curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
369
370 /* Perform the request, res will get the return code */
371 res = curl_easy_perform(curl);
372 /* Check for errors */
373 if (res != CURLE_OK) {
374 fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
375 }
376
377 free(data);
378 curl_easy_cleanup(curl);
379 curl_global_cleanup();
380 return res;
381}
382
383int format_line(char** buf, int* len, size_t used, ...) {
384 va_list ap;
385 va_start(ap, used);
386 used = _format_line2(buf, ap, (size_t*)len, used);
387 va_end(ap);
388 if (*len < 0)
389 return -1;
390 else
391 return used;
392}
393
394int _begin_line(char** buf) {
395 int len = 0x100;
396 if (!(*buf = (char*)malloc(len)))
397 return -1;
398 return len;
399}
400
401int _format_line(char** buf, va_list ap) {
402 size_t len = 0;
403 *buf = NULL;
404 return _format_line2(buf, ap, &len, 0);
405}
406
407int _format_line2(char** buf, va_list ap, size_t* _len, size_t used) {
408#define _APPEND(fmter...) \
409 for (;;) { \
410 if ((written = snprintf(*buf + used, len - used, ##fmter)) < 0) \
411 goto FAIL; \
412 if (used + written >= len && !(*buf = (char*)realloc(*buf, len *= 2))) \
413 return -1; \
414 else { \
415 used += written; \
416 break; \
417 } \
418 }
419
420 size_t len = *_len;
421 int written = 0, type = 0, last_type = 0;
422 unsigned long long i = 0;
423 double d = 0.0;
424
425 if (*buf == NULL) {
426 len = _begin_line(buf);
427 used = 0;
428 }
429
430 type = va_arg(ap, int);
431 while (type != IF_TYPE_ARG_END) {
432 if (type >= IF_TYPE_TAG && type <= IF_TYPE_FIELD_BOOLEAN) {
433 if (last_type < IF_TYPE_MEAS || last_type > (type == IF_TYPE_TAG ? IF_TYPE_TAG : IF_TYPE_FIELD_BOOLEAN))
434 goto FAIL;
435 _APPEND("%c", (last_type <= IF_TYPE_TAG && type > IF_TYPE_TAG) ? ' ' : ',');
436 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ",= "))
437 return -2;
438 _APPEND("=");
439 }
440 switch (type) {
441 case IF_TYPE_MEAS:
442 if (last_type)
443 _APPEND("\n");
444 if (last_type && last_type <= IF_TYPE_TAG)
445 goto FAIL;
446 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ", "))
447 return -3;
448 break;
449 case IF_TYPE_TAG:
450 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), ",= "))
451 return -4;
452 break;
454 _APPEND("\"");
455 if (_escaped_append(buf, &len, &used, va_arg(ap, char*), "\""))
456 return -5;
457 _APPEND("\"");
458 break;
460 d = va_arg(ap, double);
461 i = va_arg(ap, int);
462 _APPEND("%.*lf", (int)i, d);
463 break;
465 i = va_arg(ap, long long);
466 _APPEND("%lldi", i);
467 break;
469 i = va_arg(ap, int);
470 _APPEND("%c", i ? 't' : 'f');
471 break;
473 if (last_type < IF_TYPE_FIELD_STRING || last_type > IF_TYPE_FIELD_BOOLEAN)
474 goto FAIL;
475 i = va_arg(ap, long long);
476 _APPEND(" %lld", i);
477 break;
478 default:
479 goto FAIL;
480 }
481 last_type = type;
482 type = va_arg(ap, int);
483 }
484 _APPEND("\n");
485 if (last_type <= IF_TYPE_TAG)
486 goto FAIL;
487 *_len = len;
488 return used;
489FAIL:
490 free(*buf);
491 *buf = NULL;
492 return -1;
493}
494#undef _APPEND
495
496int _escaped_append(char** dest, size_t* len, size_t* used, const char* src, const char* escape_seq) {
497 size_t i = 0;
498
499 for (;;) {
500 if ((i = strcspn(src, escape_seq)) > 0) {
501 if (*used + i > *len && !(*dest = (char*)realloc(*dest, (*len) *= 2)))
502 return -1;
503 strncpy(*dest + *used, src, i);
504 *used += i;
505 src += i;
506 }
507 if (*src) {
508 if (*used + 2 > *len && !(*dest = (char*)realloc(*dest, (*len) *= 2)))
509 return -2;
510 (*dest)[(*used)++] = '\\';
511 (*dest)[(*used)++] = *src++;
512 } else
513 return 0;
514 }
515 return 0;
516}
int format_line(char **buf, int *len, size_t used,...)
Format a line for InfluxDB.
Definition influxdb.h:383
int send_udp(influx_client_t *c,...)
Send a line to InfluxDB via UDP.
Definition influxdb.h:316
int post_http_send_line(influx_client_t *c, char *buf, int len)
Post a line to InfluxDB via HTTP.
Definition influxdb.h:151
int send_udp_line(influx_client_t *c, char *line, int len)
Send a line to InfluxDB via UDP.
Definition influxdb.h:290
int post_curl(influx_v2_client_t *c,...)
Post a line to InfluxDB via HTTP.
Definition influxdb.h:333
int post_http(influx_client_t *c,...)
Post a line to InfluxDB via HTTP.
Definition influxdb.h:274
#define _APPEND(fmter...)
int _format_line2(char **buf, va_list ap, size_t *, size_t)
Definition influxdb.h:407
#define IF_TYPE_TAG
Definition influxdb.h:117
#define IF_TYPE_ARG_END
Definition influxdb.h:115
#define _GET_NEXT_CHAR()
#define IF_TYPE_FIELD_STRING
Definition influxdb.h:118
#define _UNTIL(c)
#define IF_TYPE_FIELD_BOOLEAN
Definition influxdb.h:121
int _escaped_append(char **dest, size_t *len, size_t *used, const char *src, const char *escape_seq)
Definition influxdb.h:496
int _format_line(char **buf, va_list ap)
Definition influxdb.h:401
#define _GET_NUMBER(n)
#define IF_TYPE_FIELD_INTEGER
Definition influxdb.h:120
#define IF_TYPE_TIMESTAMP
Definition influxdb.h:122
#define IF_TYPE_FIELD_FLOAT
Definition influxdb.h:119
#define IF_TYPE_MEAS
Definition influxdb.h:116
int _begin_line(char **buf)
Definition influxdb.h:394
#define _(c)
InfluxDB client.
Definition influxdb.h:49
char * token
Definition influxdb.h:55
char * pwd
Definition influxdb.h:54
int port
Definition influxdb.h:51
char * host
Definition influxdb.h:50
char * db
Definition influxdb.h:52
char * usr
Definition influxdb.h:53
InfluxDB v2 client.
Definition influxdb.h:62
int port
Definition influxdb.h:64
char * pwd
Definition influxdb.h:69
char * token
Definition influxdb.h:70
char * usr
Definition influxdb.h:68
char * precision
Definition influxdb.h:67
char * bucket
Definition influxdb.h:66
char * host
Definition influxdb.h:63
char * org
Definition influxdb.h:65