Skip to content

Commit

Permalink
Run throttled async summarization loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
whilo committed Sep 22, 2024
1 parent c8db31b commit a5568d0
Showing 1 changed file with 70 additions and 56 deletions.
126 changes: 70 additions & 56 deletions src/ie/simm/runtimes/assistance.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,60 +25,69 @@
[clojure.test :as ct])
(:import [java.util.zip ZipEntry ZipOutputStream]))

(defn summarize [S conn conv chat]
(go-try S
(debug "=========================== SUMMARIZING ===============================")
(let [db @conn
summarization (<? S (cheap-llm (format pr/summarization conv)))
messages (->> (d/q '[:find ?d ?e
:in $ ?chat
:where
[?c :chat/id ?chat]
[?e :message/chat ?c]
[?e :message/date ?d]]
db (:id chat))
(sort-by first)
(take-last window-size)
(map second))
note-titles (extract-links summarization)
_ (debug "=========================== CREATING NOTES ===============================")
new-notes
(<? S (async/into []
(go-for S [[i note] (partition 2 (interleave (iterate inc 2) note-titles))
:let [[nid body] (first (d/q '[:find ?n ?b :in $ ?t :where [?n :note/title ?t] [(get-else $ ?n :note/body "EMPTY") ?b]] db note))
prompt (format pr/note note body #_summarization conv)
new-body (<? S (cheap-llm prompt))]
:when (not (.contains new-body "SKIP"))
:let [new-refs (extract-links new-body)
ref-ids (mapv first (d/q '[:find ?n
:in $ [?t ...]
:where
[?n :note/title ?t]]
db new-refs))]]
{:db/id (or nid (- i))
:note/title note
:note/body new-body
:note/link ref-ids
:note/summary -1})))]
(debug "=========================== STORING NOTES ===============================")
(debug "Summarization:" summarization)
(debug "summarization links" (extract-links summarization))
(d/transact conn (concat
[{:db/id -1
:conversation/summary summarization
:conversation/date (java.util.Date.)
:conversation/message messages}]
new-notes))
(def ^:const summarization-interval 60000)

(defn summarize [S peer new-msg-chan]
(go-loop-try S [{:keys [msg] {:keys [chat from text]} :msg} (<? S new-msg-chan)]
(when msg
;; poll from channel until empty
(while (async/poll! new-msg-chan))
(debug "=========================== SUMMARIZING ===============================")
(let [conn (ensure-conn peer (:id chat))
conv (conversation @conn (:id chat) window-size)
db @conn
summarization (<? S (cheap-llm (format pr/summarization conv)))
messages (->> (d/q '[:find ?d ?e
:in $ ?chat
:where
[?c :chat/id ?chat]
[?e :message/chat ?c]
[?e :message/date ?d]]
db (:id chat))
(sort-by first)
(take-last window-size)
(map second))
note-titles (extract-links summarization)
_ (debug "=========================== CREATING NOTES ===============================")
new-notes
(<? S (async/into []
(go-for S [[i note] (partition 2 (interleave (iterate inc 2) note-titles))
:let [[nid body] (first (d/q '[:find ?n ?b :in $ ?t :where [?n :note/title ?t] [(get-else $ ?n :note/body "EMPTY") ?b]] db note))
prompt (format pr/note note body #_summarization conv)
new-body (<? S (cheap-llm prompt))]
:when (not (.contains new-body "SKIP"))
:let [new-refs (extract-links new-body)
ref-ids (mapv first (d/q '[:find ?n
:in $ [?t ...]
:where
[?n :note/title ?t]]
db new-refs))]]
{:db/id (or nid (- i))
:note/title note
:note/body new-body
:note/link ref-ids
:note/summary -1})))]
(debug "=========================== STORING NOTES ===============================")
(debug "Summarization:" summarization)
(debug "summarization links" (extract-links summarization))
(d/transact conn (concat
[{:db/id -1
:conversation/summary summarization
:conversation/date (java.util.Date.)
:conversation/message messages}]
new-notes))
;; keep exports up to date
(doseq [[t b] (map (fn [{:keys [note/title note/body]}] [title body]) new-notes)]
(debug "writing note" t)
(doseq [[t b] (map (fn [{:keys [note/title note/body]}] [title body]) new-notes)]
(debug "writing note" t)
;; write to org file in chats/chat-id/title.org
(let [f (io/file (str "chats/" (:id chat) "/" t ".md"))]
(io/make-parents f)
(with-open [w (io/writer f)]
(binding [*out* w]
(println b)))))
(extract-links summarization))))
(let [f (io/file (str "chats/" (:id chat) "/" t ".md"))]
(io/make-parents f)
(with-open [w (io/writer f)]
(binding [*out* w]
(println b)))))
(extract-links summarization))
(<? S (timeout summarization-interval))
(recur (<? S new-msg-chan)))))

(defn zip-notes [chat-id]
(let [zip-file (io/file (str "chats/" chat-id ".zip"))
Expand Down Expand Up @@ -443,6 +452,10 @@
_ (tap mo pub-out)
po (pub pub-out :type)

;; summarization channel
new-msg-chan (chan 1000)
_ (sub pi :ie.simm.runtimes.telegram/message new-msg-chan)

;; TODO figure out prefix, here conflict if chats/
routes [["/download/chat/:chat-id/notes.zip" {:get (fn [{{:keys [chat-id]} :path-params}] {:status 200 :body (zip-notes chat-id)})}]
["/chats/:chat-id" {:get (partial #'chat-overview peer)}]
Expand All @@ -452,9 +465,10 @@
["/chats/:chat-id/notes/:note/delete" {:post (partial #'delete-note peer)}]]]
(swap! peer assoc-in [:http :routes :assistance] routes)
;; we will continuously interpret the messages
(go-loop-try S [m (<? S msg-ch)]
(when m
(binding [lb/*chans* [next-in pi out po]]
(binding [lb/*chans* [next-in pi out po]]
(summarize S peer new-msg-chan)
(go-loop-try S [m (<? S msg-ch)]
(when m
(let [{:keys [msg]
{:keys [chat from text]} :msg} m]
(try
Expand All @@ -478,7 +492,7 @@
[?c :chat/id ?cid]]
@conn (:id chat))
_ (debug "message count" msg-count)
_ (when (<= (mod msg-count window-size) 1)
#_(when (<= (mod msg-count window-size) 1)
(summarize S conn conv chat))]

(when (or (= (:type chat) "private")
Expand Down

0 comments on commit a5568d0

Please sign in to comment.