Overslaan en naar de inhoud gaan

đź“… Schrijf je in voor de Masterclass Power BI report design op 30 oktober

Annelijn
23-6-2023 - 5 min

Van een data warehouse met T-SQL naar een Lake house met PySpark

Een start met PySpark in Azure Databricks vanuit een Data Engineering rol

Dit jaar ben ik begonnen bij een nieuwe klant. Bij deze klant is de ELT-code geschreven in PySpark binnen Azure Databricks. Azure Data Factory gebruiken wij voor de orkestratie van databewerkingen.

Ik had nog geen ervaring met PySpark of met Azure Databricks. Ik had wel al enkele cursussen gevolgd van Python via DataCamp, maar toch merkte ik al snel dat PySpark net even anders werkt.

Zo worden er bijvoorbeeld andere libraries gebruikt en is de syntax soms net even anders. PySpark kan je zien als een dialect van Python. Als je al ervaring hebt met Python, dan zul je ongetwijfeld ook snel aan de gang kunnen gaan met PySpark.

Ik heb via Advancing Analytics een Spark cursus gevolgd om de onder andere de basis van PySpark te leren. Deze was erg waardevol voor mij, omdat er goed wordt uitgelicht wat de best practices zijn bij bepaalde scenario’s op het gebied van Data & Analytics. Als je al een achtergrond hebt in SQL zul je merken dat PySpark snel onder de knie krijgt.

In deze blog geef ik wat tips om de basis van PySpark en Azure Databricks beter te begrijpen.

Azure Databricks – Workspaces & Repos

In Databricks zijn er verschillende manieren en plekken waar je je code kan schrijven en opslaan. Code schrijf je in Notebooks. Dit kan je doen vanuit een:

  • Workspace
  • Repo

Workspace

Een Workspace (werkruimte) biedt een geĂŻntegreerde notebook-omgeving waarin je code kunt schrijven in verschillende talen zoals PySpark, Scala en SQL. Met de werkruimte kun je eenvoudig notebooks maken, bewerken en delen met je teamleden.

Repo

Databricks ondersteunt ook de samenwerking door middel van versiebeheer, zodat je wijzigingen in de code kunt bijhouden en kunt terugkeren naar eerdere versies indien nodig. Je werkt dan vanuit een Repo

Databricks workspace screenshot 1

Vanuit een Repo maak je een een eigen branch (vertakking) aan, waardoor er een vertakking ontstaat van de main vertakking.

Vanuit deze vertakking kan je nieuwe code ontwikkelen of bestaande code wijzigen. Als je code af is, dan kan je dit direct committen in je vertakking en pushen naar DevOps.

Pull requests

In DevOps maak je vervolgens je pull request aan (dit kan niet vanuit Databricks). Zodra je wijzigingen zijn uitgerold naar de betreffende omgeving staat je code in de Workspace van die omgeving. Dit is de live locatie vanwaar de notebooks worden aangeroepen vanuit Azure Data Factory. In DevOps kan dit bijvoorbeeld met behulp van Azure Pipelines.

Medaillion architecture

Bij mijn klant hebben we gekozen om onze Notebooks in een hoofdmap “Notebooks” te plaatsen met vervolgens daarin in verschillende sub mappen, zodat duidelijk is wat waar staat. Zoals je kunt zien maken wij gebruik van de Medaillon Architecture (voor meer info zie What is a Medallion Architecture?).

In de map Libraries staan Notebooks met logica die in meerdere notebooks worden aangeroepen, zodat je deze code niet steeds opnieuw voluit hoeft uit te schrijven. Denk hierbij voorbeeld aan een merge statement die je in meerdere tabellen gebruikt.

De Workspace is dus waar de code Live staat, het is niet de bedoeling dat je hier direct wijzigingen in aan brengt. Wel heb je in de Workspace je persoonlijke map staan waar je ad-hoc kan werken. Handig als je bijvoorbeeld de datakwaliteit wilt controleren of als je een ad-hoc-analyses moet doen. Zoals je hieronder kan zien heb ik verschillende notebooks aangemaakt, allemaal met een verschillend doeleinden.

Databricks workspace screenshot 2

De basis, werken met DataFrames – net geen tabel

Als je met PySpark gaat werken dan zul je regelmatig DataFrames maken. Een DataFrame klinkt als een tabel maar dat is het niet. Eigenlijk is een DataFrame gewoonweg een query. In de allereerste DataFrame lees je de data met behulp van het commando “spark.read.load…”

Daarna breid je de DataFrame, ofwel je query, steeds meer uit door transformaties en bewerkingen te doen, bijvoorbeeld door het toevoegen van kolommen.

Maar dan is het dus nog steeds een query. Steeds als je bewerkingen toevoegt aan een DataFrame, ken je deze toe aan een nieuwe DataFrame. Je kan ervoor kiezen om dezelfde DataFrame naam aan te houden, daarmee overschrijf je dus in feite je bestaande DataFrame.

Een voorbeeld, in onderstaand voorbeeld lees ik een test.csv bestand in en wijs deze toe aan een DataFrame genaamd “df”. Vervolgens voeg ik een extra kolom toe; “ColumnB” en vul deze met een default waarde “B” en wijs deze opnieuw toe aan een DataFrame genaamd “df”.

Als je nu het commando display(df) uitvoert zie je resultaat terug inclusief de extra toegevoegde kolom, ofwel de laatste versie van “df”.

pyspark query voorbeeld 1

Maar, stel ik wijs het DataFrame met de extra kolom “B” toe aan een nieuw DataFrame “dfB” toe, zoals hieronder, dan zie je de extra toegevoegde kolom alleen als je “dfB” opvraagt. Als je het eerdere DataFrame “df” opvraagt, dan is dit zonder de toegevoegde kolom.

pyspark query voorbeeld 2

Je zal merken dat wanneer je je DataFrame blijft uitbreiden, door steeds weer nieuwe bewerkingen toe te voegen en toe te wijzen aan nieuwe DataFrames, dat het uitvoeren van deze stukken code erg snel is. Dat komt omdat de bewerkingen nog niet worden uitgevoerd, je breidt immers alleen je query uit en zegt tegen Spark dat hij dit moet onthouden. Je vraagt nog niet het resultaat op.

Eigenlijk is het dus één grote query maar dan in stukjes opgedeeld, wat je code een stuk overzichtelijker maakt. En pas als je het resultaat van alle voorgaande bewerkingen wilt zien, door middel van display(dataframe), dan zal je ook zien dat het wat meer tijd kost omdat dan pas alle bewerkingen worden uitgevoerd wat Spark heeft onthouden. Alle code daarvoor is simpelweg een update van de query, je zegt dan tegen Spark “breid mijn query hiermee uit, maar vóer het nog niet uit”.

Je kan dit vergelijken door het maken van een T-SQL View in Microsoft SQL Server. Bij het maken van zo een View verwerk je alle benodigde logica. Het opslaan van deze view gaat erg snel (net zoals bewerkingen toevoegen aan een DataFrame). Maar zodra je een Select statement uitvoert op je view, dan pas wordt alle logica in de view uitgevoerd en zal dit doorgaans wat langer duren.

Pas nadat je een dataframe hebt weggeschreven naar bijvoorbeeld Delta , dan pas is het daadwerkelijk een tabel.

Welke taal te gebruiken, to SQL or not to SQL

Je kan verschillende talen gebruiken in Azure Databricks; SQL, PySpark, Scala en R.

Maar welke taal moet je nou eigenlijk gebruiken? Het antwoord daarop is dat dat niets uitmaakt. Onder de motorkap vertaalt Spark het naar dezelfde taken. Een taal is niet sneller dan de andere, dit is dus een persoonlijke voorkeur. Wat wel verschilt zijn de beschikbare functies binnen een specifieke taal.

Tot nu toe is mijn voorkeur om alle logica in PySpark te doen. Binnen een SQL-omgeving zoals Microsoft SQL Server gebruik je daar Stored Procedures voor, maar dat kan niet in Databricks. Je kan wel de code van een Stored Procedure in een notebook zetten, maar naar mijn mening is de code om databewerkingen te doen in PySpark overzichtelijker door onder andere het gebruik van DataFrames.

Voor analyses en controle van de datakwaliteit op de tabellen in gebruik ik het liefste SQL omdat ik dat makkelijker vind voor dat soort bevragingen, omdat dat meestal adhoc vragen zijn en ik heb met SQL dan toch wel nog de meeste ervaring. Dat kan ook prima met PySpark, Scala of R.

Parquet versus Delta

Delta

In Databricks kan je onder andere data wegschrijven naar Delta. Je kan ook data wegschrijven naar een Parquet tabel. Wat is dan het verschil? Het antwoord is vrij simpel; Delta is een uitbreiding van het Parquet-bestandsformaat met extra functionaliteit zoals transacties, gegevensbeheer en flexibele schema-evolutie. Delta introduceert feitelijk dus het concept van tabellen, maar dan binnen een data lake, zoals wij deze in de SQL-wereld al langer kennen.

Parquet

Parquet is een columnar-bestandsformaat dat is geoptimaliseerd voor efficiënte opslag en verwerking van grote datasets. Het comprimeert gegevens en slaat ze op in een columnar-indeling, waardoor het lezen en schrijven van gegevens efficiënter wordt. Parquet bestanden staan fysiek in een storage container (data lake).

Je kan in de Azure portal naar deze map en de Parquet bestanden navigeren, maar helaas kan je uit zo'n individueel bestand niet opmaken wat voor data er precies in zit of uit wat voor structuur de tabellen bestaan. Dit is dus een beetje een black box. Je zal dus altijd een vertaler/tool nodig hebben om de data te kunnen inzien. Belangrijk om te onthouden is dat als je een query uitvoert op een Delta tabel, dat je de query uitvoert op de map en niet op een specifiek bestand. Spark kijkt dan zelf welke parquet bestanden benodigd zijn voor het weergeven van het juiste resultaat.

Je kan ook een partitionering meegeven bij het wegschrijven van de data naar een Delta tabel. Er worden dan automatisch sub mappen gemaakt binnen de tabel-map. Dit is handig als je weet dat de data bijvoorbeeld vaak gefilterd wordt op jaartal. Als je dan in de partitionBy() clausule meegeeft op welke kolom er moet worden gepartitioneerd (bijvoorbeeld “Year”), dan maakt Spark voor jou de sub mappen aan en schrijft de bestanden naar de bijbehorende mappen weg. Bij het lezen van de data worden dan niet álle mappen met daarin de bijbehorende bestanden gelezen, maar alleen in het de sub map met het betreffende jaartal.

Tot slot

Natuurlijk is er nog veel meer te vertellen over PySpark en Databricks. Leuk ook dat deze technologieën zich blijven ontwikkelen, dus we raken nooit uitgeleerd :-) Ik hoop dat ik jullie met deze tips een beetje wegwijs heb kunnen maken in PySpark en in Databricks! Heb jij zelf ook nog handige tips die je graag wilt delen? Laat me dit dan weten via LinkedIn.

Over de schrijver

Annelijn
LinkedIn