Skip to content

Commit 7dc0346

Browse files
author
Ben Chen
committed
Updating Spring Boot to v2.3.3 and Kafka Streams to v2.6.0
1 parent 00de29b commit 7dc0346

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Spring Boot (2.3.2) RESTful API with Kafka Streams (2.5.0)
1+
# Spring Boot (2.3.3) RESTful API with Kafka Streams (2.6.0)
22

33
While looking through the Kafka Tutorials to see how I could setup a Spring Boot API project with Kafka Streams, I found it strange that there wasn't a complete or more informative example on how this could be achieved. Most use cases demonstrated how to compute aggregations and how to build simple topologies, but it was difficult to find a concrete example on how to build an API service that could query into these materialized name stores. Anyways, I thought I’d create my own using a more recent version of Spring Boot with Java 14.
44

@@ -71,7 +71,7 @@ After the application runs, navigate to [http://localhost:7001/swagger-ui/index.
7171
```json
7272
{
7373
"movieId": 362,
74-
"rating": 9.0
74+
"rating": 9
7575
}
7676
```
7777

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>org.springframework.boot</groupId>
77
<artifactId>spring-boot-starter-parent</artifactId>
8-
<version>2.3.2.RELEASE</version>
8+
<version>2.3.3.RELEASE</version>
99
<relativePath/>
1010
</parent>
1111
<groupId>com.example.kafkastreams.restapi</groupId>
@@ -23,7 +23,7 @@
2323
<properties>
2424
<java.version>14</java.version>
2525
<avro.version>1.10.0</avro.version>
26-
<kafka.version>2.5.0</kafka.version>
26+
<kafka.version>2.6.0</kafka.version>
2727
<openapi.version>1.4.3</openapi.version>
2828
</properties>
2929
<dependencies>

src/main/java/com/example/kafkastreams/restapi/springbootapp/controller/v1/MovieController.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import org.slf4j.LoggerFactory;
2424
import org.springframework.beans.factory.annotation.Autowired;
2525
import org.springframework.beans.factory.annotation.Value;
26+
import org.springframework.http.HttpStatus;
27+
import org.springframework.http.ResponseEntity;
2628
import org.springframework.web.bind.annotation.*;
2729

28-
@OpenAPIDefinition(servers = { @Server(url = "http://localhost:7001") }, info = @Info(title = "Sample Spring Boot Kafka Stream API", version = "v1", description = "A demo project using Spring Boot with Kafka Stream", license = @License(name = "MIT License", url = "https://github.com/bchen04/springboot-kafka-streams-rest-api/blob/master/LICENSE"), contact = @Contact(url = "https://www.linkedin.com/in/bchen04/", name = "Ben Chen")))
30+
@OpenAPIDefinition(servers = { @Server(url = "http://localhost:7001") }, info = @Info(title = "Sample Spring Boot Kafka Stream API", version = "v1", description = "A demo project using Spring Boot with Kafka Streams.", license = @License(name = "MIT License", url = "https://github.com/bchen04/springboot-kafka-streams-rest-api/blob/master/LICENSE"), contact = @Contact(url = "https://www.linkedin.com/in/bchen04/", name = "Ben Chen")))
2931
@RestController
3032
@RequestMapping("v1/movie")
3133
public class MovieController {
@@ -42,11 +44,13 @@ public MovieController(KafkaStreams streams) {
4244

4345
@Operation(summary = "Returns the average rating for a particular movie")
4446
@ApiResponses(value = {
45-
@ApiResponse(responseCode = "200", description = "successful operation", content = @Content(schema = @Schema(type = "object"))) })
47+
@ApiResponse(responseCode = "200", description = "successful operation", content = @Content(schema = @Schema(type = "object"))),
48+
@ApiResponse(responseCode = "500", description = "internal server error")})
4649
@GetMapping(value = "{movieId}/rating", produces = { "application/json" })
47-
public MovieAverageRatingResponse getMovieAverageRating(@Parameter(required = true, example = "362") @PathVariable Long movieId) {
50+
public ResponseEntity<MovieAverageRatingResponse> getMovieAverageRating(@Parameter(description = "Movie identifier", required = true, example = "362") @PathVariable Long movieId) {
4851
try {
4952
//find active, standby host list and partition for key
53+
//get the metadata related to the key.
5054
final KeyQueryMetadata keyQueryMetadata = streams.queryMetadataForKey(stateStoreName, movieId, Serdes.Long().serializer());
5155

5256
//use the above information to redirect the query to the host containing the partition for the key
@@ -64,11 +68,16 @@ public MovieAverageRatingResponse getMovieAverageRating(@Parameter(required = tr
6468
//get the value by key
6569
Double result = store.get(movieId);
6670

67-
return new MovieAverageRatingResponse(movieId, result);
71+
return ResponseEntity
72+
.ok()
73+
.body(new MovieAverageRatingResponse(movieId, result));
6874
}
6975
catch(Exception ex) {
7076
logger.error("Failed due to exception: {}", ex.getMessage());
71-
return null;
77+
78+
return ResponseEntity
79+
.status(HttpStatus.INTERNAL_SERVER_ERROR)
80+
.build();
7281
}
7382
}
7483
}

0 commit comments

Comments
 (0)