RabbitMQ – asynchroniczna komunikacja w praktyce

TL;DR: RabbitMQ to broker wiadomości który działa jak poczta między aplikacjami. Jedna aplikacja wysyła wiadomość do kolejki, druga ją odbiera – niezależnie od siebie i w swoim tempie. Idealne do systemów które muszą być skalowalne i odporne na awarie.

Wyobraź sobie e-commerce gdzie klient składa zamówienie. System musi wysłać email, zaktualizować magazyn, wystawić fakturę i powiadomić kuriera. Gdyby każda operacja blokowała poprzednią, użytkownik czekałby wieczność na potwierdzenie. RabbitMQ rozwiązuje ten problem przez asynchroniczną komunikację.

Dlaczego to ważne?

W realnych systemach aplikacje muszą komunikować się ze sobą, ale nie zawsze mogą czekać na natychmiastową odpowiedź. RabbitMQ umożliwia luźne połączenie między komponentami – jeśli jeden serwis jest przeciążony lub niedostępny, reszta systemu nadal działa.

Co się nauczysz:

  • Czym jest RabbitMQ i kiedy go używać
  • Podstawowe koncepty: kolejki, exchange, routing
  • Implementacja producenta i konsumenta w Javie
  • Wzorce komunikacji: work queues, publish/subscribe
  • Integracja z Spring Boot
  • Monitorowanie i zarządzanie kolejkami
Wymagania wstępne: Znajomość Javy i Spring Boot, podstawy programowania sieciowego, pojęcie wielowątkowości, doświadczenie z REST API.

Czym jest RabbitMQ?

RabbitMQ – broker wiadomości (message broker) implementujący protokół AMQP, który pośredniczy w komunikacji między aplikacjami przez kolejki wiadomości.
RabbitMQ to jak poczta w biurowcu. Aplikacje wysyłają listy (wiadomości) do skrzynek pocztowych (kolejek). Poczta (RabbitMQ) dostarcza listy do odpowiednich odbiorców, nawet jeśli akurat nie ma ich w biurze.

Podstawowe koncepty

Producer (Producent)

Aplikacja która wysyła wiadomości. W naszym e-commerce to może być serwis obsługujący zamówienia.

Queue (Kolejka)

Buffor przechowujący wiadomości. Wiadomości czekają w kolejce aż zostaną odebrane przez konsumenta.

Consumer (Konsument)

Aplikacja która odbiera i przetwarza wiadomości z kolejki.

Exchange

Komponent odpowiedzialny za routing wiadomości. Decyduje do której kolejki trafi wiadomość.

Typ ExchangeRoutingZastosowanie
DirectExact match kluczaPoint-to-point komunikacja
TopicPattern matchingSelective routing
FanoutBroadcast do wszystkichPublish/Subscribe
HeadersNa podstawie nagłówkówComplex routing rules

Instalacja i konfiguracja

Instalacja RabbitMQ (Ubuntu/Debian):

# Dodaj klucz i repozytorium
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list

# Instalacja
sudo apt-get update
sudo apt-get install rabbitmq-server

# Uruchomienie i włączenie auto-start
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

# Włączenie management plugin
sudo rabbitmq-plugins enable rabbitmq_management

Po instalacji Management Console jest dostępny pod adresem http://localhost:15672 (guest/guest)

Pierwszy przykład – Work Queue

Konfiguracja projektu Maven:


    
        com.rabbitmq
        amqp-client
        3.6.5
    
    
        org.slf4j
        slf4j-simple
        1.7.21
    

Producer – wysyłanie wiadomości:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class OrderProducer {
    private final static String QUEUE_NAME = "order_processing";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // Deklaracja kolejki (tworzy jeśli nie istnieje)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // Symulacja zamówień
            for (int i = 1; i <= 10; i++) {
                String message = "Order #" + i + " - Processing required";
                
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
                
                Thread.sleep(1000); // Symulacja czasu między zamówieniami
            }
        }
    }
}

Consumer - odbieranie wiadomości:

import com.rabbitmq.client.*;

import java.io.IOException;

public class OrderConsumer {
    private final static String QUEUE_NAME = "order_processing";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        
        // Callback do przetwarzania wiadomości
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                
                System.out.println(" [x] Received '" + message + "'");
                
                try {
                    // Symulacja przetwarzania zamówienia
                    processOrder(message);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // Potwierdzenie przetworzenia
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        
        // Ustawienie manual acknowledgment
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
    
    private static void processOrder(String orderData) throws InterruptedException {
        // Symulacja czasochłonnej operacji
        Thread.sleep(3000);
        System.out.println(" [✓] Order processed: " + orderData);
    }
}
Pro tip: Użycie basicAck zapewnia że wiadomość zostanie usunięta z kolejki dopiero po potwierdzeniu przetworzenia. Jeśli konsument się wysypie, wiadomość wróci do kolejki.

Publish/Subscribe pattern

W tym wzorcu jedna wiadomość trafia do wielu konsumentów jednocześnie:

Publisher:

public class NewsPublisher {
    private static final String EXCHANGE_NAME = "news_broadcast";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // Deklaracja fanout exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            
            String[] newsTypes = {"SPORT", "TECH", "BUSINESS"};
            
            for (String type : newsTypes) {
                String message = "Breaking news in " + type + " - Important update!";
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
                Thread.sleep(2000);
            }
        }
    }
}

Subscriber:

public class NewsSubscriber {
    private static final String EXCHANGE_NAME = "news_broadcast";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        
        // Tworzymy tymczasową kolejkę dla tego konsumenta
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        
        System.out.println(" [*] Waiting for news. To exit press CTRL+C");
        
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received news: '" + message + "'");
            }
        };
        
        channel.basicConsume(queueName, true, consumer);
    }
}

Integracja ze Spring Boot

Konfiguracja Spring Boot:


    org.springframework.boot
    spring-boot-starter-amqp

Konfiguracja aplikacji:

# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

Configuration class:

@Configuration
@EnableRabbit
public class RabbitConfig {

    @Bean
    public Queue orderQueue() {
        return new Queue("order.processing", true);
    }

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("order.exchange");
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(orderExchange())
                .with("order.#");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}
Uwaga: W produkcji zawsze używaj dedykowanych użytkowników zamiast domyślnego guest/guest. Guest ma dostęp tylko z localhost.

Monitorowanie i debugging

Management Console - kluczowe metryki:

- **Message rates:** ile wiadomości/sekundę
- **Queue length:** ile wiadomości czeka w kolejce
- **Consumer utilisation:** jak efektywnie konsumenci przetwarzają wiadomości
- **Connection details:** aktywne połączenia i ich stan

Monitoring przez kod:

// Sprawdzenie stanu kolejki
AMQP.Queue.DeclareOk response = channel.queueDeclarePassive("order_processing");
int messageCount = response.getMessageCount();
int consumerCount = response.getConsumerCount();

System.out.println("Messages in queue: " + messageCount);
System.out.println("Active consumers: " + consumerCount);

Najczęstsze problemy i rozwiązania

Typowe pułapki:

  • Memory leak: Niezamknięte połączenia - zawsze używaj try-with-resources
  • Message loss: Brak acknowledgment - wiadomości mogą zniknąć przy crashu
  • Duplicate processing: Konsument przetwarza tę samą wiadomość wielokrotnie
  • Dead letters: Wiadomości które nie mogą być przetworzone blokują kolejkę

Best practices:

Best practices 2016:

  • Zawsze używaj durable queues w produkcji
  • Implementuj dead letter exchanges dla failed messages
  • Monitoruj queue depth - długie kolejki oznaczają problemy
  • Używaj connection pooling w aplikacjach wielowątkowych
  • Testuj zachowanie przy restart RabbitMQ
Kiedy używać RabbitMQ zamiast REST API?

RabbitMQ gdy nie potrzebujesz natychmiastowej odpowiedzi, chcesz oddzielić systemy od siebie, lub obsłużyć traffic bursts. REST API gdy potrzebujesz synchronicznej komunikacji i natychmiastowej odpowiedzi.

Co się dzieje gdy RabbitMQ się restartuje?

Durable queues i persistent messages przetrwają restart. Non-durable queues i transient messages zostaną utracone. Dlatego w produkcji zawsze używaj durable konfiguracji.

Jak obsłużyć failed messages?

Skonfiguruj Dead Letter Exchange (DLExchange). Wiadomości które nie mogą być przetworzone trafią tam zamiast blokować główną kolejkę. Możesz je później przeanalizować i ponownie przetworzyć.

Ile konsumentów może obsługiwać jedna kolejka?

Teoretycznie bez limitu, ale praktycznie zależy od zasobów. RabbitMQ automatycznie dystrybuuje wiadomości między konsumentów (round-robin). Monitoruj throughput i dodawaj konsumentów stopniowo.

Czy RabbitMQ gwarantuje kolejność wiadomości?

Tak, ale tylko w ramach jednej kolejki i jednego konsumenta. Z wieloma konsumentami kolejność może się zmienić. Jeśli kolejność jest krytyczna, używaj jednego konsumenta lub routing keys.

Jak zabezpieczyć RabbitMQ w produkcji?

Utwórz dedykowanych użytkowników z ograniczonymi uprawnieniami, używaj SSL/TLS dla połączeń, skonfiguruj virtual hosts dla separacji, regularnie aktualizuj RabbitMQ i monitoruj logi bezpieczeństwa.

Przydatne zasoby:

🚀 Zadanie dla Ciebie

Stwórz system powiadomień dla e-commerce: jeden serwis publikuje zdarzenia (zamówienia, płatności), a różne konsumenty obsługują email, SMS, aktualizację magazynu. Użyj topic exchange do selective routing. Przetestuj co się dzieje gdy jeden z konsumentów się zawiesza.

Jakie były Twoje pierwsze doświadczenia z RabbitMQ? Czy udało Ci się skonfigurować komunikację między aplikacjami? Podziel się w komentarzu!

Zostaw komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *

Przewijanie do góry