Hazelcast - coś więcej niż rozproszony cache

19.08.2019 | Wojciech Kurda

Artykuł możecie znaleźć również w lipcowym wydaniu Magazynu Programista.

Wstęp

Przetwarzanie równoległe i konieczność synchronizacji wątków nigdy nie była najprostszym zadaniem. Sprawa komplikuje się jeszcze bardziej, gdy konieczność synchronizacji nie dotyczy pojedynczej instancji aplikacji, a całego klastra. W poniższym artykule zaprezentuję, jak produkt pozornie zupełnie innego przeznaczenia może rozwiązać wspomniane zagadnienie.

Use case

Zacznijmy od podstawowych założeń. System przetwarza różnego typu zdarzenia. Każde zdarzenie przypisane jest do logicznej grupy zdarzeń. W jednej chwili w ramach danej grupy powinno być przetwarzane co najwyżej jedno zdarzenie (ale oczywiście dobrzy by było, gdyby zdarzenia z dwóch różnych grup mogły być przetwarzane równolegle). Na początek wystarczy.

Najprościej – powołać dedykowany wątek dla każdej grupy zdarzeń. Ten będzie blokował swoją grupę (można użyć albo dedykowanego dla grupy locka, albo wspólnej mapy, konkretnie IMap, i blokować klucz – wtedy w wartości dla klucza można trzymać jakąś wartość, może się przydać). Samo źródło zdarzeń to naturalnie kolejka, Hazelcast dostarcza implementację BlockingQueue, która w tym przypadku idealnie się sprawdza, bo wątek może czekać na zdarzenia. Z dobrych praktyk – najlepiej ograniczać czasy blokad i czekania na eventy i wprowadzić mechanizm heartbeatów. Wtedy wiadomo, że system działa i nie mamy do czynienia z deadlockiem. Czasy oczywiście trzeba dobrać do swoich potrzeb biznesowych (ile dany typ zdarzenia może maksymalnie się przetwarzać).

Żeby nie przedłużać, poniżej przykładowy kod (oczywiście odpowiednio uproszczony i okrojony ze zbędnych sekcji):

Listing 1. Przykładowa implementacja procesora zdarzeń

public class EventProcessor implements Runnable {
    // oczywiscie produkcyjnie trzeba odpowiednio zarzadzac instancjami 
    protected HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    protected String eventGroup = "...";
    protected BlockingQueue queue = hazelcastInstance.getQueue(eventGroup);
    protected IMap locks = hazelcastInstance.getMap("locks");

    @Override
    public void run() {
        while (running) {
            // lock na grupe, na czas maksymalnego czekania + maksymalnego przetwarzania
            locks.lock(eventGroup, MAX_POLL_AWAIT + MAX_PROCESSING_TIME, TimeUnit.MILLISECONDS);
            try {
                Object event = queue.poll(MAX_POLL_AWAIT, TimeUnit.MILLISECONDS);
                if (event != null) {
                    process(event);
                }
                heartbeat();
            } catch (Exception e) {
                // log 
            } finally {
                try {
                    locks.unlock(eventGroup);
                } catch (IllegalMonitorStateException e) {
                    // log
                }
            }
        }
    }
}

Use case - zaawansowany

Pewnie dla większości przypadków powyższy przykład jest w zupełności wystarczający (i dobrze, bo po co komplikować), ale pójdźmy krok dalej. Istnieje dedykowana grupa zdarzeń, które wymagają zablokowania przetwarzania wszystkich innych zdarzeń (w ramach całego klastra aplikacyjnego). Gdy wpada takie zdarzenie, system nie powinien pozwalać na rozpoczęcie przetwarzania innych zdarzeń, ale z drugiej strony powinien zaczekać, aż aktualnie trwające się zakończą.

Na pomoc przychodzą locki i semafory (konkretnie ILock i ISemaphore). Idea jest prosta, procesory zdarzeń, poza blokowaniem swoich grup, próbują zmniejszyć wartości globalnego semafora (czego nie są w stanie zrobić, gdy globalna blokada jest założona). Z drugiej strony dedykowany globalny procesor po uzyskaniu blokady czeka, aż wartości semafora wrócą do stanu początkowego (czyli wszystkie pozostałe procesory zakończą swoje przetwarzania). W tym celu zdefiniowany został interfejs GlobalLock, rozszerzający standardowy interfejs Lock o:

  • boolean isReady() – gotowość semafora,
  • boolean tryAcquirePreventionSemaphore() – próba zmniejszenia wartości semafora, o ile lock na to pozwala,
  • void releasePreventionSemaphore() – zwiększenie wartości semafora,
  • boolean isLocked() – czy założona jest globalna blokada.

Dla czytelności nie załączam pełnych źródeł implementacji GlobalLocka – co w znacznej większości jest po prostu delegowaniem wywołań do dedykowanego ILocka'a i ISemaphore'a (i tu też warto pamiętać o maksymalnych czasach blokad – szczególnie że blokują cały system).

Kolejnym krokiem jest jego użycie w procesorze zdarzeń. Z uwagi na szczególną obsługę wyróżniona została dedykowana klasa obsługująca zdarzenia globalne. Nie przeciągając – przykładowy kod z komentarzami:

Listing 2. Przykładowa implementacja rozszerzonego procesora zdarzeń

public class EventProcessor implements Runnable {
    @Override
    public void run() {
        while (running) {
            locks.lock(eventGroup, MAX_POLL_AWAIT + MAX_PROCESSING_TIME, TimeUnit.MILLISECONDS);
            try {
                if (!globalLock.tryAcquirePreventionSemaphore()) {
                    // globalne przetwarzanie z reguly trwa dluzej
                    // dobrze chwile poczekac
                    Thread.sleep(...);
                } else {
                    try {
                        // tu po staremu
                        Object event = queue.poll(MAX_POLL_AWAIT, TimeUnit.MILLISECONDS);
                        if (event != null) {
                            process(event);
                        }
                    } finally {
                        globalLock.releasePreventionSemaphore();
                    }
                }
                heartbeat();
            } catch (Exception e) {
                // log
            } finally {
                try {
                    locks.unlock(eventGroup);
                } catch (IllegalMonitorStateException e) {
                    // log
                }
            }
        }
    }
}

Listing 3. Przykładowa implementacja procesora zdarzeń globalnych

public class GlobalEventProcessor extends EventProcessor {
    @Override
    public void run() {
        while (running) {
            // dedykowana globalna grupa
            locks.lock(eventGroup, MAX_POLL_AWAIT + MAX_GLOBAL_PROCESSING_TIME, TimeUnit.MILLISECONDS);
            try {
                // logika swiadomie odwrocona
                // po co globalnie blokowac, skoro nie ma czego
                // przetwarzac?
                Object event = queue.poll(MAX_POLL_AWAIT, TimeUnit.MILLISECONDS);
                if (event != null) {
                    globalLock.lock();
                    try {
                        awaitExecutionReadiness();
                        process(event);
                    } catch (Exception e) {
                        // log
                    } finally {
                        globalLock.unlock();
                    }
                }
                heartbeat();
            } catch (Exception e) {
                // log
            } finally {
                try {
                    locks.unlock(eventGroup);
                } catch (IllegalMonitorStateException e) {
                    // log
                }
            }
        }
    }

    private void awaitExecutionReadiness() throws InterruptedException {
        // semafor moze zwolnic inna instancje aplikacji
        // wiec wait/notify nie wystarcza // z drugiej strony, po co komplikować?
        while (!globalLock.isReady()) {
            Thread.sleep(...);
        }
    }
}

Patrząc na całość z perspektywy czasu (każdy kod po jakimś czasie nadaje się do refactoringu), można by spróbować wyodrębnić abstrakcję, która wysokopoziomowo oferowała wyłącznie blokady kolejek, a na podstawie grupy zdarzenia odpowiednio weryfikowała i blokowała globalne locki i semafory. Jednak w myśl zasady od szczegółu do ogółu (albo bardziej agile’owo, MVP) – lepiej obsłużyć konkretny przypadek, a dopiero później wyodrębniać generyczne mechanizmy.

Lesson learned

Na koniec poimplementacyjne lesson learned – może się przydać w podobnych (i nie tylko) przypadkach.

Produkowanie zdarzeń oparłem o mechanizm Springowych eventów, wyzwalanych przez dedykowany EntityListener, w fazach PostPersist i PostUpdate. Hazelcast na (nie)szczęście działa szybko – druga instancja aplikacji odbierała zdarzenie znacznie szybciej niż produkująca commitowała transakcję. Skutkowało to brakiem danych przy obsłudze – bo w tym przypadku zdarzenie jest tylko notyfikacją i musi być odpowiednio wzbogacone (też dla zachowania integralności). Na szczęście Spring, poza standardowym EventListener’em, dostarcza dedykowany TransactionalEventListener – gdzie można reagować na konkretne fazy transakcji (w tym przypadku TransactionPhase.AFTER_COMMIT). W ten sposób udało się uniknąć Thread.sleep’a przy obsłudze ;)

Hazelcast - może coś jeszcze?

Tak zadane pytanie w dość oczywisty sposób podpowiada – jak najbardziej. Na szczególną uwagę zasługuje wzbogacony interfejs mapy – IMap. Poza wspomnianymi lockami na konkretny klucz możliwa jest rejestracja tzw. EntryListener’ów, które pozwalają na asynchroniczną reakcję na zdarzenie na mapie (np. dodanie czy usunięcie danego klucza). Takie podejście znacznie ułatwia pisanie reaktywnych aplikacji (ale to już temat na osobny artykuł).

Podsumowując, dobrze czasem spróbować na nowo odkryć potencjalnie znaną technologię – może pozytywnie zaskoczyć.