Databricks
In private preview
This guide explains how to create a service account (metadata access only) for Revefi on Databricks.
Step 1: Create a new Databricks access token
Revefi connects to your Databricks via an access token. You can create a new service principal for Revefi and then generate an access token for that corresponding service principal. You can also use an access token for an existing user in your workspace if that's easier. Both the approaches are described below
[Option 1]: Generate access token for a new service principal (Prefered)
-
Create a service principal for Revefi
Use the guide to create a service principal in your Databricks account and add it to your workspace. Save the application id -
Grant token usage to service principal in workspace
Use the guide to grant the above service principal permissions to use access tokens. -
Generate an access token for service principal
Use the guide to generate an access token for the new service principal.
[Option 2]: Generate personal access token for your user
1. Use the guide to generate a personal access token for an existing user.
Step 2: Enable System tables on Unity Catalog
Use the guide to enable information_schema
, access
, workflow
, compute
, query
and billing
system schemas if not enabled already. This requires metastoreID which can be found in databricks workspace under catalog -> select any catalog -> details
Step 3: Grant Unity Catalog data permission to the service principal
Run these commands on each catalog that Revefi should have access to.
GRANT USE_CATALOG ON CATALOG <catalog_name> TO `<application_id>`;
GRANT USE_SCHEMA ON CATALOG <catalog_name> TO `<application_id>`;
GRANT SELECT ON CATALOG <catalog_name> TO `<application_id>`;
Revefi also needs access to thesystem
catalog. Use the above commands to grant access to system Catalog as well. Note that access to system
catalog can only be granted by a metastore admin
Step 4: Create a Databricks SQL Warehouse
Use the guide to create a new SQL Warehouse for Revefi(Serverless is preferred). Use the 'Permissions' button and give the new service principal 'Can use' permissions on this warehouse.
Step 5: Create Revefi Jobs in your databricks Account
Since Databricks currently only provides the full job run history via APIs, we need to have one databricks job in your account to periodically copy over the API data into a table(within your databricks account). Databricks expects to expose this information via System table soon so the job can be removed after that. Ensure that the job is running successfully and populating the corresponding tables as mentioned below
Create a new catalog and grant full access to the service principal
Use the guide to create a new catalog called revefi for the two tables and grant below access
GRANT USE_CATALOG ON CATALOG revefi TO `<application_id>`;
GRANT USE_SCHEMA ON CATALOG revefi TO `<application_id>`;
GRANT SELECT ON CATALOG revefi TO `<application_id>`;
Create a access Token for API access
Use the guide to create an access token for API access. This is only needed in the job that will copy over data. Revefi doesnt need API access directly. Also ensure that the user has all_privileges
on the revefi catalog so the databricks job can write data to the tables in this catalog.
It is recommended to use dedicated clusters and not a shared cluster for this job
Databricks Job to copy over Job Run History
Use the below python script to setup an hourly job that copies over the data from the Job Run API into a table called Revefi.default.JobRunHistory
.
Replace hostname
and accessToken
with the corresponding values
Pass ["{{job.start_time.timestamp_ms}}"] as a parameter to the job
Create a trigger to schedule the job every hour
import json
import requests
import sys
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType, BooleanType, ArrayType
schema = StructType([
StructField("cleanup_duration", LongType(), True),
StructField("creator_user_name", StringType(), True),
StructField("end_time", LongType(), True),
StructField("execution_duration", LongType(), True),
StructField("format", StringType(), True),
StructField("job_id", LongType(), True),
StructField("number_in_job", LongType(), True),
StructField("original_attempt_run_id", LongType(), True),
StructField("run_duration", LongType(), True),
StructField("run_id", LongType(), True),
StructField("run_name", StringType(), True),
StructField("run_page_url", StringType(), True),
StructField("run_type", StringType(), True),
StructField("setup_duration", LongType(), True),
StructField("start_time", LongType(), True),
StructField("state", StructType([
StructField("life_cycle_state", StringType(), True),
StructField("result_state", StringType(), True),
StructField("state_message", StringType(), True),
StructField("user_cancelled_or_timedout", BooleanType(), True),
StructField("queue_reason", StringType(), True)
]), True),
StructField("trigger", StringType(), True),
StructField("schedule", StructType([
StructField("quartz_cron_expression", StringType(), True),
StructField("timezone_id", StringType(), True),
StructField("pause_status", StringType(), True)
]), True),
StructField("tasks", ArrayType(
StructType([
StructField("task_key", StringType(), True),
StructField("description", StringType(), True),
StructField("spark_python_task", StructType([
StructField("python_file", StringType(), True),
StructField("parameters", ArrayType(StringType()), True),
StructField("source", StringType(), True)
]), True)
])
), True)
])
# json to dataframe
def to_df(result_json):
return (
spark.createDataFrame(result_json, schema)
)
hostname = "<Hostname>"
token = "<accessToken>"
header={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
url = f"https://{hostname}/api/2.1/jobs/runs/list?expand_tasks=true"
start_time_from = None
if spark.sql("select 1 from revefi.information_schema.tables where table_name = 'jobrunhistory'").collect() != []:
start_time_from = spark.sql("select max(start_time) from revefi.default.JobRunHistory").collect()[0][0]
if start_time_from is None:
start_time_from = int(sys.argv[1]) - (86400000*15)
print(start_time_from)
payload = {
"start_time_from": start_time_from + 1,
"start_time_to": int(sys.argv[1]) - 3600000
}
print("Starting script")
data = json.dumps(payload)
lst_df = []
has_next_page = True
while has_next_page:
print("Calling api")
response = requests.get(url=url, data=data, headers=header)
has_next_page = response.json()['has_more']
if 'runs' in response.json():
print("Received response")
df = to_df(response.json()['runs'])
# Check if columns are missing in the first DataFrame
if lst_df and len(df.columns) > len(lst_df[0].columns):
missing_columns = set(list(df.columns)) - set(list(lst_df[0].columns))
for col in missing_columns:
lst_df[0] = lst_df[0].withColumn(col, F.lit(None).cast("string"))
lst_df.append(df)
if has_next_page and 'next_page_token' in response.json():
print("Next page")
next_page_token = response.json()['next_page_token']
payload = {
"start_time_from": start_time_from + 1,
"start_time_to": int(sys.argv[1]) - 3600000,
"page_token": next_page_token
}
data = json.dumps(payload)
print("Received full response")
# Union all the dataframe into
if lst_df:
print("Writing data")
final_df = reduce(DataFrame.unionAll, lst_df)
final_df.write.mode("append").option("mergeSchema", "true").saveAsTable("revefi.default.JobRunHistory")
print("Completed script")
Step 6: Add Databricks as a connection in Revefi
Finally now you can add your Databricks on Revefi. On the connections page, click the 'Add connection' and select Databricks as the source. The HostName, Port and HTTP Path fields come from the SQL Warehouse created in Step 4. The Access token comes from Step 1.
Updated 5 months ago