Skip to content

Commit

Permalink
add rabbitMq, executeSellDecision
Browse files Browse the repository at this point in the history
Signed-off-by: MarcoMandar <malicemandar@gmail.com>
  • Loading branch information
MarcoMandar committed Nov 21, 2024
1 parent e6c3456 commit e240a18
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 96 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"node": ">=22"
},
"dependencies": {
"amqplib": "^0.10.4",
"ollama-ai-provider": "^0.16.1",
"optional": "^0.1.4",
"sharp": "^0.33.5"
Expand Down
300 changes: 206 additions & 94 deletions packages/plugin-solana/src/providers/simulationSellingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { TokenProvider } from "./token.ts";
import { settings } from "@ai16z/eliza";
import { IAgentRuntime, Memory, Provider, State } from "@ai16z/eliza";
import { WalletProvider } from "./wallet.ts";
import * as amqp from "amqplib";

interface sellDetails {
interface SellDetails {
sell_amount: number;
sell_recommender_id: string | null;
}
Expand All @@ -26,6 +27,12 @@ export class simulationSellingService {
private MAX_DECAY_DAYS = 30;
private backend: string;
private backendToken: string;
private amqpConnection: amqp.Connection;
private amqpChannel: amqp.Channel;
private sonarBe: string;
private sonarBeToken: string;

private runningProcesses: Set<string> = new Set();

constructor(
runtime: IAgentRuntime,
Expand All @@ -45,141 +52,245 @@ export class simulationSellingService {
);
this.backend = runtime.getSetting("BACKEND_URL");
this.backendToken = runtime.getSetting("BACKEND_TOKEN");
this.initializeRabbitMQ(runtime.getSetting("AMQP_URL"));
this.sonarBe = runtime.getSetting("SONAR_BE");
this.sonarBeToken = runtime.getSetting("SONAR_BE_TOKEN");
}

public async startService() {
// starting the service
console.log("Starting SellingService...");
await this.scanAndSell();
/**
* Initializes the RabbitMQ connection and starts consuming messages.
* @param amqpUrl The RabbitMQ server URL.
*/
private async initializeRabbitMQ(amqpUrl: string) {
try {
this.amqpConnection = await amqp.connect(amqpUrl);
this.amqpChannel = await this.amqpConnection.createChannel();
console.log("Connected to RabbitMQ");
// Start consuming messages
this.consumeMessages();
} catch (error) {
console.error("Failed to connect to RabbitMQ:", error);
}
}

private async scanAndSell() {
// scanning recommendations and selling
console.log("Scanning for token performances...");
const tokenPerformances =
await this.trustScoreDb.getAllTokenPerformancesWithBalance();

const sellDecisions = this.decideWhenToSell(tokenPerformances);

// Execute sells
await this.executeSells(sellDecisions);

// Perform stop loss checks
await this.performStopLoss(tokenPerformances);
/**
* Sets up the consumer for the specified RabbitMQ queue.
*/
private async consumeMessages() {
const queue = "process_eliza_simulation";
await this.amqpChannel.assertQueue(queue, { durable: true });
this.amqpChannel.consume(
queue,
(msg) => {
if (msg !== null) {
const content = msg.content.toString();
this.processMessage(content);
this.amqpChannel.ack(msg);
}
},
{ noAck: false }
);
console.log(`Listening for messages on queue: ${queue}`);
}

private decideWhenToSell(
tokenPerformances: TokenPerformance[]
): SellDecision[] {
// To Do: logic when to sell and how much
console.log("Deciding when to sell and how much...");
const decisions: SellDecision[] = [];

tokenPerformances.forEach(async (performance) => {
const tokenProvider = new TokenProvider(
performance.tokenAddress,
this.walletProvider
);
const sellAmount = await this.amountToSell(
performance.tokenAddress,
tokenProvider
/**
* Processes incoming messages from RabbitMQ.
* @param message The message content as a string.
*/
private async processMessage(message: string) {
try {
const { tokenAddress, amount, sell_recommender_id } =
JSON.parse(message);
console.log(
`Received message for token ${tokenAddress} to sell ${amount}`
);
const amountToSell = sellAmount.sellAmount;
decisions.push({ tokenPerformance: performance, amountToSell });
});

return decisions;
}

async amountToSell(tokenAddress: string, tokenProvider: TokenProvider) {
// To Do: Implement logic to decide how much to sell
//placeholder
const processedData: ProcessedTokenData =
await tokenProvider.getProcessedTokenData();
const prices = await this.walletProvider.fetchPrices(null);
const solPrice = prices.solana.usd;
const tokenBalance = this.trustScoreDb.getTokenBalance(tokenAddress);
const decision: SellDecision = {
tokenPerformance:
await this.trustScoreDb.getTokenPerformance(tokenAddress),
amountToSell: amount,
sell_recommender_id: sell_recommender_id,
};

const sellAmount = tokenBalance * 0.1;
const sellSol = sellAmount / parseFloat(solPrice);
const sellValueUsd = sellAmount * processedData.tradeData.price;
// Execute the sell
await this.executeSellDecision(decision);

return { sellAmount, sellSol, sellValueUsd };
// Remove from running processes after completion
this.runningProcesses.delete(tokenAddress);
} catch (error) {
console.error("Error processing message:", error);
}
}

private async executeSells(decisions: SellDecision[]) {
console.log("Executing sell orders...");
for (const decision of decisions) {
/**
* Executes a single sell decision.
* @param decision The sell decision containing token performance and amount to sell.
*/
private async executeSellDecision(decision: SellDecision) {
const { tokenPerformance, amountToSell, sell_recommender_id } =
decision;
const tokenAddress = tokenPerformance.tokenAddress;

try {
console.log(
`Selling ${decision.amountToSell} of token ${decision.tokenPerformance.tokenSymbol}`
`Executing sell for token ${tokenPerformance.tokenSymbol}: ${amountToSell}`
);
// update the sell details
const sellDetails = {
sell_amount: decision.amountToSell,
sell_recommender_id: null,

// Update the sell details
const sellDetails: SellDetails = {
sell_amount: amountToSell,
sell_recommender_id: sell_recommender_id, // Adjust if necessary
};
const sellTimeStamp = new Date().toISOString();
const tokenProvider = new TokenProvider(
decision.tokenPerformance.tokenAddress,
tokenAddress,
this.walletProvider
);

// Update sell details in the database
const sellDetailsData = await this.updateSellDetails(
decision.tokenPerformance.tokenAddress,
decision.tokenPerformance.recommenderId,
tokenAddress,
tokenPerformance.recommenderId,
sellTimeStamp,
sellDetails,
true,
true, // isSimulation
tokenProvider
);

console.log("Sell order executed successfully", sellDetailsData);

// check if balance is zero and remove token from running processes
const balance = this.trustScoreDb.getTokenBalance(tokenAddress);
if (balance === 0) {
this.runningProcesses.delete(tokenAddress);
}
// stop the process in the sonar backend
await this.stopProcessInTheSonarBackend(tokenAddress);
} catch (error) {
console.error(
`Error executing sell for token ${tokenAddress}:`,
error
);
}
}

private async performStopLoss(tokenPerformances: TokenPerformance[]) {
console.log("Performing stop loss checks...");
// To Do: Implement stop loss logic
// check if the token has dropped by more than 50% in the last 24 hours
for (const performance of tokenPerformances) {
public async startService() {
// starting the service
console.log("Starting SellingService...");
await this.startListeners();
}

private async startListeners() {
// scanning recommendations and selling
console.log("Scanning for token performances...");
const tokenPerformances =
await this.trustScoreDb.getAllTokenPerformancesWithBalance();

await this.processTokenPerformances(tokenPerformances);
}

private processTokenPerformances(tokenPerformances: TokenPerformance[]) {
// To Do: logic when to sell and how much
console.log("Deciding when to sell and how much...");
const runningProcesses = this.runningProcesses;
// remove running processes from tokenPerformances
tokenPerformances = tokenPerformances.filter(
(tp) => !runningProcesses.has(tp.tokenAddress)
);

// start the process in the sonar backend
tokenPerformances.forEach(async (tokenPerformance) => {
const tokenProvider = new TokenProvider(
performance.tokenAddress,
tokenPerformance.tokenAddress,
this.walletProvider
);
const processedData: ProcessedTokenData =
await tokenProvider.getProcessedTokenData();
if (processedData.tradeData.trade_24h_change_percent < -50) {
const sellAmount = performance.balance;
const sellSol = sellAmount / 100;
const sellValueUsd = sellAmount * processedData.tradeData.price;
const sellDetails = {
sell_amount: sellAmount,
sell_recommender_id: null,
};
const sellTimeStamp = new Date().toISOString();
const sellDetailsData = await this.updateSellDetails(
performance.tokenAddress,
performance.recommenderId,
sellTimeStamp,
sellDetails,
true,
tokenProvider
const shouldTrade = await tokenProvider.shouldTradeToken();
if (shouldTrade) {
const balance = tokenPerformance.balance;
const sell_recommender_id = tokenPerformance.recommenderId;
const tokenAddress = tokenPerformance.tokenAddress;
const process = await this.startProcessInTheSonarBackend(
tokenAddress,
balance,
sell_recommender_id
);
console.log(
"Stop loss triggered. Sell order executed successfully",
sellDetailsData
if (process) {
this.runningProcesses.add(tokenAddress);
}
}
});
}

private async startProcessInTheSonarBackend(
tokenAddress: string,
balance: number,
sell_recommender_id: string
) {
try {
const message = JSON.stringify({
tokenAddress,
balance,
sell_recommender_id,
});
const response = await fetch(
`${this.sonarBe}/api/simulation/sell`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.sonarBeToken}`,
},
body: message,
}
);

if (!response.ok) {
console.error(
`Failed to send message to process token ${tokenAddress}`
);
return;
}

const result = await response.json();
console.log("Received response:", result);
console.log(`Sent message to process token ${tokenAddress}`);

return result;
} catch (error) {
console.error(
`Error sending message to process token ${tokenAddress}:`,
error
);
return null;
}
}

private stopProcessInTheSonarBackend(tokenAddress: string) {
try {
return fetch(
`${this.sonarBe}/api/simulation/sell/${tokenAddress}`,
{
method: "GET",
headers: {
Authorization: `Bearer ${this.sonarBeToken}`,
},
}
);
} catch (error) {
console.error(
`Error stopping process for token ${tokenAddress}:`,
error
);
}
}

async updateSellDetails(
tokenAddress: string,
recommenderId: string,
sellTimeStamp: string,
sellDetails: sellDetails,
sellDetails: SellDetails,
isSimulation: boolean,
tokenProvider: TokenProvider
) {
// To Do: Change the logic after codex update
const recommender =
await this.trustScoreDb.getOrCreateRecommenderWithTelegramId(
recommenderId
Expand Down Expand Up @@ -278,7 +389,7 @@ export class simulationSellingService {
tokenAddress: string,
recommenderId: string,
username: string,
data: sellDetails,
data: SellDetails,
balanceLeft: number,
retries = 3,
delayMs = 2000
Expand Down Expand Up @@ -325,4 +436,5 @@ export class simulationSellingService {
interface SellDecision {
tokenPerformance: TokenPerformance;
amountToSell: number;
sell_recommender_id: string | null;
}
Loading

0 comments on commit e240a18

Please sign in to comment.