Fabric REST API — Data pipelines
[Authors: Sean Mirabile and Aitor Murguzur]
The initial blog post in the Fabric REST API series focused on the OneLake shortcuts API. This post outlines the various Fabric Data Factory data pipelines REST API endpoints and how they can be utilized.
Data pipeline REST API usage examples
Use the following instructions to test usage examples for specific data pipeline public APIs and verify the results.
+--------+------------------------+--------------------------------------+
| Method | Action | Description |
+--------+------------------------+--------------------------------------+
| POST | Create | Creates a data pipeline |
| DELETE | Delete | Deletes a data pipeline |
| GET | Get item | Gets the metadata of a pipeline |
| POST | Get item definition | Gets the content of a pipeline |
| GET | List | Lists all data pipelines |
| PATCH | Update item | Updates the metadata of a pipeline |
| POST | Update item definition | Updates the content of a pipeline |
| POST | Run instance | Runs data pipeline instance |
| GET | Cancel instance | Cancels data pipeline instance |
| POST | Get instance | Gets data pipeline instance |
+--------+------------------------+--------------------------------------+
Prerequisites
These are the prerequisites to make the examples work:
- Create a Fabric workspace if you don’t have one.
After creating the workspace, first run the code below in a notebook.
sc.addPyFile('https://gist.githubusercontent.com/murggu/09d7befcb157011c340c51cb5d4af42f/raw/b4bdb054c110a6811b184a2f993f9f7a23ecda45/invoke_fabric_api.py')
from invoke_fabric_api import *
workspace_id = mssparkutils.runtime.context['currentWorkspaceId']
item_type = "DataPipeline"
Note: if you find any issues importing the invoke_fabric_api.py
, go here > click on Raw > replace the URL.
Create data pipeline
You can create a data pipeline with or without an item definition (i.e., the content or payload of the data pipeline).
- Create a data pipeline without an item definition: in this case, only the data pipeline name, type, and description (optional) are specified.
# without item definition
method = "post"
uri = f"workspaces/{workspace_id}/items"
payload = {
"displayName": "pip_rest_api_1",
"type": item_type,
"description": "fabric rest api 101"
}
invoke_fabric_api_request(method, uri, payload)
∟ Response 201: metadata is returned.
- Create a data pipeline with an item definition: you can create a data pipeline with a base64 encoded item definition payload. Use base64 to encode and decode your json. If you have an existing data pipeline, you can get the pipeline definition from the Fabric UI: open a data pipeline > View > View JSON code. The example below creates an empty pipeline.
# with item definition (payload)
method = "post"
uri = f"workspaces/{workspace_id}/items"
payload = {
"displayName": "pip_rest_api_2",
"type": item_type,
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": "ewogICAgIm5hbWUiOiAicGlwZWxpbmUxIiwKICAgICJvYmplY3RJZCI6ICI0YWRlZjRlMC1hYWU5LTQ2ZmEtOTE1My0xYTdhNjBkMzJiMzkiLAogICAgInByb3BlcnRpZXMiOiB7CiAgICAgICAgImFjdGl2aXRpZXMiOiBbXSwKICAgICAgICAiYW5ub3RhdGlvbnMiOiBbXSwKICAgICAgICAibGFzdE1vZGlmaWVkQnlPYmplY3RJZCI6ICIzOTVkNDcyMi1lMDM0LTRmZTUtYTk1Yy01MjVkNWIwYWRlZWIiLAogICAgICAgICJsYXN0UHVibGlzaFRpbWUiOiAiMjAyNC0wNC0xMFQxMTozNTo1NFoiCiAgICB9Cn0=",
"payloadType": "InlineBase64"
}
]
}
}
invoke_fabric_api_request(method, uri, payload)
∟ Response 201: only metadata is returned, payload is not included in the response.
Delete data pipeline
Use the following endpoint to delete a data pipeline:
item_id = "<data_pipeline_id>"
method = "delete"
uri = f"workspaces/{workspace_id}/items/{item_id}"
invoke_fabric_api_request(method, uri)
∟ Response 200: no body is returned. After deleting a pipeline, you can list pipelines to verify results.
Get data pipeline
To obtain data pipeline metadata and content, follow these steps:
- Get metadata (get item): returns properties of a data pipeline — id, type, displayName, description and workspaceId.
item_id = "<data_pipeline_id>"
method = "get"
uri = f"workspaces/{workspace_id}/items/{item_id}"
invoke_fabric_api_request(method, uri)
∟ Response 200: the response the data pipeline metadata.
- Get content (get item definition): use the following to get the data pipeline content, i.e. base64 encoded payload.
item_id = "<data_pipeline_id>"
method = "post"
uri = f"workspaces/{workspace_id}/items/{item_id}/getDefinition"
invoke_fabric_api_request(method, uri)
∟ Response 200: the response only includes the item definition part, with the base64 encoded payload.
List data pipelines
To list all data pipelines in a workspace, use:
method = "get"
uri = f"workspaces/{workspace_id}/items?type={item_type}"
invoke_fabric_api_request(method, uri)
Update data pipeline
- Update metadata: to update the metadata, obtain a data pipeline ID. In the example below, both the data pipeline name and description are changed.
item_id = "<data_pipeline_id>"
method = "patch"
uri = f"workspaces/{workspace_id}/items/{item_id}"
payload = {
"displayName": "pip_rest_api_2_updated",
"type": item_type,
"description": "fabric rest api 101 updated"
}
invoke_fabric_api_request(method, uri, payload)
∟ Response 200: response includes metadata.
- Update content: you can also modify the data pipeline content by updating the payload. Updating the item’s definition or payload does not affect its name or sensitivity label. The following example illustrates updating an empty data pipeline with a notebook activity.
item_id = "<data_pipeline_id>"
method = "post"
uri = f"workspaces/{workspace_id}/items/{item_id}/updateDefinition"
payload = {
"displayName": "pip_rest_api_2",
"type": item_type,
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": "ewogICAgIm5hbWUiOiAicGlwX3Jlc3RfYXBpXzJfdXBkYXRlZCIsCiAgICAib2JqZWN0SWQiOiAiMmU4MGU2MTMtYjAzNi00YjBjLWI2ODgtOTNiMDNiNzk4NDIzIiwKICAgICJwcm9wZXJ0aWVzIjogewogICAgICAgICJkZXNjcmlwdGlvbiI6ICJmYWJyaWMgcmVzdCBhcGkgMTAxIHVwZGF0ZWQiLAogICAgICAgICJhY3Rpdml0aWVzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJuYl9wYXJhbXNfdGVzdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJUcmlkZW50Tm90ZWJvb2siLAogICAgICAgICAgICAgICAgImRlcGVuZHNPbiI6IFtdLAogICAgICAgICAgICAgICAgInBvbGljeSI6IHsKICAgICAgICAgICAgICAgICAgICAidGltZW91dCI6ICIwLjEyOjAwOjAwIiwKICAgICAgICAgICAgICAgICAgICAicmV0cnkiOiAwLAogICAgICAgICAgICAgICAgICAgICJyZXRyeUludGVydmFsSW5TZWNvbmRzIjogMzAsCiAgICAgICAgICAgICAgICAgICAgInNlY3VyZU91dHB1dCI6IGZhbHNlLAogICAgICAgICAgICAgICAgICAgICJzZWN1cmVJbnB1dCI6IGZhbHNlCiAgICAgICAgICAgICAgICB9LAogICAgICAgICAgICAgICAgInR5cGVQcm9wZXJ0aWVzIjogewogICAgICAgICAgICAgICAgICAgICJub3RlYm9va0lkIjogImNmMzllN2U1LTI4YzAtNDY2ZS04NWFlLWMzMjk0ODM3MmMyNyIsCiAgICAgICAgICAgICAgICAgICAgIndvcmtzcGFjZUlkIjogImRjMjQ3MTBkLTU5OGEtNDRkMi1iYWFkLTgzNWM5N2JjOWNjZCIsCiAgICAgICAgICAgICAgICAgICAgInBhcmFtZXRlcnMiOiB7CiAgICAgICAgICAgICAgICAgICAgICAgICJwYXJhbTEiOiB7CiAgICAgICAgICAgICAgICAgICAgICAgICAgICAidmFsdWUiOiB7CiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgInZhbHVlIjogIkBwaXBlbGluZSgpLnBhcmFtZXRlcnMucGFyYW0xIiwKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAidHlwZSI6ICJFeHByZXNzaW9uIgogICAgICAgICAgICAgICAgICAgICAgICAgICAgfSwKICAgICAgICAgICAgICAgICAgICAgICAgICAgICJ0eXBlIjogInN0cmluZyIKICAgICAgICAgICAgICAgICAgICAgICAgfQogICAgICAgICAgICAgICAgICAgIH0KICAgICAgICAgICAgICAgIH0KICAgICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgInBhcmFtZXRlcnMiOiB7CiAgICAgICAgICAgICJwYXJhbTEiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJzdHJpbmciLAogICAgICAgICAgICAgICAgImRlZmF1bHRWYWx1ZSI6ICIwMDIiCiAgICAgICAgICAgIH0KICAgICAgICB9LAogICAgICAgICJhbm5vdGF0aW9ucyI6IFtdLAogICAgICAgICJsYXN0TW9kaWZpZWRCeU9iamVjdElkIjogIjM5NWQ0NzIyLWUwMzQtNGZlNS1hOTVjLTUyNWQ1YjBhZGVlYiIsCiAgICAgICAgImxhc3RQdWJsaXNoVGltZSI6ICIyMDI0LTA0LTE3VDEyOjA3OjI1WiIKICAgIH0KfQ==",
"payloadType": "InlineBase64"
}
]
},
"description": "fabric rest api 101"
}
invoke_fabric_api_request(method, uri, payload)
∟ Response 200: there is no body returned at the moment. You can get item definition to see updated payload.
Run data pipeline instance
To run a data pipeline, you can use the Job Scheduler API. Fabric supports passing parameters in the request body to parametrize the data pipeline run. In the example below, we’re running a data pipeline with a notebook activity and using a parameter named param1
, which is passed to the notebook (using a toggle parameter cell).
item_id = "<data_pipeline_id>"
job_type = "Pipeline"
method = "post"
uri = f"workspaces/{workspace_id}/items/{item_id}/jobs/instances?jobType={job_type}"
payload = {
"executionData": {
"parameters": {
"param1": "101"
}
}
}
invoke_fabric_api_request(method, uri, payload)
∟ Response 202: at the moment, there is no body returned. Instead, the job ID is returned in the headers, accessible through the “Location” property. This allows for easy retrieval and tracking of job status.
Get data pipeline instance
To retrieve the status of a data pipeline instance, use:
item_id = "<data_pipeline_id>"
job_instance_id = "<job_instance_id>"
method = "get"
uri = f"workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}"
invoke_fabric_api_request(method, uri)
∟ Response 200: the result does not show the output of of data pipeline run for now, e.g. input/output.
Cancel data pipeline instance
To cancel a data pipeline instance run, use:
item_id = "<data_pipeline_id>"
job_instance_id = "<job_instance_id>"
method = "post"
uri = f"workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}/cancel"
invoke_fabric_api_request(method, uri)
∟ Response 202: no body is returned, the headers include “Location” property. After cancelling you can check the status either by calling get data pipeline instance or check the Run > View run history in the Fabric UI.
Considerations
- Data pipeline API rate limit is 10 requests per minute, per operation, per user.
- At present, scheduling a data pipeline or managing data pipeline connections via the REST API is not supported.
- You can also use data pipeline item specific endpoints
{workspaceId}/dataPipelines/{dataPipelineId}
.