add elasticsearch output module
[rsyslog.git] / plugins / omelasticsearch / omelasticsearch.c
1 /* omelasticsearch.c
2  * This is the http://www.elasticsearch.org/ output module.
3  *
4  * NOTE: read comments in module-template.h for more specifics!
5  *
6  * Copyright 2011 Nathan Scott.
7  * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
8  *
9  * This file is part of rsyslog.
10  *
11  * Rsyslog is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation, either version 3 of the License, or
14  * (at your option) any later version.
15  *
16  * Rsyslog is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Rsyslog.  If not, see <http://www.gnu.org/licenses/>.
23  *
24  * A copy of the GPL can be found in the file "COPYING" in this distribution.
25  */
26 #include "config.h"
27 #include "rsyslog.h"
28 #include <stdio.h>
29 #include <stdarg.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <curl/curl.h>
33 #include <curl/types.h>
34 #include <curl/easy.h>
35 #include <assert.h>
36 #include <signal.h>
37 #include <errno.h>
38 #include <time.h>
39 #include "conf.h"
40 #include "syslogd-types.h"
41 #include "srUtils.h"
42 #include "template.h"
43 #include "module-template.h"
44 #include "errmsg.h"
45 #include "statsobj.h"
46 #include "cfsysline.h"
47
48 MODULE_TYPE_OUTPUT
49 MODULE_TYPE_NOKEEP
50
51 /* internal structures */
52 DEF_OMOD_STATIC_DATA
53 DEFobjCurrIf(errmsg)
54 DEFobjCurrIf(statsobj)
55
56 statsobj_t *indexStats;
57 STATSCOUNTER_DEF(indexConFail, mutIndexConFail)
58 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
59 STATSCOUNTER_DEF(indexFailed, mutIndexFailed)
60 STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
61
62 /* REST API for elasticsearch hits this URL:
63  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
64  */
65 typedef struct curl_slist HEADER;
66 typedef struct _instanceData {
67         CURL    *curlHandle;    /* libcurl session handle */
68         HEADER  *postHeader;    /* json POST request info */
69 } instanceData;
70
71 /* config variables */
72 static int restPort = 9200;
73 static char *hostName = "localhost";
74 static char *searchIndex = "system";
75 static char *searchType = "events";
76
77 BEGINcreateInstance
78 CODESTARTcreateInstance
79 ENDcreateInstance
80
81 BEGINisCompatibleWithFeature
82 CODESTARTisCompatibleWithFeature
83         if(eFeat == sFEATURERepeatedMsgReduction)
84                 iRet = RS_RET_OK;
85 ENDisCompatibleWithFeature
86
87 BEGINfreeInstance
88 CODESTARTfreeInstance
89         if (pData->postHeader) {
90                 curl_slist_free_all(pData->postHeader);
91                 pData->postHeader = NULL;
92         }
93         if (pData->curlHandle) {
94                 curl_easy_cleanup(pData->curlHandle);
95                 pData->curlHandle = NULL;
96         }
97 ENDfreeInstance
98
99 BEGINdbgPrintInstInfo
100 CODESTARTdbgPrintInstInfo
101 ENDdbgPrintInstInfo
102
103 BEGINtryResume
104 CODESTARTtryResume
105 ENDtryResume
106
107 rsRetVal
108 curlPost(instanceData *instance, uchar *message)
109 {
110         CURLcode code;
111         CURL *curl = instance->curlHandle;
112         int length = strlen((char *)message);
113
114         curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
115         curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); 
116         curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); 
117         code = curl_easy_perform(curl);
118         switch (code) {
119                 case CURLE_COULDNT_RESOLVE_HOST:
120                 case CURLE_COULDNT_RESOLVE_PROXY:
121                 case CURLE_COULDNT_CONNECT:
122                 case CURLE_WRITE_ERROR:
123                         STATSCOUNTER_INC(indexConFail, mutIndexConFail);
124                         return RS_RET_SUSPENDED;
125                 default:
126                         STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
127                         return RS_RET_OK;
128         }
129 }
130
131 BEGINdoAction
132 CODESTARTdoAction
133         CHKiRet(curlPost(pData, ppString[0]));
134 finalize_it:
135 ENDdoAction
136
137 /* elasticsearch POST result string ... useful for debugging */
138 size_t
139 curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
140 {
141         unsigned int i;
142         char *p = (char *)ptr;
143         char *jsonData = (char *)userdata;
144         static char ok[] = "{\"ok\":true,";
145
146         ASSERT(size == 1);
147
148         if (size == 1 &&
149             nmemb > sizeof(ok)-1 &&
150             strncmp(p, ok, sizeof(ok)-1) == 0) {
151                 STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
152         } else {
153                 STATSCOUNTER_INC(indexFailed, mutIndexFailed);
154                 if (Debug) {
155                         DBGPRINTF("omelasticsearch request: %s\n", jsonData);
156                         DBGPRINTF("omelasticsearch result: ");
157                         for (i = 0; i < nmemb; i++)
158                                 DBGPRINTF("%c", p[i]);
159                         DBGPRINTF("\n");
160                 }
161         }
162         return size * nmemb;
163 }
164
165 static rsRetVal
166 curlSetup(instanceData *instance)
167 {
168         char restURL[2048];     /* libcurl makes a copy, using the stack here is OK */
169         HEADER *header;
170         CURL *handle;
171
172         handle = curl_easy_init();
173         if (handle == NULL) {
174                 return RS_RET_OBJ_CREATION_FAILED;
175         }
176
177         snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
178                 hostName, restPort, searchIndex, searchType);
179         header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
180
181         curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
182         curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); 
183         curl_easy_setopt(handle, CURLOPT_URL, restURL); 
184         curl_easy_setopt(handle, CURLOPT_POST, 1); 
185
186         instance->curlHandle = handle;
187         instance->postHeader = header;
188
189         DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL);
190         return RS_RET_OK;
191 }
192
193 BEGINparseSelectorAct
194 CODESTARTparseSelectorAct
195 CODE_STD_STRING_REQUESTparseSelectorAct(1)
196         if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) {
197                 ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
198         }
199         p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence  (-1 because of '\0'!) */
200         CHKiRet(createInstance(&pData));
201
202         /* check if a non-standard template is to be applied */
203         if(*(p-1) == ';')
204                 --p;
205         CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt"));
206
207         /* all good, we can now initialise our private data */
208         CHKiRet(curlSetup(pData));
209 CODE_STD_FINALIZERparseSelectorAct
210 ENDparseSelectorAct
211
212 BEGINmodExit
213 CODESTARTmodExit
214         curl_global_cleanup();
215         statsobj.Destruct(&indexStats);
216         objRelease(errmsg, CORE_COMPONENT);
217         objRelease(statsobj, CORE_COMPONENT);
218 ENDmodExit
219
220 BEGINqueryEtryPt
221 CODESTARTqueryEtryPt
222 CODEqueryEtryPt_STD_OMOD_QUERIES
223 CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
224 ENDqueryEtryPt
225
226 static rsRetVal
227 resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
228 {
229         DEFiRet;
230         restPort = 9200;
231         hostName = "localhost";
232         searchIndex = "system";
233         searchType = "events";
234         RETiRet;
235 }
236
237 BEGINmodInit()
238 CODESTARTmodInit
239         *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
240 CODEmodInit_QueryRegCFSLineHdlr
241         CHKiRet(objUse(errmsg, CORE_COMPONENT));
242         CHKiRet(objUse(statsobj, CORE_COMPONENT));
243
244         /* register config file handlers */
245         CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID));
246         CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID));
247         CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID));
248         CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID));
249         CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
250
251         if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
252                 errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
253                 ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
254         }
255
256         /* support statistics gathering */
257         CHKiRet(statsobj.Construct(&indexStats));
258         CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch"));
259         CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail",
260                 ctrType_IntCtr, &indexConFail));
261         CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits",
262                 ctrType_IntCtr, &indexSubmit));
263         CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed",
264                 ctrType_IntCtr, &indexFailed));
265         CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success",
266                 ctrType_IntCtr, &indexSuccess));
267         CHKiRet(statsobj.ConstructFinalize(indexStats));
268 ENDmodInit
269
270 /* vi:set ai:
271  */