Human-in-the-Loop with Locks
Kodosumi provides a powerful Human-in-the-Loop (HTIL) mechanism through its lock system, allowing automated workflows to pause and request human intervention when needed. This document explains how to implement HTIL services using locks and leases.
kodosumi-examples repository on HITL.Overview
The HTIL system consists of three main components:
- Entry Points - Initial form inputs that launch workflow executions.
- Locks - Points where execution pauses and waits for human input
- Leases - Handlers that process human input and continue execution
Basic Implementation
1. Entry Point with Form Input
First, create an entry point that collects initial user input:
from kodosumi.core import ServeAPI, Launch
from kodosumi.service.inputs.forms import Model, InputText, Checkbox, Submit, Cancel
from fastapi import Request
app = ServeAPI()
# Define the form model for the entry point
form_model = Model(
InputText(label="Name", name="name", placeholder="Enter your name"),
Checkbox(label="Active", name="active", option="ACTIVE", value=False),
Submit("Submit"),
Cancel("Cancel"),
)
@app.enter(
"/",
model=form_model,
summary="HTIL Example",
description="Example workflow with human intervention",
)
async def post(inputs: dict, request: Request) -> Launch:
"""Launch the workflow with user inputs."""
return Launch(request, "my_module:runner", inputs=inputs)2. Runner Function with Lock
The runner function executes the workflow and can request human intervention:
from kodosumi.core import Tracer
async def runner(inputs: dict, tracer: Tracer):
# Process initial inputs
await tracer.debug("Processing user inputs")
# Request human intervention
result = await tracer.lock("approval-lock", data={
"hello": "from runner",
"inputs": inputs
})
# Continue processing after human approval
return {"final_result": result}3. Lock Handler
Define what the human sees when the lock is triggered:
from kodosumi.service.inputs.forms import Markdown
@app.lock("approval-lock")
async def approval_lock(data: dict):
return Model(
Markdown(f"# Approval Required\n\nReceived data: {data['hello']}"),
Checkbox(label="Approve", name="approve", option="APPROVE", value=False),
InputText(label="Comments", name="comments", placeholder="Optional comments"),
Submit("Approve"),
Cancel("Reject"),
)4. Lease Handler
Process the human input and continue execution:
@app.lease("approval-lock")
async def approval_lease(inputs: dict):
if not inputs.get("approve"):
raise InputsError(approve="Approval is required to continue")
return {
"approved": True,
"comments": inputs.get("comments", ""),
"timestamp": "2024-01-01T12:00:00Z"
}Lock vs Lease Decorators
@app.lock(lock_id)
- Purpose: Defines the user interface shown when execution pauses
- Parameters: Receives the data passed from
tracer.lock() - Return: A
Modelobject defining the form elements - When called: When the lock is first triggered
@app.lease(lock_id)
- Purpose: Processes human input and continues execution
- Parameters: Receives the form data submitted by the human
- Return: Data that will be passed back to the runner function
- When called: When the human submits the lock form
Missing Lease Handler
Complete Example
Here's a complete HTIL service implementation:
from kodosumi.core import ServeAPI, Launch, Tracer
from kodosumi.service.inputs.forms import Model, InputText, Checkbox, Submit, Cancel, Markdown
from kodosumi.service.inputs.errors import InputsError
from fastapi import Request
app = ServeAPI()
# Entry point form
entry_form = Model(
InputText(label="Document Name", name="doc_name", placeholder="Enter document name"),
InputText(label="Author", name="author", placeholder="Enter author name"),
Submit("Start Review"),
Cancel("Cancel"),
)
@app.enter(
"/review",
model=entry_form,
summary="Document Review Workflow",
description="Review workflow with human approval steps",
)
async def start_review(inputs: dict, request: Request) -> Launch:
return Launch(request, "document_service:review_runner", inputs=inputs)
# Runner function
async def review_runner(inputs: dict, tracer: Tracer):
await tracer.debug(f"Starting review for document: {inputs['doc_name']}")
# First approval - content review
content_approval = await tracer.lock("content-review", data={
"document": inputs['doc_name'],
"author": inputs['author'],
"stage": "content"
})
# Second approval - final review
final_approval = await tracer.lock("final-review", data={
"document": inputs['doc_name'],
"content_approved": content_approval,
"stage": "final"
})
return {
"document": inputs['doc_name'],
"content_approval": content_approval,
"final_approval": final_approval,
"status": "completed"
}
# Content review lock
@app.lock("content-review")
async def content_review_lock(data: dict):
return Model(
Markdown(f"# Content Review Required\n\n**Document**: {data['document']}\n**Author**: {data['author']}"),
Checkbox(label="Content is accurate", name="accurate", option="ACCURATE", value=False),
Checkbox(label="Grammar is correct", name="grammar", option="GRAMMAR", value=False),
InputText(label="Review Comments", name="comments", placeholder="Enter review comments"),
Submit("Approve Content"),
Cancel("Request Changes"),
)
# Content review lease
@app.lease("content-review")
async def content_review_lease(inputs: dict):
if not inputs.get("accurate") or not inputs.get("grammar"):
raise InputsError(
accurate="Content accuracy must be confirmed",
grammar="Grammar must be verified"
)
return {
"approved": True,
"comments": inputs.get("comments", ""),
"reviewer": "human_reviewer"
}
# Final review lock
@app.lock("final-review")
async def final_review_lock(data: dict):
return Model(
Markdown(f"# Final Review\n\n**Document**: {data['document']}\n**Content Approved**: {data['content_approved']['approved']}"),
Checkbox(label="Ready for publication", name="publish", option="PUBLISH", value=False),
InputText(label="Final Comments", name="final_comments", placeholder="Final approval comments"),
Submit("Publish"),
Cancel("Hold"),
)
# Final review lease
@app.lease("final-review")
async def final_review_lease(inputs: dict):
if not inputs.get("publish"):
raise InputsError(publish="Publication approval is required")
return {
"published": True,
"comments": inputs.get("final_comments", ""),
"publisher": "human_publisher"
}API Workflow
Step 1: Get Input Schema
Request:
GET /inputs/-/localhost/8125/-/reviewResponse:
{
"elements": [
{
"type": "text",
"name": "doc_name",
"label": "Document Name",
"value": null,
"required": false,
"placeholder": "Enter document name",
"size": null,
"pattern": null
},
{
"type": "text",
"name": "author",
"label": "Author",
"value": null,
"required": false,
"placeholder": "Enter author name",
"size": null,
"pattern": null
},
{
"type": "submit",
"text": "Start Review"
},
{
"type": "cancel",
"text": "Cancel"
}
]
}Step 2: Launch Workflow Execution
Request:
POST /-/localhost/8125/-/review
Content-Type: application/x-www-form-urlencoded
doc_name=My%20Document&author=John%20DoeResponse:
{
"result": "abc123-def456-ghi789"
}Step 3: Check Execution Status
Request:
GET /outputs/status/abc123-def456-ghi789Response:
{
"status": "awaiting",
"locks": ["content-review-xyz789"]
}Step 4: Get Lock Input Schema
Request:
GET /lock/abc123-def456-ghi789/content-review-xyz789Response:
[
{
"type": "markdown",
"text": "# Content Review Required\n\n**Document**: My Document\n**Author**: John Doe"
},
{
"type": "boolean",
"name": "accurate",
"label": "Content is accurate",
"value": false,
"option": "ACCURATE"
},
{
"type": "boolean",
"name": "grammar",
"label": "Grammar is correct",
"value": false,
"option": "GRAMMAR"
},
{
"type": "text",
"name": "comments",
"label": "Review Comments",
"value": null,
"required": false,
"placeholder": "Enter review comments",
"size": null,
"pattern": null
},
{
"type": "submit",
"text": "Approve Content"
},
{
"type": "cancel",
"text": "Request Changes"
}
]Step 5: Submit Lock Data
Request:
POST /lock/abc123-def456-ghi789/content-review-xyz789
Content-Type: application/json
{
"accurate": true,
"grammar": true,
"comments": "Content looks good, ready for final review"
}Response:
{
"result": {
"approved": true,
"comments": "Content looks good, ready for final review",
"reviewer": "human_reviewer"
}
}Step 6: Check Final Status
Request:
GET /outputs/status/abc123-def456-ghi789Response:
{
"status": "awaiting",
"locks": ["final-review-abc456"]
}Continue the process for subsequent locks until the workflow completes.
Error Handling
Validation Errors
If the lease handler raises an InputsError, the lock remains active:
@app.lease("approval-lock")
async def approval_lease(inputs: dict):
error = InputsError()
if not inputs.get("approve"):
error.add(approve="Approval is required")
if not inputs.get("comments"):
error.add(comments="Comments are required")
if error.has_errors():
raise error
return {"approved": True}Response for validation errors:
{
"errors": {
"approve": ["Approval is required"],
"comments": ["Comments are required"],
"_global_": []
}
}Timeout Handling
Locks can have timeouts:
# In runner function
result = await tracer.lock("timeout-lock", data={...}, timeout=300) # 5 minutesIf the lock times out, the execution fails with a timeout error. The default timeout is 3 hours.

