Documentation is a critical and tedious part of every project. However, it is essential to review existing developments or document new ones. When the Power BI API was initially released, I attempted to do similar things. I wanted to know how to use the API to obtain an inventory of a tenant – Power BI Template – Prathy’s Blog…. Now, I believe I am achieving the same goal but using my current favourite functionality, Fabric Notebooks.
In this blog post, I will discuss using Semantic Link and Semantic Labs to get an overview of workspaces and their contents within specified workspaces via Fabric Notebook. This is just a way of doing it; plenty of blogs discuss various things you could do with Semantic Link. Also, I want to use this to document what I have learned. I like how I can generate a Lakehouse and automatically create Delta Tables as needed.
Let’s begin. First things first, install all necessary libraries. I am using both Semantic Links and Semantic Link Labs.
! pip install semantic-link
! pip install semantic-link-labs
Also, import all required modules.
import sempy.fabric as fabric
import sempy_labs as sempy_labs
from sempy_labs import migration, report, directlake
from sempy_labs import lakehouse as lake
from sempy_labs.tom import connect_semantic_model
from pyspark.sql.functions import current_timestamp
import pandas as pd
The purpose is to define a Lakehouse and check if it already exists. If not, create one. Also, the notebook should be mounted to the new or existing Lakehouse so that all the data can be written.
# Define Lakehouse name and description. This will the LH where all documentation will be saved
LH_Name = "LH_Fabric_Documentation"
LH_desc = "Lakehouse for Fabric Documentation"
# Get current workspace details
current_workspace_id = fabric.get_workspace_id()
current_workspace_name = fabric.resolve_workspace_name(current_workspace_id)
print(f'Current workspace ID: {current_workspace_id}')
print(f'Current workspace name: {current_workspace_name}')
# Check if the Lakehouse already exists, if not, create it
# List existing lakehouses and check if the specified one already exists
lakehouse_list = sempy_labs.list_lakehouses()
if LH_Name in lakehouse_list['Lakehouse Name'].values:
print("Lakehouse already exists")
else:
# Create a new Lakehouse
mssparkutils.lakehouse.create(name=LH_Name, description=LH_desc, workspaceId=current_workspace_id)
print("Lakehouse created successfully")
# Mount the Lakehouse for direct file system access
lakehouse = mssparkutils.lakehouse.get(LH_Name)
mssparkutils.fs.mount(lakehouse.get("properties").get("abfsPath"), f"/{LH_Name}")
# Retrieve and store local and ABFS paths of the mounted Lakehouse
local_path = mssparkutils.fs.getMountPath(f"/{LH_Name}")
lh_abfs_path = lakehouse.get("properties").get("abfsPath")
print(f'Local path: {local_path}')
print(f'ABFS path: {lh_abfs_path}')
Next, I will list the workspaces I want to use. I am not giving a list of workspaces, but I want to work with more than one; this is where I would use it.
# Initialize the list of workspaces if it's empty populate with current workspace ID
list_of_workspaces = []
if not list_of_workspaces:
list_of_workspaces.append(fabric.get_workspace_id())
print("Number of workspaces: ", len(list_of_workspaces))
print(list_of_workspaces)
After obtaining the data, I transformed the column names to prevent errors in the Lakehouse. Then, I included a ‘load time’ column to easily track when a row was inserted in the SQL Endpoint View. Additionally, I created a separate table for distinct Fabric Item Types using the following approach.
# Iterate through each workspace
for Current_workspace_name in list_of_workspaces:
# Retrieve all items from the current fabric workspace
Fabric_all_items = fabric.list_items(workspace=Current_workspace_name)
# Transform column names for LH compatibility
Fabric_all_items.columns = Fabric_all_items.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)
Fabric_all_items.columns = Fabric_all_items.columns.str.replace('[ ]', '', regex=True)
# Create a Spark DataFrame from the fabric items and save it to the Lakehouse
sparkdf = spark.createDataFrame(Fabric_all_items)
sparkdf = sparkdf.withColumn("LoadDate", current_timestamp())
Table_Name = "R_fabric_all_items"
sparkdf.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{Table_Name}")
print(Table_Name, "created at :", f"{lh_abfs_path}/Tables/{Table_Name}")
# Extract and save distinct item types from fabric items
Fabric_all_items_type = Fabric_all_items['Type'].unique()
# Create a Spark DataFrame for item types and save it to the Lakehouse
spark_df_Fabric_all_items_type = spark.createDataFrame(Fabric_all_items_type)
spark_df_Fabric_all_items_type = spark_df_Fabric_all_items_type.withColumnRenamed('value', 'Type')
spark_df_Fabric_all_items_type = spark_df_Fabric_all_items_type.withColumn("LoadDate", current_timestamp())
Table_Name = "R_fabric_item_types"
spark_df_Fabric_all_items_type.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{Table_Name}")
print(Table_Name, "created at :", f"{lh_abfs_path}/Tables/{Table_Name}")
For this post, I am only interested in focusing on Semantic Models. I covered how I got Lakehouse’s info in the video at the end of the post. I retrieved more details about Semantic Models and scripted filters to extract Semantic Models from Fabric Items. Then, I looped through each model, gathered details, and appended the data into a delta table. Additionally, I created an extra table for Semantic Model Object Type and included a LoadDate timestamp for all tables. During filtering, I excluded the Report Usage Metrics Model as I couldn’t access it.
from delta.tables import DeltaTable
import pandas as pd
from pyspark.sql.functions import lit, current_timestamp
# Initialize an empty list to store semantic model objects
list_semantic_model_objects = []
# Filter the DataFrame to include only items of type 'SemanticModel'
df_semantic_models = Fabric_all_items[(Fabric_all_items['Type'] == 'SemanticModel') & (Fabric_all_items['DisplayName'] != 'Report Usage Metrics Model')]
sdf_df_semantic_models = spark.createDataFrame(df_semantic_models)
table_name = "R_semantic_models"
sdf_df_semantic_models.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
# Iterate through each semantic model, retrieve its objects, and append to the list
for _, row in df_semantic_models.iterrows():
dataset_name = row['DisplayName']
try:
semantic_model_objects = sempy_labs.list_semantic_model_objects(dataset_name, current_workspace_name)
except Exception as e:
print(f"Error fetching semantic model objects for {dataset_name}: {e}")
continue
if not semantic_model_objects.empty:
df_semantic_model_objects = pd.DataFrame(semantic_model_objects)
df_semantic_model_objects.columns = df_semantic_model_objects.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)
df_semantic_model_objects.columns = df_semantic_model_objects.columns.str.replace(' ', '', regex=True)
spark_df_semantic_models = spark.createDataFrame(df_semantic_model_objects)
spark_df_semantic_models = spark_df_semantic_models.withColumn("LoadDate", current_timestamp())
spark_df_semantic_models = spark_df_semantic_models.withColumn("WorkspaceName", lit(current_workspace_name))
spark_df_semantic_models = spark_df_semantic_models.withColumn("DatasetName", lit(dataset_name))
table_name = "R_semantic_model_objects"
spark_df_semantic_models.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
print(f"{table_name} created/updated at: {lh_abfs_path}/Tables/{table_name} for {dataset_name}")
# Extract and save distinct item types from fabric items
df_semantic_model_objects_type = df_semantic_model_objects['ObjectType'].unique()
# Create a Spark DataFrame for item types and save it to the Lakehouse
spark_df_semantic_model_objects_type = spark.createDataFrame(df_semantic_model_objects_type)
spark_df_semantic_model_objects_type = spark_df_semantic_model_objects_type.withColumn("LoadDate", current_timestamp())
table_name = "R_semantic_model_object_types"
spark_df_semantic_model_objects_type.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
print(f"{table_name} created at: {lh_abfs_path}/Tables/{table_name}")
After this, when I go to my workspace, I should see my lakehouse and tables underneath. With a quick semantic model and Power BI Reports, I can see more details of all semantic models in my workspace.
To take it further, I would run Vertipaq Analyzer next. I have covered how I work with Vertipaq Analyzer and how I gather details about Lakehouses in the following YouTube video – 🎥 – https://youtu.be/x4OXHSk5Zpo. You can also access the notebook here – 👩🏾💻 – Document-MS-Fabric/NB_Document_current_workspace (1).ipynb at main · prathyusha-kamasani/Document-MS-Fabric (github.com)
I hope it will be helpful to someone out there. I certainly enjoyed it and learned a lot while developing this code. And of course, special Thanks to Michel for always helping 🙂
Until Next time 🙂