適用於:
適用於 Python 的 Azure Machine Learning SDK v1
重要事項
本文提供使用 Azure Machine Learning SDK v1 的相關信息。 SDK v1 自 2025 年 3 月 31 日起已被取代。 其支援將於 2026 年 6 月 30 日結束。 您可以在該日期之前安裝並使用 SDK v1。 您使用 SDK v1 的現有工作流程將在支援終止日期後繼續運作。 不過,如果產品發生架構變更,它們可能會面臨安全性風險或重大變更。
建議您在 2026 年 6 月 30 日之前轉換至 SDK v2。 如需 SDK v2 的詳細資訊,請參閱 什麼是 Azure Machine Learning CLI 和 Python SDK v2? 和 SDK v2 參考。
本文說明如何與同事或客戶共用機器學習管線。
機器學習管線是用於機器學習工作的可重複使用工作流程。 管線的其中一個優點是可加強共同作業。 您也可以將管線進行版本管理,從而在您開發新版本的同時,讓客戶使用當前的模型。
先決條件
建立 Azure Machine Learning 工作區 以包含您的管線資源。
安裝 Azure Machine Learning SDK,或使用已安裝 SDK 的 Azure Machine Learning 計算實例來設定您的開發環境。
建立並執行機器學習管線。 符合此需求的其中一種方式是完成 教學課程:建置 Azure Machine Learning 管線以進行批次評分。 如需其他選項,請參閱 使用 Azure Machine Learning SDK 建立和執行機器學習管線。
發佈管線
在擁有執行中的管道之後,您可以發佈它,使其能以不同的輸入資料運行。 若要讓已發佈管線的 REST 端點接收參數,您必須將管線設定為使用針對會變動之參數的 PipelineParameter 物件。
若要建立管線參數,請使用具有預設值的 PipelineParameter 物件:
from azureml.pipeline.core.graph import PipelineParameter pipeline_param = PipelineParameter( name="pipeline_arg", default_value=10)將
PipelineParameter物件新增為管線中任何步驟的參數,如下所示:compareStep = PythonScriptStep( script_name="compare.py", arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param], inputs=[ comp_data1, comp_data2], outputs=[out_data3], compute_target=compute_target, source_directory=project_folder)發佈此管線,此管線會在叫用時接受參數:
published_pipeline1 = pipeline_run1.publish_pipeline( name="My_Published_Pipeline", description="My Published Pipeline Description", version="1.0")發佈管道後,即可在 UI 中加以檢查。 管線標識碼 是已發佈管線的唯一標識碼。
執行已發佈的管線
所有已發佈的管線都有 REST 端點。 藉由使用管線端點,您可以從外部系統觸發管線的執行,包括非 Python 用戶端。 此端點可在批次評分和重新訓練案例中,提供「受控的可重複性」。
重要事項
如果您使用 Azure 角色型存取控制 (RBAC) 來管理管線的存取,請設定您管線場景的權限(訓練或評分)。
若要叫用先前的管線執行,您需要一個 Microsoft Entra 驗證標頭權杖。 權杖的取得程序說明位於 AzureCliAuthentication 類別參考和 Azure Machine Learning 中的驗證筆記本中。
from azureml.pipeline.core import PublishedPipeline
import requests
response = requests.post(published_pipeline1.endpoint,
headers=aad_token,
json={"ExperimentName": "My_Pipeline",
"ParameterAssignments": {"pipeline_arg": 20}})
針對 json 索引鍵,POST 要求的 ParameterAssignments 引數必須包含字典,字典中有管線參數和其值。 此外,json 參數可以包含下列鍵值:
| 鑰匙 | 描述 |
|---|---|
ExperimentName |
與端點相關聯的實驗名稱。 |
Description |
自由格式文本描述端點。 |
Tags |
可用來標記和註解請求的自由形式鍵值對。 |
DataSetDefinitionValueAssignments |
不需重新訓練即可用於更改數據集的字典。 (請參閱本文稍後的討論。 |
DataPathAssignments |
詞典,用於變更資料路徑而不需重新訓練。 (請參閱本文稍後的討論。 |
透過使用 C# 執行已發佈的管線
下列程式碼示範如何以非同步方式從 C# 呼叫管線。 部分代碼段只會顯示呼叫結構。 不會顯示完整的類別或錯誤處理。 這不是Microsoft範例的一部分。
[DataContract]
public class SubmitPipelineRunRequest
{
[DataMember]
public string ExperimentName { get; set; }
[DataMember]
public string Description { get; set; }
[DataMember(IsRequired = false)]
public IDictionary<string, string> ParameterAssignments { get; set; }
}
// ... in its own class and method ...
const string RestEndpoint = "your-pipeline-endpoint";
using (HttpClient client = new HttpClient())
{
var submitPipelineRunRequest = new SubmitPipelineRunRequest()
{
ExperimentName = "YourExperimentName",
Description = "Asynchronous C# REST api call",
ParameterAssignments = new Dictionary<string, string>
{
{
// Replace with your pipeline parameter keys and values
"your-pipeline-parameter", "default-value"
}
}
};
string auth_key = "your-auth-key";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);
// Submit the job
var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
if (!submitResponse.IsSuccessStatusCode)
{
await WriteFailedResponse(submitResponse); // ... method not shown ...
return;
}
var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
var obj = JObject.Parse(result);
// ... use `obj` dictionary to access results
}
藉由使用 Java 執行已發佈的管線
下列程式代碼顯示對需要驗證之管線的呼叫。 (請參閱 設定 Azure Machine Learning 資源和工作流程的驗證。如果您的管線已公開部署,則不需要產生 authKey的呼叫。 部分程式碼片段不會顯示 Java 類別和例外狀況處理的樣板。 程序代碼會使用 Optional.flatMap 將可能會傳回空白 Optional的函式鏈結在一起。 使用 flatMap 可縮短並清楚說明程式碼,但請注意,getRequestBody() 會容忍例外狀況。
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;
String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";
HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();
HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token));
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc. (`scoringResult.orElse()`) ...
static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
String clientIdParam = String.format("client_id=%s", clientId);
String resourceParam = String.format("resource=%s", resourceManagerUrl);
String clientSecretParam = String.format("client_secret=%s", clientSecret);
String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(authUrl))
.POST(HttpRequest.BodyPublishers.ofString(bodyString))
.build();
return request;
}
static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(scoringUri))
.header("Authorization", String.format("Token %s", authKey))
.POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
.build();
return request;
}
static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
System.out.println(String.format("Unexpected server response %d", response.statusCode()));
return Optional.empty();
}
return Optional.of(response.body());
}catch(Exception x)
{
System.out.println(x.toString());
return Optional.empty();
}
}
class AuthenticationBody {
String access_token;
String token_type;
int expires_in;
String scope;
String refresh_token;
String id_token;
AuthenticationBody() {}
}
在不重新訓練的情況下變更數據集和數據路徑
您可以在不同的資料集和資料路徑上進行訓練和推斷。 例如,您可能想要在較小的數據集上訓練,但在完整數據集上進行推論。 您可以使用請求的 DataSetDefinitionValueAssignments 參數中的 json 鍵來切換數據集。 您可以使用DataPathAssignments來切換資料路徑。 這兩種技術都類似:
在您的管線定義指令碼中,為資料集建立
PipelineParameter。 從DatasetConsumptionConfig中建立DataPath或PipelineParameter:tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv') tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset) tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)在您的機器學習文稿中,使用
Run.get_context().input_datasets來存取動態指定的數據集:from azureml.core import Run input_tabular_ds = Run.get_context().input_datasets['tabular_dataset'] dataframe = input_tabular_ds.to_pandas_dataframe() # ... etc. ...請注意,機器學習腳本會存取針對
DatasetConsumptionConfig(tabular_dataset)指定的值,而不是PipelineParameter(tabular_ds_param)的值。在您的管線定義腳本中,將
DatasetConsumptionConfig設為PipelineScriptStep的參數。train_step = PythonScriptStep( name="train_step", script_name="train_with_dataset.py", arguments=["--param1", tabular_ds_consumption], inputs=[tabular_ds_consumption], compute_target=compute_target, source_directory=source_directory) pipeline = Pipeline(workspace=ws, steps=[train_step])若要在推斷 REST 呼叫中動態切換資料集,請使用
DataSetDefinitionValueAssignments:tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset') tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset') ds1_id = tabular_ds1.id d22_id = tabular_ds2.id response = requests.post(rest_endpoint, headers=aad_token, json={ "ExperimentName": "MyRestPipeline", "DataSetDefinitionValueAssignments": { "tabular_ds_param": { "SavedDataSetReference": {"Id": ds1_id #or ds2_id }}}})
展示資料集和 PipelineParameter 及展示資料路徑和 PipelineParameter 的筆記本中都包含這項技術的完整範例。
建立已設定版本的管線端點
您可以建立管線端點,在其後方擁有多個已發佈的管線。 這項技術可讓您在反覆運算和更新機器學習管線時,提供固定的 REST 端點。
from azureml.pipeline.core import PipelineEndpoint
published_pipeline = PublishedPipeline.get(workspace=ws, id="My_Published_Pipeline_id")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
pipeline=published_pipeline, description="Test description Notebook")
將作業提交至管線端點
您可以將作業提交至管線端點的預設版本:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)
也可以將作業提交至特定版本:
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)
您可以使用 REST API 來完成相同的作業:
rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint,
headers=aad_token,
json={"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"1": "united", "2":"city"}})
在工作室中使用已發佈的管線
您也可以從工作室執行已發佈的管線:
在左側功能表中,選取 [端點]。
選取 管線端點:
選取特定管線,以執行、取用或檢閱管線端點上先前執行的結果。
停用已發佈的管線
若要從已發佈的管線清單中隱藏管線,您可以在 Studio 中或透過 SDK 加以停用:
# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()
您可以再次使用 p.enable() 啟用它。 如需詳細資訊,請參閱 PublishedPipeline 類別 參考。
後續步驟
- 使用 GitHub 上的這些 Jupyter Notebook 來進一步探索機器學習管線。
- 請參閱 azureml-pipelines-core 套件和 azureml-pipelines-steps 套件的 SDK 參考。
- 如需有關管線偵錯和疑難排解的秘訣,請參閱管線偵錯操作說明。