This documentation describes the implementation of an Upload App, which demonstrates the management of AsyncFileSystem and SyncFileSystem for synchronous and asynchronous file operations. The app shows how Ray Remote Execution can be used for distributed file processing across multiple nodes.

Overview

The Upload App is an example of:
  • Forms with File Upload: Using InputFiles for multiple file uploads
  • Ray Remote Execution: Parallel processing of files across distributed nodes
  • AsyncFileSystem vs SyncFileSystem: Demonstration of the differences between synchronous and asynchronous file system operations
  • File Download/Upload across Nodes: Ray Remote Functions enable file operations across different nodes

Step-by-Step Implementation

1. Imports and Setup

import asyncio, os, re, time
from pathlib import Path
from tempfile import mkdtemp
from traceback import format_exc

import fastapi
import uvicorn
from pptx import Presentation
from ray import remote, serve

from kodosumi.core import ServeAPI, forms as F, Tracer, Launch
from kodosumi.response import Markdown

app = ServeAPI()
Explanation:
  • ray.remote: Decorator for Remote Functions
  • ray.serve: For service deployment
  • kodosumi.core: Core components for Kodosumi apps
  • pptx.Presentation: For PowerPoint file processing (example only)

2. Ray Remote Function for File Processing

@remote
def process_file(file: str, tracer: Tracer, ignore_errors: bool = True):
    fs = tracer.fs_sync()  # SyncFileSystem for Remote Functions
    tempfile = next(fs.download(file))
    tracer.debug_sync(f"start processing `{file}`")
    start_time = time.time()
    
    try:
        # File processing (here: PowerPoint to text)
        prs = Presentation(tempfile)
        text_runs = [
            run.text for slide in prs.slides 
            for shape in slide.shapes if shape.has_text_frame
            for paragraph in shape.text_frame.paragraphs
            for run in paragraph.runs
        ]
        
        # Create local markdown result file
        tempdir = mkdtemp()
        md_file = Path(tempdir) / Path(file).with_suffix(".md").name
        content = re.sub(r"\s+", " ", " ".join(text_runs))
        
        with md_file.open("w") as f:
            f.write(content)
        
        # Upload file to the flow execution
        fs.upload(str(md_file))
        # Remove the local result file
        os.remove(md_file)
        
        return {
            "source": file,
            "target": md_file.name,
            "length": len(text_runs),
            "words": len(content.split()),
            "runtime": time.time() - start_time,
            "error": None
        }
    except Exception as e:
        if not ignore_errors:
            raise RuntimeError(f"error processing `{file}`: {format_exc()}")
        return {
            "source": file,
            "target": None,
            "length": 0,
            "words": 0,
            "runtime": time.time() - start_time,
            "error": str(e)
        }
    finally:
        # Remove the downloaded file on the node
        os.remove(tempfile)
        fs.close()
Key Points:
  • @remote: Makes the function a Ray Remote Function
  • tracer.fs_sync(): Uses SyncFileSystem since Ray Remote Functions are always synchronous
  • fs.download(file): Downloads file from Kodosumi File System
  • fs.upload(str(md_file)): Uploads processed file
  • Error Handling: Robust handling of processing errors

3. Main Function with AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    afs = await tracer.fs()  # AsyncFileSystem for main function
    files = await afs.ls("in")  # List all uploaded files in `/in` folder
    await tracer.markdown("### File Processing Started")
    
    # Display file links
    file_links = [
        f"* [{f['path']}](/files/{tracer.fid}/{f['path']})" for f in files]
    await tracer.markdown("\n".join(file_links))
    
    # Parallel processing with Ray
    futures = [
        process_file.remote(
            f['path'], tracer, ignore_errors=inputs['ignore_errors'])
        for f in files
    ]
    
    await afs.close()
    results = await asyncio.gather(*futures)
    
    # Summarize results
    output = ["### File Processing Completed"]
    for r in results:
        err = r['error']
        link = f"**ERROR:** {err}" if err else f"[markdown](/files/{tracer.fid}/out/{r['target']})"
        output.append(
            f"* {r['source']} with {r['length']} text runs in {r['runtime']:.2f}s - {link}"
        )
    
    return Markdown("\n".join(output))
Key Points:
  • await tracer.fs(): Uses AsyncFileSystem for asynchronous operations
  • await afs.ls("in"): Asynchronous file listing
  • process_file.remote(): Starts Remote Functions
  • asyncio.gather(*futures): Waits for all Remote Functions
  • await afs.close(): Properly closes AsyncFileSystem

4. Form Definition

model = F.Model(
    F.Markdown("""
    # Parallel Text Processing
    
    This application demonstrates the use of:
               
    * Forms with text input and file upload
    * `InputsFile` for file processing
    * Ray for parallel processing
    """),
    F.Break(),
    F.HR(),
    F.InputFiles(
        label="Upload Files", 
        name="files", 
        multiple=True, 
        directory=False, 
        required=False
    ),
    F.HR(),
    F.Submit("Start"),
    F.Cancel("Cancel"),
    F.Checkbox(
        name="ignore_errors",
        option="ignore errors",
        value=True
    ),
)
Form Elements:
  • F.InputFiles: Multiple file upload
  • F.Checkbox: Option to ignore errors
  • F.Submit/F.Cancel: Action buttons

5. Endpoint Registration

@app.enter(
    path="/", 
    model=model,
    summary="Text Processing with Ray",
    description="Demonstrates forms, InputsFile and Ray for parallel file processing.",
    version="0.1.0",
    author="example@kodosumi.com",
    organization="Kodosumi Examples",
    tags=["Test"]
)
async def enter(request: fastapi.Request, inputs: dict):
    return Launch(request, "kodosumi_examples.upload.app:run", inputs=inputs)

6. Ray Serve Deployment

@serve.deployment
@serve.ingress(app)
class TextProcessor: pass

fast_app = TextProcessor.bind()  # type: ignore

if __name__ == "__main__":
    uvicorn.run(
        "kodosumi_examples.upload.app:app", 
        host="0.0.0.0", 
        port=8013, 
        reload=True
    )

AsyncFileSystem vs SyncFileSystem

AsyncFileSystem

  • Usage: In asynchronous functions (async def)
  • Methods: await fs.ls(), await fs.upload(), await fs.close()
  • Advantages: Non-blocking, better performance for I/O operations
  • Example: Main function run()

SyncFileSystem

  • Usage: In synchronous functions (Ray Remote Functions)
  • Methods: fs.ls(), fs.upload(), fs.close()
  • Advantages: Easier to use, compatible with Ray Remote Functions
  • Example: process_file() Remote Function

Ray Remote Execution

Why Ray Remote Functions?

  1. Distributed Processing: Files can be processed on different nodes
  2. Scalability: Automatic load balancing
  3. Fault Tolerance: Error handling at node level
  4. Resource Management: Efficient use of CPU and memory

Remote Function Lifecycle

  1. File Upload → Kodosumi File System
  2. AsyncFileSystem.ls() → Get file list
  3. process_file.remote() → Start Remote Function
  4. SyncFileSystem.download() → Download file to remote node
  5. File Processing → PowerPoint to text
  6. SyncFileSystem.upload() → Upload result
  7. Result Return → To main function

Running the Upload Example

This section provides step-by-step instructions for running the Upload App example from the kodosumi-examples repository.

Prerequisites

  1. Python Environment: Python 3.12 or higher
  2. Ray Cluster: Running Ray cluster (local or distributed)
  3. Kodosumi: Installed and configured
  4. Dependencies: Required packages for the example

Setup Instructions

1. Clone the Examples Repository

git clone https://github.com/masumi-network/kodosumi-examples.git
cd kodosumi-examples

2. Install Dependencies

# Install the example package
pip install -e .

# Install additional dependencies for the upload example
pip install python-pptx  # For PowerPoint file processing

3. Start Ray Cluster

# Start Ray head node
ray start --head

# Verify Ray is running
ray status

4. Deploy the Upload Example

Create a deployment configuration file data/config/upload_example.yaml and a Ray serve deployment file data/config/config.yaml:
# upload_example.yaml
name: upload_example
route_prefix: /upload
import_path: kodosumi_examples.upload.app:fast_app
# config.yaml
proxy_location: EveryNode

http_options:
  host: 127.0.0.1
  port: 8001

grpc_options:
  port: 9001
  grpc_servicer_functions: []

logging_config:
  encoding: TEXT
  log_level: DEBUG
  logs_dir: null
  enable_access_log: true
Deploy both the Ray and the service configuration:
# Deploy using Kodosumi CLI
koco deploy --run --file ./data/config/config.yaml

5. Start Kodosumi Services

# Start the spooler daemon
koco spool

# Start the panel web interface
koco serve --register http://localhost:8001/-/routes

6. Access the Application

Open your web browser and navigate to http://localhost:3370/.