Skip to content

Commit

Permalink
fix: set a 1MB event limit
Browse files Browse the repository at this point in the history
  • Loading branch information
filipesilva committed Mar 3, 2022
1 parent 579bd0d commit 2f26baa
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 61 deletions.
47 changes: 18 additions & 29 deletions src/clj/athens/self_hosted/clients.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,14 @@
[athens.common-events :as common-events]
[athens.common-events.schema :as schema]
[athens.common.logging :as log]
[cognitect.transit :as transit]
[org.httpkit.server :as http])
(:import
(java.io
ByteArrayInputStream
ByteArrayOutputStream)))
[org.httpkit.server :as http]))


;; Internal state
;; channel -> session info
(defonce clients (atom {}))


(defn ->transit
[data]
(let [out (ByteArrayOutputStream. 4096)
writer (transit/writer out :json)]
(transit/write writer data)
(.toString out)))


(defn <-transit
[transit-str]
(let [in (ByteArrayInputStream. (.getBytes transit-str))
reader (transit/reader in :json)]
(transit/read reader)))


;; Client management API

(defn get-client-session
Expand Down Expand Up @@ -71,14 +51,23 @@
valid-server-event? (schema/valid-server-event? data)]
(if (or valid-event-response?
valid-server-event?)
(let [type (common-events/find-event-or-atomic-op-type data)
status (:event/status data)]
(log/debug "Sending to username:" username
", event-id:" (:event/id data)
(if type
(str ", type: " type)
(str ", status: " status)))
(http/send! channel (->transit data)))
(let [type (common-events/find-event-or-atomic-op-type data)
status (:event/status data)
serialized-event (common-events/serialize data)
errors (common-events/validate-serialized-event serialized-event)]
(if errors
(log/error "Not sending invalid event to username:" username
", event-id:" (:event/id data)
", type:" (common-events/find-event-or-atomic-op-type data)
", invalid serialized event:"
"event-response take:" (str errors))
(do
(log/debug "Sending to username:" username
", event-id:" (:event/id data)
(if type
(str ", type: " type)
(str ", status: " status)))
(http/send! channel serialized-event))))
;; TODO internal failure mode, collect in reporting
(log/error "Not sending invalid event to username:" username
", event-id:" (:event/id data)
Expand Down
19 changes: 17 additions & 2 deletions src/clj/athens/self_hosted/components/web.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,28 @@
(fn receive-handler
[channel msg]
(let [username (clients/get-client-username channel)
data (clients/<-transit msg)]
(if-not (schema/valid-event? data)
data (common-events/deserialize msg)
errors (common-events/validate-serialized-event msg)]
(cond
;; TODO: we should be able to validate the serialized event without deserializing it, but
;; we also need to build a rejected event to pass back to the client, and to do that we need
;; the :event/id, which we only get after deserializing the event.
;; I think this means that we should separate the :event/id from the payload itself.
errors
(do
(log/warn "username:" username "Invalid serialized event received, errors:" errors)
(clients/send! channel (common-events/build-event-rejected (:event/id data)
(str "Invalid serialized event")
errors)))

(not (schema/valid-event? data))
(let [explanation (schema/explain-event data)]
(log/warn "username:" username "Invalid event received, explanation:" explanation)
(clients/send! channel (common-events/build-event-rejected (:event/id data)
(str "Invalid event: " (pr-str data))
explanation)))

:else
(let [{:event/keys [id type]} data]
(log/info "Received valid event" "username:" username ", event-id:" id ", type:" (common-events/find-event-or-atomic-op-type data))
(let [{:event/keys [status]
Expand Down
60 changes: 59 additions & 1 deletion src/cljc/athens/common_events.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,65 @@
(ns athens.common-events
"Event as Verbs executed on Knowledge Graph"
(:require
[athens.common.utils :as utils]))
[athens.common.utils :as utils]
[cognitect.transit :as transit]
[com.cognitect.transit.types :as ty])
#?(:clj
(:import
(java.io
ByteArrayInputStream
ByteArrayOutputStream))))


;; Limits

(def max-event-size-in-bytes (* 1 1000 1000)) ; 1 MB

(defn valid-serialized-event?
[serialized-event]
(< (count serialized-event) max-event-size-in-bytes))


(defn validate-serialized-event
[serialized-event]
(when-not (valid-serialized-event? serialized-event)
(ex-info "Serialized event is larger than 10 MB" {})))


;; serialization and limits

(def serialization-type :json)


(defn serialize
[event]
(let [writer #?(:cljs (transit/writer serialization-type)
:clj (transit/writer (ByteArrayOutputStream. 4096)
serialization-type))]
(transit/write writer event)))


;; Really shouldn't need these two, but we still send datoms via db-dump.
(extend-type ty/UUID IUUID)


(def ^:private datom-reader
(transit/read-handler
(fn [[e a v tx added]]
{:e e
:a a
:v v
:tx tx
:added added})))


(defn deserialize
[serialized-event]
(let [opts {:handlers {:datom datom-reader}}
reader #?(:cljs (transit/reader serialization-type opts)
:clj (transit/reader (ByteArrayInputStream. (.getBytes serialized-event))
serialization-type opts))]
(transit/read reader serialized-event)))


;; building events
Expand Down
5 changes: 4 additions & 1 deletion src/cljs/athens/effects.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@
;; valid event let's send it
(do
(log/debug "Sending event:" (pr-str event))
(client/send! event))
(let [ret (client/send! event)]
(when (= :rejected (:result ret))
(rf/dispatch [:remote/reject-forwarded-event event])
(log/warn "Tried to send invalid event. Error:" (pr-str (:reason ret))))))
(let [explanation (-> schema/event
(m/explain event)
(me/humanize))]
Expand Down
42 changes: 14 additions & 28 deletions src/cljs/athens/self_hosted/client.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
[athens.common-events.graph.atomic :as atomic-graph-ops]
[athens.common-events.schema :as schema]
[athens.common.logging :as log]
[cognitect.transit :as transit]
[com.cognitect.transit.types :as ty]
[com.stuartsierra.component :as component]
[re-frame.core :as rf]))


(extend-type ty/UUID IUUID)


(defonce ^:private ws-connection (atom nil))


Expand Down Expand Up @@ -102,8 +97,14 @@
(log/debug "event-id:" (pr-str (:event/id data))
", type:" (pr-str (:event/type data))
"WSClient sending to server")
(.send connection (transit/write (transit/writer :json) data))
{:result :sent})
(let [serialized-event (common-events/serialize data)
errors (common-events/validate-serialized-event serialized-event)]
(if errors
(do (log/warn "Received invalid serialized event:" (pr-str errors))
{:result :rejected
:reason :invalid-event-schema})
(do (.send connection serialized-event)
{:result :sent}))))
(do
(log/warn "event-id:" (pr-str (:event/id data))
", type:" (pr-str (:event/type data))
Expand Down Expand Up @@ -277,29 +278,14 @@
"WSClient Received invalid server event, explanation:" (pr-str (schema/explain-server-event packet)))))


(def ^:private datom-reader
(transit/read-handler
(fn [[e a v tx added]]
{:e e
:a a
:v v
:tx tx
:added added})))


(defn- message-handler
[event]
(let [packet (->> event
.-data
(transit/read
(transit/reader
:json
{:handlers
{:datom datom-reader}})))]

(if (schema/valid-event-response? packet)
(awaited-response-handler packet)
(server-event-handler packet))))
(if-let [errors (common-events/validate-serialized-event event)]
(log/warn "Received invalid serialized event:" (pr-str errors))
(let [packet (->> event .-data common-events/deserialize)]
(if (schema/valid-event-response? packet)
(awaited-response-handler packet)
(server-event-handler packet)))))


(defn- remove-listeners!
Expand Down

0 comments on commit 2f26baa

Please sign in to comment.