Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Flume sink #225

Closed
nmaillard opened this issue May 24, 2013 · 4 comments
Closed

Flume sink #225

nmaillard opened this issue May 24, 2013 · 4 comments

Comments

@nmaillard
Copy link
Contributor

On a regular basis I use flume to ingest logs in realtime.
Flume currently has an Hbase sink, a phoenix hbase sink would probably be an extra layer or a revamp of this existing sink.

@jtaylor-sfdc
Copy link
Contributor

Great idea! That would be a nice addition

@nmaillard
Copy link
Contributor Author

I have started looking into this, definitly not a priority, but since it's looking like a use case in my company mght as well do it for everyone.
Essentially the current Hbase sink from cloudera can be mostly reused, serializers as well. The hbase write primitives must be changed to phoenix statements and phoenix client added to the libs.
I'll try to get a test going shortly

@ghost ghost assigned mravi Nov 15, 2013
@mravi
Copy link
Contributor

mravi commented Nov 23, 2013

Hi ,
Below are my thoughts on the Phoenix plugin for Flume.
a) A custom Phoenix Sink where events will be extracted from the Channel.
b) A Regular Expression based Event serializer that extracts data from the flume event based on the regular expression passed through the flume-conf.properties file.

A typical configuration file for the Phoenix sink could be like
       agent.sinks.sink1.type = com.salesforce.phoenix.flume.sink.PhoenixSink
    agent.sinks.sink1.channel = ch1
    agent.sinks.sink1.batchSize = 100
    agent.sinks.sink1.table = events
          or
    agent.sinks.sink1.table.ddl = CREATE TABLE IF NOT EXISTS events (event_time date not null,col1    integer ,cf1.col2 integer CONSTRAINT pk PRIMARY KEY (event_time)) SALT_BUCKETS=3;   

    agent.sinks.sink1.serializer = com.salesforce.phoenix.flume.serializer.PhoenixEventSerializer
    agent.sinks.sink1.serializer.regex = {regular expression}
            agent.sinks.sink1.serializer.columns=event_time,col1,cf1.col2
    agent.sinks.sink1.serializer.preserveHeaders = true

Assumptions:
a) When agent.sinks.sink1.serializer.preserveHeaders = true is set, the headers from the flume event will be inserted as dynamic columns where the column name will be the header name and its value will be the header value.
b) If a event doesn't match the regular expression , we drop the event. If this is not acceptable, we need to further discuss on how we handle such cases.
c) Users either provide the table name or the ddl for table creation. In the former , the table is assumed to exist and we get the column information from the metadata.
d) Users need to explicitly specify the property of agent.sinks.sink1.serializer.columns=event_time,col1,cf1.col2
when the order of column definition in the payload of event and the table is different.
e) Being a third party plugin, this component will be added to the classpath of Flume by placing the necessary artifacts in the plugins.d directory

Features:
a) Should be able to send metrics to the JMX platform MBean server for performance monitoring.

Suggestions and recommendations are most welcome and we can keep track of all through this ticket itself.

@jtaylor-sfdc
Copy link
Contributor

Implemented by @mravi. Fantastic job - thanks for the contributions.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants