Databricks

In private preview

This guide explains how to create a service account (metadata access only) for Revefi on Databricks.

Step 1: Create a new Databricks access token
Revefi connects to your Databricks via an access token. You can create a new service principal for Revefi and then generate an access token for that corresponding service principal. You can also use an access token for an existing user in your workspace if that's easier. Both the approaches are described below

[Option 1]: Generate access token for a new service principal (Prefered)

  1. Create a service principal for Revefi
    Use the guide to create a service principal in your Databricks account and add it to your workspace. Save the application id

  2. Grant token usage to service principal in workspace
    Use the guide to grant the above service principal permissions to use access tokens.

  3. Generate an access token for service principal
    Use the guide to generate an access token for the new service principal.

[Option 2]: Generate personal access token for your user
1. Use the guide to generate a personal access token for an existing user.

Step 2: Enable System tables on Unity Catalog

Use the guide to enable information_schema, access, workflow, compute, query and billing system schemas if not enabled already. This requires metastoreID which can be found in databricks workspace under catalog -> select any catalog -> details

Step 3: Grant Unity Catalog data permission to the service principal
Run these commands on each catalog that Revefi should have access to.

GRANT USE_CATALOG ON CATALOG <catalog_name> TO `<application_id>`;  
GRANT USE_SCHEMA ON CATALOG <catalog_name> TO `<application_id>`;  
GRANT SELECT ON CATALOG <catalog_name> TO `<application_id>`;

Revefi also needs access to thesystem catalog. Use the above commands to grant access to system Catalog as well. Note that access to system catalog can only be granted by a metastore admin

Step 4: Create a Databricks SQL Warehouse
Use the guide to create a new SQL Warehouse for Revefi(Serverless is preferred). Use the 'Permissions' button and give the new service principal 'Can use' permissions on this warehouse.

Step 5: Create Revefi Jobs in your databricks Account
Since Databricks currently only provides the full job run history via APIs, we need to have one databricks job in your account to periodically copy over the API data into a table(within your databricks account). Databricks expects to expose this information via System table soon so the job can be removed after that. Ensure that the job is running successfully and populating the corresponding tables as mentioned below

Create a new catalog and grant full access to the service principal

Use the guide to create a new catalog called revefi for the two tables and grant below access

GRANT USE_CATALOG ON CATALOG revefi TO `<application_id>`;  
GRANT USE_SCHEMA ON CATALOG revefi TO `<application_id>`;  
GRANT SELECT ON CATALOG revefi TO `<application_id>`;

Create a access Token for API access

Use the guide to create an access token for API access. This is only needed in the job that will copy over data. Revefi doesnt need API access directly. Also ensure that the user has all_privileges on the revefi catalog so the databricks job can write data to the tables in this catalog.

It is recommended to use dedicated clusters and not a shared cluster for this job

Databricks Job to copy over Job Run History
Use the below python script to setup an hourly job that copies over the data from the Job Run API into a table called Revefi.default.JobRunHistory.

Replace hostname and accessToken with the corresponding values
Pass ["{{job.start_time.timestamp_ms}}"] as a parameter to the job
Create a trigger to schedule the job every hour

import json  
import requests  
import sys  
from functools import reduce  
from pyspark.sql import DataFrame  
from pyspark.sql import functions as F  
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType, BooleanType, ArrayType

schema = StructType([  
  StructField("cleanup_duration", LongType(), True),  
  StructField("creator_user_name", StringType(), True),  
  StructField("end_time", LongType(), True),  
  StructField("execution_duration", LongType(), True),  
  StructField("format", StringType(), True),  
  StructField("job_id", LongType(), True),  
  StructField("number_in_job", LongType(), True),  
  StructField("original_attempt_run_id", LongType(), True),  
  StructField("run_duration", LongType(), True),  
  StructField("run_id", LongType(), True),  
  StructField("run_name", StringType(), True),  
  StructField("run_page_url", StringType(), True),  
  StructField("run_type", StringType(), True),  
  StructField("setup_duration", LongType(), True),  
  StructField("start_time", LongType(), True),  
  StructField("state", StructType([  
         StructField("life_cycle_state", StringType(), True),  
         StructField("result_state", StringType(), True),  
         StructField("state_message", StringType(), True),  
         StructField("user_cancelled_or_timedout", BooleanType(), True),  
         StructField("queue_reason", StringType(), True)  
     ]), True),  
  StructField("trigger", StringType(), True),  
  StructField("schedule", StructType([  
         StructField("quartz_cron_expression", StringType(), True),  
         StructField("timezone_id", StringType(), True),  
         StructField("pause_status", StringType(), True)  
     ]), True),  
  StructField("tasks", ArrayType(  
    StructType([  
         StructField("task_key", StringType(), True),  
         StructField("description", StringType(), True),  
         StructField("spark_python_task", StructType([  
           StructField("python_file", StringType(), True),  
           StructField("parameters", ArrayType(StringType()), True),  
           StructField("source", StringType(), True)  
       ]), True)  
     ])  
  ), True)  
  ])

# json to dataframe

def to_df(result_json):  
 return (  
   spark.createDataFrame(result_json, schema)  
 )

hostname = "<Hostname>"  
token = "<accessToken>"

header={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}  
url = f"https://{hostname}/api/2.1/jobs/runs/list?expand_tasks=true"

start_time_from = None  
if spark.sql("select 1 from revefi.information_schema.tables where table_name = 'jobrunhistory'").collect() != []: 
 start_time_from = spark.sql("select max(start_time) from revefi.default.JobRunHistory").collect()[0][0]

if start_time_from is None:  
 start_time_from = int(sys.argv[1]) - (86400000*15)  
print(start_time_from)

payload = {  
 "start_time_from": start_time_from + 1,  
 "start_time_to": int(sys.argv[1]) - 3600000  
}

print("Starting script")  
data = json.dumps(payload)  
lst_df = []  
has_next_page = True  
while has_next_page:  
 print("Calling api")  
 response = requests.get(url=url, data=data, headers=header)  
 has_next_page = response.json()['has_more']  
 if 'runs' in response.json():  
   print("Received response")  
   df = to_df(response.json()['runs'])

# Check if columns are missing in the first DataFrame

   if lst_df and len(df.columns) > len(lst_df[0].columns):  
       missing_columns = set(list(df.columns)) - set(list(lst_df[0].columns))  
       for col in missing_columns:  
           lst_df[0] = lst_df[0].withColumn(col, F.lit(None).cast("string"))

   lst_df.append(df)

 if has_next_page and 'next_page_token' in response.json():  
   print("Next page")  
   next_page_token = response.json()['next_page_token']  
   payload = {  
   "start_time_from": start_time_from + 1,  
   "start_time_to": int(sys.argv[1]) - 3600000,  
   "page_token": next_page_token  
   }  
   data = json.dumps(payload)

print("Received full response")

# Union all the dataframe into

if lst_df:  
 print("Writing data")  
 final_df = reduce(DataFrame.unionAll, lst_df)  
 final_df.write.mode("append").option("mergeSchema", "true").saveAsTable("revefi.default.JobRunHistory")

print("Completed script")

Step 6: Add Databricks as a connection in Revefi
Finally now you can add your Databricks on Revefi. On the connections page, click the 'Add connection' and select Databricks as the source. The HostName, Port and HTTP Path fields come from the SQL Warehouse created in Step 4. The Access token comes from Step 1.