diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000000000..40d7e09edaf16
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent
+ 1.0.0-SNAPSHOT
+ ../../pom.xml
+
+
+ spark-streaming-flume-sink_2.10
+ jar
+ Spark Project External Flume Sink
+ http://spark.apache.org/
+
+
+ org.apache.flume
+ flume-ng-sdk
+ 1.4.0
+
+
+ org.jboss.netty
+ netty
+
+
+ org.apache.thrift
+ libthrift
+
+
+
+
+ org.apache.flume
+ flume-ng-core
+ 1.4.0
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.7.3
+
+ String
+
+ ${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro
+
+
+
+ generate-sources
+
+ idl-protocol
+
+
+
+
+
+
+
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000000000..9dcc709de079a
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.flume")
+
+protocol SparkFlumeProtocol {
+
+ record SparkSinkEvent {
+ map headers;
+ bytes body;
+ }
+
+ record EventBatch {
+ string sequenceNumber;
+ array eventBatch;
+ }
+
+ EventBatch getEventBatch (int n);
+
+ void ack (string sequenceNumber);
+
+ void nack (string sequenceNumber);
+
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala
new file mode 100644
index 0000000000000..6243463a475b6
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.flume.sink
+
+import org.apache.flume.sink.AbstractSink
+import java.util.concurrent.locks.ReentrantLock
+import org.apache.flume.Sink.Status
+import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol}
+import scala.util.control.Breaks
+import java.nio.ByteBuffer
+import org.apache.flume.{FlumeException, Context}
+import org.slf4j.LoggerFactory
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.commons.lang.RandomStringUtils
+import java.util.concurrent._
+import java.util
+import org.apache.flume.conf.{ConfigurationException, Configurable}
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.{NettyTransceiver, NettyServer}
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import java.net.InetSocketAddress
+
+class SparkSink() extends AbstractSink with Configurable {
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+ private val lock = new ReentrantLock()
+ private val blockingCondition = lock.newCondition()
+
+ // This sink will not persist sequence numbers and reuses them if it gets restarted.
+ // So it is possible to commit a transaction which may have been meant for the sink before the
+ // restart.
+ // Since the new txn may not have the same sequence number we must guard against accidentally
+ // committing
+ // a new transaction. To reduce the probability of that happening a random string is prepended
+ // to the sequence number.
+ // Does not change for life of sink
+ private val seqBase = RandomStringUtils.randomAlphanumeric(8)
+ // Incremented for each transaction
+ private val seqNum = new AtomicLong(0)
+
+ private var transactionExecutorOpt: Option[ExecutorService] = None
+
+ private var numProcessors: Integer = SparkSinkConfig.DEFAULT_PROCESSOR_COUNT
+ private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+ private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+
+ private var processorFactory: Option[SparkHandlerFactory] = None
+ private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+ private var port: Int = 0
+ private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS
+ private var serverOpt: Option[NettyServer] = None
+ private var running = false
+
+ override def start() {
+ transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build()))
+
+ processorFactory = Option(new SparkHandlerFactory(numProcessors))
+
+ val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler())
+
+ serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port),
+ new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+ "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build),
+ Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat(
+ "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build))))
+
+ serverOpt.map(server => server.start())
+ lock.lock()
+ try {
+ running = true
+ } finally {
+ lock.unlock()
+ }
+ super.start()
+ }
+
+ override def stop() {
+ lock.lock()
+ try {
+ running = false
+ transactionExecutorOpt.map(executor => executor.shutdownNow())
+ blockingCondition.signalAll()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def configure(ctx: Context) {
+ import SparkSinkConfig._
+ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+ val portOpt = Option(ctx.getInteger(CONF_PORT))
+ if(portOpt.isDefined) {
+ port = portOpt.get
+ } else {
+ throw new ConfigurationException("The Port to bind must be specified")
+ }
+ numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT)
+ transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+ maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS)
+ }
+
+ override def process(): Status = {
+ // This method is called in a loop by the Flume framework - block it until the sink is
+ // stopped to save CPU resources
+ lock.lock()
+ try {
+ while(running) {
+ blockingCondition.await()
+ }
+ } finally {
+ lock.unlock()
+ }
+ Status.BACKOFF
+ }
+
+ private class AvroCallbackHandler() extends SparkFlumeProtocol {
+
+ override def getEventBatch(n: Int): EventBatch = {
+ val processor = processorFactory.get.checkOut(n)
+ transactionExecutorOpt.map(executor => executor.submit(processor))
+ // Wait until a batch is available - can be null if some error was thrown
+ val eventBatch = Option(processor.eventQueue.take())
+ if (eventBatch.isDefined) {
+ val eventsToBeSent = eventBatch.get
+ processorMap.put(eventsToBeSent.getSequenceNumber, processor)
+ if (LOG.isDebugEnabled) {
+ LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() +
+ " events with sequence number: " + eventsToBeSent.getSequenceNumber)
+ }
+ eventsToBeSent
+ } else {
+ throw new FlumeException("Error while trying to retrieve events from the channel.")
+ }
+ }
+
+ override def ack(sequenceNumber: CharSequence): Void = {
+ completeTransaction(sequenceNumber, success = true)
+ null
+ }
+
+ override def nack(sequenceNumber: CharSequence): Void = {
+ completeTransaction(sequenceNumber, success = false)
+ LOG.info("Spark failed to commit transaction. Will reattempt events.")
+ null
+ }
+
+ def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+ val processorOpt = Option(processorMap.remove(sequenceNumber))
+ if (processorOpt.isDefined) {
+ val processor = processorOpt.get
+ processor.resultQueueUpdateLock.lock()
+ try {
+ // Is the sequence number the same as the one the processor is processing? If not,
+ // don't update {
+ if (processor.eventBatch.getSequenceNumber.equals(sequenceNumber)) {
+ processor.resultQueue.put(success)
+ }
+ } finally {
+ processor.resultQueueUpdateLock.unlock()
+ }
+ }
+ }
+ }
+
+ // Flume forces transactions to be thread-local (horrible, I know!)
+ // So the sink basically spawns a new thread to pull the events out within a transaction.
+ // The thread fills in the event batch object that is set before the thread is scheduled.
+ // After filling it in, the thread waits on a condition - which is released only
+ // when the success message comes back for the specific sequence number for that event batch.
+ /**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. It is blocked until the success call for that transaction comes
+ * back.
+ * @param maxBatchSize
+ */
+ private class TransactionProcessor(var maxBatchSize: Int) extends Callable[Void] {
+ // Must be set to a new event batch before scheduling this!!
+ val eventBatch = new EventBatch("", new util.LinkedList[SparkSinkEvent])
+ val eventQueue = new SynchronousQueue[EventBatch]()
+ val resultQueue = new SynchronousQueue[Boolean]()
+ val resultQueueUpdateLock = new ReentrantLock()
+
+ object Zero {
+ val zero = "0" // Oh, I miss static finals
+ }
+
+
+ override def call(): Void = {
+ val tx = getChannel.getTransaction
+ tx.begin()
+ try {
+ eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
+ val events = eventBatch.getEventBatch
+ events.clear()
+ val loop = new Breaks
+ loop.breakable {
+ for (i <- 0 until maxBatchSize) {
+ val eventOpt = Option(getChannel.take())
+
+ eventOpt.map(event => {
+ events.add(new SparkSinkEvent(toCharSequenceMap(event
+ .getHeaders),
+ ByteBuffer.wrap(event.getBody)))
+ })
+ if (eventOpt.isEmpty) {
+ loop.break()
+ }
+ }
+ }
+ // Make the data available to the sender thread
+ eventQueue.put(eventBatch)
+
+ // Wait till timeout for the ack/nack
+ val maybeResult = Option(resultQueue.poll(transactionTimeout, TimeUnit.SECONDS))
+ // There is a race condition here.
+ // 1. This times out.
+ // 2. The result is empty, so timeout exception is thrown.
+ // 3. The ack comes in before the finally block is entered
+ // 4. The thread with the ack has a handle to this processor,
+ // and another thread has the same processor checked out
+ // (since the finally block was executed and the processor checked back in)
+ // 5. The thread with the ack now updates the result queue,
+ // so the processor thinks it is the ack for the current batch.
+ // To avoid this - update the sequence number to "0" (with or without a result - does not
+ // matter).
+ // In the ack method, check if the seq number is the same as the processor's -
+ // if they are then update the result queue. Now if the
+ // processor updates the seq number first - the ack/nack never updates the result. If the
+ // ack/nack updates the
+ // result after the timeout but before the seq number is updated to "0" it does not
+ // matter - the processor would
+ // still timeout and the result is cleared before reusing the processor.
+ // Unfortunately, this needs to be done from within a lock
+ // to make sure that the new sequence number is actually visible to the ack thread
+ // (happens-before)
+ resultQueueUpdateLock.lock()
+ try {
+ eventBatch.setSequenceNumber(Zero.zero)
+ } finally {
+ resultQueueUpdateLock.unlock()
+ }
+ eventBatch.getEventBatch.clear()
+ // If the batch failed on spark side, throw a FlumeException
+ maybeResult.map(success =>
+ if (!success) {
+ throw new
+ FlumeException("Spark could not accept events. The transaction will be retried.")
+ }
+ )
+ // If the operation timed out, throw a TimeoutException
+ if (maybeResult.isEmpty) {
+ throw new TimeoutException("Spark did not respond within the timeout period of " +
+ transactionTimeout + "seconds. Transaction will be retried")
+ }
+ null
+ } catch {
+ case e: Throwable =>
+ try {
+ LOG.warn("Error while attempting to remove events from the channel.", e)
+ tx.rollback()
+ } catch {
+ case e1: Throwable => LOG.error(
+ "Rollback failed while attempting to rollback due to commit failure.", e1)
+ }
+ null // No point rethrowing the exception
+ } finally {
+ // Must *always* release the caller thread
+ eventQueue.put(null)
+ // In the case of success coming after the timeout, but before resetting the seq number
+ // remove the event from the map and then clear the value
+ resultQueue.clear()
+ processorMap.remove(eventBatch.getSequenceNumber)
+ processorFactory.get.checkIn(this)
+ tx.close()
+ }
+ }
+
+ def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+ CharSequence] = {
+ val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+ charSeqMap.putAll(inMap)
+ charSeqMap
+ }
+ }
+
+ private class SparkHandlerFactory(val maxInstances: Int) {
+ val queue = new scala.collection.mutable.Queue[TransactionProcessor]
+ val queueModificationLock = new ReentrantLock()
+ var currentSize = 0
+ val waitForCheckIn = queueModificationLock.newCondition()
+
+ def checkOut(n: Int): TransactionProcessor = {
+ def getProcessor = {
+ val processor = queue.dequeue()
+ processor.maxBatchSize = n
+ processor
+ }
+ queueModificationLock.lock()
+ try {
+ if (queue.size > 0) {
+ getProcessor
+ }
+ else {
+ if (currentSize < maxInstances) {
+ currentSize += 1
+ new TransactionProcessor(n)
+ } else {
+ // No events in queue and cannot initialize more!
+ // Since currentSize never reduces, queue size increasing is the only hope
+ while (queue.size == 0 && currentSize >= maxInstances) {
+ waitForCheckIn.await()
+ }
+ getProcessor
+ }
+ }
+ } finally {
+ queueModificationLock.unlock()
+ }
+ }
+
+ def checkIn(processor: TransactionProcessor) {
+ queueModificationLock.lock()
+ try {
+ queue.enqueue(processor)
+ waitForCheckIn.signal()
+ } finally {
+ queueModificationLock.unlock()
+ }
+ }
+ }
+}
+
+object SparkSinkConfig {
+ val PROCESSOR_COUNT = "processorCount"
+ val DEFAULT_PROCESSOR_COUNT = 10
+
+ val CONF_TRANSACTION_TIMEOUT = "timeout"
+ val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+ val CONF_HOSTNAME = "hostname"
+ val DEFAULT_HOSTNAME = "0.0.0.0"
+
+ val CONF_PORT = "port"
+
+ val CONF_MAX_THREADS = "maxThreads"
+ val DEFAULT_MAX_THREADS = 5
+}
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 6aec215687fe0..93d8ec02ac69e 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -74,6 +74,10 @@
junit-interface
test
+
+ org.apache.spark
+ spark-streaming-flume-sink_2.10
+
target/scala-${scala.binary.version}/classes
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000000..91f6171d57368
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectOutput, ObjectInput}
+import org.apache.spark.util.Utils
+import scala.collection.JavaConversions._
+import org.apache.spark.Logging
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.read(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.read(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.read(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k,v) <- headers) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index df7605fe579f8..78715226ab402 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -30,7 +30,6 @@ import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
@@ -61,47 +60,14 @@ class SparkFlumeEvent() extends Externalizable {
/* De-serialize from bytes. */
def readExternal(in: ObjectInput) {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.read(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.read(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.read(valBuff)
- val value : String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
-
+ val (headers, bodyBuff) = EventTransformer.readExternal(in)
event.setBody(ByteBuffer.wrap(bodyBuff))
event.setHeaders(headers)
}
/* Serialize to bytes. */
def writeExternal(out: ObjectOutput) {
- val body = event.getBody.array()
- out.writeInt(body.length)
- out.write(body)
-
- val numHeaders = event.getHeaders.size()
- out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
+ EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array())
}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000000..71b0f72f85f53
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.reflect.ClassTag
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.Logging
+import java.net.InetSocketAddress
+import java.util.concurrent.{TimeUnit, Executors}
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol}
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+
+class FlumePollingInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ val host: String,
+ val port: Int,
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+) extends ReceiverInputDStream[SparkPollingEvent](ssc_) {
+ /**
+ * Gets the receiver object that will be sent to the worker nodes
+ * to receive data. This method needs to defined by any specific implementation
+ * of a NetworkInputDStream.
+ */
+ override def getReceiver(): Receiver[SparkPollingEvent] = {
+ new FlumePollingReceiver(host, port, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ host: String,
+ port: Int,
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+) extends Receiver[SparkPollingEvent](storageLevel) with Logging {
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),
+ Executors.newSingleThreadExecutor())
+ lazy val transceiver = new NettyTransceiver(new InetSocketAddress(host, port), channelFactory)
+ lazy val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ override def onStart(): Unit = {
+ val dataReceiver = new Runnable {
+ override def run(): Unit = {
+ while (true) {
+ val batch = client.getEventBatch(maxBatchSize)
+ val seq = batch.getSequenceNumber
+ val events: java.util.List[SparkSinkEvent] = batch.getEventBatch
+ logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
+ try {
+ events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event)))
+ client.ack(seq)
+ } catch {
+ case e: Throwable =>
+ client.nack(seq)
+ TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
+ logWarning("Error while attempting to store events", e)
+ }
+ }
+ }
+ }
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads starting..")
+ receiverExecutor.submit(dataReceiver)
+ }
+ }
+
+ override def store(dataItem: SparkPollingEvent) {
+ // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
+ // This takes a performance hit, since the parallelism is useful only for pulling data now.
+ this.synchronized {
+ super.store(dataItem)
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdownNow()
+ transceiver.close()
+ channelFactory.releaseExternalResources()
+ }
+}
+
+private[streaming] object SparkPollingEvent {
+ def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = {
+ val event = new SparkPollingEvent()
+ event.event = in
+ event
+ }
+}
+/*
+ * Unfortunately Avro does not allow including pre-compiled classes - so even though
+ * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
+ * around that to make it externalizable.
+ */
+class SparkPollingEvent() extends Externalizable with Logging {
+ var event : SparkSinkEvent = new SparkSinkEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput) {
+ val (headers, bodyBuff) = EventTransformer.readExternal(in)
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput) {
+ EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array())
+ }
+}
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 499f3560ef768..f7d9bd3c6e2ab 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
object FlumeUtils {
/**
@@ -68,4 +68,52 @@ object FlumeUtils {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param host The host on which the Flume agent is running
+ * @param port The port the Spark Sink is accepting connections on
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream (
+ ssc: StreamingContext,
+ host: String,
+ port: Int,
+ maxBatchSize: Int = 100,
+ parallelism: Int = 5,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkPollingEvent] = {
+ new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param host The host on which the Flume agent is running
+ * @param port The port the Spark Sink is accepting connections on
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createJavaPollingStream (
+ ssc: StreamingContext,
+ host: String,
+ port: Int,
+ maxBatchSize: Int = 100,
+ parallelism: Int = 5,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): JavaReceiverInputDStream[SparkPollingEvent] = {
+ new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
+ parallelism, storageLevel)
+ }
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala
new file mode 100644
index 0000000000000..579f0b1091df3
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.storage.StorageLevel
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import org.apache.spark.streaming.util.ManualClock
+import java.nio.charset.Charset
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.Context
+import org.apache.flume.conf.Configurables
+import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink}
+import scala.collection.JavaConversions._
+import org.apache.flume.event.EventBuilder
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class FlumePollingReceiverSuite extends TestSuiteBase {
+
+ val testPort = 9999
+
+ test("flume polling test") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
+ FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 5,
+ StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
+ with SynchronizedBuffer[Seq[SparkPollingEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", "5000")
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+ for (i <- 0 until 5) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until input.size) {
+ channel.put(EventBuilder.withBody(
+ (String.valueOf(i) + input(j)).getBytes("utf-8"),
+ Map[String, String]("test-" + input(j).toString -> "header")))
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < 5 && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + outputBuffer.size)
+ Thread.sleep(100)
+ }
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+
+ assert(outputBuffer.size === 5)
+ var counter = 0
+ for (i <- 0 until outputBuffer.size;
+ j <- 0 until outputBuffer(i).size) {
+ counter += 1
+ val eventToVerify = outputBuffer(i)(j).event
+ val str = decoder.decode(eventToVerify.getBody)
+ assert(str.toString === (String.valueOf(i) + input(j)))
+ assert(eventToVerify.getHeaders.get("test-" + input(j).toString) === "header")
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 86264d1132ec4..cf7bf3d2ee1ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
external/twitter
external/kafka
external/flume
+ external/flume-sink
external/zeromq
external/mqtt
examples
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 29dcd8678b476..df21813ff983a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -16,10 +16,15 @@
*/
import sbt._
+import sbt.ClasspathDependency
import sbt.Classpaths.publishTask
+import sbt.ExclusionRule
import sbt.Keys._
+import sbt.Task
import sbtassembly.Plugin._
import AssemblyKeys._
+import sbtavro.SbtAvro._
+import scala.Some
import scala.util.Properties
import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
@@ -140,8 +145,11 @@ object SparkBuild extends Build {
lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
.dependsOn(streaming % "compile->compile;test->test")
+ lazy val externalFlumeSink = Project("external-flume-sink", file("external/flume-sink"), settings = flumeSinkSettings)
+
lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
- .dependsOn(streaming % "compile->compile;test->test")
+ .dependsOn(streaming % "compile->compile;test->test").dependsOn(externalFlumeSink)
+
lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
.dependsOn(streaming % "compile->compile;test->test")
@@ -149,8 +157,8 @@ object SparkBuild extends Build {
lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
.dependsOn(streaming % "compile->compile;test->test")
- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+ lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalFlumeSink, externalZeromq, externalMqtt)
+ lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalFlumeSink, externalZeromq, externalMqtt)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, graphx, bagel, streaming, hive) dependsOn(allExternal: _*)
@@ -622,6 +630,18 @@ object SparkBuild extends Build {
)
)
+ def flumeSinkSettings() = {
+ sharedSettings ++ Seq(
+ name := "spark-streaming-flume-sink",
+ previousArtifact := sparkPreviousArtifact("spark-streaming-flume-sink"),
+ libraryDependencies ++= Seq(
+ "org.apache.flume" % "flume-ng-sdk" % "1.4.0" % "compile"
+ excludeAll(excludeJBossNetty, excludeThrift),
+ "org.apache.flume" % "flume-ng-core" % "1.4.0" % "compile"
+ excludeAll(excludeJBossNetty, excludeThrift)
+ )
+ ) ++ sbtavro.SbtAvro.avroSettings
+ }
def zeromqSettings() = sharedSettings ++ Seq(
name := "spark-streaming-zeromq",
previousArtifact := sparkPreviousArtifact("spark-streaming-zeromq"),
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 0cd16fd5bedd4..eadf71707ba19 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,6 +4,8 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
+resolvers += "sbt-plugins" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases"
+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
@@ -24,3 +26,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0")
+
+addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")