This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report.
import{ serve }from"@upstash/workflow/nextjs"import{ downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks,}from"./utils"typeOpenAiResponse={ choices:{ message:{ role:string, content:string}}[]}exportconst{POST}=serve<{ datasetId:string; userId:string}>(async(context)=>{const request = context.requestPayload// Step 1: Download the datasetconst datasetUrl =await context.run("get-dataset-url",async()=>{returnawaitgetDatasetUrl(request.datasetId)})// HTTP request with much longer timeout (2hrs)const{ body: dataset }=awaitcontext.call("download-dataset",{ url: datasetUrl, method:"GET"})// Step 2: Process data in chunks using OpenAIconst chunkSize =1000const chunks =splitIntoChunks(dataset, chunkSize)const processedChunks:string[]=[]for(let i =0; i < chunks.length; i++){const{ body: processedChunk }=await context.api.openai.call(`process-chunk-${i}`,{ token: process.env.OPENAI_API_KEY, operation:"chat.completions.create", body:{ model:"gpt-4", messages:[{ role:"system", content:"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",},{ role:"user", content:`Analyze this data chunk: ${JSON.stringify(chunks[i])}`,},], max_completion_tokens:150,},}) processedChunks.push(processedChunk.choices[0].message.content!)// Every 10 chunks, we'll aggregate intermediate resultsif(i %10===9|| i === chunks.length -1){await context.run(`aggregate-results${i}`,async()=>{awaitaggregateResults(processedChunks) processedChunks.length =0})}}// Step 3: Generate and send data reportconst report =await context.run("generate-report",async()=>{returnawaitgenerateReport(request.datasetId)})await context.run("send-report",async()=>{awaitsendReport(report, request.userId)})})
Note that we use context.call for the download, a way to make HTTP requests that run for much longer than your serverless execution limit would normally allow.
We split the dataset into chunks and process each one using OpenAI’s GPT-4 model:
for(let i =0; i < chunks.length; i++){const{ body: processedChunk }=await context.api.openai.call<OpenAiResponse>(`process-chunk-${i}`,{ token: process.env.OPENAI_API_KEY!, operation:"chat.completions.create", body:{ model:"gpt-4", messages:[{ role:"system", content:"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",},{ role:"user", content:`Analyze this data chunk: ${JSON.stringify(chunks[i])}`,},], max_completion_tokens:150,},})}
Non-blocking HTTP Calls: We use context.call for API requests so they don’t consume the endpoint’s execution time (great for optimizing serverless cost).
Long-running tasks: The dataset download can take up to 2 hours, though is realistically limited by function memory.