[ ]:
# this is a hidden cell. It will not show on the documentation HTML.
import os
from vespa.package import (
    HNSW,
    Document,
    Field,
    Schema,
    FieldSet,
#    SecondPhaseRanking,
    RankProfile,
    ApplicationPackage,
    QueryProfile,
    QueryProfileType,
    QueryTypeField
)

from vespa.deployment import VespaDocker

class QuestionAnswering(ApplicationPackage):
    def __init__(self, name: str = "qa"):
        context_document = Document(
            fields=[
                Field(
                    name="questions",
                    type="array<int>",
                    indexing=["summary", "attribute"],
                ),
                Field(name="dataset", type="string", indexing=["summary", "attribute"]),
                Field(name="context_id", type="int", indexing=["summary", "attribute"]),
                Field(
                    name="text",
                    type="string",
                    indexing=["summary", "index"],
                    index="enable-bm25",
                ),
            ]
        )
        context_schema = Schema(
            name="context",
            document=context_document,
            fieldsets=[FieldSet(name="default", fields=["text"])],
            rank_profiles=[
                RankProfile(name="bm25", inherits="default", first_phase="bm25(text)"),
                RankProfile(
                    name="nativeRank",
                    inherits="default",
                    first_phase="nativeRank(text)",
                ),
            ],
        )
        sentence_document = Document(
            inherits="context",
            fields=[
                Field(
                    name="sentence_embedding",
                    type="tensor<float>(x[512])",
                    indexing=["attribute", "index"],
                    ann=HNSW(
                        distance_metric="euclidean",
                        max_links_per_node=16,
                        neighbors_to_explore_at_insert=500,
                    ),
                )
            ],
        )
        sentence_schema = Schema(
            name="sentence",
            document=sentence_document,
            fieldsets=[FieldSet(name="default", fields=["text"])],
            rank_profiles=[
                RankProfile(
                    name="semantic-similarity",
                    inherits="default",
                    first_phase="closeness(sentence_embedding)",
                ),
                RankProfile(name="bm25", inherits="default", first_phase="bm25(text)"),
                RankProfile(
                    name="bm25-semantic-similarity",
                    inherits="default",
                    first_phase="bm25(text) + closeness(sentence_embedding)",
                ),
            ],
        )
        super().__init__(
            name=name,
            schema=[context_schema, sentence_schema],
            query_profile=QueryProfile(),
            query_profile_type=QueryProfileType(
                fields=[
                    QueryTypeField(
                        name="ranking.features.query(query_embedding)",
                        type="tensor<float>(x[512])",
                    )
                ]
            ),
        )

app_package = QuestionAnswering()
vespa_docker = VespaDocker()
app = vespa_docker.deploy(application_package=app_package)
Waiting for configuration server, 0/300 seconds...
Waiting for configuration server, 5/300 seconds...
Waiting for configuration server, 10/300 seconds...
Waiting for application status, 0/300 seconds...
Waiting for application status, 5/300 seconds...
Waiting for application status, 10/300 seconds...
Waiting for application status, 15/300 seconds...
Waiting for application status, 20/300 seconds...
Waiting for application status, 25/300 seconds...
Waiting for application status, 30/300 seconds...
Finished deployment.

Exchange data with applications

Feed, get, update and delete operations

We will use the question answering (QA) app to demonstrate ways to feed data to an application. We start by downloading sample data.

[ ]:
import json, requests

sentence_data = json.loads(
    requests.get("https://data.vespa.oath.cloud/blog/qa/sample_sentence_data_100.json").text
)
list(sentence_data[0].keys())
['text', 'dataset', 'questions', 'context_id', 'sentence_embedding']

We assume that app holds a Vespa connection instance to the desired Vespa application.

Feed data

We can either feed a batch of data for convenience or feed individual data points for increased control.

Batch

We need to prepare the data as a list of dicts having the id key holding a unique id of the data point and the fields key holding a dict with the data fields.

[ ]:
batch_feed = [
    {
        "id": idx,
        "fields": sentence
    }
    for idx, sentence in enumerate(sentence_data)
]

We then feed the batch to the desired schema using the feed_batch method.

[ ]:
response = app.feed_batch(schema="sentence", batch=batch_feed)
Successful documents fed: 100/100.
Batch progress: 1/1.

Individual data points

Synchronous

Syncronously feeding individual data points is similar to batch feeding, except that you have more control when looping through your dataset.

[ ]:
response = []
for idx, sentence in enumerate(sentence_data):
    response.append(
        app.feed_data_point(schema="sentence", data_id=idx, fields=sentence)
    )

Asynchronous

app.asyncio() returns a VespaAsync instance that contains async operations such as feed_data_point. Using the async with context manager ensures that we open and close the appropriate connections required for async feeding.

[ ]:
async with app.asyncio() as async_app:
    response = await async_app.feed_data_point(
        schema="sentence",
        data_id=idx,
        fields=sentence,
    )

We can then use asyncio constructs like create_task and wait to create different types of asynchronous flows like the one below.

[ ]:
from asyncio import create_task, wait, ALL_COMPLETED

async with app.asyncio() as async_app:
    feed = []
    for idx, sentence in enumerate(sentence_data):
        feed.append(
            create_task(
                async_app.feed_data_point(
                    schema="sentence",
                    data_id=idx,
                    fields=sentence,
                )
            )
        )
    await wait(feed, return_when=ALL_COMPLETED)
    response = [x.result() for x in feed]
Note: The code above runs from a Jupyter Notebook because it already has its async event loop running in the background. You must create your event loop when running this code on an environment without one, just like any asyncio code requires.

Get data

Similarly to the examples about feeding, we can get a batch of data for convenience or get individual data points for increased control.

Batch

We need to prepare the data as a list of dicts having the id key holding a unique id of the data point. We then get the batch from the desired schema using the get_batch method.

[ ]:
batch = [{"id": idx} for idx, sentence in enumerate(sentence_data)]
response = app.get_batch(schema="sentence", batch=batch)

Individual data points

We can get individual data points synchronously or asynchronously.

Synchronous

[ ]:
response = app.get_data(schema="sentence", data_id=0)

Asynchronous

[ ]:
async with app.asyncio() as async_app:
    response = await async_app.get_data(schema="sentence",data_id=0)
Note: The code above runs from a Jupyter Notebook because it already has its async event loop running in the background. You must create your event loop when running this code on an environment without one, just like any asyncio code requires.

Update data

Similarly to the examples about feeding, we can update a batch of data for convenience or update individual data points for increased control.

Batch

We need to prepare the data as a list of dicts having the id key holding a unique id of the data point, the fields key holding a dict with the fields to be updated and an optional create key with a boolean value to indicate if a data point should be created in case it does not exist (default to False).

[ ]:
batch_update = [
    {
        "id": idx,           # data_id
        "fields": sentence,  # fields to be updated
        "create": True       # Optional. Create data point if not exist, default to False.

    }
    for idx, sentence in enumerate(sentence_data)
]

We then update the batch on the desired schema using the update_batch method.

[ ]:
response = app.update_batch(schema="sentence", batch=batch_update)

Individual data points

We can update individual data points synchronously or asynchronously.

Synchronous

[ ]:
response = app.update_data(schema="sentence", data_id=0, fields=sentence_data[0], create=True)

Asynchronous

[ ]:
async with app.asyncio() as async_app:
    response = await async_app.update_data(schema="sentence",data_id=0, fields=sentence_data[0], create=True)
Note: The code above runs from a Jupyter Notebook because it already has its async event loop running in the background. You must create your event loop when running this code on an environment without one, just like any asyncio code requires.

Delete data

Similarly to the examples about feeding, we can delete a batch of data for convenience or delete individual data points for increased control.

Batch

We need to prepare the data as a list of dicts having the id key holding a unique id of the data point. We then delete the batch from the desired schema using the delete_batch method.

[ ]:
batch = [{"id": idx} for idx, sentence in enumerate(sentence_data)]
response = app.delete_batch(schema="sentence", batch=batch)

Individual data points

We can delete individual data points synchronously or asynchronously.

Synchronous

[ ]:
response = app.delete_data(schema="sentence", data_id=0)

Asynchronous

[ ]:
async with app.asyncio() as async_app:
    response = await async_app.delete_data(schema="sentence",data_id=0)
Note: The code above runs from a Jupyter Notebook because it already has its async event loop running in the background. You must create your event loop when running this code on an environment without one, just like any asyncio code requires.
[ ]:
# this is a hidden cell. It will not show on the documentation HTML.

vespa_docker.container.stop()
vespa_docker.container.remove()