improved processor code

This commit is contained in:
Nikita 2024-12-11 01:31:30 +03:00
parent 08d2859f1c
commit 0aede9a01a
3 changed files with 169 additions and 260 deletions

View file

@ -1,3 +1,5 @@
import { isEqual } from 'lodash';
/**
* Styles
*/
@ -26,16 +28,13 @@ const AIResponse = memo(
}
const handleResize = () => {
// Smooth scroll to bottom of response.
const { scrollHeight, clientHeight } = popupContent;
// Only auto-scroll for shorter contents.
const shouldScroll = scrollHeight - clientHeight < 1000;
if (shouldScroll) {
popupContent.scrollTo({
top: scrollHeight,
behavior: 'smooth',
behavior: 'instant',
});
}
};
@ -80,11 +79,9 @@ const AIResponse = memo(
);
},
(prevProps, nextProps) => {
// Compare blocks length and loading state
return (
prevProps.response?.length === nextProps.response?.length &&
prevProps.loading === nextProps.loading &&
prevProps.progress?.blocksCount === nextProps.progress?.blocksCount
isEqual(prevProps.response, nextProps.response) &&
prevProps.loading === nextProps.loading
);
}
);

View file

@ -1,15 +1,14 @@
import { createBlock, getBlockType } from '@wordpress/blocks';
import { createBlock } from '@wordpress/blocks';
export default class BlocksStreamProcessor {
constructor(dispatch) {
this.dispatch = dispatch;
this.buffer = '';
this.contentBuffer = '';
this.parsedBlocksCount = 0;
this.blocks = [];
this.decoder = new TextDecoder();
this.lastUpdate = Date.now();
this.isJsonStarted = false;
this.jsonBuffer = '';
this.lastDispatchedBlocks = null;
this.CONFIG = {
UPDATE_INTERVAL: 50,
@ -21,15 +20,13 @@ export default class BlocksStreamProcessor {
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (done) {
break;
}
await this.processChunk(value);
}
// Try to parse any remaining content
if (this.contentBuffer) {
await this.tryParseJson(this.contentBuffer, true);
}
await this.sendCompletion();
} catch (error) {
this.handleError(error);
}
@ -37,195 +34,184 @@ export default class BlocksStreamProcessor {
async processChunk(value) {
const text = this.decoder.decode(value, { stream: true });
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.content) {
await this.processContent(data.content);
if (!line.startsWith('data: ')) continue;
try {
const dataContent = line.slice(6);
const data = JSON.parse(dataContent);
// Handle completion signal
if (data.done === true) {
// If we have any remaining content, parse and dispatch it
if (this.jsonBuffer) {
await this.parseAndDispatchBlocks(
this.jsonBuffer,
true
);
} else {
// If no remaining content, dispatch the last known blocks as final
await this.dispatchBlocks(
this.lastDispatchedBlocks || [],
true
);
}
} catch (e) {
// Skip invalid JSON lines
// Reset only processing state
this.isJsonStarted = false;
this.jsonBuffer = '';
continue;
}
if (!data.content) continue;
// Accumulate content
this.contentBuffer += data.content;
if (!this.isJsonStarted) {
if (this.contentBuffer.includes('```json')) {
this.isJsonStarted = true;
const parts = this.contentBuffer.split('```json');
this.jsonBuffer = parts[1] || '';
}
} else if (data.content.includes('```')) {
const endIndex = data.content.indexOf('```');
this.jsonBuffer += data.content.substring(0, endIndex);
await this.parseAndDispatchBlocks(this.jsonBuffer, true);
this.isJsonStarted = false;
this.jsonBuffer = '';
} else {
this.jsonBuffer += data.content;
await this.tryParseIncomplete(this.jsonBuffer);
}
} catch (e) {
// console.log('Error processing line:', e);
}
}
}
async processContent(content) {
// Detect start of JSON
if (!this.isJsonStarted) {
this.buffer += content;
if (this.buffer.includes('```') && this.buffer.includes('json')) {
this.isJsonStarted = true;
this.buffer = '';
this.contentBuffer = '';
}
}
// Detect end of JSON
else if (content.includes('```')) {
this.isJsonStarted = false;
await this.tryParseJson(this.contentBuffer, true);
}
// Accumulate JSON content
else if (this.isJsonStarted) {
this.contentBuffer += content;
await this.tryParseJson(this.contentBuffer);
}
}
async tryParseJson(jsonString, isFinal = false) {
if (!jsonString.trim()) {
return;
}
async tryParseIncomplete(jsonContent) {
try {
// Clean up the JSON string
const cleanJson = jsonString
.replace(/\n/g, '')
.replace(/\s+/g, ' ')
.trim();
const completedJson = this.balanceJsonStructure(jsonContent);
const parsed = JSON.parse(completedJson);
// Only proceed if we have a starting array
if (!cleanJson.startsWith('[')) {
return;
}
if (Array.isArray(parsed) && parsed.length > 0) {
const blocks = parsed
.map((block) => this.transformToBlock(block))
.filter(Boolean);
// Find complete blocks by matching brackets
const blocks = [];
let depth = 0;
let currentBlock = '';
let inBlock = false;
for (let i = 0; i < cleanJson.length; i++) {
const char = cleanJson[i];
if (char === '{') {
if (depth === 0) {
inBlock = true;
}
depth++;
if (blocks.length > 0) {
// Reuse dispatchBlocks for incomplete content
await this.dispatchBlocks(blocks, false);
}
if (inBlock) {
currentBlock += char;
}
if (char === '}') {
depth--;
if (depth === 0 && inBlock) {
// We found a complete block
try {
const block = JSON.parse(currentBlock);
if (block.name && block.attributes) {
blocks.push(block);
}
} catch (e) {
// eslint-disable-next-line no-console
console.error('Block parse error:', e);
}
currentBlock = '';
inBlock = false;
}
}
}
// Only process if we found new blocks
if (blocks.length > this.parsedBlocksCount) {
const validBlocks = this.validateAndTransformBlocks(blocks);
this.parsedBlocksCount = blocks.length;
await this.updateBlocks(validBlocks);
}
} catch (e) {
if (isFinal) {
// eslint-disable-next-line no-console
console.error('JSON parse error:', e);
// console.log('Error in tryParseIncomplete:', e);
}
}
balanceJsonStructure(partial) {
let openBrackets = 0;
let openBraces = 0;
// Count opening and closing brackets
for (const char of partial) {
if (char === '[') openBrackets++;
if (char === ']') openBrackets--;
if (char === '{') openBraces++;
if (char === '}') openBraces--;
}
// Complete the structure
let completed = partial;
while (openBraces > 0) {
completed += '}';
openBraces--;
}
while (openBrackets > 0) {
completed += ']';
openBrackets--;
}
return completed;
}
async parseAndDispatchBlocks(jsonContent, isFinal = false) {
try {
const blocks = JSON.parse(jsonContent);
const transformedBlocks = Array.isArray(blocks)
? blocks
.map((block) => this.transformToBlock(block))
.filter(Boolean)
: [this.transformToBlock(blocks)].filter(Boolean);
if (transformedBlocks.length > 0) {
await this.dispatchBlocks(transformedBlocks, isFinal);
}
} catch (e) {
// console.log('Error parsing complete JSON:', e);
if (!isFinal) {
await this.tryParseIncomplete(jsonContent);
}
}
}
validateAndTransformBlocks(blocks) {
return blocks.map((block) => {
try {
// Validate block structure
if (!block?.name || !block?.attributes) {
// eslint-disable-next-line no-console
console.error('Invalid block structure:', block);
return createBlock('core/paragraph', {
content: 'Invalid block content',
});
}
transformToBlock(blockData) {
if (!blockData?.name) return null;
// Verify block type exists
const blockType = getBlockType(block.name);
if (!blockType) {
// eslint-disable-next-line no-console
console.error('Unknown block type:', block.name);
return createBlock('core/paragraph', {
content: `Unknown block type: ${block.name}`,
});
}
try {
const innerBlocks = Array.isArray(blockData.innerBlocks)
? blockData.innerBlocks
.map((inner) => this.transformToBlock(inner))
.filter(Boolean)
: [];
// Handle innerBlocks recursively
const innerBlocks = Array.isArray(block.innerBlocks)
? this.validateAndTransformBlocks(block.innerBlocks)
: [];
// Create block with validated data
return createBlock(block.name, block.attributes, innerBlocks);
} catch (error) {
// eslint-disable-next-line no-console
console.error('Block validation error:', error, block);
return createBlock('core/paragraph', {
content: 'Error processing block',
});
}
});
return createBlock(
blockData.name,
blockData.attributes || {},
innerBlocks
);
} catch (error) {
// console.log('Error transforming block:', error);
return null;
}
}
async updateBlocks(validBlocks) {
async dispatchBlocks(blocks, isFinal = false) {
const now = Date.now();
if (now - this.lastUpdate >= this.CONFIG.UPDATE_INTERVAL) {
this.blocks = validBlocks;
if (now - this.lastUpdate >= this.CONFIG.UPDATE_INTERVAL || isFinal) {
// Store the last dispatched blocks
this.lastDispatchedBlocks = blocks;
// Single dispatch point for both streaming and completion
this.dispatch({
type: 'REQUEST_AI_CHUNK',
type: isFinal ? 'REQUEST_AI_SUCCESS' : 'REQUEST_AI_CHUNK',
payload: {
response: validBlocks,
response: blocks,
progress: {
charsProcessed: this.contentBuffer.length,
blocksCount: validBlocks.length,
isComplete: false,
blocksCount: blocks.length,
isComplete: isFinal,
},
},
});
this.lastUpdate = now;
await new Promise((resolve) =>
setTimeout(resolve, this.CONFIG.CHUNK_DELAY)
);
if (!isFinal) {
await new Promise((resolve) =>
setTimeout(resolve, this.CONFIG.CHUNK_DELAY)
);
}
}
}
async sendCompletion() {
this.dispatch({
type: 'REQUEST_AI_SUCCESS',
payload: {
response: this.blocks,
progress: {
charsProcessed: this.contentBuffer.length,
blocksCount: this.blocks.length,
isComplete: true,
},
},
});
}
handleError(error) {
// eslint-disable-next-line no-console
console.error('Stream processor error:', error);
// console.error('Stream processor error:', error);
this.dispatch({
type: 'REQUEST_AI_ERROR',
payload: error.message,

View file

@ -84,108 +84,34 @@ export function reset() {
};
}
/**
* Process stream data from the API
*
* @param {ReadableStream} reader Stream reader
* @return {Function} Redux-style action function
*/
export function processStream(reader) {
return async ({ dispatch }) => {
const decoder = new TextDecoder();
let buffer = '';
let responseText = '';
// Create artificial delay for smoother updates
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
// Process smaller chunks.
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.error) {
dispatch({
type: 'REQUEST_AI_ERROR',
payload: data.message,
});
return;
}
if (data.done) {
dispatch({
type: 'REQUEST_AI_SUCCESS',
payload: responseText,
});
return;
}
if (data.content) {
responseText += data.content;
dispatch({
type: 'REQUEST_AI_CHUNK',
payload: responseText,
});
// Add small delay between chunks for smoother appearance.
await delay(50);
}
} catch (error) {
dispatch({
type: 'REQUEST_AI_ERROR',
payload: __(
'Failed to parse stream data',
'mind'
),
});
}
}
}
}
} catch (error) {
dispatch({
type: 'REQUEST_AI_ERROR',
payload: error.message,
});
}
};
}
export function requestAI() {
return async ({ dispatch, select }) => {
if (!isConnected) {
dispatch(setError(__('Not connected', 'mind')));
return;
}
const loading = select.getLoading();
if (loading) {
if (select.getLoading()) {
return;
}
dispatch({ type: 'REQUEST_AI_PENDING' });
const context = select.getContext();
const data = { request: select.getInput() };
if (context === 'selected-blocks') {
data.context = getSelectedBlocksContent();
}
try {
dispatch({ type: 'REQUEST_AI_PENDING' });
// Prepare request data
const data = {
request: select.getInput(),
};
// Add context if needed
if (select.getContext() === 'selected-blocks') {
data.context = getSelectedBlocksContent();
}
// Initialize stream processor
const streamProcessor = new BlocksStreamProcessor(dispatch);
// Make API request
const response = await apiFetch({
path: '/mind/v1/request_ai',
method: 'POST',