Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter news by existing tickers #70

Merged
merged 7 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 46 additions & 34 deletions app.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package main

import (
"context"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron/v2"
. "github.com/samgozman/fin-thread/archivist"
. "github.com/samgozman/fin-thread/composer"
. "github.com/samgozman/fin-thread/jobs"
. "github.com/samgozman/fin-thread/journalist"
. "github.com/samgozman/fin-thread/publisher"
"github.com/samgozman/fin-thread/archivist"
"github.com/samgozman/fin-thread/composer"
"github.com/samgozman/fin-thread/jobs"
"github.com/samgozman/fin-thread/journalist"
"github.com/samgozman/fin-thread/publisher"
"github.com/samgozman/fin-thread/scavenger"
"log/slog"
"time"
Expand All @@ -18,50 +19,62 @@ type App struct {
}

func (a *App) start() {
publisher, err := NewTelegramPublisher(a.cnf.env.TelegramChannelID, a.cnf.env.TelegramBotToken)
telegramPublisher, err := publisher.NewTelegramPublisher(a.cnf.env.TelegramChannelID, a.cnf.env.TelegramBotToken)
if err != nil {
slog.Default().Error("[main] Error creating Telegram publisher:", err)
slog.Default().Error("[main] Error creating Telegram telegramPublisher:", err)
panic(err)
}

archivist, err := NewArchivist(a.cnf.env.PostgresDSN)
archivistEntity, err := archivist.NewArchivist(a.cnf.env.PostgresDSN)
if err != nil {
slog.Default().Error("[main] Error creating Archivist:", err)
panic(err)
}

composer := NewComposer(a.cnf.env.OpenAiToken, a.cnf.env.TogetherAIToken, a.cnf.env.GoogleGeminiToken)

marketJournalist := NewJournalist("MarketNews", []NewsProvider{
NewRssProvider("benzinga:large-cap", "https://www.benzinga.com/news/large-cap/feed"),
NewRssProvider("benzinga:mid-cap", "https://www.benzinga.com/news/mid-cap/feed"),
NewRssProvider("benzinga:m&a", "https://www.benzinga.com/news/m-a/feed"),
NewRssProvider("benzinga:buybacks", "https://www.benzinga.com/news/buybacks/feed"),
NewRssProvider("benzinga:global", "https://www.benzinga.com/news/global/feed"),
NewRssProvider("benzinga:sec", "https://www.benzinga.com/sec/feed"),
NewRssProvider("benzinga:bonds", "https://www.benzinga.com/markets/bonds/feed"),
NewRssProvider("benzinga:analyst:upgrades", "https://www.benzinga.com/analyst-ratings/upgrades/feed"),
NewRssProvider("benzinga:analyst:downgrades", "https://www.benzinga.com/analyst-ratings/downgrades/feed"),
NewRssProvider("benzinga:etfs", "https://www.benzinga.com/etfs/feed"),
composerEntity := composer.NewComposer(a.cnf.env.OpenAiToken, a.cnf.env.TogetherAIToken, a.cnf.env.GoogleGeminiToken)

marketJournalist := journalist.NewJournalist("MarketNews", []journalist.NewsProvider{
journalist.NewRssProvider("benzinga:large-cap", "https://www.benzinga.com/news/large-cap/feed"),
journalist.NewRssProvider("benzinga:mid-cap", "https://www.benzinga.com/news/mid-cap/feed"),
journalist.NewRssProvider("benzinga:m&a", "https://www.benzinga.com/news/m-a/feed"),
journalist.NewRssProvider("benzinga:buybacks", "https://www.benzinga.com/news/buybacks/feed"),
journalist.NewRssProvider("benzinga:global", "https://www.benzinga.com/news/global/feed"),
journalist.NewRssProvider("benzinga:sec", "https://www.benzinga.com/sec/feed"),
journalist.NewRssProvider("benzinga:bonds", "https://www.benzinga.com/markets/bonds/feed"),
journalist.NewRssProvider("benzinga:analyst:upgrades", "https://www.benzinga.com/analyst-ratings/upgrades/feed"),
journalist.NewRssProvider("benzinga:analyst:downgrades", "https://www.benzinga.com/analyst-ratings/downgrades/feed"),
journalist.NewRssProvider("benzinga:etfs", "https://www.benzinga.com/etfs/feed"),
}).FlagByKeys(a.cnf.suspiciousKeywords).Limit(2)

broadNews := NewJournalist("BroadNews", []NewsProvider{
NewRssProvider("finpost:news", "https://financialpost.com/feed"),
broadNews := journalist.NewJournalist("BroadNews", []journalist.NewsProvider{
journalist.NewRssProvider("finpost:news", "https://financialpost.com/feed"),
}).FlagByKeys(a.cnf.suspiciousKeywords).Limit(1)

marketJob := NewJob(composer, publisher, archivist, marketJournalist).
// get all stocks and pass as a parameter to jobs
scv := scavenger.Scavenger{}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
stocks, err := scv.Screener.FetchAll(ctx)
if err != nil {
slog.Default().Error("[main] Error fetching stocks:", err)
panic(err)
}

marketJob := jobs.NewJob(composerEntity, telegramPublisher, archivistEntity, marketJournalist, stocks).
FetchUntil(time.Now().Add(-60 * time.Second)).
OmitSuspicious().
OmitIfAllKeysEmpty().
OmitUnlistedStocks().
RemoveClones().
ComposeText().
SaveToDB().
Publish()

broadJob := NewJob(composer, publisher, archivist, broadNews).
broadJob := jobs.NewJob(composerEntity, telegramPublisher, archivistEntity, broadNews, stocks).
FetchUntil(time.Now().Add(-4 * time.Minute)).
OmitSuspicious().
OmitEmptyMeta(MetaTickers).
OmitEmptyMeta(jobs.MetaTickers).
OmitUnlistedStocks().
RemoveClones().
ComposeText().
SaveToDB().
Expand Down Expand Up @@ -118,11 +131,10 @@ func (a *App) start() {
}

// Calendar job
scv := scavenger.Scavenger{}
calJob := NewCalendarJob(
calJob := jobs.NewCalendarJob(
scv.EconomicCalendar,
publisher,
archivist,
telegramPublisher,
archivistEntity,
"mql5-calendar",
).Publish()

Expand Down Expand Up @@ -157,10 +169,10 @@ func (a *App) start() {
}

// Before market open job
bmoJob := NewSummaryJob(
composer,
publisher,
archivist,
bmoJob := jobs.NewSummaryJob(
composerEntity,
telegramPublisher,
archivistEntity,
).Publish()
_, err = s.NewJob(
// TODO: Use holidays calendar to avoid unnecessary runs
Expand Down
27 changes: 27 additions & 0 deletions jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/samgozman/fin-thread/composer"
"github.com/samgozman/fin-thread/journalist"
"github.com/samgozman/fin-thread/publisher"
"github.com/samgozman/fin-thread/scavenger/stocks"
"log/slog"
"slices"
"strings"
Expand All @@ -23,6 +24,7 @@ type Job struct {
publisher *publisher.TelegramPublisher // publisher that will publish news to the channel
archivist *archivist.Archivist // archivist that will save news to the database
journalist *journalist.Journalist // journalist that will fetch news
stocks *stocks.StockMap // stocks that will be used to filter news and compose meta (optional). TODO: use more fields from Stock struct
logger *slog.Logger // special logger for the job
options *JobOptions // job options
}
Expand All @@ -33,6 +35,7 @@ type JobOptions struct {
omitSuspicious bool // if true, will not publish suspicious articles
omitEmptyMetaKeys *omitKeyOptions // holds keys that will omit news if empty. Note: requires shouldComposeText to be true
omitIfAllKeysEmpty bool // if true, will omit articles with empty meta for all keys. Note: requires shouldComposeText to be set
omitUnlistedStocks bool // if true, will omit articles with stocks unlisted in the Job.stocks
shouldComposeText bool // if true, will compose text for the article using OpenAI. If false, will use original title and description
shouldSaveToDB bool // if true, will save all news to the database
shouldRemoveClones bool // if true, will remove duplicated news found in the DB. Note: requires shouldSaveToDB to be true
Expand All @@ -45,12 +48,14 @@ func NewJob(
publisher *publisher.TelegramPublisher,
archivist *archivist.Archivist,
journalist *journalist.Journalist,
stocks *stocks.StockMap,
) *Job {
return &Job{
composer: composer,
publisher: publisher,
archivist: archivist,
journalist: journalist,
stocks: stocks,
logger: slog.Default(),
options: &JobOptions{},
}
Expand Down Expand Up @@ -121,6 +126,12 @@ func (job *Job) Publish() *Job {
return job
}

// OmitUnlistedStocks sets the flag that will omit articles publishing with stocks unlisted in the Job.stocks.
func (job *Job) OmitUnlistedStocks() *Job {
job.options.omitUnlistedStocks = true
return job
}

// Run return job function that will be executed by the scheduler.
func (job *Job) Run() JobFunc {
return func() {
Expand Down Expand Up @@ -364,6 +375,8 @@ func (job *Job) saveNews(ctx context.Context, data *JobData) ([]*models.News, er
return dbNews, nil
}

// TODO: refactor publish. Split into two multiple methods.

// publish publishes the news to the channel.
func (job *Job) publish(ctx context.Context, dbNews []*models.News) ([]*models.News, error) {
for _, n := range dbNews {
Expand Down Expand Up @@ -392,6 +405,20 @@ func (job *Job) publish(ctx context.Context, dbNews []*models.News) ([]*models.N
}
}

// Skip news with unlisted stocks if needed
if job.options.omitUnlistedStocks && job.stocks != nil && len(meta.Tickers) > 0 {
skip := false
for _, t := range meta.Tickers {
if _, ok := (*job.stocks)[t]; !ok {
skip = true
break
}
}
if skip {
continue
}
}

// Omit if all keys are empty and omitIfAllKeysEmpty is set
if job.options.omitIfAllKeysEmpty {
if len(meta.Tickers) == 0 && len(meta.Markets) == 0 && len(meta.Hashtags) == 0 {
Expand Down
6 changes: 5 additions & 1 deletion scavenger/scavenger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package scavenger

import "github.com/samgozman/fin-thread/scavenger/ecal"
import (
"github.com/samgozman/fin-thread/scavenger/ecal"
"github.com/samgozman/fin-thread/scavenger/stocks"
)

// Scavenger is the struct that fetches some custom data from defined sources.
// The Scavenger will hold all available sources and will fetch the data from them.
Expand All @@ -9,4 +12,5 @@ import "github.com/samgozman/fin-thread/scavenger/ecal"
// fetch custom unstructured data for different purposes. For example to fetch info updates or parse calendar events.
type Scavenger struct {
EconomicCalendar *ecal.EconomicCalendar
Screener *stocks.Screener
}
92 changes: 92 additions & 0 deletions scavenger/stocks/stocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package stocks

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)

// Screener is a struct to fetch all available Stocks from external API.
type Screener struct{}

// FetchAll fetches all available Stocks from external API
// and returns them as a map of `ticker` -> Stock.
func (f *Screener) FetchAll(ctx context.Context) (*StockMap, error) {
url := "https://api.nasdaq.com/api/screener/stocks?tableonly=true&limit=25&offset=0&download=true"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("error creating request to fetch stocks from nasdaq: %w", err)
}
req = req.WithContext(ctx)
req.Header.Set("accept", "application/json")
req.Header.Set("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")

client := &http.Client{}
resp, err := client.Do(req) //nolint:bodyclose
if err != nil {
return nil, fmt.Errorf("error fetching stocks from nasdaq: %w", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
fmt.Println("error closing response body:", err)
}
}(resp.Body)

var respParsed nasdaqScreenerResponse
if err := json.NewDecoder(resp.Body).Decode(&respParsed); err != nil {
return nil, fmt.Errorf("error parsing response from nasdaq: %w", err)
}

stockMap := make(StockMap)
for _, stock := range respParsed.Data.Rows {
// replace / with . in ticker to match the format of other sources (BRK/A -> BRK.A)
s := strings.ReplaceAll(stock.Symbol, "/", ".")
stockMap[s] = Stock{
Name: stock.Name,
MarketCap: stock.MarketCap,
Country: stock.Country,
Industry: stock.Industry,
Sector: stock.Sector,
}
}

return &stockMap, nil
}

type Stock struct {
Name string `json:"name"`
MarketCap string `json:"marketCap"`
Country string `json:"country"`
Industry string `json:"industry"`
Sector string `json:"sector"`
}

// StockMap is a map of `ticker` -> Stock.
type StockMap map[string]Stock

type nasdaqScreenerResponse struct {
Data struct {
AsOf string `json:"asOf"` // unnecessary, but keeping it for JSON unmarshalling
Headers any `json:"headers"` // unnecessary, but keeping it for JSON unmarshalling
Rows []struct {
Symbol string `json:"symbol"`
Name string `json:"name"`
LastSale string `json:"lastsale"`
NetChange string `json:"netchange"`
PctChange string `json:"pctchange"`
Volume string `json:"volume"`
MarketCap string `json:"marketCap"`
Country string `json:"country"`
IPOYear string `json:"ipoyear"`
Industry string `json:"industry"`
Sector string `json:"sector"`
URL string `json:"url"`
} `json:"rows"` // Stocks array
} `json:"data"`
Message string `json:"message"`
Status any `json:"status"` // Status object, probably not needed for this project
}