-
Notifications
You must be signed in to change notification settings - Fork 454
/
Copy pathCheckoutConfirmedStatusConsumer.java
148 lines (127 loc) · 5.86 KB
/
CheckoutConfirmedStatusConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.yas.order.kafka.consumer;
import static com.yas.order.kafka.helper.ConsumerHelper.processForEventUpdate;
import static com.yas.order.utils.JsonUtils.getJsonValueOrNull;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yas.commonlibrary.exception.BadRequestException;
import com.yas.order.model.Checkout;
import com.yas.order.model.CheckoutItem;
import com.yas.order.model.Order;
import com.yas.order.model.OrderItem;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.model.enumeration.DeliveryStatus;
import com.yas.order.model.enumeration.OrderStatus;
import com.yas.order.service.CheckoutItemService;
import com.yas.order.service.CheckoutService;
import com.yas.order.service.OrderAddressService;
import com.yas.order.service.OrderItemService;
import com.yas.order.service.OrderService;
import com.yas.order.utils.Constants;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* After the Checkout Status is set to PAYMENT_CONFIRMED, an order will be created.
*/
@Service
@Transactional
@RequiredArgsConstructor
public class CheckoutConfirmedStatusConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(CheckoutConfirmedStatusConsumer.class);
private final OrderService orderService;
private final OrderItemService orderItemService;
private final CheckoutService checkoutService;
private final CheckoutItemService checkoutItemService;
private final OrderAddressService orderAddressService;
private final ObjectMapper objectMapper;
@KafkaListener(
topics = "${cdc.event.checkout.status.topic-name}",
groupId = "${cdc.event.checkout.confirmed.status.group-id}"
)
@RetryableTopic
public void listen(ConsumerRecord<?, ?> consumerRecord) {
processForEventUpdate(
consumerRecord,
this::handleJsonForUpdateCheckout,
objectMapper,
LOGGER
);
}
private void handleJsonForUpdateCheckout(JsonNode valueObject) {
JsonNode before = valueObject.get("before");
JsonNode after = valueObject.get("after");
String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN, Constants.ErrorCode.ID_NOT_EXISTED);
String beforeStatus = getJsonValueOrNull(before, Constants.Column.STATUS_COLUMN);
String afterStatus = getJsonValueOrNull(after, Constants.Column.STATUS_COLUMN);
if (Objects.isNull(afterStatus)
|| afterStatus.equals(beforeStatus)
|| !CheckoutState.PAYMENT_CONFIRMED.name().equals(afterStatus)
) {
LOGGER.info("It's not an event to create Order with Checkout Id {}", id);
return;
}
LOGGER.info("Checkout record with ID {} has the status 'PAYMENT_CONFIRMED'", id);
Checkout checkout = checkoutService.findCheckoutById(id);
List<CheckoutItem> checkoutItemList = checkoutItemService.getAllByCheckoutId(checkout.getId());
Order order = createOrder(checkout, checkoutItemList);
createOrderItems(order, checkoutItemList);
updateCheckoutStatus(checkout);
}
private Order createOrder(Checkout checkout, List<CheckoutItem> checkoutItemList) {
Order order = Order.builder()
.email(checkout.getEmail())
.numberItem(checkoutItemList.size())
.note(checkout.getNote())
.tax(checkout.getTotalTax())
.discount(checkout.getTotalDiscountAmount())
.totalPrice(checkout.getTotalAmount())
.couponCode(checkout.getCouponCode())
.orderStatus(OrderStatus.PAYMENT_CONFIRMED)
.deliveryFee(checkout.getTotalShipmentFee())
.deliveryMethod(checkout.getShipmentMethodId())
.deliveryStatus(DeliveryStatus.PREPARING)
.totalShipmentTax(checkout.getTotalShipmentTax())
.customerId(checkout.getCustomerId())
.shippingAddressId(
Optional.ofNullable(checkout.getShippingAddressId())
.map(orderAddressService::findOrderAddressById)
.orElseThrow(() -> new BadRequestException("Shipping Address Id is not existed: {}",
checkout.getShippingAddressId()))
)
.billingAddressId(
Optional.ofNullable(checkout.getBillingAddressId())
.map(orderAddressService::findOrderAddressById)
.orElseThrow(() -> new BadRequestException("Billing Address Id is not existed: {}",
checkout.getBillingAddressId()))
)
.checkoutId(checkout.getId())
.build();
return orderService.updateOrder(order);
}
private void createOrderItems(Order order, List<CheckoutItem> checkoutItemList) {
List<OrderItem> orderItems = checkoutItemList.stream()
.map(item -> OrderItem.builder()
.productId(item.getProductId())
.productName(item.getProductName())
.quantity(item.getQuantity())
.productPrice(item.getProductPrice())
.note(item.getNote())
.orderId(order.getId())
.build())
.toList();
orderItemService.saveAll(orderItems);
}
private void updateCheckoutStatus(Checkout checkout) {
checkout.setCheckoutState(CheckoutState.FULFILLED);
checkoutService.updateCheckout(checkout);
}
}