Skip to content

Commit

Permalink
Merge pull request #307 from shawnsmith/cassandra_dogstream
Browse files Browse the repository at this point in the history
Seems fine. Merging it.
Thanks @shawnsmith !
  • Loading branch information
Remi Hakim committed Dec 19, 2012
2 parents 8565a55 + cd7fa35 commit b481b3f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
6 changes: 4 additions & 2 deletions dogstream/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
DATE_FORMAT = '%Y-%m-%d %H:%M:%S,%f'
LEGACY_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'

# Parse Cassandra default system.log log4j pattern: %5p [%t] %d{ISO8601} %F (line %L) %m%n
LOG_PATTERN = re.compile(r"".join([
r"\s*(?P<priority>%s)\s+" % "|".join("(%s)" % p for p in LOG4J_PRIORITY),
r"(\[CompactionExecutor:\d*\])?\s*", # thread name and number
r"(\[CompactionExecutor:\d*\]\s+)?", # optional thread name and number
r"((?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d*)|",
r"(?P<time>\d{2}:\d{2}:\d{2},\d*))\s+",
r"(\w+\.java \(line \d+\)\s+)?", # optional source file and line
r"(?P<msg>Compact(ed|ing) .*)\s*",
]))

Expand All @@ -45,7 +47,7 @@ def parse_date(timestamp):
except ValueError:
# Only Python >= 2.6 supports %f in the date string
timestamp, _ = timestamp.split(',')
common.parse_date(timestamp, LEGACY_DATE_FORMAT)
return common.parse_date(timestamp, LEGACY_DATE_FORMAT)

def parse_cassandra(log, line):
matched = LOG_PATTERN.match(line)
Expand Down
58 changes: 57 additions & 1 deletion tests/test_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from nose.plugins.attrib import attr

from checks.cassandra import Cassandra
from dogstream.cassandra import parse_cassandra



Expand Down Expand Up @@ -66,6 +67,61 @@ def testParseTpstats(self):
res = {}
self.c._parseTpstats(self.tpstats, res)
self.assertNotEquals(len(res.keys()), 0)


class TestCassandraDogstream(unittest.TestCase):
@attr('cassandra')
def testStart(self):
events = parse_cassandra(logger, " INFO [main] 2012-12-11 21:46:26,995 StorageService.java (line 687) Bootstrap/Replace/Move completed! Now serving reads.")
self.assertTrue(events is None)

@attr('cassandra')
def testInfo(self):
events = parse_cassandra(logger, " INFO [CompactionExecutor:35] 2012-12-02 21:15:03,738 AutoSavingCache.java (line 268) Saved KeyCache (5 items) in 3 ms")
self.assertTrue(events is None)

@attr('cassandra')
def testWarn(self):
events = parse_cassandra(logger, " WARN [MemoryMeter:1] 2012-12-03 20:07:47,158 Memtable.java (line 197) setting live ratio to minimum of 1.0 instead of 0.9416553595658074")
self.assertTrue(events is None)

@attr('cassandra')
def testError(self):
for line in """\
ERROR [CompactionExecutor:518] 2012-12-11 21:35:29,686 AbstractCassandraDaemon.java (line 135) Exception in thread Thread[CompactionExecutor:518,1,RMI Runtime]
java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:215)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:397)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:470)
at org.apache.cassandra.io.sstable.SSTableDeletingTask.schedule(SSTableDeletingTask.java:67)
at org.apache.cassandra.io.sstable.SSTableReader.releaseReference(SSTableReader.java:806)
at org.apache.cassandra.db.DataTracker.removeOldSSTablesSize(DataTracker.java:358)
at org.apache.cassandra.db.DataTracker.postReplace(DataTracker.java:330)
at org.apache.cassandra.db.DataTracker.replace(DataTracker.java:324)
at org.apache.cassandra.db.DataTracker.replaceCompactedSSTables(DataTracker.java:253)
at org.apache.cassandra.db.ColumnFamilyStore.replaceCompactedSSTables(ColumnFamilyStore.java:992)
at org.apache.cassandra.db.compaction.CompactionTask.execute(CompactionTask.java:200)
at org.apache.cassandra.db.compaction.CompactionManager$1.runMayThrow(CompactionManager.java:154)
at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:30)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)""".splitlines():
events = parse_cassandra(logger, line)
self.assertTrue(events is None)

@attr('cassandra')
def testCompactionStart(self):
events = parse_cassandra(logger, " INFO [CompactionExecutor:2] 2012-12-11 21:46:27,012 CompactionTask.java (line 109) Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-11-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-9-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-12-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-10-Data.db')]")
self.assertEquals(events, [{'alert_type': 'info', 'event_type': 'cassandra.compaction', 'timestamp': 1355262387, 'msg_title': "Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-1", 'msg_text': "Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-11-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-9-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-12-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-10-Data.db')]", 'auto_priority': 0}])

@attr('cassandra')
def testCompactionEnd(self):
events = parse_cassandra(logger, "INFO [CompactionExecutor:2] 2012-12-11 21:46:27,095 CompactionTask.java (line 221) Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 to 583 (~66% of original) bytes for 4 keys at 0.007831MB/s. Time: 71ms.")
self.assertEquals(events, [{'alert_type': 'info', 'event_type': 'cassandra.compaction', 'timestamp': 1355262387, 'msg_title': 'Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 ', 'msg_text': 'Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 to 583 (~66% of original) bytes for 4 keys at 0.007831MB/s. Time: 71ms.', 'auto_priority': 0}])

if __name__ == '__main__':
unittest.main()

0 comments on commit b481b3f

Please sign in to comment.