Gerenciar recursos de computação e dados do Qiskit Serverless
Versões dos pacotes
O código nesta página foi desenvolvido usando os seguintes requisitos. Recomendamos usar estas versões ou mais recentes.
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Com o Qiskit Serverless, você pode gerenciar computação e dados em todo o seu padrão Qiskit, incluindo CPUs, QPUs e outros aceleradores de computação.
Definir status detalhados
As cargas de trabalho Serverless passam por várias etapas em um fluxo de trabalho. Por padrão, os seguintes status podem ser visualizados com job.status():
QUEUED: a carga de trabalho está na fila para recursos clássicosINITIALIZING: a carga de trabalho está sendo configuradaRUNNING: a carga de trabalho está sendo executada nos recursos clássicosDONE: a carga de trabalho foi concluída com sucesso
Você também pode definir status personalizados que descrevem melhor a etapa específica do fluxo de trabalho, conforme mostrado a seguir.
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
Após a conclusão bem-sucedida desta carga de trabalho (com save_result()), este status será atualizado para DONE automaticamente.
Fluxos de trabalho paralelos
Para tarefas clássicas que podem ser paralelizadas, use o decorator @distribute_task para definir os requisitos de computação necessários para executar uma tarefa. Comece relembrando o exemplo transpile_remote.py do tópico Escreva seu primeiro programa Qiskit Serverless com o código a seguir.
O código a seguir requer que você já tenha salvo suas credenciais.
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
Neste exemplo, você decorou a função transpile_remote() com @distribute_task(target={"cpu": 1}). Quando executado, isso cria uma tarefa worker paralela assíncrona com um único núcleo de CPU e retorna com uma referência para rastrear o worker. Para buscar o resultado, passe a referência para a função get(). Podemos usar isso para executar várias tarefas paralelas:
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Explorar diferentes configurações de tarefas
Você pode alocar CPU, GPU e memória de forma flexível para suas tarefas via @distribute_task(). Para o Qiskit Serverless na plataforma IBM Quantum®, cada programa é equipado com 16 núcleos de CPU e 32 GB de RAM, que podem ser alocados dinamicamente conforme necessário.
Os núcleos de CPU podem ser alocados como núcleos completos ou até mesmo em alocações fracionárias, conforme mostrado a seguir.
A memória é alocada em número de bytes. Lembre que há 1024 bytes em um kilobyte, 1024 kilobytes em um megabyte e 1024 megabytes em um gigabyte. Para alocar 2 GB de memória para seu worker, você precisa alocar "mem": 2 * 1024 * 1024 * 1024.
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Gerenciar dados em seu programa
O Qiskit Serverless permite que você gerencie arquivos no diretório /data em todos os seus programas. Isso inclui algumas limitações:
- Somente arquivos
tareh5são suportados atualmente - Este é apenas um armazenamento
/dataplano e não pode ter subdiretórios/data/folder/
O exemplo a seguir mostra como fazer upload de arquivos. Certifique-se de ter se autenticado no Qiskit Serverless com sua conta IBM Quantum (consulte Implantar na plataforma IBM Quantum para instruções).
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
Em seguida, você pode listar todos os arquivos no seu diretório data. Esses dados são acessíveis a todos os programas.
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
Isso pode ser feito a partir de um programa usando file_download() para baixar o arquivo para o ambiente do programa e descompactando o tar.
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
Neste ponto, seu programa pode interagir com os arquivos como faria em um experimento local. file_upload(), file_download() e file_delete() podem ser chamados a partir do seu experimento local ou do seu programa carregado, para um gerenciamento de dados consistente e flexível.
Próximos passos
- Veja um exemplo completo que migra código existente para o Qiskit Serverless.
- Leia um artigo em que pesquisadores usaram o Qiskit Serverless e computação quântica centrada em supercomputadores para explorar química quântica.