Skip to the content.

Pipeline

The Client orchestrates two built-in pipelines. Every step receives ctx context.Context for cancellation, timeout, and distributed tracing.


Index Pipeline

DocumentLoader.Load(ctx, source)
    → []Document
→ TextSplitter.Split(ctx, documents)
    → []Chunk
→ Embedder.EmbedBatch(ctx, texts)
    → [][]float64
→ VectorInserter.Insert(ctx, documents)
    → error (or nil on success)

Error handling: If any step fails, the pipeline stops and returns a wrapped error identifying which step failed. The caller can retry from the failure point.

Provider flexibility: The embedding provider used for indexing can differ from the one used for querying. E.g., use Ollama (local, free) for bulk indexing, OpenAI for queries.

// Build pipeline with different embedding providers
ai := ihandai.New(
    ihandai.WithIndexEmbedding("ollama", embedding.WithModel("nomic-embed-text")),
    ihandai.WithEmbedding("openai", embedding.WithModel("text-embedding-3-small")),
    ihandai.WithVectorStore("qdrant", vectordb.WithURL("http://localhost:6333")),
)

// Index uses ollama
err := ai.Index(ctx, documents)

// Ask uses openai embedding + openai LLM
resp, err := ai.Ask(ctx, "What is RAG?")

Ask Pipeline (RAG Query)

Embedder.Embed(ctx, query)
    → queryVector []float64
→ VectorSearcher.Search(ctx, queryVector, opts...)
    → []ScoredDocument
→ Reranker.Rerank(ctx, query, documents)
    → []ScoredDocument (reranked)
→ PromptBuilder.Build(ctx, template, context)
    → []Message
→ ChatCompleter.Chat(ctx, messages)
    → *Response

Error handling: Each step can fail independently. Possible failures and handling:

Step Failure Handling
Embed Network timeout, rate limit Retry with backoff, or fallback to secondary embedding provider
Search Connection error Retry, or return error to caller
Rerank Model unavailable Skip reranking, continue with raw search results
Build Template error Return configuration error (not recoverable)
Chat Rate limit, auth error Retry (rate limit), or surface to caller (auth)

The pipeline wraps errors with context: "pipeline: step 3 (rerank): connection refused".


Concurrency


Custom Pipelines

Users who need custom pipelines can bypass Client.Ask() / Client.Index() and compose the interfaces directly:

// Advanced: manual pipeline with custom logic
embedder := ai.Embedding()
chat := ai.LLM()
store := ai.VectorStore()

vec, _ := embedder.Embed(ctx, query)
docs, _ := store.Search(ctx, vec, vectordb.WithTopK(20))

// Custom filtering before LLM call
filtered := myCustomFilter(docs)

msgs := []ihandai.Message{
    {Role: "system", Content: "You are a helpful assistant."},
    {Role: "user", Content: fmt.Sprintf("Context: %s\n\nQuestion: %s", filtered, query)},
}
resp, _ := chat.Chat(ctx, msgs)

Future: Streaming Pipeline

When streaming is implemented (Phase 4+), AskStream() will return a channel:

func (c *Client) AskStream(ctx context.Context, query string) (<-chan Chunk, error)