Skip to content

Commit

Permalink
add threading
Browse files Browse the repository at this point in the history
  • Loading branch information
onetonfoot committed Mar 31, 2023
1 parent 96e9eb8 commit bc3443d
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 74 deletions.
14 changes: 10 additions & 4 deletions Manifest.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This file is machine-generated - editing it directly is not advised

julia_version = "1.9.0-beta4"
julia_version = "1.9.0-rc1"
manifest_format = "2.0"
project_hash = "b87badb4c3d50c7fc1e8092b66ffc39303ff5ec3"
project_hash = "785d003a7fe0d5a0bbce7a64c1600efa9c5ebbe2"

[[deps.AbstractTrees]]
git-tree-sha1 = "faa260e4cb5aba097a73fab382dd4b5819d8ec8c"
Expand Down Expand Up @@ -234,7 +234,13 @@ version = "1.1.7"
[[deps.MbedTLS_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "c8ffd9c3-330d-5841-b78e-0817d7145fa1"
version = "2.28.0+0"
version = "2.28.2+0"

[[deps.Mkcert]]
deps = ["Pkg", "Test"]
git-tree-sha1 = "f774d7c0a1fa6530808a1497b3ac794edea88814"
uuid = "f2e023e7-06c5-4a31-9859-21762dffa823"
version = "0.1.1"

[[deps.Mmap]]
uuid = "a63ad114-7e13-5084-954f-fe012c677804"
Expand All @@ -260,7 +266,7 @@ version = "1.2.0"
[[deps.OpenBLAS_jll]]
deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"]
uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
version = "0.3.21+0"
version = "0.3.21+4"

[[deps.OpenSSL]]
deps = ["BitFlags", "Dates", "MozillaCACerts_jll", "OpenSSL_jll", "Sockets"]
Expand Down
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
JET = "c3a54625-cd67-489e-a8e7-0a5a0ff4e31b"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
Mkcert = "f2e023e7-06c5-4a31-9859-21762dffa823"
NamedTupleTools = "d9ec5142-1e00-5aa0-9d6a-321866360f50"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Parsers = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0"
Expand All @@ -36,7 +37,6 @@ CodeInfoTools = "0.3"
DataStructures = "0.18"
ExproniconLite = "0.7.1"
FilePaths = "0.8"
PkgVersion = "0.3"
FilePathsBase = "0.9"
HTTP = "1"
JET = "0.7"
Expand All @@ -45,6 +45,7 @@ MacroTools = "0.5.10"
NamedTupleTools = "0.14"
OrderedCollections = "1"
Parsers = "2.3"
PkgVersion = "0.3"
Revise = "3.3"
SnoopPrecompile = "1.0"
StructTypes = "1.10"
Expand Down
19 changes: 19 additions & 0 deletions benchmark/server.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Bonsai, JSON3

const app = App()

function index(stream)
Bonsai.write(stream, Body("ok"))
end

app.get["/"] = index

# TODO: make some plots with Makie

function benchmark(nconnections, total_reqs)
# bombardier -c 1000 -n 100000 http://localhost:7517 -o json
end



start(app, port=7517)
32 changes: 11 additions & 21 deletions src/app.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ using OrderedCollections
using PkgVersion
using Term
using Term: hstack
using Base.Threads

const VERSION = PkgVersion.@Version

Expand All @@ -17,6 +18,8 @@ Base.@kwdef mutable struct App
inet_addr::Union{InetAddr,Nothing} = InetAddr(ip"0.0.0.0", 8081)
server::Union{TCPServer,Nothing} = nothing

current_connections::Int = 0
connection_limit::Int = typemax(Int)
# LittleDict is ordered dict that is fast to iterate over for less than 50 elements
_middleware::LittleDict{Tuple{HttpMethod,String},Array{Middleware}} = LittleDict() #

Expand Down Expand Up @@ -45,30 +48,17 @@ function (app::App)(stream)
request.body = Base.read(stream)
closeread(stream)
end
handler, middleware = gethandlers(app, request)

try
handler, middleware = gethandlers(app, request)

if ismissing(handler) || isnothing(handler)
push!(middleware, Middleware((stream, next) -> throw(NoHandler(stream))))
else
push!(middleware, Middleware((stream, next) -> handler(stream)))
end

combine_middleware(middleware)(stream)

catch e
if e isa NoHandler
@warn e
else
@error "Unhandled Error" e = e
end
finally
request.response.request = request
startwrite(stream)
Base.write(stream, request.response.body)
if ismissing(handler) || isnothing(handler)
push!(middleware, Middleware((stream, next) -> throw(NoHandler(stream))))
else
push!(middleware, Middleware((stream, next) -> handler(stream)))
end

combine_middleware(middleware)(stream)
startwrite(stream)
Base.write(stream, request.response.body)
end


Expand Down
87 changes: 45 additions & 42 deletions src/precompile/precompile_Bonsai.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,69 +54,72 @@ end

function _precompile_()
ccall(:jl_generating_output, Cint, ()) == 1 || return nothing
Base.precompile(Tuple{typeof(write),Stream,Headers{NamedTuple{(:header,), Tuple{String}}}}) # time: 2.2988083
Base.precompile(Tuple{typeof(write),Response,Body{NamedTuple{(:x,), Tuple{Int64}}}}) # time: 2.1992393
Base.precompile(Tuple{typeof(write),Stream,Headers{NamedTuple{(:header,),Tuple{String}}}}) # time: 2.2988083
Base.precompile(Tuple{typeof(write),Response,Body{NamedTuple{(:x,),Tuple{Int64}}}}) # time: 2.1992393
Base.precompile(Tuple{typeof(headerize),Symbol}) # time: 2.164023
Base.precompile(Tuple{typeof(write),Response,Headers{NamedTuple{(:content_type,), Tuple{String}}}}) # time: 2.1166892
Base.precompile(Tuple{typeof(write),Response,Headers{NamedTuple{(:header,), Tuple{String}}}}) # time: 2.0835173
Base.precompile(Tuple{typeof(read),HTTP.Streams.Stream{A<:HTTP.Messages.Request, B<:Core.IO},Query}) # time: 0.96762437
Base.precompile(Tuple{typeof(write),Response,Headers{NamedTuple{(:content_type,),Tuple{String}}}}) # time: 2.1166892
Base.precompile(Tuple{typeof(write),Response,Headers{NamedTuple{(:header,),Tuple{String}}}}) # time: 2.0835173
Base.precompile(Tuple{typeof(read),HTTP.Streams.Stream{A<:HTTP.Messages.Request,B<:Core.IO},Query}) # time: 0.96762437
Base.precompile(Tuple{typeof(handler_writes),Any}) # time: 0.69667375
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:id, :color), Tuple{DataType, DataType}},typeof(kw_constructor),Type}) # time: 0.11900312
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:id, :color),Tuple{DataType,DataType}},typeof(kw_constructor),Type}) # time: 0.11900312
Base.precompile(Tuple{typeof(getmiddleware),App,Request}) # time: 0.10577414
Base.precompile(Tuple{typeof(write),Stream,Body{NamedTuple{(:x,), Tuple{Int64}}}}) # time: 0.09205892
Base.precompile(Tuple{typeof(write),Stream,Body{NamedTuple{(:x,),Tuple{Int64}}}}) # time: 0.09205892
Base.precompile(Tuple{typeof(write),Response,Body{String},Status{201}}) # time: 0.06865481
Base.precompile(Tuple{typeof(read),Dict{Any, Any},DataType}) # time: 0.031720627
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:header,), Tuple{String}},Type{Headers}}) # time: 0.02950376
Base.precompile(Tuple{typeof(construct_error),DataType,JSON3.Object{Vector{UInt8}, Vector{UInt64}}}) # time: 0.02322583
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,), Tuple{Int64}},Type{Body}}) # time: 0.022858528
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:color,), Tuple{DataType}},Type{Query}}) # time: 0.021564905
Base.precompile(Tuple{typeof(read),Dict{Any,Any},DataType}) # time: 0.031720627
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:header,),Tuple{String}},Type{Headers}}) # time: 0.02950376
Base.precompile(Tuple{typeof(construct_error),DataType,JSON3.Object{Vector{UInt8},Vector{UInt64}}}) # time: 0.02322583
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,),Tuple{Int64}},Type{Body}}) # time: 0.022858528
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:color,),Tuple{DataType}},Type{Query}}) # time: 0.021564905
Base.precompile(Tuple{typeof(setindex!),CreateMiddleware,Vector{Function},String}) # time: 0.01607091
Base.precompile(Tuple{typeof(read),Request,Route{NamedTuple{(:id,), Tuple{Int64}}}}) # time: 0.016059436
Base.precompile(Tuple{typeof(read),Request,Headers{NamedTuple{(:x_next,), Tuple{String}}}}) # time: 0.015223521
Base.precompile(Tuple{typeof(read),Request,Route{NamedTuple{(:id,),Tuple{Int64}}}}) # time: 0.016059436
Base.precompile(Tuple{typeof(read),Request,Headers{NamedTuple{(:x_next,),Tuple{String}}}}) # time: 0.015223521
Base.precompile(Tuple{typeof(write),Response,Body{PosixPath}}) # time: 0.014884812
Base.precompile(Tuple{typeof(read),Request,Query{NamedTuple{(:y,), Tuple{Union{Nothing, String}}}}}) # time: 0.014723092
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:id,), Tuple{DataType}},Type{Route}}) # time: 0.014646934
Base.precompile(Tuple{typeof(read),Request,Route{NamedTuple{(:x,), Tuple{String}}}}) # time: 0.014077245
Base.precompile(Tuple{typeof(read),Request,Query{NamedTuple{(:y,),Tuple{Union{Nothing,String}}}}}) # time: 0.014723092
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:id,),Tuple{DataType}},Type{Route}}) # time: 0.014646934
Base.precompile(Tuple{typeof(read),Request,Route{NamedTuple{(:x,),Tuple{String}}}}) # time: 0.014077245
isdefined(Bonsai, Symbol("#_field_name#5")) && Base.precompile(Tuple{getfield(Bonsai, Symbol("#_field_name#5")),Expr}) # time: 0.011492283
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x, :y), Tuple{Int64, Float64}},Type{Body}}) # time: 0.01000522
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x, :y), Tuple{DataType, DataType}},Type{Body}}) # time: 0.008596082
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x, :y),Tuple{Int64,Float64}},Type{Body}}) # time: 0.01000522
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x, :y),Tuple{DataType,DataType}},Type{Body}}) # time: 0.008596082
Base.precompile(Tuple{typeof(handler_reads),Any}) # time: 0.00729835
Base.precompile(Tuple{typeof(read),NamedTuple{(:a, :b), Tuple{Int64, Float64}},DataType}) # time: 0.006895304
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:error, :message), Tuple{String, String}},Type{Body}}) # time: 0.006244926
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:content_type,), Tuple{String}},Type{Headers}}) # time: 0.005384445
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:y,), Tuple{Union}},Type{Query}}) # time: 0.005269121
Base.precompile(Tuple{typeof(read),NamedTuple{(:a, :b),Tuple{Int64,Float64}},DataType}) # time: 0.006895304
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:error, :message),Tuple{String,String}},Type{Body}}) # time: 0.006244926
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:content_type,),Tuple{String}},Type{Headers}}) # time: 0.005384445
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:y,),Tuple{Union}},Type{Query}}) # time: 0.005269121
Base.precompile(Tuple{typeof(spliturl),String}) # time: 0.005232004
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,), Tuple{DataType}},Type{Route}}) # time: 0.00513121
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x_next,), Tuple{DataType}},Type{Headers}}) # time: 0.004608333
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,), Tuple{DataType}},Type{Query}}) # time: 0.003575695
Base.precompile(Tuple{typeof(convert_numbers!),Dict{Any, Any},Type}) # time: 0.003463286
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,),Tuple{DataType}},Type{Route}}) # time: 0.00513121
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x_next,),Tuple{DataType}},Type{Headers}}) # time: 0.004608333
Base.precompile(Tuple{typeof(Core.kwcall),NamedTuple{(:x,),Tuple{DataType}},Type{Query}}) # time: 0.003575695
Base.precompile(Tuple{typeof(convert_numbers!),Dict{Any,Any},Type}) # time: 0.003463286
Base.precompile(Tuple{typeof(gethandlers),App,Request}) # time: 0.003460286
isdefined(Bonsai, Symbol("#_field_kw#6")) && Base.precompile(Tuple{getfield(Bonsai, Symbol("#_field_kw#6")),Any}) # time: 0.00333146
Base.precompile(Tuple{Type{Status},Int64}) # time: 0.003187899
Base.precompile(Tuple{typeof(write),Stream,Body{String},Status{201}}) # time: 0.003142989
Base.precompile(Tuple{typeof(combine_middleware),Vector{Function}}) # time: 0.002716805
Base.precompile(Tuple{typeof(getindex),CreateMiddleware,String}) # time: 0.002696335
Base.precompile(Tuple{typeof(gethandler),App,Request}) # time: 0.002680445
Base.precompile(Tuple{typeof(convert_numbers!),Dict{Symbol, Any},Type}) # time: 0.002601102
Base.precompile(Tuple{typeof(convert_numbers!),Dict{Symbol,Any},Type}) # time: 0.002601102
Base.precompile(Tuple{typeof(combine_middleware),Vector{Any}}) # time: 0.002584136
Base.precompile(Tuple{typeof(read),HTTP.Streams.Stream{A<:HTTP.Messages.Request, B<:Core.IO},Route}) # time: 0.002568127
Base.precompile(Tuple{typeof(read),HTTP.Streams.Stream{A<:HTTP.Messages.Request,B<:Core.IO},Route}) # time: 0.002568127
Base.precompile(Tuple{Type{Query},DataType}) # time: 0.002217689
Base.precompile(Tuple{typeof(write),Response,Status{201}}) # time: 0.002194738
Base.precompile(Tuple{Type{Headers},DataType}) # time: 0.002117839
Base.precompile(Tuple{typeof(setindex!),CreateMiddleware,Function,String}) # time: 0.00199732
Base.precompile(Tuple{typeof(construct_error),DataType,JSON3.Object{Base.CodeUnits{UInt8, String}, Vector{UInt64}}}) # time: 0.001952182
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:id,), Tuple{Int64}}},Nothing}) # time: 0.001862422
Base.precompile(Tuple{Type{Query},Type{NamedTuple{(:y,), Tuple{Union{Nothing, String}}}},Nothing}) # time: 0.001759281
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:id, :color), Tuple{Int64, String}}},Nothing}) # time: 0.001602885
let fbody = try __lookup_kwbody__(which(kw_constructor, (Type{Route},))) catch missing end
if !ismissing(fbody)
precompile(fbody, (Base.Pairs{Symbol, DataType, Tuple{Symbol, Symbol}, NamedTuple{(:id, :color), Tuple{DataType, DataType}}},typeof(kw_constructor),Type{Route},))
end
end # time: 0.001553336
Base.precompile(Tuple{Type{Headers},Type{NamedTuple{(:x_next,), Tuple{String}}},Nothing}) # time: 0.001450776
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:x,), Tuple{String}}},Nothing}) # time: 0.001449037
Base.precompile(Tuple{Type{Body},Type{NamedTuple{(:x, :y), Tuple{String, Float64}}},Nothing}) # time: 0.001426166
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:x,), Tuple{Int64}}},Nothing}) # time: 0.001367136
Base.precompile(Tuple{typeof(construct_error),DataType,JSON3.Object{Base.CodeUnits{UInt8,String},Vector{UInt64}}}) # time: 0.001952182
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:id,),Tuple{Int64}}},Nothing}) # time: 0.001862422
Base.precompile(Tuple{Type{Query},Type{NamedTuple{(:y,),Tuple{Union{Nothing,String}}}},Nothing}) # time: 0.001759281
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:id, :color),Tuple{Int64,String}}},Nothing}) # time: 0.001602885
let fbody = try
__lookup_kwbody__(which(kw_constructor, (Type{Route},)))
catch missing
end
if !ismissing(fbody)
precompile(fbody, (Base.Pairs{Symbol,DataType,Tuple{Symbol,Symbol},NamedTuple{(:id, :color),Tuple{DataType,DataType}}}, typeof(kw_constructor), Type{Route},))
end
end # time: 0.001553336
Base.precompile(Tuple{Type{Headers},Type{NamedTuple{(:x_next,),Tuple{String}}},Nothing}) # time: 0.001450776
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:x,),Tuple{String}}},Nothing}) # time: 0.001449037
Base.precompile(Tuple{Type{Body},Type{NamedTuple{(:x, :y),Tuple{String,Float64}}},Nothing}) # time: 0.001426166
Base.precompile(Tuple{Type{Route},Type{NamedTuple{(:x,),Tuple{Int64}}},Nothing}) # time: 0.001367136
Base.precompile(Tuple{typeof(get_default),Expr}) # time: 0.001248698
isdefined(Bonsai, Symbol("#_field_kw#6")) && Base.precompile(Tuple{getfield(Bonsai, Symbol("#_field_kw#6")),Expr}) # time: 0.001074699
Base.precompile(Tuple{Type{Body},PosixPath}) # time: 0.001045749
Expand Down
67 changes: 61 additions & 6 deletions src/server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,78 @@ export start, stop
# using Base.Threads: Atomic
# using Base: check_channel_state

struct WebRequest
stream::HTTP.Stream
done::Threads.Event
end

struct HandlerQueue
queue::Channel{WebRequest}
count::Threads.Atomic{Int}
shutdown::Threads.Atomic{Bool}
function HandlerQueue(queuesize=1024)
new(Channel{WebRequest}(queuesize), Threads.Atomic{Int}(0), Threads.Atomic{Bool}(false))
end
end

function handle_error(stream, e)
if e isa NoHandler
@warn e
else
@error "Unhandled Error" e = (e, catch_backtrace())
end
request::Request = stream.message
request.response.request = request
HTTP.setstatus(stream, 500)
startwrite(stream)
Base.write(stream, request.response.body)
end

function respond(stream::HTTP.Stream, app::App)
try
app(stream)
catch e
handle_error(stream, e)
end
end

function start(
app::App;
host=ip"0.0.0.0",
port=8081,
kwargs...)

queue = HandlerQueue()

function streamhandler(stream::HTTP.Stream)
try
app.current_connections += 1
if app.current_connections < app.connection_limit
t = Threads.@spawn respond(stream, app)
wait(t)
else
@warn "Dropping connection..."
HTTP.setstatus(stream, 529)
Base.write(stream, "Server overloaded.")
end
catch e
# should never hit this code
handle_error(stream, e)
finally
app.current_connections -= 1
end
end

# https://github.com/JuliaLang/julia/issues/46635

errormonitor(
# can't use thread @spawn otherwise this
# breaks revise
# can't use thread @spawn otherwise this breaks revise, why I don't know
@async while isopen(app.cancel_token)
addr = Sockets.InetAddr(host, port)
tcp_server = Sockets.listen(addr)
app.server = tcp_server

http_server = HTTP.serve!(
# doesn't seem to make a difference
app,
streamhandler,
host, port; server=tcp_server, stream=true, kwargs...
)

Expand All @@ -50,10 +102,12 @@ function start(
sleep(0.1)
end
)

@info "Server running $(host):$(port), hold Ctrl+C to stop"

try
# https://github.com/JuliaLang/julia/issues/45055
# seems to be broken on 1.9
wait(app.cancel_token)
catch e
if e isa InterruptException
Expand All @@ -62,6 +116,7 @@ function start(
end
finally
stop(app)
queue.shutdown[] = true
notify(Revise.revision_event)
end
end
Expand Down

0 comments on commit bc3443d

Please sign in to comment.