Dobry, zły i brzydki – Parę słów o Pandas UDF w Apache Spark
02.11.2020 | Krzysztof Domogała
Wstęp
Apache Spark to potężne narzędzie open-source do przetwarzania dużych zbiorów danych. Zasada działania opiera się o wykonywanie rozproszonych operacji w pamięci komputerów, pracujących jako jeden klaster obliczeniowy. Może to być architektura oparta o Mesosa, Hadoopa lub (od niedawna) Kubernetesa. Dzięki swojej szybkości oraz oferowanym możliwościom zyskał ogromną popularność i stał się jednym z podstawowych narzędzi Big Data. Dodatkowym atutem Sparka jest jego rozbudowane API dostępne nie tylko w Scali (w którym jest natywnie napisany) ale również w Pythonie, Javie czy R. Samo API pomiędzy językami różni się niewiele, co pozwala teoretycznie prototypować aplikację w jednym języku, a implementować produkcyjnie w drugim.
Rysunek 1. Czy jesteś w stanie rozpoznać w jakich językach napisano powyższe przykłady?
Czy Spark to tylko ETLe, przetwarzania i inne złożone transformacje danych? Nie.
Machine Learning spotyka Big Data
Pewną już oczywistością w dzisiejszych czasach jest ścisłe powiązanie dużych zbiorów danych z nauczaniem maszynowym. Często bez odpowiednio dużego (lub różnorodnego) zbioru danych nie jesteśmy w stanie wytrenować satysfakcjonującego modelu. Z myślą o takich potrzebach twórcy Sparka stworzyli komponent MLlib, który udostępnia wybrane algorytmy Machine Learning.
Rysunek 2. Na cały framework Sparka składa się kilka komponentów (źródło: spark.apache.org).
Niestety, nie zawsze algorytmy natywnie dostępne w Sparku są wystarczające do rozwiązania naszego problemu. Innym razem chcemy dokonać tylko predykcji przy użyciu już wytrenowanego modelu (pochodzącego np. z biblioteki Scikit-learn). Możemy oczywiście próbować pobrać wszystkie dane przy pomocy metody `.toPandas()`, a następnie potrzebne operacje wykonać lokalnie, ale mija się to z celem używania Sparka, gdyż tracimy cały zysk wynikający z rozproszonego przetwarzania.
Część potrzeb można zaadresować korzystając z Daska, dzięki któremu możemy również rozpraszać swoje przetwarzania i wykonywać je w pamięci. Niestety łatwość korzystania musimy czasem przypłacić trudnymi do zaadresowania wyciekami pamięci (https://github.com/dask/distributed/issues/3096) lub (jak w przypadku Dask YARN) różnymi innymi bolączkami takimi jak gorsza integracja z HDFS czy Hive.
Iskrzący wąż
Do niedawna łączenie kodu Pythonowego ze Sparkiem pozostawiało wiele do życzenia. Owszem, API RDD pozwalało na definiowanie własnych przetwarzań, ale znacznym kosztem czasu przetwarzania. Częściowo ten problem rozwiązywało API SQL (tzw. Spark SQL), które wprowadziło obiekty DataFrame, których przetwarzanie jest optymalizowane przez specjalny optymalizator kosztowy (https://databricks.com/glossary/catalyst-optimizer).
Rysunek 3. Różnica w czasie przetwarzania pomiędzy RDD a DataFrame (DF) w przypadku języka Python oraz Scala (Źródło: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html).
Czy to oznaczało koniec problemów? Niestety nie, w sytuacji, gdy niezbędne było wykonanie operacji niedostępnej natywnie w Spark SQL, wtedy konieczne było definiowanie własnej transformacji jako UDF (User Defined Function) lub powrót do RDD. Oznaczało to niestety wzrost czasu przetwarzania. Pewnym obejściem tego problemu było pisanie własnych rozszerzeń w Scali i następnie wywoływanie ich w Pythonie ale ilu użytkowników Pythona zna biegle Scalę?
Przyczyna całego problemu leży w architekturze PySparka. Jak pokazuje schemat (Rysunek 4), każdy Spark Worker powołany przez Driver inicjalizuje instancje Pythona, z którymi się komunikuje poprzez Py4J. Generuje to narzut związany z serializajcą oraz deserializacją danych. Z tego względu łączenie Pythona ze Sparkiem nie było rekomendowane przez społeczność Sparka jako dobre rozwiązanie dla bardzo dużych zbiorów danych.
Rysunek 4. Architektura PySparka (źródło: https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals).
Strzała przybywa na ratunek
Twórcy Sparka na prośbę społeczności podjęli próbę zaadresowania tego problemu poprzez modyfikację sposobu komunikacji Sparka z Pythonem. Pomocny okazał się projekt Apache Arrow, którego celem jest stworzenie ustandaryzowanego, niezależnego od języka programowania, formatu reprezentacji danych w pamięci (więcej na temat projektu można znaleźć na oficjalnej stronie projektu: https://arrow.apache.org/). Dzięki takiemu rozwiązaniu, Spark oraz Python mogą ze sobą się komunikować poprzez obiekty, które nie wymagają kosztownej serializacji oraz deserializacji.
Nowe rozwiązanie jest dostępne tylko w ramach API Spark SQL i jest ściśle powiązane z obiektami spark.sql.DataFrame. Nowy sposób definiowania własnych transformacji został nazwany jako Pandas UDF, ponieważ po stronie Pythona opiera się na obiektach pochodzących z popularnej biblioteki Pandas.
Na papierze wszystko wygląda dobrze, ale czy faktycznie tak jest? Jak pokazują wykresy (Rysunek 5) zysk jest tym większym, im bardziej złożony jest problem.
Rysunek 5. Porównanie szybkości działania różnych funkcji w dwóch implementacjach: zwykły UDF vs Pandas UDF (Źródło: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html).
Szybkość to nie jedyna różnica między Pandas UDF a klasycznym UDF. Dodatkowo Pandas UDF występuje w dwóch formach: Scalar oraz Grouped Map.
Pandas UDF Scalar ma zastosowanie identyczne jak standardowy UDF – służy do wykonania operacji w celu uzyskania nowej kolumny poprzez wykonanie operacji na innych kolumnach. Liczba powstałych wierszy musi być taka sama jak w przypadku danych wejściowych. A co się dzieje pod maską? Wyobraźmy sobie, że nasz spark.sql.DataFrame składa się z 3 partycji (Rysunek 6), utworzonych przez nas ze względu na jakiś warunek np. kategorię sklepu. Ze względu na specyfikę danych, powstałe partycje są różnego rozmiaru. Dane są przekazywane do Pythona w paczkach, których maksymalna wielkość jest określona przez parametr spark.sql.execution.arrow.maxRecordsPerBatch. O ile paczki nie podlegają dodatkowej operacji shuffle, o tyle mogą zawierać rekordy z kilku partycji na raz, ponieważ Spark zawsze stara się wysyłać paczki o maksymalnej wielkości. Dlatego, chcąc np. aplikować współczynnik zależny od rodzaju sklepu, należy rodzaj sklepu weryfikować dla każdego wiersza osobno.
Rysunek 6. Schemat ilustrujący zazębianie się partycji podczas używania Pandas UDF – Scalar.
Pandas Grouped Map daje nam możliwość wykonania zdefiniowanej przez nas operacji na danych pogrupowanych metodą .groupBy(). Dzięki temu możemy wykonać operacje na wszystkich danych tej samej kategorii. Co ciekawe, tym razem nie zwracamy pojedynczej kolumny lecz cały DataFrame. Dzięki temu użytkownik ma pełną swobodę co do liczby kolumn oraz wierszy. Takie rozwiązanie było wcześniej niedostępne w API Spark SQL i jedyną alternatywą było użycie API RDD.
Rysunek 7. Grouped Map gwarantuje nam, że w danej paczce będą dane z tylko jednej partycji.
Jak korzystać z Pandas UDF?
Teoria, teorią, ale zobaczmy jak w praktyce korzysta się z Pandas UDF. Na początek, potrzebujemy dodatkowej zależności w postaci biblioteki pyArrow, która musi być dostępna na wszystkich node’ach. Możemy to zrobić ręcznie instalując bibliotekę na każdej maszynie lub wysyłając conda environment podczas inicjalizacji Sparka. Oczywiście nic nie stoi na przeszkodzie, aby w fazie eksperymentowania robić wszystko lokalnie.
Mając przygotowane środowisko możemy przystąpić do pisania kodu. Każdy z rodzajów Pandas UDF charakteryzuje się pewnymi niuansami, które postaram się wyjaśnić.
Scalar
Ten typ jest koncepcyjnie najprostszy oraz najłatwiejszy do użycia, ponieważ jak już wspomniano, jest podobny do zwykłego UDF.
Załóżmy, że chcielibyśmy dokonać konwersji współrzędnych geograficznych na Open Location Code (https://en.wikipedia.org/wiki/Open_Location_Code) przy użyciu biblioteki autorstwa Google (https://github.com/google/open-location-code/). W przypadku zwykłego UDF nasza definicja funkcji wyglądałaby mniej więcej tak:
W przypadku Pandas UDF musimy nieco więcej napisać:
Jak widać sposób wywołania obu UDF jest identyczny - jako parametr przekazujemy nazwy kolumn. Wewnątrz funkcji wartości są przekazywane jako pandas.Series, którymi możemy swobodnie operować wewnątrz funkcji. Jak widać, tego typu funkcję można spokojnie prototypować lokalnie, poza Sparkiem.
Jak w naszym przypadku wyszło porównanie szybkości pokazuje poniższy wykres (Rysunek 8.).
Rysunek 8. Porównanie szybkości transformacji szerokości geograficznej do Open Location Code dla 200 tys. rekordów.
Grouped Map
Jak już wcześniej wspomniałem, ten typ Pandas UDF przyda się wszędzie tam, gdzie do wykonania operacji potrzebujemy wszystkich danych. Może to być wyliczenie złożonej miary, wyrysowanie wykresu, a nawet wytrenowanie modelu! Grouped Map zawsze będziemy używać poprzez metodę .apply() dostępną po wywołaniu metody .groupBy()
Przyjmijmy, że mamy listę klientów. Każdy klient ma tylko jeden wpis, a każdy wpis zawiera m.in. pole z całkowitą liczbą wystawionych recenzji oraz pole ze średnią oceną ze wszystkich opinii klienta. Następnie, grupujemy klientów ze względu na średnią ocenę (zaokrągloną do liczby całkowitej), żeby w kolejnym kroku dla każdej takiej grupy obliczyć liczebność oraz wygenerować histogram przedstawiający rozkład liczby recenzji. Jak to zrobić jedną funkcją?
Na początku musimy zdefiniować schemat nowej struktury, ponieważ zwracanym obiektem nie jest tym razem pojedynczy pandas.Series lecz pandas.DataFrame, który następnie staje się pyspark.sql.DataFrame. W omawianym przykładzie planujemy zwrócić średnią ocenę (nasze pole grupujące) oraz liczbę użytkowników:
Mając już ustalony typ, możemy zdefiniować funkcję, która będzie się wykonywała na workerach:
Jak widać, tym razem do zdefiniowania użyto dekoratora – kwestią indywidualną jest w jaki sposób definiujemy Pandas UDF. Nasza funkcja jako argument przyjmuje DataFrame typu pandasowego i również taki DataFrame zwraca. Dzięki temu, ponownie jak w przypadku Pandas UDF typu Scalar, możemy naszą funkcję testować lokalnie przed użyciem w Sparku.
Jak widać, wewnątrz funkcji generujemy i zapisujemy na dysku wykres oraz zliczamy liczbę użytkowników. Uwagę może przykuć wysłanie zapisanego wykresu na HDFS. Musimy to zrobić, ponieważ Spark uruchomiony w trybie rozproszonym powołuje workery na różnych maszynach, do których najczęściej nie mamy bezpośredniego dostępu. Dodatkowo, wszelkie lokalne pliki stworzone podczas przetwarzania ulegają usunięciu wraz z zakończeniem się aplikacji Sparkowej. Z tego względu musimy we własnym zakresie zadbać o to, żeby wygenerowane pliki znalazły się w dostępnym dla nas miejscu – w tym przypadku jest to HDFS, ale może to być dowolny inny zasób. Tego problemu oczywiście nie mamy, gdy rezultaty zwracamy w ramach DataFrame.
Użycie napisanej funkcji sprowadza się do prostego wywołania:
Sky is the limit?
Niestety korzystając z Pandas UDF należy zwrócić uwagę na kilka aspektów technicznych. Po pierwsze, jeżeli używamy biblioteki PyArrow >= 0.15.0 musimy pamiętać o ustawieniu zmiennej środowiskowej ARROW_PRE_0_15_IPC_FORMAT=1. Najlepiej zrobić to poprzez konfigurację samego Sparka:
Drugą, bardziej problematyczną kwestią jest ograniczenie rozmiaru paczki danych do 2 GB, które do niedawna było obecne w bibliotece Arrow https://issues.apache.org/jira/browse/ARROW-4890. Największe konsekwencje tego ograniczenia widoczne są podczas używania Grouped Map, ponieważ wtedy dochodzi do przesłania dużych paczek danych. Niestety tym problemem obarczone są wszystkie (na moment pisania) wersje Sparka.
Praktyczne zastosowanie
Pandas UDF znalazł swoje praktyczne zastosowanie w ING jako metoda do zrównoleglenia treningu kilkudziesięciu modeli dla różnych kategorii danych (Rysunek 9). Każdy executor poprzez gridsearch znajduje najbardziej optymalny model, którego parametry są zwracane jako DataFrame. Wytrenowane modele są zapisywane na HDFS. Oprócz tego, Pandas UDF wykorzystywany jest do uzyskiwania predykcji, rysowania wykresów oraz wykonywania zaawansowanych operacji na kolumnach.
Rysunek 9. Schemat przedstawiający architekturę wykorzystywaną w ING do produkcyjnego trenowania kilkudziesięciu modeli w jednym czasie na wielu maszynach.
Apache Spark to niewątpliwie użyteczne narzędzie, które w ING jest częścią istotnych procesów oraz należy do zbioru podstawowych narzędzi Data Scientistów. Dlatego z niecierpliwością oczekujemy wersji 3.0, żeby móc w boju przetestować nie tylko nowości, ale również poprawki i usprawnienia już istniejących funkcjonalności.