Skip to content

Commit

Permalink
Version 7.0.0-v1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
borjahernandez committed Apr 12, 2022
1 parent 69a2259 commit 0804d6f
Show file tree
Hide file tree
Showing 22 changed files with 2,041 additions and 293 deletions.
Binary file modified .DS_Store
Binary file not shown.
16 changes: 8 additions & 8 deletions challenge/dotnet-producer-avro/dotnet-producer-avro.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
<noWarn>1591</noWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void main(String[] args) throws IOException, InterruptedException
// TODO: write the lat/long position to a Kafka topic
// TODO: print the key and value in the callback lambda
producer.send(???, (md, e) -> {
System.out.println(???
System.out.printf(???
});
Thread.sleep(1000);
pos = (pos + 1) % rows.length;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{"namespace": "clients.avro",
"type": "record",
"name": "PositionDistance",
"name": "PositionString",
"fields": [
{"name": "latitude", "type": "double"},
{"name": "longitude", "type": "double"},
{"name": "distance", "type": "double"}
{"name": "positionString", "type": "string"}
]
}
86 changes: 40 additions & 46 deletions challenge/java-streams-avro/src/main/java/clients/StreamsApp.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package clients;

import clients.avro.PositionDistance;
import clients.avro.PositionString;
import clients.avro.PositionValue;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
Expand All @@ -10,8 +10,6 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import net.sf.geographiclib.Geodesic;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -20,8 +18,6 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

public class StreamsApp {
Expand All @@ -37,7 +33,7 @@ public static void main(String[] args) {
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app-1");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
Serdes.String().getClass().getName());
// Disabling caching ensures we get a complete "changelog" from the
// aggregate(...) (i.e. every input event will have a corresponding output event.
// see
Expand Down Expand Up @@ -68,54 +64,52 @@ public static void main(String[] args) {
}

private static Topology getTopology() {

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://schema-registry:8081");
"http://schema-registry:8081");
final Serde<PositionValue> positionValueSerde = new SpecificAvroSerde<>();
positionValueSerde.configure(serdeConfig, false);
final Serde<PositionDistance> positionDistanceSerde = new SpecificAvroSerde<>();
positionDistanceSerde.configure(serdeConfig, false);
positionValueSerde.configure(serdeConfig, false);
final Serde<PositionString> positionStringSerde = new SpecificAvroSerde<>();
positionStringSerde.configure(serdeConfig, false);

// Create the StreamsBuilder object to create our Topology
final StreamsBuilder builder = new StreamsBuilder();

// create a KStream from the driver-positions-avro topic
// Create a KStream from the `driver-positions-avro` topic
// configure a serdes that can read the string key, and avro value
final KStream<String, PositionValue> positions = builder.stream(
"driver-positions-avro",
Consumed.with(Serdes.String(),
positionValueSerde));


// We do a groupByKey on the ‘positions’ stream which returns an
// intermediate KGroupedStream, we then aggregate to return a KTable.
final KTable<String, PositionDistance> reduced = positions.groupByKey().aggregate(
() -> null,
(aggKey, newValue, aggValue) -> {
final Double newLatitude = newValue.getLatitude();
final Double newLongitude = newValue.getLongitude();

// initial record - no distance to calculate
if (aggValue == null) {
return new PositionDistance(newLatitude, newLongitude, 0.0);
}

// cacluate the distance between the new value and the aggregate value
final Double aggLatitude = aggValue.getLatitude();
final Double aggLongitude = aggValue.getLongitude();
Double aggDistance = aggValue.getDistance();
final Double distance = Geodesic.WGS84.Inverse(aggLatitude, aggLongitude,
newLatitude, newLongitude).s12;
aggDistance += distance;

// return the new value and distance as the new aggregate
return new PositionDistance(newLatitude, newLongitude, aggDistance);
}, Materialized.with(
Serdes.String(),
positionDistanceSerde));

reduced.toStream().to(
"driver-distance-avro",
Produced.with(Serdes.String(), positionDistanceSerde));
"driver-positions-avro",
Consumed.with(Serdes.String(),
positionValueSerde));

// TO-DO: Use filter() method to filter out the events from `driver-2`.
// Define the predicate in the lambda expression of the filter().
final KStream<String, PositionValue> positionsFiltered = positions.filter(
(key,value) -> ???);

// TO-DO: Use mapValues() method to change the value of each
// event from PositionValue to PositionString class.
// You can check the two schemas under src/main/avro/.
// Notice that position_string.avsc contains a new field
// `positionString` as String type.
final KStream<String, PositionString> positionsString = positionsFiltered.mapValues(
value -> {
final Double latitude = ???;
final Double longitude = ???;
final String positionString = "Latitude: " + String.valueOf(???) +
", Longitude: " + String.valueOf(???);
return new PositionString(???, ???, ???);
}
);

// Write the results to topic `driver-positions-string-avro`
// configure a serdes that can write the string key, and new avro value
positionsString.to(
"driver-positions-string-avro",
Produced.with(Serdes.String(), positionStringSerde));

// Build the Topology
final Topology topology = builder.build();
return topology;
}
Expand Down
Loading

0 comments on commit 0804d6f

Please sign in to comment.