Apache Airflow – custom PowerBI refresh operator

In een eerdere blogpost heb ik geschreven hoe de PowerBI API in te zetten is voor het refreshen van datasets. Later heb ik deze uitgebreid met een methode hoe de status van deze refreshes is op te vragen. Hiermee is het niet meer ‘fire-and-forget’., maar kun je netjes afvangen of de refresh succesvol is of niet. Ook kun je een beknopte failure reden terugkrijgen, echter is deze vaak niet heel indicatief is gebleken;

				
					Melding: 'ModelRefresh_ShortMessage_ProcessingError'
				
			

Deze melding is redelijk generiek. Als je het object in kwestie opent in de PowerBI service en daar de details opent, zie je een stuk meer;

				
					Data source error{"error":{"code":"DMTS_UserLostPermissionToDatasourceError",
"pbi.error":{"code":"DMTS_UserLostPermissionToDatasourceError",
"details":[{"code":"Server","detail":{"type":1,"value":"<HOST_NAME>"}},
{"code":"Database","detail":{"type":1,"value":"sdwh_prd"}},
{"code":"ConnectionType","detail":{"type":0,"value":"Sql"}}],
"exceptionCulprit":1}}} Table: <TABLE_NAME>
Cluster URI <URI>
Activity ID <ACTIVITY_ID>
Request ID <REQUEST_ID>
Time <TIME>
				
			

Dit is meer informatie dan standaard uit de `get-refresh-history` API komt.

In deze laatste iteratie heb ik alles beter laten samenwerken met de open source scheduling tool Apache Airflow in de vorm van een custom operator.

Wat is Apache Airflow?

Apache Airflow is een open-source platform voor development, scheduling en monitoring van workflows. Een workflow in deze context is in feite het triggeren van een taak in een schedule, waarbij een taak van alles kan zijn. In de terminologie van Airflow wordt een workflow / schedule ook wel DAG* genoemd.

*Directed acyclic graph

Voorbeeld Airflow DAG

Deze afbeelding toont een voorbeeld van een Airflow DAG. Elk blokje vormt een task. Een task kan bijvoorbeeld een operator gebruiken, en stuk template code om een specifieke taak met parameters uit te voeren. Ik heb een eigen operator gemaakt voor deze PowerBI refresh usecase; de PowerBIRefreshOperator.

Deze blog legt uit hoe de opgezet gemaakt is. Het geeft geen algemene uitleg over de anatomie van Apache Airflow, hiervoor verwijs ik je graag naar de algemene documentatie. Waar nodig link ik ook specifieke onderwerpen naar de bijbehorende documentatie, gelukkig is deze zeer uitgebreid!

Voorbeeldje van een DAG

Een DAG is beschikbaar op de filestorage van Airflow (folder `/dags`) als een .py (Python) bestand. Onderstaande code-snippet is een heel simpel voorbeeld genaamd “my-dag”, met daarin twee taken die sequentieel aan elkaar gekoppeld zijn; “ping” & “email”.

				
					with DAG("my-dag") as dag
    ping = SimpleHttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")
    ping >> email:
				
			

Beide taken gebruiken een Operator (SimpleHttpOperator & EmailOperator. Deze operators zijn de feitelijke acties die een het moet uitvoeren. De Operators zijn aan te sturen met input parameters, zoals

				
					endpoint=

				
			

Meer uitleg inclusief een tutorial over Operators kun je hier vinden.

PowerBIRefreshOperator (custom operator)

Airflow maakt het mogelijk om een eigen operator te maken, die daarmee specifiek uitvoert wat je nodig hebt en past binnen je context. Hiermee flexibel en krachtig! 

“The extensibility is one of the many reasons which makes Apache Airflow powerful.”

Een custom operator kun je maken door een nieuwe class te maken, extended van de BaseOperator. Een customer operator moet verplicht een `__init__` & `execute` method override hebben. Dit wordt de constructor & executor genoemd. Dat ziet er als volgt uit;

				
					from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return messager
				
			

En dan vervolgens in een DAG te gebruiken als;

				
					from custom_operator.hello_operator import HelloOperator
with dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")r
				
			

In deze repository vind je de code voor de PowerBIRefreshOperator. In `class PowerBIRefreshOperator(BaseOperator):` zie je diverse functies (class methods) staan die gebruikt worden, dit is niet anders dan in mijn vorige blogpost uitgelegd. Alleen zijn deze nu voorzien van self input parameters.

Constructor

De init method vertaald eigenlijk de input parameters die je aan de operator meegeeft en ‘mapped’ deze aan de parameters die je kan meegeven. In dit geval gaat het dus om inputs ‘ENV‘, ‘action‘, ‘workspace‘ en ‘object‘. Deze vier input parameters definiëren een refresh actie uniek.

				
					def __init__(self, ENV: str, action: str, workspace: str, object: str ,**kwargs) -> None
	super().__init__(**kwargs)
	self.ENV = ENV
	self.action = action
	self.workspace = workspace
	self.object = object:
				
			

Executor

				
					``
def execute(self, context):

	try:
		# Ophalen van de environment settings ENV_FILE
		
		environment_settings = self.get_environment_settings(ENV_FILE)
		authority_url = environment_settings[self.ENV][0]['authority']
		scope = environment_settings[self.ENV][0]['scope']
		sa_name = environment_settings[self.ENV][0]['sa_name']
		client_id = environment_settings[self.ENV][0]['client_id']
		
		commands = {
			"refresh_dataset": self.refresh_dataset,
			"refresh_dataset_by_names":self.refresh_dataset_by_names,
			"get_access_token": self.get_access_token_msal,
			"refresh_dataflow_by_names": self.refresh_dataflow_by_names
		}
		matched_command = commands.get(self.action)
	
		if not matched_command:
			raise Exception("Geen geldig commando gevonden.")
		matched_command(self.workspace, self.object, client_id, client_token, authority_url, scope)
	
	except Exception as exp:
	
		log.exception("An error occurred while running the PowerBIRefreshOperator")
		raise`
				
			

Als eerst worden environment settings opgehaald uit een environment file. Dit zijn parameters die je nodig hebt om mee te geven aan de API request. Hier hoort ook een token/secret bij, maar deze wil je vanzelfsprekend niet onderdeel maken van je codebase. Hier zijn diverse fancy oplossingen voor, bijvoorbeeld Azure Keyvault. Airflow heeft echter ook een eigen implementatie in de vorm van Airflow Variables, hier is via de frontend van Airflow goed mee te werken;

Airflow Managed Variables pagina

Zoals je kan zien is dit een goede plek om secrets in op te slaan. Deze worden nooit in plain-text weergegeven, mits de key naam van zo’n variable termen als ‘key’, ‘secret’, ’token’ of ‘password’ heeft. In feite beschermt Airflow ons ;-).

Operator gebruiken als task in een DAG

Onderstaande codesnippet geeft de syntax weer hoe deze operator nu toe te passen is als task in een DAG;

				
					powerbi_refresh_test = PowerBIRefreshOperator
	task_id = 'powerbi_refresh_test',
	ENV = ENV,
	action = 'refresh_dataset_by_names',
	workspace = '<naam van PowerBI workspace>',
	object = '<naam van dataset>'
)
				
			

Hier heb je de volgende imports voor nodig;

				
					from airflow.models import Variable
from operators.powerbi_refresh_operator import PowerBIRefreshOperator
				
			

Variable is nodig, omdat je `ENV` moet maken in de vorm van;

				
					ENV = Variable.get("ENV")
				
			

Deze wordt gebruikt als een van de inputs voor de operator ‘ENV’ is hier een key in de variabele beschikbaar voor Airflow. Vanzelfsprekend staan hier waardes in als ‘DEV’, ‘TST’, ‘ACC’ en ‘PRD’ voor de respectievelijke Airflow instanties.

Airflow Manage Variables - ENV Variable

DAG execution

In de repo zit ook een voorbeeld DAG `inf_powerbi_refresh_test.py`, als deze door Airflow geladen wordt zit het er zo uit;

Voorbeeld DAG execution

DAG log

Airflow houdt voor elke task execution een log bij. Voor dit voorbeeld ziet dat er dan als volgt uit;

Airflow Log voorbeeld

N.B. Enkele zaken heb ik bewust gegumd uit de logs.

Met deze custom operator heb ik het mogelijk gemaakt voor engineers in mijn team om eenvoudig objecten binnen PowerBI te refreshen. De eerste toepassing bleek toch wat omslachtig en moeilijk schaalbaar. Een aanpassing maken vergde toch veel tijd. Door deze Airflow operator is het toepassen en het uitvoeren van een change gesneden koek voor gebruikers.

Joerie Brugts – Data Engineer @ Conspect

Uitgelicht

Retail

Doormiddel van het inzichtelijk maken van datavraagstukken en het inzetten van BI-oplossingen helpen wij klanten in de retail actuele informatie uit de data te halen.

Lees meer »

Datamigratie in OutSystems

Dit blog is geschreven naar aanleiding van een succesvolle ervaring bij een van onze klanten, waarbij ongeveer 55 miljoen records succesvol zijn gemigreerd.
Meer weten??? Lees snel verder!

Lees meer »