| const express = require('express'); |
| const fetch = require('node-fetch'); |
| const cors = require('cors'); |
| const rateLimit = require('express-rate-limit'); |
| const helmet = require('helmet'); |
| require('dotenv').config(); |
|
|
| const app = express(); |
|
|
| |
| const PRODUCTION_MODE = process.env.PRODUCTION_MODE === 'true'; |
|
|
| function log(message, level = 'info') { |
| if (PRODUCTION_MODE && level === 'debug') return; |
| console.log(message); |
| } |
|
|
| function logSensitive(message) { |
| if (!PRODUCTION_MODE) console.log(message); |
| } |
|
|
| |
| function maskIP(ip) { |
| if (PRODUCTION_MODE) { |
| const parts = ip.split('.'); |
| return parts.length === 4 ? `${parts[0]}.${parts[1]}.***.**` : 'masked'; |
| } |
| return ip; |
| } |
|
|
| function maskOrigin(origin) { |
| if (PRODUCTION_MODE && origin && origin !== 'no-origin') { |
| try { |
| const url = new URL(origin); |
| return `${url.protocol}//${url.hostname.substring(0, 3)}***`; |
| } catch { |
| return 'masked'; |
| } |
| } |
| return origin; |
| } |
|
|
| app.use(helmet({ |
| contentSecurityPolicy: false, |
| crossOriginEmbedderPolicy: false |
| })); |
|
|
| app.set('trust proxy', 1); |
| app.use(express.json({ limit: '1mb' })); |
|
|
| |
| const API_KEYS = process.env.API_KEYS |
| ? process.env.API_KEYS.split(',').map(k => k.trim()) |
| : []; |
|
|
| function authenticateRequest(req) { |
| const origin = req.headers.origin; |
| const apiKey = req.headers['x-api-key']; |
| |
| if (origin) { |
| if (allowedOrigins.length === 0) return { valid: true, source: 'open-mode' }; |
| return { |
| valid: allowedOrigins.includes(origin), |
| source: PRODUCTION_MODE ? 'authorized-origin' : origin |
| }; |
| } |
| |
| if (apiKey) { |
| if (API_KEYS.length === 0) return { valid: true, source: 'no-keys-configured' }; |
| return { |
| valid: API_KEYS.includes(apiKey), |
| source: 'api-key' |
| }; |
| } |
| |
| return { valid: false, source: 'unauthorized' }; |
| } |
|
|
| |
| const allowedOrigins = process.env.ALLOWED_ORIGINS |
| ? process.env.ALLOWED_ORIGINS.split(',').map(o => o.trim()) |
| : []; |
|
|
| app.use(cors({ |
| origin: function (origin, callback) { |
| if (!origin) { |
| return callback(null, true); |
| } |
| if (allowedOrigins.length === 0) return callback(null, true); |
| if (allowedOrigins.includes(origin)) { |
| callback(null, true); |
| } else { |
| log(`[Security] Blocked origin: ${maskOrigin(origin)}`); |
| callback(new Error('Not allowed by CORS')); |
| } |
| }, |
| credentials: true |
| })); |
|
|
| app.use((err, req, res, next) => { |
| if (err.message === 'Not allowed by CORS') { |
| return res.status(403).json({ error: 'Access denied' }); |
| } |
| next(err); |
| }); |
|
|
| |
| app.use((req, res, next) => { |
| if (req.path === '/' || req.path === '/health' || req.path.startsWith('/test-')) { |
| return next(); |
| } |
| |
| const auth = authenticateRequest(req); |
| |
| if (!auth.valid) { |
| log(`[Security] Blocked unauthorized request from ${maskIP(req.ip)}`); |
| return res.status(401).json({ |
| error: 'Unauthorized', |
| message: 'Valid origin or API key required' |
| }); |
| } |
| |
| log(`[Auth] Request authorized from: ${auth.source}`); |
| next(); |
| }); |
|
|
| |
| const limiter = rateLimit({ |
| windowMs: 15 * 60 * 1000, |
| max: 100, |
| message: { error: "Too many requests" }, |
| keyGenerator: (req) => { |
| const ip = req.ip || req.connection.remoteAddress || 'unknown'; |
| return ip.replace(/:\d+[^:]*$/, ''); |
| } |
| }); |
| app.use(limiter); |
|
|
| |
| app.use((req, res, next) => { |
| const ip = maskIP((req.ip || 'unknown').replace(/:\d+[^:]*$/, '')); |
| const origin = maskOrigin(req.headers.origin || 'no-origin'); |
| const apiKey = req.headers['x-api-key'] ? '***' : 'none'; |
| const path = PRODUCTION_MODE ? req.path.split('/').slice(0, 4).join('/') + '/***' : req.path; |
| |
| log(`[${new Date().toISOString()}] ${ip} -> ${req.method} ${path} | Origin: ${origin} | Key: ${apiKey}`); |
| next(); |
| }); |
|
|
| |
| const dailyUsage = new Map(); |
| let lastResetDate = new Date().toDateString(); |
|
|
| function checkDailyReset() { |
| const today = new Date().toDateString(); |
| if (today !== lastResetDate) { |
| dailyUsage.clear(); |
| lastResetDate = today; |
| log('[System] Daily usage counters reset'); |
| } |
| } |
|
|
| setInterval(checkDailyReset, 60 * 60 * 1000); |
|
|
| app.use((req, res, next) => { |
| if (req.method === 'POST' && req.path.includes('/prediction/')) { |
| checkDailyReset(); |
| |
| const ip = (req.ip || 'unknown').replace(/:\d+[^:]*$/, ''); |
| const count = dailyUsage.get(ip) || 0; |
| |
| if (count >= 200) { |
| return res.status(429).json({ |
| error: 'Daily limit reached', |
| message: 'You have reached your daily usage limit. Try again tomorrow.' |
| }); |
| } |
| |
| dailyUsage.set(ip, count + 1); |
| |
| if (dailyUsage.size > 10000) { |
| log('[System] Daily usage map too large, clearing oldest entries', 'debug'); |
| const entries = Array.from(dailyUsage.entries()).slice(0, 1000); |
| entries.forEach(([key]) => dailyUsage.delete(key)); |
| } |
| } |
| next(); |
| }); |
|
|
| |
| app.use((req, res, next) => { |
| if (req.method !== 'POST') { |
| return next(); |
| } |
| |
| const userAgent = (req.headers['user-agent'] || '').toLowerCase(); |
| const suspiciousBots = ['python-requests', 'curl/', 'wget/', 'scrapy', 'crawler']; |
| |
| const hasValidApiKey = req.headers['x-api-key'] && API_KEYS.includes(req.headers['x-api-key']); |
| if (hasValidApiKey) { |
| return next(); |
| } |
| |
| const isBot = suspiciousBots.some(bot => userAgent.includes(bot)); |
| |
| if (isBot) { |
| log(`[Security] Blocked bot from ${maskIP(req.ip)}`); |
| return res.status(403).json({ |
| error: 'Automated access detected', |
| message: 'This service is for web browsers only.' |
| }); |
| } |
| next(); |
| }); |
|
|
| |
| let INSTANCES = []; |
| try { |
| INSTANCES = JSON.parse(process.env.FLOWISE_INSTANCES || '[]'); |
| log(`[System] Loaded ${INSTANCES.length} instances`); |
| if (!Array.isArray(INSTANCES) || INSTANCES.length === 0) { |
| log('ERROR: FLOWISE_INSTANCES must be a non-empty array'); |
| } |
| } catch (e) { |
| log("CRITICAL ERROR: Could not parse FLOWISE_INSTANCES JSON"); |
| } |
|
|
| |
| const flowCache = new Map(); |
|
|
| setInterval(() => { |
| const now = Date.now(); |
| for (const [key, value] of flowCache.entries()) { |
| if (value.timestamp && now - value.timestamp > 10 * 60 * 1000) { |
| flowCache.delete(key); |
| } |
| } |
| }, 10 * 60 * 1000); |
|
|
| |
| async function fetchWithTimeout(url, options, timeout = 10000) { |
| return Promise.race([ |
| fetch(url, options), |
| new Promise((_, reject) => |
| setTimeout(() => reject(new Error('Request timeout')), timeout) |
| ) |
| ]); |
| } |
|
|
| |
| async function resolveChatflowId(instanceNum, botName) { |
| const cacheKey = `${instanceNum}-${botName}`; |
| |
| const cached = flowCache.get(cacheKey); |
| if (cached && cached.timestamp && Date.now() - cached.timestamp < 5 * 60 * 1000) { |
| return { id: cached.id, instance: cached.instance }; |
| } |
| |
| if (isNaN(instanceNum) || instanceNum < 1 || instanceNum > INSTANCES.length) { |
| throw new Error(`Instance ${instanceNum} does not exist. Valid: 1-${INSTANCES.length}`); |
| } |
| |
| const instance = INSTANCES[instanceNum - 1]; |
| logSensitive(`[System] Looking up '${botName}' in instance ${instanceNum}...`); |
| |
| const headers = {}; |
| if (instance.key && instance.key.length > 0) { |
| headers['Authorization'] = `Bearer ${instance.key}`; |
| } |
| |
| const response = await fetchWithTimeout(`${instance.url}/api/v1/chatflows`, { headers }, 10000); |
| |
| if (!response.ok) { |
| throw new Error(`Instance ${instanceNum} returned status ${response.status}`); |
| } |
| |
| const flows = await response.json(); |
| |
| if (!Array.isArray(flows)) { |
| throw new Error(`Instance ${instanceNum} returned invalid response`); |
| } |
| |
| const match = flows.find(f => f.name && f.name.toLowerCase().replace(/\s+/g, '-') === botName); |
| |
| if (!match || !match.id) { |
| throw new Error(`Bot '${botName}' not found in instance ${instanceNum}`); |
| } |
| |
| flowCache.set(cacheKey, { |
| id: match.id, |
| instance: instance, |
| timestamp: Date.now() |
| }); |
| |
| logSensitive(`[System] Found '${botName}' -> ${match.id}`); |
| |
| return { id: match.id, instance }; |
| } |
|
|
| |
| async function handleStreamingResponse(flowiseResponse, clientRes) { |
| clientRes.setHeader('Content-Type', 'text/event-stream'); |
| clientRes.setHeader('Cache-Control', 'no-cache'); |
| clientRes.setHeader('Connection', 'keep-alive'); |
| clientRes.setHeader('X-Accel-Buffering', 'no'); |
| |
| log('[Streaming] Forwarding SSE stream...'); |
| |
| let streamStarted = false; |
| let dataReceived = false; |
| let lastDataTime = Date.now(); |
| let totalBytes = 0; |
| |
| const timeoutCheck = setInterval(() => { |
| const timeSinceData = Date.now() - lastDataTime; |
| |
| if (timeSinceData > 45000) { |
| log(`[Streaming] Timeout - no data for ${(timeSinceData/1000).toFixed(1)}s`); |
| clearInterval(timeoutCheck); |
| |
| if (!dataReceived) { |
| log('[Streaming] Stream completed with NO data received!'); |
| if (!streamStarted) { |
| clientRes.status(504).json({ |
| error: 'Gateway timeout', |
| message: 'No response from chatbot within 45 seconds' |
| }); |
| } else { |
| clientRes.write('\n\nevent: error\ndata: {"error": "Response timeout - no data received"}\n\n'); |
| } |
| } |
| clientRes.end(); |
| } |
| }, 5000); |
| |
| flowiseResponse.body.on('data', (chunk) => { |
| clearTimeout(timeoutCheck); |
| streamStarted = true; |
| dataReceived = true; |
| lastDataTime = Date.now(); |
| totalBytes += chunk.length; |
| |
| logSensitive(`[Streaming] Received chunk: ${chunk.length} bytes (total: ${totalBytes})`); |
| clientRes.write(chunk); |
| }); |
| |
| flowiseResponse.body.on('end', () => { |
| clearInterval(timeoutCheck); |
| |
| if (dataReceived) { |
| log(`[Streaming] Stream completed - ${totalBytes} bytes`); |
| } else { |
| log('[Streaming] Stream completed but NO data received!'); |
| } |
| |
| clientRes.end(); |
| }); |
| |
| flowiseResponse.body.on('error', (err) => { |
| clearInterval(timeoutCheck); |
| log('[Streaming Error]'); |
| |
| if (streamStarted && dataReceived) { |
| clientRes.write(`\n\nevent: error\ndata: {"error": "Stream interrupted"}\n\n`); |
| } else if (!streamStarted) { |
| clientRes.status(500).json({ error: 'Stream failed to start' }); |
| } |
| clientRes.end(); |
| }); |
| } |
|
|
| |
| app.post('/api/v1/prediction/:instanceNum/:botName', async (req, res) => { |
| try { |
| const instanceNum = parseInt(req.params.instanceNum); |
| const botName = req.params.botName.toLowerCase().substring(0, 100); |
| |
| if (!req.body.question || typeof req.body.question !== 'string') { |
| return res.status(400).json({ |
| error: 'Invalid request', |
| message: 'Question must be a non-empty string.' |
| }); |
| } |
| |
| if (req.body.question.length > 2000) { |
| return res.status(400).json({ |
| error: 'Message too long', |
| message: 'Please keep messages under 2000 characters.' |
| }); |
| } |
| |
| const { id, instance } = await resolveChatflowId(instanceNum, botName); |
| |
| const headers = { 'Content-Type': 'application/json' }; |
| if (instance.key && instance.key.length > 0) { |
| headers['Authorization'] = `Bearer ${instance.key}`; |
| } |
| |
| const startTime = Date.now(); |
| logSensitive(`[Timing] Calling Flowise at ${new Date().toISOString()}`); |
| |
| const response = await fetchWithTimeout( |
| `${instance.url}/api/v1/prediction/${id}`, |
| { |
| method: 'POST', |
| headers, |
| body: JSON.stringify(req.body) |
| }, |
| 60000 |
| ); |
| |
| const duration = Date.now() - startTime; |
| log(`[Timing] Response received in ${(duration/1000).toFixed(1)}s`); |
| |
| if (!response.ok) { |
| const errorText = await response.text(); |
| logSensitive(`[Error] Instance returned ${response.status}: ${errorText.substring(0, 100)}`); |
| return res.status(response.status).json({ |
| error: 'Flowise instance error', |
| message: 'The chatbot instance returned an error.' |
| }); |
| } |
| |
| const contentType = response.headers.get('content-type') || ''; |
| |
| if (contentType.includes('text/event-stream')) { |
| log('[Streaming] Detected SSE response'); |
| return handleStreamingResponse(response, res); |
| } |
| |
| log('[Non-streaming] Parsing JSON response'); |
| const text = await response.text(); |
| |
| try { |
| const data = JSON.parse(text); |
| res.status(200).json(data); |
| } catch (e) { |
| log('[Error] Invalid JSON response'); |
| res.status(500).json({ error: 'Invalid response from Flowise' }); |
| } |
| |
| } catch (error) { |
| log(`[Error] ${error.message}`); |
| res.status(500).json({ |
| error: 'Request failed', |
| message: error.message |
| }); |
| } |
| }); |
|
|
| |
| app.get('/api/v1/public-chatbotConfig/:instanceNum/:botName', async (req, res) => { |
| try { |
| const instanceNum = parseInt(req.params.instanceNum); |
| const botName = req.params.botName.toLowerCase().substring(0, 100); |
| |
| const { id, instance } = await resolveChatflowId(instanceNum, botName); |
| |
| const headers = {}; |
| if (instance.key && instance.key.length > 0) { |
| headers['Authorization'] = `Bearer ${instance.key}`; |
| } |
| |
| const response = await fetchWithTimeout( |
| `${instance.url}/api/v1/public-chatbotConfig/${id}`, |
| { headers }, |
| 10000 |
| ); |
| |
| if (!response.ok) { |
| return res.status(response.status).json({ error: 'Config not available' }); |
| } |
| |
| const data = await response.json(); |
| res.status(200).json(data); |
| |
| } catch (error) { |
| log('[Error] Config request failed'); |
| res.status(404).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| app.get('/api/v1/chatflows-streaming/:instanceNum/:botName', async (req, res) => { |
| try { |
| const instanceNum = parseInt(req.params.instanceNum); |
| const botName = req.params.botName.toLowerCase().substring(0, 100); |
| |
| const { id, instance } = await resolveChatflowId(instanceNum, botName); |
| |
| const headers = {}; |
| if (instance.key && instance.key.length > 0) { |
| headers['Authorization'] = `Bearer ${instance.key}`; |
| } |
| |
| const response = await fetchWithTimeout( |
| `${instance.url}/api/v1/chatflows-streaming/${id}`, |
| { headers }, |
| 10000 |
| ); |
| |
| if (!response.ok) { |
| return res.status(200).json({ isStreaming: false }); |
| } |
| |
| const data = await response.json(); |
| res.status(200).json(data); |
| |
| } catch (error) { |
| log('[Error] Streaming check failed', 'debug'); |
| res.status(200).json({ isStreaming: false }); |
| } |
| }); |
|
|
| |
| if (!PRODUCTION_MODE) { |
| app.get('/test-stream', (req, res) => { |
| res.setHeader('Content-Type', 'text/event-stream'); |
| res.setHeader('Cache-Control', 'no-cache'); |
| res.setHeader('Connection', 'keep-alive'); |
| |
| let count = 0; |
| const interval = setInterval(() => { |
| count++; |
| res.write(`data: {"message": "Test ${count}"}\n\n`); |
| |
| if (count >= 5) { |
| clearInterval(interval); |
| res.end(); |
| } |
| }, 500); |
| }); |
| } |
|
|
| |
| app.get('/', (req, res) => res.send('Federated Proxy Active')); |
|
|
| app.get('/health', (req, res) => { |
| res.json({ |
| status: 'healthy', |
| instances: INSTANCES.length, |
| cached_bots: flowCache.size, |
| daily_active_ips: dailyUsage.size, |
| uptime: process.uptime(), |
| production_mode: PRODUCTION_MODE |
| }); |
| }); |
|
|
| |
| app.use((req, res) => { |
| res.status(404).json({ error: 'Route not found' }); |
| }); |
|
|
| |
| app.use((err, req, res, next) => { |
| log('[Error] Unhandled error'); |
| res.status(500).json({ error: 'Internal server error' }); |
| }); |
|
|
| |
| const server = app.listen(7860, '0.0.0.0', () => { |
| log('===== Federated Proxy Started ====='); |
| log(`Port: 7860`); |
| log(`Mode: ${PRODUCTION_MODE ? 'PRODUCTION' : 'DEVELOPMENT'}`); |
| log(`Instances: ${INSTANCES.length}`); |
| log(`Allowed Origins: ${allowedOrigins.length || 'Open'}`); |
| log(`API Keys: ${API_KEYS.length || 'None'}`); |
| log('===================================='); |
| }); |
|
|
| process.on('SIGTERM', () => { |
| log('[System] Shutting down gracefully...'); |
| server.close(() => { |
| log('[System] Server closed'); |
| process.exit(0); |
| }); |
| }); |
|
|
| process.on('SIGINT', () => { |
| log('[System] Shutting down gracefully...'); |
| server.close(() => { |
| log('[System] Server closed'); |
| process.exit(0); |
| }); |
| }); |
|
|