[ ]:
# this is a hidden cell. It will not show on the documentation HTML.
import os
from vespa.package import (
HNSW,
Document,
Field,
Schema,
FieldSet,
# SecondPhaseRanking,
RankProfile,
ApplicationPackage,
QueryProfile,
QueryProfileType,
QueryTypeField
)
from vespa.deployment import VespaDocker
class QuestionAnswering(ApplicationPackage):
def __init__(self, name: str = "qa"):
context_document = Document(
fields=[
Field(
name="questions",
type="array<int>",
indexing=["summary", "attribute"],
),
Field(name="dataset", type="string", indexing=["summary", "attribute"]),
Field(name="context_id", type="int", indexing=["summary", "attribute"]),
Field(
name="text",
type="string",
indexing=["summary", "index"],
index="enable-bm25",
),
]
)
context_schema = Schema(
name="context",
document=context_document,
fieldsets=[FieldSet(name="default", fields=["text"])],
rank_profiles=[
RankProfile(name="bm25", inherits="default", first_phase="bm25(text)"),
RankProfile(
name="nativeRank",
inherits="default",
first_phase="nativeRank(text)",
),
],
)
sentence_document = Document(
inherits="context",
fields=[
Field(
name="sentence_embedding",
type="tensor<float>(x[512])",
indexing=["attribute", "index"],
ann=HNSW(
distance_metric="euclidean",
max_links_per_node=16,
neighbors_to_explore_at_insert=500,
),
)
],
)
sentence_schema = Schema(
name="sentence",
document=sentence_document,
fieldsets=[FieldSet(name="default", fields=["text"])],
rank_profiles=[
RankProfile(
name="semantic-similarity",
inherits="default",
first_phase="closeness(sentence_embedding)",
),
RankProfile(name="bm25", inherits="default", first_phase="bm25(text)"),
RankProfile(
name="bm25-semantic-similarity",
inherits="default",
first_phase="bm25(text) + closeness(sentence_embedding)",
),
],
)
super().__init__(
name=name,
schema=[context_schema, sentence_schema],
query_profile=QueryProfile(),
query_profile_type=QueryProfileType(
fields=[
QueryTypeField(
name="ranking.features.query(query_embedding)",
type="tensor<float>(x[512])",
)
]
),
)
app_package = QuestionAnswering()
vespa_docker = VespaDocker()
app = vespa_docker.deploy(application_package=app_package)
Waiting for configuration server, 0/300 seconds...
Waiting for configuration server, 5/300 seconds...
Waiting for configuration server, 10/300 seconds...
Waiting for application status, 0/300 seconds...
Waiting for application status, 5/300 seconds...
Waiting for application status, 10/300 seconds...
Waiting for application status, 15/300 seconds...
Waiting for application status, 20/300 seconds...
Waiting for application status, 25/300 seconds...
Waiting for application status, 30/300 seconds...
Finished deployment.
Exchange data with applications
Feed, get, update and delete operations
We will use the question answering (QA) app to demonstrate ways to feed data to an application. We start by downloading sample data.
[ ]:
import json, requests
sentence_data = json.loads(
requests.get("https://data.vespa.oath.cloud/blog/qa/sample_sentence_data_100.json").text
)
list(sentence_data[0].keys())
['text', 'dataset', 'questions', 'context_id', 'sentence_embedding']
We assume that app
holds a Vespa connection instance to the desired Vespa application.
Feed data
We can either feed a batch of data for convenience or feed individual data points for increased control.
Batch
We need to prepare the data as a list of dicts having the id
key holding a unique id of the data point and the fields
key holding a dict with the data fields.
[ ]:
batch_feed = [
{
"id": idx,
"fields": sentence
}
for idx, sentence in enumerate(sentence_data)
]
We then feed the batch to the desired schema using the feed_batch method.
[ ]:
response = app.feed_batch(schema="sentence", batch=batch_feed)
Successful documents fed: 100/100.
Batch progress: 1/1.
Individual data points
Synchronous
Syncronously feeding individual data points is similar to batch feeding, except that you have more control when looping through your dataset.
[ ]:
response = []
for idx, sentence in enumerate(sentence_data):
response.append(
app.feed_data_point(schema="sentence", data_id=idx, fields=sentence)
)
Asynchronous
app.asyncio()
returns a VespaAsync
instance that contains async operations such as feed_data_point
. Using the async with
context manager ensures that we open and close the appropriate connections required for async feeding.
[ ]:
async with app.asyncio() as async_app:
response = await async_app.feed_data_point(
schema="sentence",
data_id=idx,
fields=sentence,
)
We can then use asyncio constructs like create_task
and wait
to create different types of asynchronous flows like the one below.
[ ]:
from asyncio import create_task, wait, ALL_COMPLETED
async with app.asyncio() as async_app:
feed = []
for idx, sentence in enumerate(sentence_data):
feed.append(
create_task(
async_app.feed_data_point(
schema="sentence",
data_id=idx,
fields=sentence,
)
)
)
await wait(feed, return_when=ALL_COMPLETED)
response = [x.result() for x in feed]
Get data
Similarly to the examples about feeding, we can get a batch of data for convenience or get individual data points for increased control.
Batch
We need to prepare the data as a list of dicts having the id
key holding a unique id of the data point. We then get the batch from the desired schema using the get_batch method.
[ ]:
batch = [{"id": idx} for idx, sentence in enumerate(sentence_data)]
response = app.get_batch(schema="sentence", batch=batch)
Individual data points
We can get individual data points synchronously or asynchronously.
Synchronous
[ ]:
response = app.get_data(schema="sentence", data_id=0)
Asynchronous
[ ]:
async with app.asyncio() as async_app:
response = await async_app.get_data(schema="sentence",data_id=0)
Update data
Similarly to the examples about feeding, we can update a batch of data for convenience or update individual data points for increased control.
Batch
We need to prepare the data as a list of dicts having the id
key holding a unique id of the data point, the fields
key holding a dict with the fields to be updated and an optional create
key with a boolean value to indicate if a data point should be created in case it does not exist (default to False
).
[ ]:
batch_update = [
{
"id": idx, # data_id
"fields": sentence, # fields to be updated
"create": True # Optional. Create data point if not exist, default to False.
}
for idx, sentence in enumerate(sentence_data)
]
We then update the batch on the desired schema using the update_batch method.
[ ]:
response = app.update_batch(schema="sentence", batch=batch_update)
Individual data points
We can update individual data points synchronously or asynchronously.
Synchronous
[ ]:
response = app.update_data(schema="sentence", data_id=0, fields=sentence_data[0], create=True)
Asynchronous
[ ]:
async with app.asyncio() as async_app:
response = await async_app.update_data(schema="sentence",data_id=0, fields=sentence_data[0], create=True)
Delete data
Similarly to the examples about feeding, we can delete a batch of data for convenience or delete individual data points for increased control.
Batch
We need to prepare the data as a list of dicts having the id
key holding a unique id of the data point. We then delete the batch from the desired schema using the delete_batch method.
[ ]:
batch = [{"id": idx} for idx, sentence in enumerate(sentence_data)]
response = app.delete_batch(schema="sentence", batch=batch)
Individual data points
We can delete individual data points synchronously or asynchronously.
Synchronous
[ ]:
response = app.delete_data(schema="sentence", data_id=0)
Asynchronous
[ ]:
async with app.asyncio() as async_app:
response = await async_app.delete_data(schema="sentence",data_id=0)
[ ]:
# this is a hidden cell. It will not show on the documentation HTML.
vespa_docker.container.stop()
vespa_docker.container.remove()