Skip to content

Commit

Permalink
Fix blocked input channel, process OpenAI in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
whilo committed Apr 12, 2024
1 parent 0035ae7 commit 8628eb4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 63 deletions.
56 changes: 30 additions & 26 deletions src/ie/simm/runtimes/openai.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[taoensso.timbre :refer [debug warn]]
[ie.simm.config :refer [config]]
[clojure.core.async :refer [chan pub sub]]
[superv.async :refer [S go-loop-try <? put?]]
[superv.async :refer [S go-try go-loop-try <? put?]]
[etaoin.api :as e]))

(require-python '[openai :refer [OpenAI]])
Expand All @@ -17,8 +17,8 @@

(def create (py.- (py.- (py.- client chat) completions) create))

(def window-sizes {"gpt-3.5-turbo" 16384
"gpt-4-1106-preview" 4096})
(def window-sizes {"gpt-3.5-turbo-0125" 16384
"gpt-4-turbo" 128000})

(defn chat [model text]
(if (>= (count text) (* 4 (window-sizes model)))
Expand All @@ -33,7 +33,7 @@
(py.- (first (py.- res data)) url)))

(comment
(image-gen "dall-e-2" "a dog playing in a small house")
(image-gen "dall-e-3" "a dog playing in a small house")

)

Expand All @@ -55,52 +55,56 @@
(defn openai [[S peer [in out]]]
(let [p (pub in (fn [{:keys [type]}]
(or ({:ie.simm.languages.gen-ai/cheap-llm ::gpt-35-turbo
:ie.simm.languages.gen-ai/reasoner-llm ::gpt-4-1106-preview
:ie.simm.languages.gen-ai/reasoner-llm ::gpt-4-turbo
:ie.simm.languages.gen-ai/stt-basic ::whisper-1
:ie.simm.languages.gen-ai/image-gen ::dall-e-2} type)
:ie.simm.languages.gen-ai/image-gen ::dall-e-3} type)
:unrelated)))
gpt-35-turbo (chan)
_ (sub p ::gpt-35-turbo gpt-35-turbo)

gpt-4-1106-preview (chan)
_ (sub p ::gpt-4-1106-preview gpt-4-1106-preview)
gpt-4-turbo (chan)
_ (sub p ::gpt-4-turbo gpt-4-turbo)

whisper-1 (chan)
_ (sub p ::whisper-1 whisper-1)

dall-e-2 (chan)
_ (sub p ::dall-e-2 dall-e-2)
dall-e-3 (chan)
_ (sub p ::dall-e-3 dall-e-3)

next-in (chan)
_ (sub p :unrelated next-in)]
;; TODO use async http requests for parallelism
;; TODO factor dedicated translator to LLM language
(go-loop-try S [{[m] :args :as s} (<? S gpt-35-turbo)]
(when s
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/cheap-llm-reply
:response (try (chat "gpt-3.5-turbo" m) (catch Exception e e))))
(go-try S
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/cheap-llm-reply
:response (try (chat "gpt-3.5-turbo-0125" m) (catch Exception e e)))))
(recur (<? S gpt-35-turbo))))

(go-loop-try S [{[m] :args :as s} (<? S gpt-4-1106-preview)]
(go-loop-try S [{[m] :args :as s} (<? S gpt-4-turbo)]
(when s
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/reasoner-llm-reply
:response (try (chat "gpt-4-1106-preview" m) (catch Exception e e))))
(recur (<? S gpt-4-1106-preview))))
(go-try S
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/reasoner-llm-reply
:response (try (chat "gpt-4-turbo" m) (catch Exception e e)))))
(recur (<? S gpt-4-turbo))))

(go-loop-try S [{[m] :args :as s} (<? S whisper-1)]
(when s
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/stt-basic-reply
:response (try (stt "whisper-1" m) (catch Exception e e))))
(go-try S
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/stt-basic-reply
:response (try (stt "whisper-1" m) (catch Exception e e)))))
(recur (<? S whisper-1))))

(go-loop-try S [{[m] :args :as s} (<? S dall-e-2)]
(go-loop-try S [{[m] :args :as s} (<? S dall-e-3)]
(when s
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/image-gen-reply
:response (try (image-gen "dall-e-3" m) (catch Exception e e))))
(recur (<? S dall-e-2))))
(go-try S
(put? S out (assoc s
:type :ie.simm.languages.gen-ai/image-gen-reply
:response (try (image-gen "dall-e-3" m) (catch Exception e e)))))
(recur (<? S dall-e-3))))

[S peer [next-in out]]))
14 changes: 5 additions & 9 deletions src/ie/simm/runtimes/relational_assistance.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
conn)
(catch Exception _
(d/connect cfg)))]
#_(d/transact conn default-schema)
(d/transact conn default-schema)
(swap! peer assoc-in [:conn chat-id] conn)
conn)))

Expand Down Expand Up @@ -244,9 +244,7 @@
_ (tap mo out)
pub-out (chan)
_ (tap mo pub-out)
po (pub pub-out :type)

active-tags (atom nil)]
po (pub pub-out :type)]
;; we will continuously interpret the messages
(go-loop-try S [m (<? S msg-ch)]
(when m
Expand Down Expand Up @@ -289,9 +287,7 @@
[?c :chat/id ?cid]]
@conn (:id chat))
window-size) 1)
(summarize S conn conv chat)
#_(reset! active-tags
(concat (<? S) [firstname])))
(summarize S conn conv chat))


;; 3. retrieve summaries for active tags
Expand Down Expand Up @@ -338,7 +334,7 @@
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Removed issue: " title)))))))
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Could not find issue: " title))))))))

_ (when (.contains reply "LIST_ISSUES")
_ (when (or (.contains reply "LIST_ISSUES") (.contains reply "DAILY"))
(debug "listing issues")
(let [issues (d/q '[:find [?t ...] :where [_ :issue/title ?t]] @conn)]
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Issues:\n" (str/join "\n" (map #(format "* %s" %) issues))))))))))
Expand All @@ -355,7 +351,7 @@
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Retrieved " title "\n" body)))))))
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Could not find note: " title))))))))

_ (when (or (.contains reply "LIST_NOTES") (.contains reply "DAILY"))
_ (when (.contains reply "LIST_NOTES")
(debug "listing notes")
(let [issues (d/q '[:find [?t ...] :where [_ :note/title ?t]] @conn)]
(d/transact conn (msg->txs (:result (<? S (send-text! (:id chat) (str "Notes:\n" (str/join "\n" (map #(format "* %s" %) issues))))))))))
Expand Down
49 changes: 25 additions & 24 deletions src/ie/simm/runtimes/telegram.clj
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
file-path (-> (t/get-file token file_id)
:result
:file_path)
rand-path (str "/tmp/" (java.util.UUID/randomUUID) ".oga")
local-path (str "downloads/telegram/" file-path ".oga")
_ (io/make-parents local-path)
_ (io/copy (:body (http/get (str "https://api.telegram.org/file/bot" token "/" file-path) {:as :byte-array}))
(io/file rand-path))]
rand-path))))
(io/file local-path))]
local-path))))

(defn server [in]
(let [telegram-routes (routes
Expand Down Expand Up @@ -85,7 +86,7 @@
send-photo (chan)
_ (sub po ::send-photo send-photo)
send-document (chan)
_ (sub po ::send-document send-document)
_ (sub po ::send-document send-document)

_ (sub po :unrelated out)]
;; this only triggers when in is closed and cleans up
Expand All @@ -97,36 +98,36 @@
(go-loop-try S [{[chat-id msg] :args :as m} (<? S send-text)]
(when m
(debug "sending telegram message:" chat-id msg)
(put? S in (assoc m
:type :ie.simm.languages.chat/send-text-reply
:response (try (t/send-text (:telegram-bot-token config) chat-id msg)
(catch Exception e
(debug "error sending telegram message:" e)
e))))
(put? S next-in (assoc m
:type :ie.simm.languages.chat/send-text-reply
:response (try (t/send-text (:telegram-bot-token config) chat-id msg)
(catch Exception e
(debug "error sending telegram message:" e)
e))))
(recur (<? S send-text))))

(go-loop-try S [{[chat-id url] :args :as m} (<? S send-photo)]
(when m
(debug "sending telegram photo:" chat-id url)
(put? S in (assoc m
:type :ie.simm.languages.chat/send-photo-reply
:response (try
(t/send-photo (:telegram-bot-token config) chat-id url)
(catch Exception e
(debug "error sending telegram photo:" e)
e))))
(put? S next-in (assoc m
:type :ie.simm.languages.chat/send-photo-reply
:response (try
(t/send-photo (:telegram-bot-token config) chat-id url)
(catch Exception e
(debug "error sending telegram photo:" e)
e))))
(recur (<? S send-photo))))

(go-loop-try S [{[chat-id url] :args :as m} (<? S send-document)]
(when m
(debug "sending telegram document:" chat-id url)
(put? S in (assoc m
:type :ie.simm.languages.chat/send-document-reply
:response (try
(t/send-document (:telegram-bot-token config) chat-id url)
(catch Exception e
(debug "error sending telegram document:" e)
e))))
(put? S next-in (assoc m
:type :ie.simm.languages.chat/send-document-reply
:response (try
(t/send-document (:telegram-bot-token config) chat-id url)
(catch Exception e
(debug "error sending telegram document:" e)
e))))
(recur (<? S send-document))))

[S peer [next-in prev-out]])))
Expand Down
31 changes: 27 additions & 4 deletions src/ie/simm/simmie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
[clojure.core.async :refer [chan close!]]
[ie.simm.towers :refer [default debug test-tower]]
[nrepl.server :refer [start-server stop-server]]
[ie.simm.prompts :as pr])
[ie.simm.prompts :as pr]
[datahike.experimental.gc :as gc])
(:gen-class))

(defonce server (start-server :port 37888))
Expand All @@ -23,6 +24,7 @@
(def chans [in out])
(def peer (atom {}))
(sasync/restarting-supervisor (fn [S] (go-try S ((debug) [S peer chans])))
:delay (* 10 1000)
:log-fn (fn [level msg] (log/log level msg)))
(log/info "Server started.")
;; HACK to block
Expand All @@ -33,7 +35,7 @@
(close! (first chans))

;; pull above let into top level defs

(def in (chan))

(def out (chan))
Expand All @@ -52,10 +54,14 @@

(def conn ((:conn @peer) 79524334))

(require '[datahike.experimental.gc :as gc])

(gc/gc! @conn)

(require '[clojure.java.io :as io])

-4151611394 ;; Simulacion

(doseq [[t b] (d/q '[:find ?t ?b :where [?e :note/title ?t] [?e :note/body ?b]] @conn)]
(println "#" t)
(println b)
Expand All @@ -79,6 +85,23 @@
'[ie.simm.runtimes.openai :refer [chat]]
'[ie.simm.prompts :as pr])

;; parse instant from this string "2024-04-11T00:19:50.525695458Z"
(defn parse-instant [s]
(java.time.Instant/parse s))

(parse-instant "2024-04-11T00:19:50.525695458Z")

(str (java.time.Instant/now))

(java.time.Instant/parse
(chat "gpt-3.5-turbo-0125" (format "This is the time now: %s\nGiven the following message, return without comments the date time in the same format for the message. If you cannot figure the date out, return SKIP.\n%s\n\n"
(str (java.time.Instant/now))
"In a week at noon.")))

(chat "gpt-3.5-turbo-0125" (format "This is the time now: %s\nThis is the time given: %s\nRefer to the given time in words relative to now, e.g. 'Tomorrow at noon' or 'Next Monday at 2:30 pm.'.\n\n"
(str (java.time.Instant/now))
"2028-04-29T13:00:00Z"))

(def summary (chat #_"gpt-3.5-turbo" "gpt-4-1106-preview" (format pr/summarization-prompt conv)))

(def tags (distinct (extract-tags summary)))
Expand Down Expand Up @@ -120,7 +143,7 @@


;; inspect web hook

(require '[morse.api :as t]
'[ie.simm.config :refer [config]]
'[clj-http.client :as http])
Expand Down

0 comments on commit 8628eb4

Please sign in to comment.