Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set a 1MB event limit #2062

Merged
merged 8 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 19 additions & 29 deletions src/clj/athens/self_hosted/clients.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,14 @@
[athens.common-events :as common-events]
[athens.common-events.schema :as schema]
[athens.common.logging :as log]
[cognitect.transit :as transit]
[org.httpkit.server :as http])
(:import
(java.io
ByteArrayInputStream
ByteArrayOutputStream)))
[org.httpkit.server :as http]))


;; Internal state
;; channel -> session info
(defonce clients (atom {}))


(defn ->transit
[data]
(let [out (ByteArrayOutputStream. 4096)
writer (transit/writer out :json)]
(transit/write writer data)
(.toString out)))


(defn <-transit
[transit-str]
(let [in (ByteArrayInputStream. (.getBytes transit-str))
reader (transit/reader in :json)]
(transit/read reader)))


;; Client management API

(defn get-client-session
Expand Down Expand Up @@ -71,14 +51,24 @@
valid-server-event? (schema/valid-server-event? data)]
(if (or valid-event-response?
valid-server-event?)
(let [type (common-events/find-event-or-atomic-op-type data)
status (:event/status data)]
(log/debug "Sending to username:" username
", event-id:" (:event/id data)
(if type
(str ", type: " type)
(str ", status: " status)))
(http/send! channel (->transit data)))
(let [type (common-events/find-event-or-atomic-op-type data)
status (:event/status data)
serialized-event (common-events/serialize data)
errors (when-not (common-events/ignore-serialized-event-validation? data)
(common-events/validate-serialized-event serialized-event))]
(if errors
(log/error "Not sending invalid event to username:" username
", event-id:" (:event/id data)
", type:" (common-events/find-event-or-atomic-op-type data)
", invalid serialized event:"
"event-response take:" (str errors))
(do
(log/debug "Sending to username:" username
", event-id:" (:event/id data)
(if type
(str ", type: " type)
(str ", status: " status)))
(http/send! channel serialized-event))))
;; TODO internal failure mode, collect in reporting
(log/error "Not sending invalid event to username:" username
", event-id:" (:event/id data)
Expand Down
19 changes: 17 additions & 2 deletions src/clj/athens/self_hosted/components/web.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,28 @@
(fn receive-handler
[channel msg]
(let [username (clients/get-client-username channel)
data (clients/<-transit msg)]
(if-not (schema/valid-event? data)
data (common-events/deserialize msg)
errors (common-events/validate-serialized-event msg)]
(cond
;; TODO: we should be able to validate the serialized event without deserializing it, but
;; we also need to build a rejected event to pass back to the client, and to do that we need
;; the :event/id, which we only get after deserializing the event.
;; I think this means that we should separate the :event/id from the payload itself.
errors
(do
(log/warn "username:" username "Invalid serialized event received, errors:" errors)
(clients/send! channel (common-events/build-event-rejected (:event/id data)
(str "Invalid serialized event")
errors)))

(not (schema/valid-event? data))
(let [explanation (schema/explain-event data)]
(log/warn "username:" username "Invalid event received, explanation:" explanation)
(clients/send! channel (common-events/build-event-rejected (:event/id data)
(str "Invalid event: " (pr-str data))
explanation)))

:else
(let [{:event/keys [id type]} data]
(log/info "Received valid event" "username:" username ", event-id:" id ", type:" (common-events/find-event-or-atomic-op-type data))
(let [{:event/keys [status]
Expand Down
80 changes: 79 additions & 1 deletion src/cljc/athens/common_events.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,85 @@
(ns athens.common-events
"Event as Verbs executed on Knowledge Graph"
(:require
[athens.common.utils :as utils]))
[athens.common.utils :as utils]
[cognitect.transit :as transit]
#?(:cljs [com.cognitect.transit.types :as ty]))
#?(:clj
(:import
(java.io
ByteArrayInputStream
ByteArrayOutputStream))))


;; Limits

;; Fluree default max size over websocket is ~2mb.
;; There doesn't seem to be a max for nginx
;; https://serverfault.com/questions/1034906/can-nginx-limit-incoming-websocket-message-size
;; Was able to transmit 500mb over websocket from the server to client.
;; Let's settle on a nice sensible 1MB limit for now.
(def max-event-size-in-bytes (* 1 1000 1000))


(defn valid-serialized-event?
[serialized-event]
(< (count serialized-event) max-event-size-in-bytes))


(defn validate-serialized-event
[serialized-event]
(when-not (valid-serialized-event? serialized-event)
(ex-info "Serialized event is larger than 1 MB" {})))


(defn ignore-serialized-event-validation?
[event]
(-> event :event/type
;; db-dump is sending the whole database and can (easily) go over max-event-size-in-bytes.
;; Only real solution for this is to break down the db-dump into smaller pieces,
;; possibly transitioning to partial loading by default in the future.
#{:datascript/db-dump}))


;; serialization and limits

;; Really shouldn't need these UUID and datom-reader, but we still send datoms via db-dump.
#?(:cljs
;; see https://github.com/cognitect/transit-cljs/issues/41#issuecomment-503287258
(extend-type ty/UUID IUUID))


(def ^:private datom-reader
(transit/read-handler
(fn [[e a v tx added]]
{:e e
:a a
:v v
:tx tx
:added added})))


(def serialization-type :json)
(def serialization-opts {:handlers {:datom datom-reader}})


(defn serialize
[event]
#?(:cljs (-> (transit/writer serialization-type)
(transit/write event))
:clj (let [out (ByteArrayOutputStream. 4096)
writer (transit/writer out serialization-type)]
(transit/write writer event)
(.toString out))))


(defn deserialize
[serialized-event]
#?(:cljs (-> (transit/reader serialization-type serialization-opts)
(transit/read serialized-event))
:clj (let [in (ByteArrayInputStream. (.getBytes serialized-event))
reader (transit/reader in serialization-type serialization-opts)]
(transit/read reader))))


;; building events
Expand Down
5 changes: 4 additions & 1 deletion src/cljs/athens/effects.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@
;; valid event let's send it
(do
(log/debug "Sending event:" (pr-str event))
(client/send! event))
(let [ret (client/send! event)]
(when (= :rejected (:result ret))
(rf/dispatch [:remote/reject-forwarded-event event])
(log/warn "Tried to send invalid event. Error:" (pr-str (:reason ret))))))
(let [explanation (-> schema/event
(m/explain event)
(me/humanize))]
Expand Down
42 changes: 14 additions & 28 deletions src/cljs/athens/self_hosted/client.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
[athens.common-events.graph.atomic :as atomic-graph-ops]
[athens.common-events.schema :as schema]
[athens.common.logging :as log]
[cognitect.transit :as transit]
[com.cognitect.transit.types :as ty]
[com.stuartsierra.component :as component]
[re-frame.core :as rf]))


(extend-type ty/UUID IUUID)


(defonce ^:private ws-connection (atom nil))


Expand Down Expand Up @@ -102,8 +97,14 @@
(log/debug "event-id:" (pr-str (:event/id data))
", type:" (pr-str (:event/type data))
"WSClient sending to server")
(.send connection (transit/write (transit/writer :json) data))
{:result :sent})
(let [serialized-event (common-events/serialize data)
errors (common-events/validate-serialized-event serialized-event)]
(if errors
(do (log/warn "Tried sending invalid serialized event:" (pr-str errors))
{:result :rejected
:reason :invalid-event-schema})
(do (.send connection serialized-event)
{:result :sent}))))
(do
(log/warn "event-id:" (pr-str (:event/id data))
", type:" (pr-str (:event/type data))
Expand Down Expand Up @@ -277,29 +278,14 @@
"WSClient Received invalid server event, explanation:" (pr-str (schema/explain-server-event packet)))))


(def ^:private datom-reader
(transit/read-handler
(fn [[e a v tx added]]
{:e e
:a a
:v v
:tx tx
:added added})))


(defn- message-handler
[event]
(let [packet (->> event
.-data
(transit/read
(transit/reader
:json
{:handlers
{:datom datom-reader}})))]

(if (schema/valid-event-response? packet)
(awaited-response-handler packet)
(server-event-handler packet))))
(if-let [errors (common-events/validate-serialized-event (.-data event))]
(log/warn "Received invalid serialized event:" (pr-str errors))
(let [packet (->> event .-data common-events/deserialize)]
(if (schema/valid-event-response? packet)
(awaited-response-handler packet)
(server-event-handler packet)))))


(defn- remove-listeners!
Expand Down