const { app } = require('@azure/functions'); const { BlobServiceClient, StorageSharedKeyCredential } = require('@azure/storage-blob'); const { SearchClient, AzureKeyCredential } = require('@azure/search-documents'); const pdf = require('pdf-parse'); const mammoth = require('mammoth'); const xlsx = require('xlsx'); // Import xlsx for Excel files const path = require('path'); const fs = require('fs'); const configPath = path.join(__dirname, 'config.json'); const config = JSON.parse(fs.readFileSync(configPath, 'utf8')); const csvParser = require('csv-parser'); // Import csv-parser for CSV files const accountName = config.blobAccountName; const accountKey = config.blobAccountKey; const containerName = config.blobContainerName; const searchServiceName = config.searchServiceName; const indexName = config.searchIndexName; const apiKey = config.searchApiKey; const blobServiceClient = new BlobServiceClient( `https://${accountName}.blob.core.windows.net`, new StorageSharedKeyCredential(accountName, accountKey) ); const containerClient = blobServiceClient.getContainerClient(containerName); const searchClient = new SearchClient( `https://${searchServiceName}.search.windows.net/`, indexName, new AzureKeyCredential(apiKey) ); const imageExtensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif']; async function indexBlob(blobName) { try { console.log(`Indexing blob: ${blobName}`); const blobClient = containerClient.getBlobClient(blobName); const downloadResponse = await blobClient.download(); const encodedName = Buffer.from(blobName).toString('base64'); console.log(`encodedName: ${encodedName}`); const properties = await blobClient.getProperties(); console.log(`properties:`, properties); let contentType = properties.contentType || ''; const metadata = properties.metadata || {}; console.log(`metadata: ${metadata}`); const uniqueId = properties.metadata.uniqueId; console.log(`uniqueId: ${uniqueId}`); console.log("content type: ", contentType); // Set content type if it's application/octet-stream if (contentType === 'application/octet-stream') { const extension = path.extname(blobName).toLowerCase(); switch (extension) { case '.pdf': contentType = 'application/pdf'; break; case '.txt': contentType = 'text/plain'; break; case '.docx': contentType = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'; break; case '.xlsx': contentType = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'; break; case '.csv': contentType = 'text/csv'; break; default: if (imageExtensions.includes(extension)) { contentType = 'image/*'; } else { console.log(`Unsupported file extension: ${extension}`); return; } break; } } let blobContent = ''; if (contentType === 'application/pdf') { const pdfBuffer = await streamToBuffer(downloadResponse.readableStreamBody); const pdfData = await pdf(pdfBuffer); blobContent = pdfData.text; } else if (contentType === 'text/plain') { blobContent = await streamToString(downloadResponse.readableStreamBody); } else if (contentType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') { const docxBuffer = await streamToBuffer(downloadResponse.readableStreamBody); const result = await mammoth.extractRawText({ buffer: docxBuffer }); blobContent = result.value; } else if (contentType === 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') { const excelBuffer = await streamToBuffer(downloadResponse.readableStreamBody); const workbook = xlsx.read(excelBuffer, { type: 'buffer' }); let excelText = ''; workbook.SheetNames.forEach(sheetName => { const worksheet = workbook.Sheets[sheetName]; excelText += xlsx.utils.sheet_to_csv(worksheet) + '\n'; }); blobContent = excelText; } else if (contentType === 'text/csv') { const csvBuffer = await streamToBuffer(downloadResponse.readableStreamBody); blobContent = await csvToString(csvBuffer); } else if (contentType.startsWith('image/')) { blobContent = ''; // Let OCR handle image processing } else { console.log(`Unsupported content type: ${contentType}`); return; } const document = { id: encodedName, content: blobContent, // For images, content will be empty metadata_storage_content_type: properties.contentType || null, metadata_storage_size: properties.contentLength || null, metadata_storage_last_modified: properties.lastModified ? new Date(properties.lastModified).toISOString() : null, metadata_storage_content_md5: properties.contentMD5 ? Buffer.from(properties.contentMD5).toString('base64') : null, metadata_storage_name: blobName, metadata_storage_path: blobClient.url, metadata_storage_file_extension: path.extname(blobName), metadata_content_type: properties.contentType || null, metadata_language: null, metadata_author: null, metadata_creation_date: properties.creationTime ? new Date(properties.creationTime).toISOString() : null, }; await searchClient.uploadDocuments([document]); console.log(`Document "${document.id}" has been indexed`); } catch (error) { console.error(`Error indexing document:`, error); if (error.name === 'RestError') { console.error(`RestError message:`, error.message); console.error(`RestError details:`, error.response && error.response.body); } } } app.eventGrid('process-event-grid', { handler: async (context, eventGridEvent) => { try { console.log(`Event received: ${JSON.stringify(eventGridEvent)}`); const event = eventGridEvent.eventType; const blobUrl = eventGridEvent.triggerMetadata.data?.url; console.log(`blobUrl: ${blobUrl}`); console.error("Event data :", event); if (!blobUrl) { console.error("Event data does not contain 'url':", eventGridEvent); return; } const blobapi = eventGridEvent.triggerMetadata.data?.api; console.log(`blobapi: ${blobapi}`); const blobName = blobUrl.substring(blobUrl.lastIndexOf('/') + 1); if (blobapi === 'PutBlob') { console.log(`Blob created: ${blobName}`); await indexBlob(blobName); } else if (blobapi === 'DeleteBlob') { console.log(`Blob deleted: ${blobName}`); await deleteDocument(blobName); } else { console.log(`Unhandled blobapi type: ${blobapi}`); } } catch (error) { console.error(`Error processing event: ${error}`, eventGridEvent); } } }); async function streamToBuffer(readableStream) { return new Promise((resolve, reject) => { const chunks = []; readableStream.on("data", (data) => { chunks.push(data instanceof Buffer ? data : Buffer.from(data)); }); readableStream.on("end", () => { resolve(Buffer.concat(chunks)); }); readableStream.on("error", reject); }); } async function streamToString(readableStream) { return new Promise((resolve, reject) => { const chunks = []; readableStream.on("data", (data) => { chunks.push(data.toString()); }); readableStream.on("end", () => { resolve(chunks.join('')); }); readableStream.on("error", reject); }); } async function csvToString(buffer) { return new Promise((resolve, reject) => { const results = []; const readableStream = bufferToStream(buffer); readableStream .pipe(csvParser()) .on('data', (data) => results.push(data)) .on('end', () => { const csvString = results.map(row => Object.values(row).join(',')).join('\n'); resolve(csvString); }) .on('error', reject); }); } function bufferToStream(buffer) { const stream = new require('stream').Readable(); stream.push(buffer); stream.push(null); return stream; } async function deleteDocument(blobName) { try { console.log(`Deleting document: ${blobName}`); const encodedName = Buffer.from(blobName).toString('base64'); console.log(`encodedName: ${encodedName}`); await searchClient.deleteDocuments([{ id: encodedName }]); console.log(`Document "${encodedName}" has been deleted from the index`); } catch (error) { console.error(`Error deleting document:`, error); if (error.name === 'RestError') { console.error(`RestError message:`, error.message); console.error(`RestError details:`, error.response && error.response.body); } } } module.exports = app;