Kodosumi provides a powerful Human-in-the-Loop (HTIL) mechanism through its lock system, allowing automated workflows to pause and request human intervention when needed. This document explains how to implement HTIL services using locks and leases.
See also the kodosumi-examples repository on HITL.

Overview

The HTIL system consists of three main components:
  1. Entry Points - Initial form inputs that launch workflow executions.
  2. Locks - Points where execution pauses and waits for human input
  3. Leases - Handlers that process human input and continue execution

Basic Implementation

1. Entry Point with Form Input

First, create an entry point that collects initial user input:
from kodosumi.core import ServeAPI, Launch
from kodosumi.service.inputs.forms import Model, InputText, Checkbox, Submit, Cancel
from fastapi import Request

app = ServeAPI()

# Define the form model for the entry point
form_model = Model(
    InputText(label="Name", name="name", placeholder="Enter your name"),
    Checkbox(label="Active", name="active", option="ACTIVE", value=False),
    Submit("Submit"),
    Cancel("Cancel"),
)

@app.enter(
    "/",
    model=form_model,
    summary="HTIL Example",
    description="Example workflow with human intervention",
)
async def post(inputs: dict, request: Request) -> Launch:
    """Launch the workflow with user inputs."""
    return Launch(request, "my_module:runner", inputs=inputs)

2. Runner Function with Lock

The runner function executes the workflow and can request human intervention:
from kodosumi.core import Tracer

async def runner(inputs: dict, tracer: Tracer):
    # Process initial inputs
    await tracer.debug("Processing user inputs")
    
    # Request human intervention
    result = await tracer.lock("approval-lock", data={
        "hello": "from runner",
        "inputs": inputs
    })
    
    # Continue processing after human approval
    return {"final_result": result}

3. Lock Handler

Define what the human sees when the lock is triggered:
from kodosumi.service.inputs.forms import Markdown

@app.lock("approval-lock")
async def approval_lock(data: dict):
    return Model(
        Markdown(f"# Approval Required\n\nReceived data: {data['hello']}"),
        Checkbox(label="Approve", name="approve", option="APPROVE", value=False),
        InputText(label="Comments", name="comments", placeholder="Optional comments"),
        Submit("Approve"),
        Cancel("Reject"),
    )

4. Lease Handler

Process the human input and continue execution:
@app.lease("approval-lock")
async def approval_lease(inputs: dict):
    if not inputs.get("approve"):
        raise InputsError(approve="Approval is required to continue")
    
    return {
        "approved": True,
        "comments": inputs.get("comments", ""),
        "timestamp": "2024-01-01T12:00:00Z"
    }

Lock vs Lease Decorators

@app.lock(lock_id)

  • Purpose: Defines the user interface shown when execution pauses
  • Parameters: Receives the data passed from tracer.lock()
  • Return: A Model object defining the form elements
  • When called: When the lock is first triggered

@app.lease(lock_id)

  • Purpose: Processes human input and continues execution
  • Parameters: Receives the form data submitted by the human
  • Return: Data that will be passed back to the runner function
  • When called: When the human submits the lock form

Missing Lease Handler

Complete Example

Here’s a complete HTIL service implementation:
from kodosumi.core import ServeAPI, Launch, Tracer
from kodosumi.service.inputs.forms import Model, InputText, Checkbox, Submit, Cancel, Markdown
from kodosumi.service.inputs.errors import InputsError
from fastapi import Request

app = ServeAPI()

# Entry point form
entry_form = Model(
    InputText(label="Document Name", name="doc_name", placeholder="Enter document name"),
    InputText(label="Author", name="author", placeholder="Enter author name"),
    Submit("Start Review"),
    Cancel("Cancel"),
)

@app.enter(
    "/review",
    model=entry_form,
    summary="Document Review Workflow",
    description="Review workflow with human approval steps",
)
async def start_review(inputs: dict, request: Request) -> Launch:
    return Launch(request, "document_service:review_runner", inputs=inputs)

# Runner function
async def review_runner(inputs: dict, tracer: Tracer):
    await tracer.debug(f"Starting review for document: {inputs['doc_name']}")
    
    # First approval - content review
    content_approval = await tracer.lock("content-review", data={
        "document": inputs['doc_name'],
        "author": inputs['author'],
        "stage": "content"
    })
    
    # Second approval - final review
    final_approval = await tracer.lock("final-review", data={
        "document": inputs['doc_name'],
        "content_approved": content_approval,
        "stage": "final"
    })
    
    return {
        "document": inputs['doc_name'],
        "content_approval": content_approval,
        "final_approval": final_approval,
        "status": "completed"
    }

# Content review lock
@app.lock("content-review")
async def content_review_lock(data: dict):
    return Model(
        Markdown(f"# Content Review Required\n\n**Document**: {data['document']}\n**Author**: {data['author']}"),
        Checkbox(label="Content is accurate", name="accurate", option="ACCURATE", value=False),
        Checkbox(label="Grammar is correct", name="grammar", option="GRAMMAR", value=False),
        InputText(label="Review Comments", name="comments", placeholder="Enter review comments"),
        Submit("Approve Content"),
        Cancel("Request Changes"),
    )

# Content review lease
@app.lease("content-review")
async def content_review_lease(inputs: dict):
    if not inputs.get("accurate") or not inputs.get("grammar"):
        raise InputsError(
            accurate="Content accuracy must be confirmed",
            grammar="Grammar must be verified"
        )
    
    return {
        "approved": True,
        "comments": inputs.get("comments", ""),
        "reviewer": "human_reviewer"
    }

# Final review lock
@app.lock("final-review")
async def final_review_lock(data: dict):
    return Model(
        Markdown(f"# Final Review\n\n**Document**: {data['document']}\n**Content Approved**: {data['content_approved']['approved']}"),
        Checkbox(label="Ready for publication", name="publish", option="PUBLISH", value=False),
        InputText(label="Final Comments", name="final_comments", placeholder="Final approval comments"),
        Submit("Publish"),
        Cancel("Hold"),
    )

# Final review lease
@app.lease("final-review")
async def final_review_lease(inputs: dict):
    if not inputs.get("publish"):
        raise InputsError(publish="Publication approval is required")
    
    return {
        "published": True,
        "comments": inputs.get("final_comments", ""),
        "publisher": "human_publisher"
    }

API Workflow

Step 1: Get Input Schema

Request:
GET /inputs/-/localhost/8125/-/review
Response:
{
  "elements": [
    {
      "type": "text",
      "name": "doc_name",
      "label": "Document Name",
      "value": null,
      "required": false,
      "placeholder": "Enter document name",
      "size": null,
      "pattern": null
    },
    {
      "type": "text",
      "name": "author",
      "label": "Author",
      "value": null,
      "required": false,
      "placeholder": "Enter author name",
      "size": null,
      "pattern": null
    },
    {
      "type": "submit",
      "text": "Start Review"
    },
    {
      "type": "cancel",
      "text": "Cancel"
    }
  ]
}

Step 2: Launch Workflow Execution

Request:
POST /-/localhost/8125/-/review
Content-Type: application/x-www-form-urlencoded

doc_name=My%20Document&author=John%20Doe
Response:
{
  "result": "abc123-def456-ghi789"
}

Step 3: Check Execution Status

Request:
GET /outputs/status/abc123-def456-ghi789
Response:
{
  "status": "awaiting",
  "locks": ["content-review-xyz789"]
}

Step 4: Get Lock Input Schema

Request:
GET /lock/abc123-def456-ghi789/content-review-xyz789
Response:
[
  {
    "type": "markdown",
    "text": "# Content Review Required\n\n**Document**: My Document\n**Author**: John Doe"
  },
  {
    "type": "boolean",
    "name": "accurate",
    "label": "Content is accurate",
    "value": false,
    "option": "ACCURATE"
  },
  {
    "type": "boolean",
    "name": "grammar",
    "label": "Grammar is correct",
    "value": false,
    "option": "GRAMMAR"
  },
  {
    "type": "text",
    "name": "comments",
    "label": "Review Comments",
    "value": null,
    "required": false,
    "placeholder": "Enter review comments",
    "size": null,
    "pattern": null
  },
  {
    "type": "submit",
    "text": "Approve Content"
  },
  {
    "type": "cancel",
    "text": "Request Changes"
  }
]

Step 5: Submit Lock Data

Request:
POST /lock/abc123-def456-ghi789/content-review-xyz789
Content-Type: application/json

{
  "accurate": true,
  "grammar": true,
  "comments": "Content looks good, ready for final review"
}
Response:
{
  "result": {
    "approved": true,
    "comments": "Content looks good, ready for final review",
    "reviewer": "human_reviewer"
  }
}

Step 6: Check Final Status

Request:
GET /outputs/status/abc123-def456-ghi789
Response:
{
  "status": "awaiting",
  "locks": ["final-review-abc456"]
}
Continue the process for subsequent locks until the workflow completes.

Error Handling

Validation Errors

If the lease handler raises an InputsError, the lock remains active:
@app.lease("approval-lock")
async def approval_lease(inputs: dict):
    error = InputsError()
    if not inputs.get("approve"):
        error.add(approve="Approval is required")
    if not inputs.get("comments"):
        error.add(comments="Comments are required")
    
    if error.has_errors():
        raise error
    
    return {"approved": True}
Response for validation errors:
{
  "errors": {
    "approve": ["Approval is required"],
    "comments": ["Comments are required"],
    "_global_": []
  }
}

Timeout Handling

Locks can have timeouts:
# In runner function
result = await tracer.lock("timeout-lock", data={...}, timeout=300)  # 5 minutes
If the lock times out, the execution fails with a timeout error. The default timeout is 3 hours.