diff --git a/backend/package.json b/backend/package.json index 4434250..1f84de9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -4,7 +4,7 @@ "packageManager": "bun@1.0.27", "type": "module", "scripts": { - "build": "bun build ./src/index.ts --target=bun --outdir=dist --format=esm --external './src/services/exports/external' && cp -r src/services/exports/external dist/external/", + "build": "bun build ./src/index.ts --target=bun --outdir=dist --format=esm --external './src/external' && cp -r src/external dist/external/", "start": "bun run dist/index.js", "dev": "bun run --watch src/index.ts", "test": "jest", diff --git a/backend/src/config/admins.ts b/backend/src/config/admins.ts deleted file mode 100644 index 47ede5d..0000000 --- a/backend/src/config/admins.ts +++ /dev/null @@ -1,6 +0,0 @@ -export const ADMIN_ACCOUNTS: string[] = [ - // Add admin Twitter handles here (without @) - // Example: "TwitterDev" - "elliot_braem", - "plugrel", -]; diff --git a/backend/src/config/config.ts b/backend/src/config/config.ts index da4bc25..6cfc4c6 100644 --- a/backend/src/config/config.ts +++ b/backend/src/config/config.ts @@ -1,49 +1,5 @@ -import { AppConfig, ExportConfig } from "../types"; -import path from "path"; - -// Configure export services -const exports: ExportConfig[] = []; - -// Add Telegram export if configured -if (process.env.TELEGRAM_ENABLED === "true") { - exports.push({ - type: "telegram", - enabled: true, - module: "telegram", - botToken: process.env.TELEGRAM_BOT_TOKEN!, - channelId: process.env.TELEGRAM_CHANNEL_ID!, - }); -} - -// Add RSS export if configured -if (process.env.RSS_ENABLED === "true") { - exports.push({ - type: "rss", - enabled: true, - module: "rss", - title: process.env.RSS_TITLE || "Public Goods News", - description: - process.env.RSS_DESCRIPTION || "Latest approved public goods submissions", - feedPath: - process.env.RSS_FEED_PATH || - path.join(process.cwd(), "public", "feed.xml"), - maxItems: process.env.RSS_MAX_ITEMS - ? parseInt(process.env.RSS_MAX_ITEMS) - : 100, - }); -} - -const config: AppConfig = { - twitter: { - username: process.env.TWITTER_USERNAME!, - password: process.env.TWITTER_PASSWORD!, - email: process.env.TWITTER_EMAIL!, - }, - environment: - (process.env.NODE_ENV as "development" | "production" | "test") || - "development", - exports, -}; +import { ConfigService } from '../services/config'; +import { AppConfig } from '../types/config'; export function validateEnv() { // Validate required Twitter credentials @@ -56,20 +12,12 @@ export function validateEnv() { "Missing required Twitter credentials. Please ensure TWITTER_USERNAME, TWITTER_PASSWORD, and TWITTER_EMAIL are set in your environment variables.", ); } +} - // Validate Telegram config if enabled - if (process.env.TELEGRAM_ENABLED === "true") { - if (!process.env.TELEGRAM_BOT_TOKEN || !process.env.TELEGRAM_CHANNEL_ID) { - throw new Error( - "Telegram export is enabled but missing required configuration. Please ensure TELEGRAM_BOT_TOKEN and TELEGRAM_CHANNEL_ID are set in your environment variables.", - ); - } - } +const configService = ConfigService.getInstance(); - // Validate RSS config if enabled - if (process.env.RSS_ENABLED === "true") { - // RSS has reasonable defaults, so no validation needed - } +export function getConfig(): AppConfig { + return configService.getConfig(); } -export default config; +export default configService; diff --git a/backend/src/external/gpt-transform.ts b/backend/src/external/gpt-transform.ts new file mode 100644 index 0000000..ed25e60 --- /dev/null +++ b/backend/src/external/gpt-transform.ts @@ -0,0 +1,76 @@ +import { TransformerPlugin } from '../types/plugin'; + +interface Message { + role: 'system' | 'user' | 'assistant'; + content: string; +} + +interface OpenRouterResponse { + choices: { + message: { + content: string; + }; + }[]; +} + +export default class GPTTransformer implements TransformerPlugin { + name = 'gpt-transform'; + private prompt: string = ''; + private apiKey: string = ''; + + async initialize(config: Record): Promise { + if (!config.prompt) { + throw new Error('GPT transformer requires a prompt configuration'); + } + if (!config.apiKey) { + throw new Error('GPT transformer requires an OpenRouter API key'); + } + this.prompt = config.prompt; + this.apiKey = config.apiKey; + } + + async transform(content: string): Promise { + try { + const messages: Message[] = [ + { role: 'system', content: this.prompt }, + { role: 'user', content } + ]; + + const response = await fetch('https://openrouter.ai/api/v1/chat/completions', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${this.apiKey}`, + 'HTTP-Referer': 'https://curate.fun', + 'X-Title': 'CurateDotFun' + }, + body: JSON.stringify({ + model: 'openai/gpt-3.5-turbo', // Default to GPT-3.5-turbo for cost efficiency + messages, + temperature: 0.7, + max_tokens: 1000 + }) + }); + + if (!response.ok) { + const error = await response.text(); + throw new Error(`OpenRouter API error: ${error}`); + } + + const result = await response.json() as OpenRouterResponse; + + if (!result.choices?.[0]?.message?.content) { + throw new Error('Invalid response from OpenRouter API'); + } + + return result.choices[0].message.content; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'; + throw new Error(`GPT transformation failed: ${errorMessage}`); + } + } + + async shutdown(): Promise { + // Cleanup any resources if needed + } +} diff --git a/backend/src/services/exports/external/rss.ts b/backend/src/external/rss.ts similarity index 51% rename from backend/src/services/exports/external/rss.ts rename to backend/src/external/rss.ts index 6dd0b4c..1fe8772 100644 --- a/backend/src/services/exports/external/rss.ts +++ b/backend/src/external/rss.ts @@ -1,12 +1,13 @@ -import { ExportService, RssConfig } from "../types"; -import { TwitterSubmission } from "../../../types"; import { writeFile, readFile, mkdir } from "fs/promises"; import { existsSync } from "fs"; import path from "path"; +import { DistributorPlugin } from "types/plugin"; -export class RssExportService implements ExportService { +export class RssPlugin implements DistributorPlugin { name = "rss"; - private config: RssConfig; + private title: string | null = null; + private path: string | null = null; + private maxItems: number = 100; private items: Array<{ title: string; description: string; @@ -15,18 +16,21 @@ export class RssExportService implements ExportService { guid: string; }> = []; - constructor(config: RssConfig) { - if (!config.enabled) { - throw new Error("RSS export service is not enabled"); + async initialize(config: Record): Promise { + if (!config.title || !config.path) { + throw new Error("RSS plugin requires title and path"); + } + + this.title = config.title; + this.path = config.path; + if (config.maxItems) { + this.maxItems = parseInt(config.maxItems); } - this.config = config; - } - async initialize(): Promise { try { // Load existing RSS items if file exists - if (existsSync(this.config.feedPath)) { - const content = await readFile(this.config.feedPath, "utf-8"); + if (existsSync(this.path)) { + const content = await readFile(this.path, "utf-8"); const match = content.match(/[\s\S]*?<\/item>/g); if (match) { this.items = match.map((item) => { @@ -40,49 +44,39 @@ export class RssExportService implements ExportService { }); } } - console.info("RSS export service initialized"); + console.info("RSS plugin initialized"); } catch (error) { - console.error("Failed to initialize RSS export service:", error); + console.error("Failed to initialize RSS plugin:", error); throw error; } } - async handleApprovedSubmission(submission: TwitterSubmission): Promise { - try { - const item = { - title: this.formatTitle(submission), - description: submission.content, - link: `https://twitter.com/user/status/${submission.tweetId}`, - pubDate: new Date(submission.createdAt).toUTCString(), - guid: submission.tweetId, - }; + async distribute(content: string): Promise { + if (!this.title || !this.path) { + throw new Error("RSS plugin not initialized"); + } - this.items.unshift(item); - if (this.config.maxItems) { - this.items = this.items.slice(0, this.config.maxItems); - } + const item = { + title: "New Update", + description: content, + link: "https://twitter.com/", // TODO: Update with actual link + pubDate: new Date().toUTCString(), + guid: Date.now().toString(), + }; - await this.updateFeed(); - console.info(`Exported submission ${submission.tweetId} to RSS`); - } catch (error) { - console.error("Failed to export submission to RSS:", error); - throw error; - } - } + this.items.unshift(item); + this.items = this.items.slice(0, this.maxItems); - private formatTitle(submission: TwitterSubmission): string { - const categories = submission.categories?.length - ? ` [${submission.categories.join(", ")}]` - : ""; - return `New Public Good by @${submission.username}${categories}`; + await this.updateFeed(); } private async updateFeed(): Promise { + if (!this.title || !this.path) return; + const feed = ` - ${this.config.title} - ${this.config.description} + ${this.title} https://twitter.com/ ${new Date().toUTCString()} ${this.items @@ -101,9 +95,9 @@ export class RssExportService implements ExportService { `; // Ensure directory exists - const dir = path.dirname(this.config.feedPath); + const dir = path.dirname(this.path); await mkdir(dir, { recursive: true }); - await writeFile(this.config.feedPath, feed, "utf-8"); + await writeFile(this.path, feed, "utf-8"); } private escapeXml(unsafe: string): string { diff --git a/backend/src/external/telegram.ts b/backend/src/external/telegram.ts new file mode 100644 index 0000000..eb9ebab --- /dev/null +++ b/backend/src/external/telegram.ts @@ -0,0 +1,67 @@ +import { DistributorPlugin } from "../types/plugin"; + +export class TelegramPlugin implements DistributorPlugin { + name = "telegram"; + private botToken: string | null = null; + private channelId: string | null = null; + + async initialize(config: Record): Promise { + // Validate required config + if (!config.botToken || !config.channelId) { + throw new Error("Telegram plugin requires botToken and channelId"); + } + + this.botToken = config.botToken; + this.channelId = config.channelId; + + try { + // Validate credentials + const response = await fetch( + `https://api.telegram.org/bot${this.botToken}/getChat?chat_id=${this.channelId}`, + ); + if (!response.ok) { + throw new Error("Failed to validate Telegram credentials"); + } + console.info("Telegram plugin initialized"); + } catch (error) { + console.error("Failed to initialize Telegram plugin:", error); + throw error; + } + } + + async distribute(content: string): Promise { + if (!this.botToken || !this.channelId) { + throw new Error("Telegram plugin not initialized"); + } + + const message = this.formatMessage(content); + await this.sendMessage(message); + } + + private formatMessage(content: string): string { + // TODO + return content; + } + + private async sendMessage(text: string): Promise { + const response = await fetch( + `https://api.telegram.org/bot${this.botToken}/sendMessage`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + chat_id: this.channelId, + text, + parse_mode: "HTML", + }), + }, + ); + + if (!response.ok) { + const error = await response.json(); + throw new Error(`Telegram API error: ${JSON.stringify(error)}`); + } + } +} diff --git a/backend/src/index.ts b/backend/src/index.ts index 600a56b..443bc1f 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,10 +1,11 @@ import { ServerWebSocket } from "bun"; import dotenv from "dotenv"; import path from "path"; -import config, { validateEnv } from "./config/config"; +import { DistributionService } from "services/distribution/distribution.service"; +import configService, { validateEnv } from "./config/config"; import { db } from "./services/db"; +import { SubmissionService } from "./services/submissions/submission.service"; import { TwitterService } from "./services/twitter/client"; -import { ExportManager } from "./services/exports/manager"; import { cleanup, failSpinner, @@ -33,11 +34,12 @@ export function broadcastUpdate(data: unknown) { export async function main() { try { - // Load environment variables - startSpinner("env", "Loading environment variables..."); + // Load environment variables and config + startSpinner("env", "Loading environment variables and config..."); dotenv.config(); validateEnv(); - succeedSpinner("env", "Environment variables loaded"); + await configService.loadConfig(); + succeedSpinner("env", "Environment variables and config loaded"); // Initialize services startSpinner("server", "Starting server..."); @@ -154,23 +156,42 @@ export async function main() { succeedSpinner("server", `Server running on port ${PORT}`); - // Initialize export service - startSpinner("export-init", "Initializing export service..."); - const exportManager = new ExportManager(); - await exportManager.initialize(config.exports); - succeedSpinner("export-init", "Export service initialized"); - - // Initialize Twitter service after server is running + // Initialize Twitter service startSpinner("twitter-init", "Initializing Twitter service..."); - const twitterService = new TwitterService(config.twitter, exportManager); + const twitterService = new TwitterService({ + username: process.env.TWITTER_USERNAME!, + password: process.env.TWITTER_PASSWORD!, + email: process.env.TWITTER_EMAIL! + }); await twitterService.initialize(); succeedSpinner("twitter-init", "Twitter service initialized"); + // Initialize distribution service + startSpinner("distribution-init", "Initializing distribution service..."); + const distributionService = new DistributionService(); + const config = configService.getConfig(); + await distributionService.initialize(config.plugins); + succeedSpinner("distribution-init", "distribution service initialized"); + + // Initialize submission service + startSpinner("submission-init", "Initializing submission service..."); + const submissionService = new SubmissionService( + twitterService, + distributionService, + config + ); + await submissionService.initialize(); + succeedSpinner("submission-init", "Submission service initialized"); + // Handle graceful shutdown process.on("SIGINT", async () => { startSpinner("shutdown", "Shutting down gracefully..."); try { - await Promise.all([twitterService.stop(), exportManager.shutdown()]); + await Promise.all([ + twitterService.stop(), + submissionService.stop(), + distributionService.shutdown() + ]); succeedSpinner("shutdown", "Shutdown complete"); process.exit(0); } catch (error) { @@ -183,19 +204,19 @@ export async function main() { logger.info("🚀 Bot is running and ready for events", { twitterEnabled: true, websocketEnabled: true, - exportsEnabled: config.exports.length > 0, + distributionsEnabled: Object.keys(config.plugins).length > 0, }); // Start checking for mentions - startSpinner("twitter-mentions", "Starting mentions check..."); - await twitterService.startMentionsCheck(); - succeedSpinner("twitter-mentions", "Mentions check started"); + startSpinner("submission-monitor", "Starting submission monitoring..."); + await submissionService.startMentionsCheck(); + succeedSpinner("submission-monitor", "Submission monitoring started"); } catch (error) { // Handle any initialization errors [ "env", "twitter-init", - "export-init", + "distribution-init", "twitter-mentions", "server", ].forEach((key) => { diff --git a/backend/src/services/config/config.service.ts b/backend/src/services/config/config.service.ts new file mode 100644 index 0000000..adc4f90 --- /dev/null +++ b/backend/src/services/config/config.service.ts @@ -0,0 +1,59 @@ +import fs from 'fs/promises'; +import path from 'path'; +import { AppConfig } from '../../types/config'; +import { hydrateConfigValues } from '../../utils/config'; + +export class ConfigService { + private static instance: ConfigService; + private config: AppConfig | null = null; + private configPath: string; + + private constructor() { + // Default to local config file path + this.configPath = path.resolve(process.cwd(), '../../curate.config.json'); + } + + public static getInstance(): ConfigService { + if (!ConfigService.instance) { + ConfigService.instance = new ConfigService(); + } + return ConfigService.instance; + } + + public async loadConfig(): Promise { + try { + // This could be replaced with an API call in the future + const configFile = await fs.readFile(this.configPath, 'utf-8'); + const parsedConfig = JSON.parse(configFile) as AppConfig; + const hydratedConfig = hydrateConfigValues(parsedConfig); + this.config = hydratedConfig; + return hydratedConfig; + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`Failed to load config: ${message}`); + } + } + + public getConfig(): AppConfig { + if (!this.config) { + throw new Error('Config not loaded. Call loadConfig() first.'); + } + return this.config; + } + + public setConfigPath(path: string): void { + this.configPath = path; + } + + // Switch to a different config (if saving locally, wouldn't work in fly.io container) + public async updateConfig(newConfig: AppConfig): Promise { + // saving this for later + try { + await fs.writeFile(this.configPath, JSON.stringify(newConfig, null, 2)); + this.config = newConfig; + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`Failed to update config: ${message}`); + } + } +} diff --git a/backend/src/services/config/index.ts b/backend/src/services/config/index.ts new file mode 100644 index 0000000..456e19e --- /dev/null +++ b/backend/src/services/config/index.ts @@ -0,0 +1 @@ +export { ConfigService } from './config.service'; diff --git a/backend/src/services/db/index.ts b/backend/src/services/db/index.ts index 26ea24c..0840642 100644 --- a/backend/src/services/db/index.ts +++ b/backend/src/services/db/index.ts @@ -1,8 +1,8 @@ import { Database } from "bun:sqlite"; import { BunSQLiteDatabase, drizzle } from "drizzle-orm/bun-sqlite"; import { join } from "node:path"; +import { Moderation, TwitterSubmission } from "types/twitter"; import { broadcastUpdate } from "../../index"; -import { Moderation, TwitterSubmission } from "../../types"; import * as queries from "./queries"; export class DatabaseService { diff --git a/backend/src/services/db/queries.ts b/backend/src/services/db/queries.ts index 548cdec..9aff87c 100644 --- a/backend/src/services/db/queries.ts +++ b/backend/src/services/db/queries.ts @@ -1,7 +1,7 @@ import { and, eq, sql } from "drizzle-orm"; import { BunSQLiteDatabase } from "drizzle-orm/bun-sqlite"; import { moderationHistory, submissionCounts, submissions } from "./schema"; -import { Moderation, TwitterSubmission } from "../../types"; +import { Moderation, TwitterSubmission } from "types/twitter"; export function saveSubmission( db: BunSQLiteDatabase, diff --git a/backend/src/services/distribution/distribution.service.ts b/backend/src/services/distribution/distribution.service.ts new file mode 100644 index 0000000..f2c59b2 --- /dev/null +++ b/backend/src/services/distribution/distribution.service.ts @@ -0,0 +1,141 @@ +import { AppConfig, PluginConfig, PluginsConfig } from "../../types/config"; +import { Plugin, PluginModule } from "../../types/plugin"; +import { logger } from "../../utils/logger"; + +export class DistributionService { + private plugins: Map = new Map(); + + async initialize(config: PluginsConfig): Promise { + // Load all plugins + for (const [name, pluginConfig] of Object.entries(config)) { + try { + await this.loadPlugin(name, pluginConfig); + } catch (error) { + logger.error(`Failed to load plugin ${name}:`, error); + } + } + } + + private async loadPlugin(name: string, config: PluginConfig): Promise { + try { + // Dynamic import of plugin from URL + const module = await import(config.url) as PluginModule; + const plugin = new module.default(); + + // Store the plugin instance + this.plugins.set(name, plugin); + + logger.info(`Successfully loaded plugin: ${name}`); + } catch (error) { + logger.error(`Error loading plugin ${name}:`, error); + throw error; + } + } + + async transformContent(pluginName: string, content: string, config: { prompt: string }): Promise { + const plugin = this.plugins.get(pluginName); + if (!plugin || !('transform' in plugin)) { + throw new Error(`Transformer plugin ${pluginName} not found or invalid`); + } + + try { + await plugin.initialize(config); + return await plugin.transform(content); + } catch (error) { + logger.error(`Error transforming content with plugin ${pluginName}:`, error); + throw error; + } + } + + async distributeContent(pluginName: string, content: string, config: Record): Promise { + const plugin = this.plugins.get(pluginName); + if (!plugin || !('distribute' in plugin)) { + throw new Error(`Distributor plugin ${pluginName} not found or invalid`); + } + + try { + await plugin.initialize(config); + await plugin.distribute(content); + } catch (error) { + logger.error(`Error distributing content with plugin ${pluginName}:`, error); + throw error; + } + } + + async processStreamOutput(feedId: string, content: string): Promise { + const config = await this.getConfig(); + const feed = config.feeds.find(f => f.id === feedId); + if (!feed?.outputs.stream?.enabled) { + return; + } + + const { transform, distribute } = feed.outputs.stream; + + // Transform content if configured + let processedContent = content; + if (transform) { + processedContent = await this.transformContent( + transform.plugin, + content, + transform.config + ); + } + + // Distribute to all configured outputs + for (const dist of distribute) { + await this.distributeContent( + dist.plugin, + processedContent, + dist.config + ); + } + } + + async processRecapOutput(feedId: string, content: string): Promise { + const config = await this.getConfig(); + const feed = config.feeds.find(f => f.id === feedId); + if (!feed?.outputs.recap?.enabled) { + return; + } + + const { transform, distribute } = feed.outputs.recap; + + // Transform content if configured + let processedContent = content; + if (transform) { + processedContent = await this.transformContent( + transform.plugin, + content, + transform.config + ); + } + + // Distribute to all configured outputs + for (const dist of distribute) { + await this.distributeContent( + dist.plugin, + processedContent, + dist.config + ); + } + } + + private async getConfig(): Promise { + const { ConfigService } = await import('../config'); + return ConfigService.getInstance().getConfig(); + } + + async shutdown(): Promise { + // Shutdown all plugins + for (const [name, plugin] of this.plugins.entries()) { + try { + if (plugin.shutdown) { + await plugin.shutdown(); + } + } catch (error) { + logger.error(`Error shutting down plugin ${name}:`, error); + } + } + this.plugins.clear(); + } +} diff --git a/backend/src/services/exports/external/telegram.ts b/backend/src/services/exports/external/telegram.ts deleted file mode 100644 index bc16481..0000000 --- a/backend/src/services/exports/external/telegram.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { ExportService, TelegramConfig } from "../types"; -import { TwitterSubmission } from "../../../types"; - -export class TelegramExportService implements ExportService { - name = "telegram"; - private botToken: string; - private channelId: string; - - constructor(config: TelegramConfig) { - if (!config.enabled) { - throw new Error("Telegram export service is not enabled"); - } - this.botToken = config.botToken; - this.channelId = config.channelId; - } - - async initialize(): Promise { - try { - // Validate bot token and channel ID by making a test API call - const response = await fetch( - `https://api.telegram.org/bot${this.botToken}/getChat?chat_id=${this.channelId}`, - ); - if (!response.ok) { - throw new Error("Failed to validate Telegram credentials"); - } - console.info("Telegram export service initialized"); - } catch (error) { - console.error("Failed to initialize Telegram export service:", error); - throw error; - } - } - - async handleApprovedSubmission(submission: TwitterSubmission): Promise { - try { - const message = this.formatSubmission(submission); - await this.sendMessage(message); - console.info(`Exported submission ${submission.tweetId} to Telegram`); - } catch (error) { - console.error("Failed to export submission to Telegram:", error); - throw error; - } - } - - private formatSubmission(submission: TwitterSubmission): string { - const categories = submission.categories?.length - ? `\nCategories: ${submission.categories.join(", ")}` - : ""; - - return `🆕 New Curation\n\n${submission.content}${categories}\n\nBy @${ - submission.username - }\nSource: https://twitter.com/user/status/${submission.tweetId}`; - } - - private async sendMessage(text: string): Promise { - const response = await fetch( - `https://api.telegram.org/bot${this.botToken}/sendMessage`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - chat_id: this.channelId, - text, - parse_mode: "HTML", - }), - }, - ); - - if (!response.ok) { - const error = await response.json(); - throw new Error(`Telegram API error: ${JSON.stringify(error)}`); - } - } -} diff --git a/backend/src/services/exports/manager.ts b/backend/src/services/exports/manager.ts deleted file mode 100644 index 1d504a7..0000000 --- a/backend/src/services/exports/manager.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { ExportService, ExportConfig } from "./types"; -import { TwitterSubmission } from "../../types"; -import { logger } from "../../utils/logger"; - -export class ExportManager { - private services: ExportService[] = []; - - async initialize(configs: ExportConfig[]): Promise { - for (const config of configs) { - if (!config.enabled) continue; - - try { - // Simple relative import - works in both dev and prod since directory structure is preserved - const module = await import(`./external/${config.module}`); - const ServiceClass = module.default || Object.values(module)[0]; - const service = new ServiceClass(config); - await service.initialize(); - this.services.push(service); - logger.info(`Initialized ${service.name} export service`); - } catch (error) { - logger.error( - `Failed to initialize ${config.type} export service:`, - error, - ); - } - } - } - - async handleApprovedSubmission(submission: TwitterSubmission): Promise { - const errors: Error[] = []; - - await Promise.all( - this.services.map(async (service) => { - try { - await service.handleApprovedSubmission(submission); - } catch (error) { - errors.push(error as Error); - logger.error(`Export error in ${service.name}:`, error); - } - }), - ); - - if (errors.length > 0) { - throw new Error( - `Export errors: ${errors.map((e) => e.message).join(", ")}`, - ); - } - } - - async shutdown(): Promise { - await Promise.all( - this.services.map(async (service) => { - try { - await service.shutdown?.(); - } catch (error) { - logger.error( - `Error shutting down ${service.name} export service:`, - error, - ); - } - }), - ); - } -} diff --git a/backend/src/services/exports/types.ts b/backend/src/services/exports/types.ts deleted file mode 100644 index 7721998..0000000 --- a/backend/src/services/exports/types.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { TwitterSubmission } from "../../types"; - -export interface BaseExportConfig { - enabled: boolean; - type: string; - module: string; // Module name (e.g., 'telegram', 'rss') -} - -export interface TelegramConfig extends BaseExportConfig { - type: "telegram"; - botToken: string; - channelId: string; -} - -export interface RssConfig extends BaseExportConfig { - type: "rss"; - title: string; - description: string; - feedPath: string; - maxItems?: number; -} - -export type ExportConfig = TelegramConfig | RssConfig; - -export interface ExportService { - name: string; - initialize(): Promise; - handleApprovedSubmission(submission: TwitterSubmission): Promise; - shutdown?(): Promise; -} diff --git a/backend/src/services/submissions/submission.service.ts b/backend/src/services/submissions/submission.service.ts new file mode 100644 index 0000000..3d82480 --- /dev/null +++ b/backend/src/services/submissions/submission.service.ts @@ -0,0 +1,269 @@ +import { DistributionService } from './../distribution/distribution.service'; +import { Tweet } from "agent-twitter-client"; +import { AppConfig } from "../../types/config"; +import { TwitterService } from "../twitter/client"; +import { db } from "../db"; +import { logger } from "../../utils/logger"; +import { Moderation, TwitterSubmission } from "../../types/twitter"; +import { broadcastUpdate } from "../../index"; + +export class SubmissionService { + private checkInterval: NodeJS.Timer | null = null; + private lastCheckedTweetId: string | null = null; + private adminIdCache: Map = new Map(); + + constructor( + private readonly twitterService: TwitterService, + private readonly DistributionService: DistributionService, + private readonly config: AppConfig + ) {} + + async initialize(): Promise { + // Initialize admin cache from config + for (const feed of this.config.feeds) { + for (const handle of feed.moderation.approvers.twitter) { + try { + const userId = await this.twitterService.getUserIdByScreenName(handle); + this.adminIdCache.set(userId, handle); + logger.info(`Cached admin ID for @${handle}: ${userId}`); + } catch (error) { + logger.error(`Failed to fetch ID for admin handle @${handle}:`, error); + } + } + } + + // Load last checked tweet ID + this.lastCheckedTweetId = await this.twitterService.getLastCheckedTweetId(); + broadcastUpdate({ type: "lastTweetId", data: this.lastCheckedTweetId }); + } + + async startMentionsCheck(): Promise { + logger.info("Starting submission monitoring..."); + + // Check mentions every minute + this.checkInterval = setInterval(async () => { + try { + logger.info("Checking mentions..."); + const newTweets = await this.twitterService.fetchAllNewMentions(this.lastCheckedTweetId); + + if (newTweets.length === 0) { + logger.info("No new mentions"); + } else { + logger.info(`Found ${newTweets.length} new mentions`); + + // Process new tweets + for (const tweet of newTweets) { + if (!tweet.id) continue; + + try { + if (this.isSubmission(tweet)) { + logger.info(`Received new submission: ${tweet.id}`); + await this.handleSubmission(tweet); + } else if (this.isModeration(tweet)) { + logger.info(`Received new moderation: ${tweet.id}`); + await this.handleModeration(tweet); + } + } catch (error) { + logger.error("Error processing tweet:", error); + } + } + + // Update the last checked tweet ID + const latestTweetId = newTweets[newTweets.length - 1].id; + if (latestTweetId) { + await this.setLastCheckedTweetId(latestTweetId); + } + } + } catch (error) { + logger.error("Error checking mentions:", error); + } + }, 60000); // Check every minute + } + + async stop(): Promise { + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + } + + private async handleSubmission(tweet: Tweet): Promise { + const userId = tweet.userId; + if (!userId || !tweet.id) return; + + const inReplyToId = tweet.inReplyToStatusId; + if (!inReplyToId) { + logger.error(`Submission tweet ${tweet.id} is not a reply to another tweet`); + return; + } + + try { + // Get daily submission count + const dailyCount = db.getDailySubmissionCount(userId); + const maxSubmissions = this.config.global.maxSubmissionsPerUser; + + if (dailyCount >= maxSubmissions) { + await this.twitterService.replyToTweet( + tweet.id, + "You've reached your daily submission limit. Please try again tomorrow." + ); + logger.info(`User ${userId} has reached limit, replied to submission.`); + return; + } + + // Fetch original tweet + const originalTweet = await this.twitterService.getTweet(inReplyToId); + if (!originalTweet) { + logger.error(`Could not fetch original tweet ${inReplyToId}`); + return; + } + + // Create submission + const submission: TwitterSubmission = { + tweetId: originalTweet.id!, + userId: originalTweet.userId!, + username: originalTweet.username!, + content: originalTweet.text || "", + categories: tweet.hashtags || [], + description: this.extractDescription(tweet), + status: this.config.global.defaultStatus as "pending" | "approved" | "rejected", + moderationHistory: [], + createdAt: originalTweet.timeParsed?.toISOString() || new Date().toISOString(), + submittedAt: new Date().toISOString(), + }; + + // Save submission + db.saveSubmission(submission); + db.incrementDailySubmissionCount(userId); + + // Send acknowledgment + const acknowledgmentTweetId = await this.twitterService.replyToTweet( + tweet.id, + "Successfully submitted to publicgoods.news!" + ); + + if (acknowledgmentTweetId) { + db.updateSubmissionAcknowledgment(originalTweet.id!, acknowledgmentTweetId); + logger.info(`Successfully submitted. Sent reply: ${acknowledgmentTweetId}`); + } + } catch (error) { + logger.error(`Error handling submission for tweet ${tweet.id}:`, error); + } + } + + private async handleModeration(tweet: Tweet): Promise { + const userId = tweet.userId; + if (!userId || !tweet.id) return; + + if (!this.isAdmin(userId)) { + logger.info(`User ${userId} is not admin.`); + return; + } + + const inReplyToId = tweet.inReplyToStatusId; + if (!inReplyToId) return; + + const submission = db.getSubmissionByAcknowledgmentTweetId(inReplyToId); + if (!submission || submission.status !== "pending") return; + + const action = this.getModerationAction(tweet); + if (!action) return; + + const adminUsername = this.adminIdCache.get(userId); + if (!adminUsername) { + logger.error(`Could not find username for admin ID ${userId}`); + return; + } + + // Create moderation record + const moderation: Moderation = { + adminId: adminUsername, + action, + timestamp: tweet.timeParsed || new Date(), + tweetId: submission.tweetId, + categories: tweet.hashtags, + note: this.extractNote(tweet), + }; + + db.saveModerationAction(moderation); + + // Process based on action + if (action === "approve") { + await this.processApproval(tweet, submission); + } else { + await this.processRejection(tweet, submission); + } + } + + private async processApproval(tweet: Tweet, submission: TwitterSubmission): Promise { + const responseTweetId = await this.twitterService.replyToTweet( + tweet.id!, + "Your submission has been approved and will be added to the public goods news feed!" + ); + + if (responseTweetId) { + db.updateSubmissionStatus(submission.tweetId, "approved", responseTweetId); + + // Process through export manager + try { + const feed = this.config.feeds.find(f => + f.moderation.approvers.twitter.includes(this.adminIdCache.get(tweet.userId!) || '') + ); + + if (feed) { + await this.DistributionService.processStreamOutput(feed.id, submission.content); + } + } catch (error) { + logger.error("Failed to process approved submission:", error); + } + } + } + + private async processRejection(tweet: Tweet, submission: TwitterSubmission): Promise { + const responseTweetId = await this.twitterService.replyToTweet( + tweet.id!, + "Your submission has been reviewed and was not accepted for the public goods news feed." + ); + + if (responseTweetId) { + db.updateSubmissionStatus(submission.tweetId, "rejected", responseTweetId); + } + } + + private isAdmin(userId: string): boolean { + return this.adminIdCache.has(userId); + } + + private getModerationAction(tweet: Tweet): "approve" | "reject" | null { + const hashtags = tweet.hashtags?.map(tag => tag.toLowerCase()) || []; + if (hashtags.includes("approve")) return "approve"; + if (hashtags.includes("reject")) return "reject"; + return null; + } + + private isModeration(tweet: Tweet): boolean { + return this.getModerationAction(tweet) !== null; + } + + private isSubmission(tweet: Tweet): boolean { + return tweet.text?.toLowerCase().includes("!submit") || false; + } + + private extractDescription(tweet: Tweet): string | undefined { + return tweet.text + ?.replace(/!submit\s+@\w+/i, "") + .replace(/#\w+/g, "") + .trim() || undefined; + } + + private extractNote(tweet: Tweet): string | undefined { + return tweet.text + ?.replace(/#\w+/g, "") + .trim() || undefined; + } + + private async setLastCheckedTweetId(tweetId: string) { + this.lastCheckedTweetId = tweetId; + await this.twitterService.setLastCheckedTweetId(tweetId); + } +} diff --git a/backend/src/services/transformers/transformation.service.ts b/backend/src/services/transformers/transformation.service.ts new file mode 100644 index 0000000..ef7ee52 --- /dev/null +++ b/backend/src/services/transformers/transformation.service.ts @@ -0,0 +1,141 @@ +import { PluginModule, Plugin } from "types/plugin"; +import { AppConfig, PluginConfig, PluginsConfig } from "../../types/config"; +import { logger } from "../../utils/logger"; + +export class DistributionService { + private plugins: Map = new Map(); + + async initialize(config: PluginsConfig): Promise { + // Load all plugins + for (const [name, pluginConfig] of Object.entries(config)) { + try { + await this.loadPlugin(name, pluginConfig); + } catch (error) { + logger.error(`Failed to load plugin ${name}:`, error); + } + } + } + + private async loadPlugin(name: string, config: PluginConfig): Promise { + try { + // Dynamic import of plugin from URL + const module = await import(config.url) as PluginModule; + const plugin = new module.default(); + + // Store the plugin instance + this.plugins.set(name, plugin); + + logger.info(`Successfully loaded plugin: ${name}`); + } catch (error) { + logger.error(`Error loading plugin ${name}:`, error); + throw error; + } + } + + async transformContent(pluginName: string, content: string, config: { prompt: string }): Promise { + const plugin = this.plugins.get(pluginName); + if (!plugin || !('transform' in plugin)) { + throw new Error(`Transformer plugin ${pluginName} not found or invalid`); + } + + try { + await plugin.initialize(config); + return await plugin.transform(content); + } catch (error) { + logger.error(`Error transforming content with plugin ${pluginName}:`, error); + throw error; + } + } + + async distributeContent(pluginName: string, content: string, config: Record): Promise { + const plugin = this.plugins.get(pluginName); + if (!plugin || !('distribute' in plugin)) { + throw new Error(`Distributor plugin ${pluginName} not found or invalid`); + } + + try { + await plugin.initialize(config); + await plugin.distribute(content); + } catch (error) { + logger.error(`Error distributing content with plugin ${pluginName}:`, error); + throw error; + } + } + + async processStreamOutput(feedId: string, content: string): Promise { + const config = await this.getConfig(); + const feed = config.feeds.find(f => f.id === feedId); + if (!feed?.outputs.stream?.enabled) { + return; + } + + const { transform, distribute } = feed.outputs.stream; + + // Transform content if configured + let processedContent = content; + if (transform) { + processedContent = await this.transformContent( + transform.plugin, + content, + transform.config + ); + } + + // Distribute to all configured outputs + for (const dist of distribute) { + await this.distributeContent( + dist.plugin, + processedContent, + dist.config + ); + } + } + + async processRecapOutput(feedId: string, content: string): Promise { + const config = await this.getConfig(); + const feed = config.feeds.find(f => f.id === feedId); + if (!feed?.outputs.recap?.enabled) { + return; + } + + const { transform, distribute } = feed.outputs.recap; + + // Transform content if configured + let processedContent = content; + if (transform) { + processedContent = await this.transformContent( + transform.plugin, + content, + transform.config + ); + } + + // Distribute to all configured outputs + for (const dist of distribute) { + await this.distributeContent( + dist.plugin, + processedContent, + dist.config + ); + } + } + + private async getConfig(): Promise { + const { ConfigService } = await import('../config'); + return ConfigService.getInstance().getConfig(); + } + + async shutdown(): Promise { + // Shutdown all plugins + for (const [name, plugin] of this.plugins.entries()) { + try { + if (plugin.shutdown) { + await plugin.shutdown(); + } + } catch (error) { + logger.error(`Error shutting down plugin ${name}:`, error); + } + } + this.plugins.clear(); + } +} diff --git a/backend/src/services/twitter/client.ts b/backend/src/services/twitter/client.ts index d0bb054..2016487 100644 --- a/backend/src/services/twitter/client.ts +++ b/backend/src/services/twitter/client.ts @@ -1,12 +1,5 @@ import { Scraper, SearchMode, Tweet } from "agent-twitter-client"; -import { ADMIN_ACCOUNTS } from "config/admins"; -import { broadcastUpdate } from "index"; -import { - Moderation, - TwitterConfig, - TwitterSubmission, -} from "../../types/twitter"; -import { ExportManager } from "../exports/manager"; +import { logger } from "../../utils/logger"; import { TwitterCookie, cacheCookies, @@ -15,27 +8,21 @@ import { getLastCheckedTweetId, saveLastCheckedTweetId, } from "../../utils/cache"; -import { logger } from "../../utils/logger"; -import { db } from "../db"; export class TwitterService { private client: Scraper; - private readonly DAILY_SUBMISSION_LIMIT = 10; - private twitterUsername: string; - private config: TwitterConfig; - private isInitialized = false; - private checkInterval: NodeJS.Timer | null = null; private lastCheckedTweetId: string | null = null; - private configuredTweetId: string | null = null; - private adminIdCache: Map = new Map(); + private twitterUsername: string; constructor( - config: TwitterConfig, - private readonly exportManager?: ExportManager, + private readonly config: { + username: string; + password: string; + email: string; + } ) { this.client = new Scraper(); this.twitterUsername = config.username; - this.config = config; } private async setCookiesFromArray(cookiesArray: TwitterCookie[]) { @@ -50,22 +37,6 @@ export class TwitterService { await this.client.setCookies(cookieStrings); } - private async initializeAdminIds() { - for (const handle of ADMIN_ACCOUNTS) { - try { - const userId = await this.client.getUserIdByScreenName(handle); - this.adminIdCache.set(userId, handle); - logger.info(`Cached admin ID for @${handle}: ${userId}`); - } catch (error) { - logger.error(`Failed to fetch ID for admin handle @${handle}:`, error); - } - } - } - - private isAdmin(userId: string): boolean { - return this.adminIdCache.has(userId); - } - async initialize() { try { // Ensure cache directory exists @@ -77,13 +48,8 @@ export class TwitterService { await this.setCookiesFromArray(cachedCookies); } - // Load last checked tweet ID from cache if no configured ID exists - if (!this.configuredTweetId) { - this.lastCheckedTweetId = await getLastCheckedTweetId(); - broadcastUpdate({ type: "lastTweetId", data: this.lastCheckedTweetId }); - } else { - this.lastCheckedTweetId = this.configuredTweetId; - } + // Load last checked tweet ID from cache + this.lastCheckedTweetId = await getLastCheckedTweetId(); // Try to login with retries logger.info("Attempting Twitter login..."); @@ -109,10 +75,6 @@ export class TwitterService { await new Promise((resolve) => setTimeout(resolve, 2000)); } - // Initialize admin IDs after successful login (convert from handle to account id) - await this.initializeAdminIds(); - - this.isInitialized = true; logger.info("Successfully logged in to Twitter"); } catch (error) { logger.error("Failed to initialize Twitter client:", error); @@ -120,7 +82,29 @@ export class TwitterService { } } - private async fetchAllNewMentions(): Promise { + async getUserIdByScreenName(screenName: string): Promise { + return await this.client.getUserIdByScreenName(screenName); + } + + async getTweet(tweetId: string): Promise { + return await this.client.getTweet(tweetId); + } + + async replyToTweet(tweetId: string, message: string): Promise { + try { + const response = await this.client.sendTweet(message, tweetId); + const responseData = (await response.json()) as any; + // Extract tweet ID from response + const replyTweetId = + responseData?.data?.create_tweet?.tweet_results?.result?.rest_id; + return replyTweetId || null; + } catch (error) { + logger.error("Error replying to tweet:", error); + return null; + } + } + + async fetchAllNewMentions(lastCheckedId: string | null): Promise { const BATCH_SIZE = 20; let allNewTweets: Tweet[] = []; let foundOldTweet = false; @@ -140,14 +124,12 @@ export class TwitterService { ) ).tweets; - if (batch.length === 0) break; // No more tweets to fetch + if (batch.length === 0) break; - // Check if any tweet in this batch is older than or equal to our last checked ID for (const tweet of batch) { if (!tweet.id) continue; - const referenceId = this.configuredTweetId || this.lastCheckedTweetId; - if (!referenceId || BigInt(tweet.id) > BigInt(referenceId)) { + if (!lastCheckedId || BigInt(tweet.id) > BigInt(lastCheckedId)) { allNewTweets.push(tweet); } else { foundOldTweet = true; @@ -155,7 +137,7 @@ export class TwitterService { } } - if (batch.length < BATCH_SIZE) break; // Last batch was partial, no more to fetch + if (batch.length < BATCH_SIZE) break; attempts++; } catch (error) { logger.error("Error fetching mentions batch:", error); @@ -163,7 +145,6 @@ export class TwitterService { } } - // Sort all fetched tweets by ID (chronologically) return allNewTweets.sort((a, b) => { const aId = BigInt(a.id || "0"); const bId = BigInt(b.id || "0"); @@ -171,328 +152,17 @@ export class TwitterService { }); } - async startMentionsCheck() { - logger.info("Listening for mentions..."); - - // Check mentions every minute - this.checkInterval = setInterval(async () => { - if (!this.isInitialized) return; - - try { - logger.info("Checking mentions..."); - - const newTweets = await this.fetchAllNewMentions(); - - if (newTweets.length === 0) { - logger.info("No new mentions"); - } else { - logger.info(`Found ${newTweets.length} new mentions`); - - // Process new tweets - for (const tweet of newTweets) { - if (!tweet.id) continue; - - try { - if (this.isSubmission(tweet)) { - logger.info( - `Received new submission: ${this.getTweetLink(tweet.id, tweet.username)}`, - ); - await this.handleSubmission(tweet); - } else if (this.isModeration(tweet)) { - logger.info( - `Received new moderation: ${this.getTweetLink(tweet.id, tweet.username)}`, - ); - await this.handleModeration(tweet); - } - } catch (error) { - logger.error("Error processing tweet:", error); - } - } - - // Update the last checked tweet ID to the most recent one - const latestTweetId = newTweets[newTweets.length - 1].id; - if (latestTweetId) { - await this.setLastCheckedTweetId(latestTweetId); - } - } - } catch (error) { - logger.error("Error checking mentions:", error); - } - }, 60000); // Check every minute - } - - async stop() { - if (this.checkInterval) { - clearInterval(this.checkInterval); - this.checkInterval = null; - } - await this.client.logout(); - this.isInitialized = false; - } - - private async handleSubmission(tweet: Tweet): Promise { - const userId = tweet.userId; - if (!userId || !tweet.id) return; - - // Get the tweet being replied to - const inReplyToId = tweet.inReplyToStatusId; - if (!inReplyToId) { - logger.error( - `Submission tweet ${tweet.id} is not a reply to another tweet`, - ); - return; - } - - try { - // Fetch the original tweet that's being submitted - const originalTweet = await this.client.getTweet(inReplyToId); - if (!originalTweet) { - logger.error(`Could not fetch original tweet ${inReplyToId}`); - return; - } - - // Get submission count from database - const dailyCount = db.getDailySubmissionCount(userId); - - if (dailyCount >= this.DAILY_SUBMISSION_LIMIT) { - await this.replyToTweet( - tweet.id, - "You've reached your daily submission limit. Please try again tomorrow.", - ); - logger.info(`User ${userId} has reached limit, replied to submission.`); - return; - } - - // Extract curator handle from submission tweet - const submissionMatch = tweet.text?.match(/!submit\s+@(\w+)/i); - if (!submissionMatch) { - logger.error(`Invalid submission format in tweet ${tweet.id}`); - return; - } - - // Extract categories from hashtags in submission tweet (excluding command hashtags) - const categories = (tweet.hashtags || []).filter( - (tag) => !["submit", "approve", "reject"].includes(tag.toLowerCase()), - ); - - // Extract description: everything after !submit @handle that's not a hashtag - const description = - tweet.text - ?.replace(/!submit\s+@\w+/i, "") // Remove command - .replace(/#\w+/g, "") // Remove hashtags - .trim() || undefined; - - // Create submission using the original tweet's content and submission metadata - const submission: TwitterSubmission = { - tweetId: originalTweet.id!, // The tweet being submitted - userId: originalTweet.userId!, - username: originalTweet.username!, - content: originalTweet.text || "", - categories: categories, - description: description || undefined, - status: "pending", - moderationHistory: [], - createdAt: - originalTweet.timeParsed?.toISOString() || new Date().toISOString(), - submittedAt: new Date().toISOString(), - }; - - // Save submission to database - db.saveSubmission(submission); - // Increment submission count in database - db.incrementDailySubmissionCount(userId); - - // Send acknowledgment and save its ID - const acknowledgmentTweetId = await this.replyToTweet( - tweet.id, // Reply to the submission tweet - "Successfully submitted to publicgoods.news!", - ); - - if (acknowledgmentTweetId) { - db.updateSubmissionAcknowledgment( - originalTweet.id!, - acknowledgmentTweetId, - ); - logger.info( - `Successfully submitted. Sent reply: ${this.getTweetLink(acknowledgmentTweetId)}`, - ); - } else { - logger.error( - `Failed to acknowledge submission: ${this.getTweetLink(tweet.id, tweet.username)}`, - ); - } - } catch (error) { - logger.error(`Error handling submission for tweet ${tweet.id}:`, error); - } - } - - private async handleModeration(tweet: Tweet): Promise { - const userId = tweet.userId; - if (!userId || !tweet.id) return; - - // Verify admin status using cached ID - if (!this.isAdmin(userId)) { - logger.info(`User ${userId} is not admin.`); - return; // Silently ignore non-admin moderation attempts - } - - // Get the tweet this is in response to (should be our acknowledgment tweet) - const inReplyToId = tweet.inReplyToStatusId; - if (!inReplyToId) return; - - // Get submission by acknowledgment tweet ID - const submission = db.getSubmissionByAcknowledgmentTweetId(inReplyToId); - if (!submission) return; - - // Check if submission has already been moderated by any admin - if (submission.status !== "pending") { - logger.info( - `Submission ${submission.tweetId} has already been moderated, ignoring new moderation attempt.`, - ); - return; - } - - const action = this.getModerationAction(tweet); - if (!action) return; - - // Add moderation to database - const adminUsername = this.adminIdCache.get(userId); - if (!adminUsername) { - logger.error(`Could not find username for admin ID ${userId}`); - return; - } - - // Extract categories from hashtags in moderation tweet (excluding command hashtags) - const categories = (tweet.hashtags || []).filter( - (tag) => !["submit", "approve", "reject"].includes(tag.toLowerCase()), - ); - - // Extract note: everything in the tweet that's not a hashtag - const note = - tweet.text - ?.replace(/#\w+/g, "") // Remove hashtags - .trim() || undefined; - - const moderation: Moderation = { - adminId: adminUsername, - action: action, - timestamp: tweet.timeParsed || new Date(), - tweetId: submission.tweetId, // Use the original submission's tweetId - categories: categories.length > 0 ? categories : undefined, - note: note, - }; - db.saveModerationAction(moderation); - - // Process the moderation action - if (action === "approve") { - logger.info( - `Received review from Admin ${this.adminIdCache.get(userId)}, processing approval.`, - ); - await this.processApproval(tweet, submission); - } else { - logger.info( - `Received review from Admin ${this.adminIdCache.get(userId)}, processing rejection.`, - ); - await this.processRejection(tweet, submission); - } - } - - private async processApproval( - tweet: Tweet, - submission: TwitterSubmission, - ): Promise { - // TODO: Add NEAR integration here for approved submissions - const responseTweetId = await this.replyToTweet( - tweet.id!, - "Your submission has been approved and will be added to the public goods news feed!", - ); - if (responseTweetId) { - db.updateSubmissionStatus( - submission.tweetId, - "approved", - responseTweetId, - ); - - // Handle exports for approved submission - if (this.exportManager) { - try { - await this.exportManager.handleApprovedSubmission(submission); - } catch (error) { - logger.error( - "Failed to handle exports for approved submission:", - error, - ); - } - } - } - } - - private async processRejection( - tweet: Tweet, - submission: TwitterSubmission, - ): Promise { - // TODO: Add NEAR integration here for rejected submissions - const responseTweetId = await this.replyToTweet( - tweet.id!, - "Your submission has been reviewed and was not accepted for the public goods news feed.", - ); - if (responseTweetId) { - db.updateSubmissionStatus( - submission.tweetId, - "rejected", - responseTweetId, - ); - } - } - - private getModerationAction(tweet: Tweet): "approve" | "reject" | null { - const hashtags = tweet.hashtags?.map((tag) => tag.toLowerCase()) || []; - if (hashtags.includes("approve")) return "approve"; - if (hashtags.includes("reject")) return "reject"; - return null; - } - - private isModeration(tweet: Tweet): boolean { - return this.getModerationAction(tweet) !== null; - } - - private isSubmission(tweet: Tweet): boolean { - return tweet.text?.toLowerCase().includes("!submit") || false; - } - - private async replyToTweet( - tweetId: string, - message: string, - ): Promise { - try { - const response = await this.client.sendTweet(message, tweetId); - const responseData = (await response.json()) as any; - // Extract tweet ID from response - const replyTweetId = - responseData?.data?.create_tweet?.tweet_results?.result?.rest_id; - return replyTweetId || null; - } catch (error) { - logger.error("Error replying to tweet:", error); - return null; - } - } - async setLastCheckedTweetId(tweetId: string) { - this.configuredTweetId = tweetId; this.lastCheckedTweetId = tweetId; await saveLastCheckedTweetId(tweetId); - logger.info(`Last checked tweet ID configured to: ${tweetId}`); - broadcastUpdate({ type: "lastTweetId", data: tweetId }); + logger.info(`Last checked tweet ID updated to: ${tweetId}`); } getLastCheckedTweetId(): string | null { return this.lastCheckedTweetId; } - private getTweetLink( - tweetId: string, - username: string = this.twitterUsername, - ): string { - return `https://x.com/${username}/status/${tweetId}`; + async stop() { + await this.client.logout(); } } diff --git a/backend/src/types/config.ts b/backend/src/types/config.ts new file mode 100644 index 0000000..86fd920 --- /dev/null +++ b/backend/src/types/config.ts @@ -0,0 +1,58 @@ +export interface GlobalConfig { + defaultStatus: string; + maxSubmissionsPerUser: number; +} + +export interface PluginConfig { + type: 'distributor' | 'transformer'; + url: string; +} + +export interface ModerationConfig { + approvers: { + twitter: string[]; + }; + templates: { + approve: string; + reject: string; + acknowledge: string; + }; +} + +export interface TransformConfig { + plugin: string; + config: { + prompt: string; + }; +} + +export interface DistributorConfig { + plugin: string; + config: Record; +} + +export interface OutputConfig { + enabled: boolean; + schedule?: string; + transform?: TransformConfig; + distribute: DistributorConfig[]; +} + +export type PluginsConfig = Record; + +export interface FeedConfig { + id: string; + name: string; + description: string; + moderation: ModerationConfig; + outputs: { + stream?: OutputConfig; + recap?: OutputConfig; + }; +} + +export interface AppConfig { + global: GlobalConfig; + plugins: PluginsConfig; + feeds: FeedConfig[]; +} diff --git a/backend/src/types/index.ts b/backend/src/types/index.ts deleted file mode 100644 index bf18ab9..0000000 --- a/backend/src/types/index.ts +++ /dev/null @@ -1,8 +0,0 @@ -export * from "./twitter"; -export * from "../services/exports/types"; - -export interface AppConfig { - twitter: import("./twitter").TwitterConfig; - environment: "development" | "production" | "test"; - exports: import("../services/exports/types").ExportConfig[]; -} diff --git a/backend/src/types/plugin.ts b/backend/src/types/plugin.ts new file mode 100644 index 0000000..9538134 --- /dev/null +++ b/backend/src/types/plugin.ts @@ -0,0 +1,19 @@ +export interface DistributorPlugin { + name: string; + initialize(config: Record): Promise; + distribute(content: string): Promise; + shutdown?(): Promise; +} + +export interface TransformerPlugin { + name: string; + initialize(config: Record): Promise; + transform(content: string): Promise; + shutdown?(): Promise; +} + +export type Plugin = DistributorPlugin | TransformerPlugin; + +export interface PluginModule { + default: new () => Plugin; +} diff --git a/backend/src/utils/config.ts b/backend/src/utils/config.ts new file mode 100644 index 0000000..d2a1f3d --- /dev/null +++ b/backend/src/utils/config.ts @@ -0,0 +1,37 @@ +/** + * Recursively processes a config object, replacing environment variable placeholders + * with their actual values. + * + * Format: "{ENV_VAR_NAME}" will be replaced with process.env.ENV_VAR_NAME + */ +export function hydrateConfigValues>(config: T): T { + const processValue = (value: any): any => { + if (typeof value === 'string') { + // Match strings like "{SOME_ENV_VAR}" + const match = value.match(/^\{([A-Z_][A-Z0-9_]*)\}$/); + if (match) { + const envVar = match[1]; + const envValue = process.env[envVar]; + if (!envValue) { + throw new Error(`Required environment variable ${envVar} is not set`); + } + return envValue; + } + return value; + } + + if (Array.isArray(value)) { + return value.map(item => processValue(item)); + } + + if (value && typeof value === 'object') { + return Object.fromEntries( + Object.entries(value).map(([k, v]) => [k, processValue(v)]) + ); + } + + return value; + }; + + return processValue(config); +} diff --git a/bun.lockb b/bun.lockb index 6289be9..59d1be6 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/curate.config.json b/curate.config.json new file mode 100644 index 0000000..3d8a3e8 --- /dev/null +++ b/curate.config.json @@ -0,0 +1,205 @@ +{ + "global": { + "defaultStatus": "pending", + "maxSubmissionsPerUser": 5 + }, + "plugins": { + "@curatedotfun/telegram": { + "type": "distributor", + "url": "./external/telegram" + }, + "@curatedotfun/rss": { + "type": "distributor", + "url": "./external/rss" + }, + "@curatedotfun/gpt-transform": { + "type": "transformer", + "url": "./external/gpt-transform" + } + }, + "feeds": [ + { + "id": "grants", + "name": "Crypto Grant Wire", + "description": "Blockchain grant updates", + "moderation": { + "approvers": { + "twitter": ["plugrel", "sejal_rekhan", "arlery", "karmahq_"] + }, + "templates": { + "approve": "Approved grant update: {content}", + "reject": "Rejected grant update: {reason}", + "acknowledge": "" + } + }, + "outputs": { + "stream": { + "enabled": true, + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "Format this grant announcement..." + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_BOT_TOKEN}", + "channelId": "{TELEGRAM_CHANNEL_ID}" + } + }, + { + "plugin": "@curatedotfun/rss", + "config": { + "title": "Crypto Grant Wire", + "path": "./public/grants.xml" + } + } + ] + }, + "recap": { + "enabled": true, + "schedule": "0 0 * * *", + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "./prompts/grants_recap.txt" + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_RECAP_BOT_TOKEN}", + "channelId": "{TELEGRAM_RECAP_CHANNEL_ID}" + } + } + ] + } + } + }, + { + "id": "ethereum", + "name": "This Week in Ethereum", + "description": "Ethereum ecosystem updates", + "moderation": { + "approvers": { + "twitter": ["owoki"] + }, + "templates": { + "approve": "Approved Ethereum update: {content}", + "reject": "Rejected Ethereum update: {reason}", + "acknowledge": "" + } + }, + "outputs": { + "stream": { + "enabled": true, + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "Format this Ethereum update..." + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_BOT_TOKEN}", + "channelId": "{TELEGRAM_CHANNEL_ID}" + } + }, + { + "plugin": "@curatedotfun/rss", + "config": { + "title": "This Week in Ethereum", + "path": "./public/ethereum.xml" + } + } + ] + }, + "recap": { + "enabled": true, + "schedule": "0 0 * * 0", + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "./prompts/ethereum_weekly.txt" + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_RECAP_BOT_TOKEN}", + "channelId": "{TELEGRAM_RECAP_CHANNEL_ID}" + } + } + ] + } + } + }, + { + "id": "near", + "name": "NEARWEEK", + "description": "NEAR Protocol updates", + "moderation": { + "approvers": { + "twitter": ["peter", "plugrel", "jarednotjerry1"] + }, + "templates": { + "approve": "Approved NEAR update: {content}", + "reject": "Rejected NEAR update: {reason}", + "acknowledge": "" + } + }, + "outputs": { + "stream": { + "enabled": true, + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "Format this NEAR update..." + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_BOT_TOKEN}", + "channelId": "{TELEGRAM_CHANNEL_ID}" + } + }, + { + "plugin": "@curatedotfun/rss", + "config": { + "title": "NEARWEEK", + "path": "./public/near.xml" + } + } + ] + }, + "recap": { + "enabled": true, + "schedule": "0 0 * * 0", + "transform": { + "plugin": "@curatedotfun/gpt-transform", + "config": { + "prompt": "./prompts/near_weekly.txt" + } + }, + "distribute": [ + { + "plugin": "@curatedotfun/telegram", + "config": { + "botToken": "{TELEGRAM_RECAP_BOT_TOKEN}", + "channelId": "{TELEGRAM_RECAP_CHANNEL_ID}" + } + } + ] + } + } + } + ] +}