Skip to content

Commit

Permalink
implement bbolt and snatching for cross-seed
Browse files Browse the repository at this point in the history
  • Loading branch information
KyleSanderson authored Aug 14, 2023
1 parent 4f63425 commit 6db1d72
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 11 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ require (
github.com/autobrr/go-qbittorrent v1.3.3
github.com/avast/retry-go v3.0.0+incompatible
github.com/go-chi/chi/v5 v5.0.10
github.com/kylesanderson/go-jackett v0.0.0-20230813045210-a5f3ff7507b3
github.com/kylesanderson/go-jackett v0.0.0-20230814005711-2f756d567dc1
github.com/moistari/rls v0.5.9
github.com/pkg/errors v0.9.1
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285
go.etcd.io/bbolt v1.3.7
)

require (
golang.org/x/net v0.14.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/kylesanderson/go-jackett v0.0.0-20230813044141-fc968115486d h1:5VQ9at
github.com/kylesanderson/go-jackett v0.0.0-20230813044141-fc968115486d/go.mod h1:o805kiTZcYvSoF1ImxwxvU+VOmK/kvRVRLI49VHXORs=
github.com/kylesanderson/go-jackett v0.0.0-20230813045210-a5f3ff7507b3 h1:1r+qr6UnnsEZF1RIl8Yo/NQEB+A93Yd28+St1ltUh9w=
github.com/kylesanderson/go-jackett v0.0.0-20230813045210-a5f3ff7507b3/go.mod h1:o805kiTZcYvSoF1ImxwxvU+VOmK/kvRVRLI49VHXORs=
github.com/kylesanderson/go-jackett v0.0.0-20230814005711-2f756d567dc1 h1:Ckem/rr4qEvuy250fV3Y+V6dVQMnCXpjjS5QUsKEOIk=
github.com/kylesanderson/go-jackett v0.0.0-20230814005711-2f756d567dc1/go.mod h1:o805kiTZcYvSoF1ImxwxvU+VOmK/kvRVRLI49VHXORs=
github.com/moistari/rls v0.5.9 h1:peRGW+1/HJDUZ76s0v2ukcBLCBUs4/Qf3TKOzRjOOco=
github.com/moistari/rls v0.5.9/go.mod h1:/3P63JjNkaf1MNBoS2tSXqGeqee6l4je+Krakp4ob7c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -27,12 +29,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
Expand Down
253 changes: 243 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -45,6 +46,7 @@ import (
"github.com/moistari/rls"
"github.com/pkg/errors"
du "github.com/ricochet2200/go-disk-usage/du"
bolt "go.etcd.io/bbolt"
)

type Entry struct {
Expand Down Expand Up @@ -74,10 +76,13 @@ type timeentry struct {
sync.Mutex
}

var db *bolt.DB
var clientmap sync.Map
var torrentmap sync.Map

func main() {
initDatabase()

r := chi.NewRouter()

r.Use(middleware.RequestID)
Expand Down Expand Up @@ -1607,6 +1612,44 @@ func handleExpression(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Processed: %d\n", len(hashes)), 200)
}

func initDatabase() {
var err error
db, err = bolt.Open("/config/upgraderr.db", 0600, nil)
if err != nil {
fmt.Printf("WARNING: Unable to open Torznab database on /config. %q\n", err)
db, err = bolt.Open("upgraderr.db", 0600, nil)
if err != nil {
db, err = bolt.Open("/tmp/upgraderr.db", 0600, nil)
if err != nil {
fmt.Printf("WARNING: Unable to open Torznab database /tmp. %q\n", err)
}
}
}

if db == nil {
return
}

if err := db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte("enclosures")); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists([]byte("titles")); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists([]byte("torrents")); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists([]byte("queries")); err != nil {
return err
}

return nil
}); err != nil {
fmt.Printf("Unable to create bucket: %q\n", err)
}
}

type torznabCrossSearch struct {
APIKey string
JackettHost string
Expand All @@ -1615,6 +1658,11 @@ type torznabCrossSearch struct {
}

func handleTorznabCrossSearch(w http.ResponseWriter, r *http.Request) {
if db == nil {
http.Error(w, fmt.Sprintf("You have a configuration error, unable to create a database on the filesystem"), 480)
return
}

var req torznabCrossSearch
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 470)
Expand Down Expand Up @@ -1652,7 +1700,6 @@ func handleTorznabCrossSearch(w http.ResponseWriter, r *http.Request) {
}

processlist := make(map[string]string)

regexseason := regexp.MustCompile("(S\\d+)")
nt := time.Now().Unix()
for _, e := range mp.e {
Expand Down Expand Up @@ -1699,11 +1746,38 @@ func handleTorznabCrossSearch(w http.ResponseWriter, r *http.Request) {
}

regexadult := regexp.MustCompile("(XXX)")
failmap := make(map[string]uint)
var faillock sync.RWMutex
var wg sync.WaitGroup
for k, v := range processlist {
fmt.Printf("Searching: %q\n", v)
r := mp.d[v]
adult := regexadult.MatchString(v)
cat := ""
for _, indexer := range indexers.Indexer {
faillock.RLock()
if num := failmap[indexer.ID]; num > 3 {
faillock.RUnlock()
continue
}
faillock.RUnlock()

if err := db.Update(func(tx *bolt.Tx) error {
for _, bucket := range []*bolt.Bucket{
tx.Bucket([]byte("enclosures")),
tx.Bucket([]byte("titles")),
tx.Bucket([]byte("torrents")),
tx.Bucket([]byte("queries"))} {
if _, err := bucket.CreateBucketIfNotExists([]byte(indexer.ID)); err != nil {
return err
}
}

return nil
}); err != nil {
fmt.Printf("%q: Failed to create initial indexer buckets: %q\n", indexer.ID, err)
}

cat := ""
if adult {
for _, cl := range indexer.Caps.Categories.Category {
id, _ := strconv.Atoi(cl.ID)
Expand Down Expand Up @@ -1738,16 +1812,175 @@ func handleTorznabCrossSearch(w http.ResponseWriter, r *http.Request) {
cat = "7000"
}

res, err := jc.GetTorrents(indexer.ID, map[string]string{"q": k, "cat": cat})
if err != nil {
fmt.Printf("Fatal acquisition: %q\n", err)
continue
}
wg.Add(1)
go func(id string, m map[string]string) {
defer wg.Done()

for _, ch := range res.Channel.Item {
fmt.Printf("%q | %q\n", ch.Title, ch.Guid)
}
if err := db.View(func(tx *bolt.Tx) error {
pb := tx.Bucket([]byte("queries"))
if pb == nil {
fmt.Printf("No queries bucket %q\n", m["cat"]+m["q"])
return nil
}

b := pb.Bucket([]byte(id))
if b == nil {
return nil
}

stamp := b.Get([]byte(m["cat"] + m["q"]))
if stamp == nil {
return nil
}

if nt-720 < int64(binary.LittleEndian.Uint64(stamp)) {
return fmt.Errorf("cache found for %q", m["cat"]+m["q"])
}

return nil
}); err != nil {
fmt.Printf("%q: %q Skipping result.\n", id, err)
return
}

res, err := jc.GetTorrents(id, m)
if err != nil {
fmt.Printf("%q: Fatal acquisition: %q\n", id, err)
faillock.Lock()
i := failmap[id]
failmap[id] = (i + 1)
faillock.Unlock()
return
}

faillock.Lock()
failmap[id] = 0
faillock.Unlock()

if err := db.Update(func(tx *bolt.Tx) error {
{
tb := tx.Bucket([]byte("titles"))
if tb == nil {
return fmt.Errorf("titles: Failed to find bucket")
}

b := tb.Bucket([]byte(id))
if b == nil {
return fmt.Errorf("%q: Failed to find title bucket", id)
}

eb := tx.Bucket([]byte("enclosures"))
if eb == nil {
return fmt.Errorf("enclosures: Failed to find bucket")
}

c := eb.Bucket([]byte(id))
if c == nil {
return fmt.Errorf("%q: Failed to find enclosure bucket", id)
}

for _, ch := range res.Channel.Item {
if err := b.Put([]byte(ch.Title), []byte(ch.Guid)); err != nil {
return err
}

if err := c.Put([]byte(ch.Guid), []byte(ch.Enclosure.URL)); err != nil {
return err
}
}
}
{
pb := tx.Bucket([]byte("queries"))
if pb == nil {
return fmt.Errorf("queries: Failed to find bucket")
}

b := pb.Bucket([]byte(id))
if b == nil {
return fmt.Errorf("%q: Failed to find queries bucket", id)
}

if err := b.Put([]byte(m["cat"]+m["q"]), binary.LittleEndian.AppendUint64(nil, uint64(nt))); err != nil {
return err
}
}

return nil
}); err != nil {
fmt.Printf("%q: Failed to commit database transaction: %q\n", id, err)
}
}(indexer.ID, map[string]string{"q": k, "cat": cat})
}

wg.Wait()
break
}

if err := db.Update(func(tx *bolt.Tx) error {
titb := tx.Bucket([]byte("titles"))
if titb == nil {
return fmt.Errorf("missing parent titles bucket")
}

eb := tx.Bucket([]byte("enclosures"))
if eb == nil {
return fmt.Errorf("missing parent enclosures bucket")
}

torb := tx.Bucket([]byte("torrents"))
if torb == nil {
return fmt.Errorf("missing torrents enclosures bucket")
}

drm := mp.d
titb.ForEachBucket(func(k []byte) error {
ibc := titb.Bucket(k)
ebc := eb.Bucket(k)
tbc := torb.Bucket(k)

ibc.ForEach(func(kc, v []byte) error {
r, ok := drm[string(kc)]
if !ok {
r = rls.ParseString(string(kc))
drm[string(kc)] = r
}

ent, ok := mp.e[getFormattedTitle(r)]
if !ok {
return nil
}

for _, e := range ent {
if rls.Compare(r, e.r) != 0 {
continue
}

torrentbinary := tbc.Get(v)
if torrentbinary == nil {
enclosure := ebc.Get(v)
if enclosure == nil {
continue
}

torrentbinary, err = jc.GetEnclosure(string(enclosure))
if err != nil {
fmt.Printf("%q: error snatching %q: %q\n", k, kc, err)
continue
}

tbc.Put(v, torrentbinary)
}

req.Torrent = []byte(base64.RawStdEncoding.EncodeToString(torrentbinary))

}

return nil
})
return nil
})
return nil
}); err != nil {
}

http.Error(w, fmt.Sprintf("Processed: %d\n", len(processlist)), 200)
Expand Down

0 comments on commit 6db1d72

Please sign in to comment.