Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs #207

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open

Docs #207

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/scripts/create-web-site.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -e

echo "GIT_DEPLOY_KEY: $GIT_DEPLOY_KEY"

sbt docs/docusaurusPublishGhpages
62 changes: 62 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: build

on: [push, pull_request]

jobs:

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest

strategy:
fail-fast: true
matrix:
java: [8]
scala: [2.11.12, 2.12.10]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v7
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Cache SBT Coursier directory
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }}
restore-keys: |
${{ runner.os }}-coursier-
- name: Cache SBT directory
uses: actions/cache@v1
with:
path: ~/.sbt
key: |
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }}
restore-keys: ${{ runner.os }}-sbt-

- name: Run Tests for Java ${{ matrix.java }}, Scala ${{ matrix.scala }}
run: sbt ci

publish:
name: Publish
needs: [ tests ]
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: olafurpg/setup-scala@v2
- uses: olafurpg/setup-gpg@v2
- name: Publish release ${{ github.ref }}
run: sbt ci-release
env:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
- name: Publis documentation web page
run: |
./.github/scripts/create-web-site.sh
env:
GIT_DEPLOY_KEY: ${{ secrets.GIT_DEPLOY_KEY }}

8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

#website
/website/blog/
/website/build/
/website/node_modules/
/website/static/api/
/website/variables.js
/website/yarn.lock
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ https://github.com/poslegm

Vasiliy Efimov
https://github.com/voidconductor

Pau Alarcón
https://github.com/paualarco
255 changes: 4 additions & 251 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,260 +1,13 @@
# Monix-Kafka
# Monix Kafka

<img src="/website/static/img/kafka.png" width="125" align="right">

[![Build Status](https://travis-ci.org/monix/monix-kafka.svg?branch=master)](https://travis-ci.org/monix/monix-kafka)
[![Maven Central](https://img.shields.io/maven-central/v/io.monix/monix-kafka-1x_2.12.svg)](https://search.maven.org/search?q=g:io.monix%20AND%20a:monix-kafka-1x_2.12)
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-brightgreen.svg?style=flat&logo=)](https://scala-steward.org)

[![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/monix/monix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

Monix integration with Kafka

Work in progress!

## Table of Contents
1. [Getting Started with Kafka 1.0.x or above](#getting-started-with-kafka-10x-or-above)
2. [Getting Started with Kafka 0.11.x](#getting-started-with-kafka-011x)
3. [Getting Started with Kafka 0.10.x](#getting-started-with-kafka-010x)
4. [Getting Started with Kafka 0.9.x](#getting-started-with-kafka-09x)
5. [Getting Started with Kafka 0.8.x (no longer supported)](#getting-started-with-kafka-08x)
6. [Usage](#usage)
7. [How can I contribute to Monix-Kafka?](#how-can-i-contribute-to-monix-kafka?)
8. [Maintainers](#maintainers)
9. [License](#license)

## Getting Started with Kafka 1.0.x or above

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-1x" % "1.0.0-RC5"
```

For `kafka` versions higher than `1.0.x` also add a dependency override:

```scala
dependencyOverrides += "org.apache.kafka" % "kafka" % "2.1.0"
```

Or in case you're interested in running the tests of this project, it
now supports embedded kafka for integration testing. You can simply run:

```bash
sbt kafka1x/test
```

## Getting Started with Kafka 0.11.x

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-11" % "1.0.0-RC5"
```

Or in case you're interested in running the tests of this project, it
now supports embedded kafka for integration testing. You can simply run:

```bash
sbt kafka11/test
```

## Getting Started with Kafka 0.10.x

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-10" % "1.0.0-RC5"
```

Or in case you're interested in running the tests of this project, it
now supports embedded kafka for integration testing. You can simply run:

```bash
sbt kafka10/test
```

## Getting Started with Kafka 0.9.x

Please note that `EmbeddedKafka` is not supported for Kafka `0.9.x`

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-9" % "1.0.0-RC5"
```

Or in case you're interested in running the tests of this project,
first download the Kafka server, version `0.9.x` from their
[download page](https://kafka.apache.org/downloads.html) (note that
`0.10.x` or higher do not work with `0.9`), then as the
[quick start](https://kafka.apache.org/090/documentation.html#quickstart)
section says, open a terminal window and first start Zookeeper:

```bash
bin/zookeeper-server-start.sh config/zookeeper.properties
```

Then start Kafka:

```bash
bin/kafka-server-start.sh config/server.properties
```

Create the topic we need for our tests:

```bash
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-tests
```

And run the tests:

```bash
sbt kafka9/test
```

## Getting Started with Kafka 0.8.x

Please note that support for Kafka `0.8.x` is dropped and the last available version with this dependency is `0.14`.

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-8" % "0.14"
```

Or in case you're interested in running the tests of this project,
first download the Kafka server, version `0.8.x` from their
[download page](https://kafka.apache.org/downloads.html) (note that
`0.9.x` or higher do not work with `0.8`), then as the
[quick start](https://kafka.apache.org/082/documentation.html#quickstart)
section says, open a terminal window and first start Zookeeper:

```bash
bin/zookeeper-server-start.sh config/zookeeper.properties
```

Then start Kafka:

```bash
bin/kafka-server-start.sh config/server.properties
```

Create the topics we need for our tests:

```bash
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-tests
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-manual-commit-tests
```

And run the tests:

```bash
sbt kafka8/test
```

## Usage

### Producer

```scala
import monix.kafka._
import monix.execution.Scheduler

implicit val scheduler: Scheduler = monix.execution.Scheduler.global

// Init
val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092")
)

val producer = KafkaProducer[String,String](producerCfg, scheduler)

// For sending one message
val recordMetadataF = producer.send("my-topic", "my-message").runToFuture

// For closing the producer connection
val closeF = producer.close().runToFuture
```

Calling `producer.send` returns a [Task](https://monix.io/docs/3x/eval/task.html) of `Option[RecordMetadata]` which can then be run and transformed into a `Future`.

If the `Task` completes with `None` it means that `producer.send` method was called after the producer was closed and that the message wasn't successfully acknowledged by the Kafka broker. In case of the failure of the underlying Kafka client the producer will bubble up the exception and fail the `Task`. All successfully delivered messages will complete with `Some[RecordMetadata]`.

For pushing an entire `Observable` to Apache Kafka:

```scala
import monix.kafka._
import monix.execution.Scheduler
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord

implicit val scheduler: Scheduler = monix.execution.Scheduler.global

// Initializing the producer
val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092")
)

val producer = KafkaProducerSink[String,String](producerCfg, scheduler)

// Lets pretend we have this observable of records
val observable: Observable[ProducerRecord[String,String]] = ???

observable
// on overflow, start dropping incoming events
.whileBusyDrop
// buffers into batches if the consumer is busy, up to a max size
.bufferIntrospective(1024)
// consume everything by pushing into Apache Kafka
.consumeWith(producer)
// ready, set, go!
.runToFuture
```

### Consumer

There are several ways for consuming from Apache Kafka (Version 0.11.x and above):

Consumer which commits offsets itself:
```scala
import monix.kafka._

val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
// you can use this settings for At Most Once semantics:
// observableCommitOrder = ObservableCommitOrder.BeforeAck
)

val observable =
KafkaConsumerObservable[String,String](consumerCfg, List("my-topic"))
.take(10000)
.map(_.value())
```

Consumer which allows you to commit offsets manually:
```scala
import monix.kafka._

val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
)

val observable =
KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List("my-topic"))
.map(message => message.record.value() -> message.committableOffset)
.mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
.bufferTimedAndCounted(1.second, 1000)
.mapEval(offsets => CommittableOffsetBatch(offsets).commitSync())
```

Enjoy!
See the [__documentation web site__](https://monix.github.io/monix-kafka) to get started.

### Caveats

Expand Down
Loading