omrelp: support v8 omod interface
authorRainer Gerhards <rgerhards@adiscon.com>
Wed, 13 Nov 2013 10:14:11 +0000 (11:14 +0100)
committerRainer Gerhards <rgerhards@adiscon.com>
Wed, 13 Nov 2013 10:14:11 +0000 (11:14 +0100)
plugins/omrelp/omrelp.c

index 34511e4..3b1584e 100644 (file)
@@ -2,8 +2,17 @@
  *
  * This is the implementation of the RELP output module.
  *
- * NOTE: read comments in module-template.h to understand how this file
- *       works!
+ * Note that when multiple action workers are activated, we currently
+ * also create multiple actions. This may be the source of some mild
+ * message loss (!) if the worker instance is shut down while the
+ * connection to the remote system is in retry state.
+ * TODO: think if we should implement a mode where we do NOT
+ *       support multiple action worker instances. This would be
+ *       slower, but not have this loss opportunity. But it should
+ *       definitely be optional and by default off due to the
+ *       performance implications (and given the fact that message
+ *       loss is pretty unlikely in usual cases).
+ *
  *
  * File begun on 2008-03-13 by RGerhards
  *
@@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine;    /* our relp engine */
 typedef struct _instanceData {
        uchar *target;
        uchar *port;
-       int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
-       int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
        int sizeWindow;         /**< the RELP window size - 0=use default */
        unsigned timeout;
        unsigned rebindInterval;
-       unsigned nSent;
-       relpClt_t *pRelpClt; /* relp client for this instance */
        sbool bEnableTLS;
        sbool bEnableTLSZip;
        sbool bHadAuthFail;     /**< set on auth failure, will cause retry to disable action */
@@ -85,11 +90,20 @@ typedef struct _instanceData {
        } permittedPeers;
 } instanceData;
 
+typedef struct wrkrInstanceData {
+       instanceData *pData;
+       int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
+       int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+       relpClt_t *pRelpClt; /* relp client for this instance */
+       unsigned nSent; /* number msgs sent - for rebind support */
+} wrkrInstanceData_t;
+
 typedef struct configSettings_s {
        EMPTY_STRUCT
 } configSettings_t;
 static configSettings_t __attribute__((unused)) cs;
 
+static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData);
 
 /* tables for interfacing with the v6 config system */
 /* action (instance) parameters */
@@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData)
 static void
 onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
 {
-       instanceData *pData = (instanceData*) pUsr;
+       wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr;
        errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
                        " '%s' - action may not work as intended",
-                       pData->target, pData->port, errmesg, objinfo);
+                       pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo);
 }
 
 static void
@@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er
 static void
 onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
 {
-       instanceData *pData = (instanceData*) pUsr;
+       instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData;
        errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
                        "is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
        pData->bHadAuthFail = 1;
 }
 
-static inline rsRetVal
-doCreateRelpClient(instanceData *pData)
+static rsRetVal
+doCreateRelpClient(wrkrInstanceData_t *pWrkrData)
 {
        int i;
+       instanceData *pData;
        DEFiRet;
-       if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+
+       pData = pWrkrData->pData;
+       if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK)
                ABORT_FINALIZE(RS_RET_RELP_ERR);
-       if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+       if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK)
                ABORT_FINALIZE(RS_RET_RELP_ERR);
-       if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+       if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
                ABORT_FINALIZE(RS_RET_RELP_ERR);
-       if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK)
+       if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK)
                ABORT_FINALIZE(RS_RET_RELP_ERR);
        if(pData->bEnableTLS) {
-               if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+               if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
                if(pData->bEnableTLSZip) {
-                       if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+                       if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK)
                                ABORT_FINALIZE(RS_RET_RELP_ERR);
                }
-               if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+               if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
-               if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+               if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
                        errmsg.LogError(0, RS_RET_RELP_ERR,
                                        "omrelp: invalid auth mode '%s'\n", pData->authmode);
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
                }
-               if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+               if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
-               if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+               if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
-               if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+               if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
                for(i = 0 ; i <  pData->permittedPeers.nmemb ; ++i) {
-                       relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+                       relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]);
                }
        }
        if(glbl.GetSourceIPofLocalClient() == NULL) {   /* ar Do we have a client IP set? */
-               if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+               if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
                        ABORT_FINALIZE(RS_RET_RELP_ERR);
        }
-       pData->bInitialConnect = 1;
-       pData->nSent = 0;
+       pWrkrData->bInitialConnect = 1;
+       pWrkrData->nSent = 0;
 finalize_it:
        RETiRet;
 }
@@ -221,11 +238,15 @@ CODESTARTcreateInstance
        pData->permittedPeers.nmemb = 0;
 ENDcreateInstance
 
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+       pWrkrData->pRelpClt = NULL;
+       iRet = doCreateRelpClient(pWrkrData);
+ENDcreateWrkrInstance
+
 BEGINfreeInstance
        int i;
 CODESTARTfreeInstance
-       if(pData->pRelpClt != NULL)
-               relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
        free(pData->target);
        free(pData->port);
        free(pData->tplName);
@@ -239,6 +260,12 @@ CODESTARTfreeInstance
        }
 ENDfreeInstance
 
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+       if(pWrkrData->pRelpClt != NULL)
+               relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt);
+ENDfreeWrkrInstance
+
 static inline void
 setInstParamDefaults(instanceData *pData)
 {
@@ -318,8 +345,6 @@ CODESTARTnewActInst
                            "RSYSLOG_ForwardFormat" : (char*)pData->tplName),
                            OMSR_NO_RQD_TPL_OPTS));
 
-       CHKiRet(doCreateRelpClient(pData));
-
 CODE_STD_FINALIZERnewActInst
        if(pvals != NULL)
                cnfparamvalsDestruct(pvals, &actpblk);
@@ -347,22 +372,23 @@ ENDdbgPrintInstInfo
 /* try to connect to server
  * rgerhards, 2008-03-21
  */
-static rsRetVal doConnect(instanceData *pData)
+static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData)
 {
        DEFiRet;
 
-       if(pData->bInitialConnect) {
-               iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
+       if(pWrkrData->bInitialConnect) {
+               iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(),
+                                     pWrkrData->pData->port, pWrkrData->pData->target);
                if(iRet == RELP_RET_OK)
-                       pData->bInitialConnect = 0;
+                       pWrkrData->bInitialConnect = 0;
        } else {
-               iRet = relpCltReconnect(pData->pRelpClt);
+               iRet = relpCltReconnect(pWrkrData->pRelpClt);
        }
 
        if(iRet == RELP_RET_OK) {
-               pData->bIsConnected = 1;
+               pWrkrData->bIsConnected = 1;
        } else {
-               pData->bIsConnected = 0;
+               pWrkrData->bIsConnected = 0;
                iRet = RS_RET_SUSPENDED;
        }
 
@@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData)
 
 BEGINtryResume
 CODESTARTtryResume
-       if(pData->bHadAuthFail) {
+       if(pWrkrData->pData->bHadAuthFail) {
                ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
        }
-       iRet = doConnect(pData);
+       iRet = doConnect(pWrkrData);
 finalize_it:
 ENDtryResume
 
 static inline rsRetVal
-doRebind(instanceData *pData)
+doRebind(wrkrInstanceData_t *pWrkrData)
 {
        DEFiRet;
        DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
-       CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
-       pData->bIsConnected = 0;
-       CHKiRet(doCreateRelpClient(pData));
+       CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt));
+       pWrkrData->bIsConnected = 0;
+       CHKiRet(doCreateRelpClient(pWrkrData));
 finalize_it:
        RETiRet;
 }
@@ -394,10 +420,10 @@ finalize_it:
 BEGINbeginTransaction
 CODESTARTbeginTransaction
 dbgprintf("omrelp: beginTransaction\n");
-       if(!pData->bIsConnected) {
-               CHKiRet(doConnect(pData));
+       if(!pWrkrData->bIsConnected) {
+               CHKiRet(doConnect(pWrkrData));
        }
-       relpCltHintBurstBegin(pData->pRelpClt);
+       relpCltHintBurstBegin(pWrkrData->pRelpClt);
 finalize_it:
 ENDbeginTransaction
 
@@ -405,11 +431,13 @@ BEGINdoAction
        uchar *pMsg; /* temporary buffering */
        size_t lenMsg;
        relpRetVal ret;
+       instanceData *pData;
 CODESTARTdoAction
+       pData = pWrkrData->pData;
        dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
 
-       if(!pData->bIsConnected) {
-               CHKiRet(doConnect(pData));
+       if(!pWrkrData->bIsConnected) {
+               CHKiRet(doConnect(pWrkrData));
        }
 
        pMsg = ppString[0];
@@ -420,7 +448,7 @@ CODESTARTdoAction
                lenMsg = glbl.GetMaxLine();
 
        /* forward */
-       ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg);
+       ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg);
        if(ret != RELP_RET_OK) {
                /* error! */
                dbgprintf("error forwarding via relp, suspending\n");
@@ -428,8 +456,8 @@ CODESTARTdoAction
        }
 
        if(pData->rebindInterval != 0 &&
-          (++pData->nSent >= pData->rebindInterval)) {
-               doRebind(pData);
+          (++pWrkrData->nSent >= pData->rebindInterval)) {
+               doRebind(pWrkrData);
        }
 finalize_it:
        if(pData->bHadAuthFail)
@@ -448,7 +476,7 @@ ENDdoAction
 BEGINendTransaction
 CODESTARTendTransaction
        dbgprintf("omrelp: endTransaction\n");
-       relpCltHintBurstEnd(pData->pRelpClt);
+       relpCltHintBurstEnd(pWrkrData->pRelpClt);
 ENDendTransaction
 
 BEGINparseSelectorAct
@@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
        /* process template */
        CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
 
-       CHKiRet(doCreateRelpClient(pData));
-
 CODE_STD_FINALIZERparseSelectorAct
 ENDparseSelectorAct
 
@@ -546,6 +572,7 @@ ENDmodExit
 BEGINqueryEtryPt
 CODESTARTqueryEtryPt
 CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
 CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES 
 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
 CODEqueryEtryPt_TXIF_OMOD_QUERIES