imjournal: add ratelimiting capability
authorRainer Gerhards <rgerhards@adiscon.com>
Sat, 15 Jun 2013 10:49:38 +0000 (12:49 +0200)
committerRainer Gerhards <rgerhards@adiscon.com>
Sat, 15 Jun 2013 10:49:38 +0000 (12:49 +0200)
The original imjournal code did not support ratelimiting at all. We
now have our own ratelimiter. This can mitigate against journal
database corruption, when the journal re-sends old data. This is a
current bug in systemd journal, but we won't outrule this to happen
in the future again. So it is better to have a safeguard in place.
By default, we permit 20,000 messages witin 10 minutes. This may
be a bit restrictive, but given the risk potential it seems reasonable.
Users requiring larger traffic flows can always adjust the value.

ChangeLog
plugins/imjournal/imjournal.c

index 037de05..d730b08 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,14 @@
 ---------------------------------------------------------------------------
 Version 7.4.1  [v7.4-stable] 2013-06-??
+- imjournal: add ratelimiting capability
+  The original imjournal code did not support ratelimiting at all. We
+  now have our own ratelimiter. This can mitigate against journal
+  database corruption, when the journal re-sends old data. This is a
+  current bug in systemd journal, but we won't outrule this to happen
+  in the future again. So it is better to have a safeguard in place.
+  By default, we permit 20,000 messages witin 10 minutes. This may
+  be a bit restrictive, but given the risk potential it seems reasonable.
+  Users requiring larger traffic flows can always adjust the value.
 - bugfix: potential loop in rate limiting
   if the message that tells about rate-limiting gets rate-limited itself,
   it will potentially create and endless loop
index 26b385c..f67cfa9 100755 (executable)
@@ -3,7 +3,7 @@
  * To test under Linux:
  * emmit log message into systemd journal
  *
- * Copyright (C) 2008-2012 Adiscon GmbH
+ * Copyright (C) 2008-2013 Adiscon GmbH
  *
  * This file is part of rsyslog.
  *
@@ -33,6 +33,7 @@
 #include <sys/poll.h>
 #include <sys/socket.h>
 #include <errno.h>
+#include <systemd/sd-journal.h>
 
 #include "dirty.h"
 #include "cfsysline.h"
@@ -47,7 +48,7 @@
 #include "errmsg.h"
 #include "srUtils.h"
 #include "unicode-helper.h"
-#include <systemd/sd-journal.h>
+#include "ratelimit.h"
 
 MODULE_TYPE_INPUT
 MODULE_TYPE_NOKEEP
@@ -64,11 +65,15 @@ DEFobjCurrIf(errmsg)
 static struct configSettings_s {
        char *stateFile;
        int iPersistStateInterval;
+       int ratelimitInterval;
+       int ratelimitBurst;
 } cs;
 
-/* module-gloval parameters */
+/* module-global parameters */
 static struct cnfparamdescr modpdescr[] = {
        { "statefile", eCmdHdlrGetWord, 0 },
+       { "ratelimit.interval", eCmdHdlrInt, 0 },
+       { "ratelimit.burst", eCmdHdlrInt, 0 },
        { "persiststateinterval", eCmdHdlrInt, 0 }
 };
 static struct cnfparamblk modpblk =
@@ -84,6 +89,7 @@ static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config
 static prop_t *pInputName = NULL;      /* there is only one global inputName for all messages generated by this module */
 static prop_t *pLocalHostIP = NULL;    /* a pseudo-constant propterty for 127.0.0.1 */
 
+static ratelimit_t *ratelimiter = NULL;
 static sd_journal *j;
 
 /* enqueue the the journal message into the message queue.
@@ -121,7 +127,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *
                msgAddJSON(pMsg, (uchar*)"!", json);
        }
 
-       CHKiRet(submitMsg2(pMsg));
+       CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
 
 finalize_it:
        RETiRet;
@@ -249,7 +255,7 @@ readjournal() {
                if (equal_sign == NULL) {
                        errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()"
                                " returned a malformed field (has no '='): '%s'",
-                               get);
+                               (char*)get);
                        continue; /* skip the entry */
                }
 
@@ -486,14 +492,16 @@ finalize_it:
 
 BEGINrunInput
 CODESTARTrunInput
-       /* this is an endless loop - it is terminated when the thread is
-        * signalled to do so. This, however, is handled by the framework.
-        */
+       CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL));
+       ratelimitSetLinuxLike(ratelimiter, cs.ratelimitInterval, cs.ratelimitBurst);
 
        if (cs.stateFile) {
                CHKiRet(loadJournalState());
        }
 
+       /* this is an endless loop - it is terminated when the thread is
+        * signalled to do so. This, however, is handled by the framework.
+        */
        while (glbl.GetGlobalInputTermState() == 0) {
                int count = 0, r;
 
@@ -534,6 +542,8 @@ CODESTARTbeginCnfLoad
 
        cs.iPersistStateInterval = DFLT_persiststateinterval;
        cs.stateFile = NULL;
+       cs.ratelimitBurst = 20000;
+       cs.ratelimitInterval = 600;
 ENDbeginCnfLoad
 
 
@@ -578,6 +588,7 @@ ENDafterRun
 
 BEGINmodExit
 CODESTARTmodExit
+       ratelimitDestruct(ratelimiter);
        if(pInputName != NULL)
                prop.Destruct(&pInputName);
        if(pLocalHostIP != NULL)
@@ -615,6 +626,10 @@ CODESTARTsetModCnf
                        cs.iPersistStateInterval = (int) pvals[i].val.d.n;
                } else if (!strcmp(modpblk.descr[i].name, "statefile")) {
                        cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
+               } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) {
+                       cs.ratelimitBurst = (int) pvals[i].val.d.n;
+               } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) {
+                       cs.ratelimitInterval = (int) pvals[i].val.d.n;
                } else {
                        dbgprintf("imjournal: program error, non-handled "
                                "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);