Wyobraź sobie e-commerce gdzie klient składa zamówienie. System musi wysłać email, zaktualizować magazyn, wystawić fakturę i powiadomić kuriera. Gdyby każda operacja blokowała poprzednią, użytkownik czekałby wieczność na potwierdzenie. RabbitMQ rozwiązuje ten problem przez asynchroniczną komunikację.
Dlaczego to ważne?
W realnych systemach aplikacje muszą komunikować się ze sobą, ale nie zawsze mogą czekać na natychmiastową odpowiedź. RabbitMQ umożliwia luźne połączenie między komponentami – jeśli jeden serwis jest przeciążony lub niedostępny, reszta systemu nadal działa.
Co się nauczysz:
- Czym jest RabbitMQ i kiedy go używać
- Podstawowe koncepty: kolejki, exchange, routing
- Implementacja producenta i konsumenta w Javie
- Wzorce komunikacji: work queues, publish/subscribe
- Integracja z Spring Boot
- Monitorowanie i zarządzanie kolejkami
Czym jest RabbitMQ?
Podstawowe koncepty
Producer (Producent)
Aplikacja która wysyła wiadomości. W naszym e-commerce to może być serwis obsługujący zamówienia.
Queue (Kolejka)
Buffor przechowujący wiadomości. Wiadomości czekają w kolejce aż zostaną odebrane przez konsumenta.
Consumer (Konsument)
Aplikacja która odbiera i przetwarza wiadomości z kolejki.
Exchange
Komponent odpowiedzialny za routing wiadomości. Decyduje do której kolejki trafi wiadomość.
Typ Exchange | Routing | Zastosowanie |
---|---|---|
Direct | Exact match klucza | Point-to-point komunikacja |
Topic | Pattern matching | Selective routing |
Fanout | Broadcast do wszystkich | Publish/Subscribe |
Headers | Na podstawie nagłówków | Complex routing rules |
Instalacja i konfiguracja
Instalacja RabbitMQ (Ubuntu/Debian):
# Dodaj klucz i repozytorium wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add - echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list # Instalacja sudo apt-get update sudo apt-get install rabbitmq-server # Uruchomienie i włączenie auto-start sudo systemctl start rabbitmq-server sudo systemctl enable rabbitmq-server # Włączenie management plugin sudo rabbitmq-plugins enable rabbitmq_management
Pierwszy przykład – Work Queue
Konfiguracja projektu Maven:
com.rabbitmq amqp-client 3.6.5 org.slf4j slf4j-simple 1.7.21
Producer – wysyłanie wiadomości:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class OrderProducer { private final static String QUEUE_NAME = "order_processing"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // Deklaracja kolejki (tworzy jeśli nie istnieje) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // Symulacja zamówień for (int i = 1; i <= 10; i++) { String message = "Order #" + i + " - Processing required"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(1000); // Symulacja czasu między zamówieniami } } } }
Consumer - odbieranie wiadomości:
import com.rabbitmq.client.*; import java.io.IOException; public class OrderConsumer { private final static String QUEUE_NAME = "order_processing"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // Callback do przetwarzania wiadomości Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { // Symulacja przetwarzania zamówienia processOrder(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { // Potwierdzenie przetworzenia channel.basicAck(envelope.getDeliveryTag(), false); } } }; // Ustawienie manual acknowledgment channel.basicConsume(QUEUE_NAME, false, consumer); } private static void processOrder(String orderData) throws InterruptedException { // Symulacja czasochłonnej operacji Thread.sleep(3000); System.out.println(" [✓] Order processed: " + orderData); } }
Publish/Subscribe pattern
W tym wzorcu jedna wiadomość trafia do wielu konsumentów jednocześnie:
Publisher:
public class NewsPublisher { private static final String EXCHANGE_NAME = "news_broadcast"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // Deklaracja fanout exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String[] newsTypes = {"SPORT", "TECH", "BUSINESS"}; for (String type : newsTypes) { String message = "Breaking news in " + type + " - Important update!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(2000); } } } }
Subscriber:
public class NewsSubscriber { private static final String EXCHANGE_NAME = "news_broadcast"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // Tworzymy tymczasową kolejkę dla tego konsumenta String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for news. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received news: '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
Integracja ze Spring Boot
Konfiguracja Spring Boot:
org.springframework.boot spring-boot-starter-amqp
Konfiguracja aplikacji:
# application.properties spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
Configuration class:
@Configuration @EnableRabbit public class RabbitConfig { @Bean public Queue orderQueue() { return new Queue("order.processing", true); } @Bean public TopicExchange orderExchange() { return new TopicExchange("order.exchange"); } @Bean public Binding orderBinding() { return BindingBuilder .bind(orderQueue()) .to(orderExchange()) .with("order.#"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } }
Monitorowanie i debugging
Management Console - kluczowe metryki:
- **Message rates:** ile wiadomości/sekundę
- **Queue length:** ile wiadomości czeka w kolejce
- **Consumer utilisation:** jak efektywnie konsumenci przetwarzają wiadomości
- **Connection details:** aktywne połączenia i ich stan
Monitoring przez kod:
// Sprawdzenie stanu kolejki AMQP.Queue.DeclareOk response = channel.queueDeclarePassive("order_processing"); int messageCount = response.getMessageCount(); int consumerCount = response.getConsumerCount(); System.out.println("Messages in queue: " + messageCount); System.out.println("Active consumers: " + consumerCount);
Najczęstsze problemy i rozwiązania
Typowe pułapki:
- Memory leak: Niezamknięte połączenia - zawsze używaj try-with-resources
- Message loss: Brak acknowledgment - wiadomości mogą zniknąć przy crashu
- Duplicate processing: Konsument przetwarza tę samą wiadomość wielokrotnie
- Dead letters: Wiadomości które nie mogą być przetworzone blokują kolejkę
Best practices:
- Zawsze używaj durable queues w produkcji
- Implementuj dead letter exchanges dla failed messages
- Monitoruj queue depth - długie kolejki oznaczają problemy
- Używaj connection pooling w aplikacjach wielowątkowych
- Testuj zachowanie przy restart RabbitMQ
RabbitMQ gdy nie potrzebujesz natychmiastowej odpowiedzi, chcesz oddzielić systemy od siebie, lub obsłużyć traffic bursts. REST API gdy potrzebujesz synchronicznej komunikacji i natychmiastowej odpowiedzi.
Durable queues i persistent messages przetrwają restart. Non-durable queues i transient messages zostaną utracone. Dlatego w produkcji zawsze używaj durable konfiguracji.
Skonfiguruj Dead Letter Exchange (DLExchange). Wiadomości które nie mogą być przetworzone trafią tam zamiast blokować główną kolejkę. Możesz je później przeanalizować i ponownie przetworzyć.
Teoretycznie bez limitu, ale praktycznie zależy od zasobów. RabbitMQ automatycznie dystrybuuje wiadomości między konsumentów (round-robin). Monitoruj throughput i dodawaj konsumentów stopniowo.
Tak, ale tylko w ramach jednej kolejki i jednego konsumenta. Z wieloma konsumentami kolejność może się zmienić. Jeśli kolejność jest krytyczna, używaj jednego konsumenta lub routing keys.
Utwórz dedykowanych użytkowników z ograniczonymi uprawnieniami, używaj SSL/TLS dla połączeń, skonfiguruj virtual hosts dla separacji, regularnie aktualizuj RabbitMQ i monitoruj logi bezpieczeństwa.
Przydatne zasoby:
🚀 Zadanie dla Ciebie
Stwórz system powiadomień dla e-commerce: jeden serwis publikuje zdarzenia (zamówienia, płatności), a różne konsumenty obsługują email, SMS, aktualizację magazynu. Użyj topic exchange do selective routing. Przetestuj co się dzieje gdy jeden z konsumentów się zawiesza.
Jakie były Twoje pierwsze doświadczenia z RabbitMQ? Czy udało Ci się skonfigurować komunikację między aplikacjami? Podziel się w komentarzu!