Skip to content

Commit

Permalink
Merge pull request #608 from mirage/bounds-stream
Browse files Browse the repository at this point in the history
Try to bounds the stream between the receiver and the PACK decoder
  • Loading branch information
dinosaure authored Dec 23, 2022
2 parents 203e963 + ffabb51 commit 627ae72
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 39 deletions.
12 changes: 7 additions & 5 deletions src/not-so-smart/fetch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,21 @@ struct
>>= function
| `Close -> return []
| `Continue res ->
let pack ctx =
let recv_pack ctx =
let open Smart in
let side_band =
Smart.Context.is_cap_shared ctx `Side_band
|| Smart.Context.is_cap_shared ctx `Side_band_64k
in
recv ctx (recv_pack ~side_band ~push_stdout ~push_stderr pack)
recv ctx (recv_pack ~push_stdout ~push_stderr side_band)
in
if res < 0 then Log.warn (fun m -> m "No common commits");
let rec go () =
Log.debug (fun m -> m "Read PACK file.");
Smart_flow.run sched fail io flow (pack ctx) |> prj
>>= fun continue -> if continue then go () else return ()
Smart_flow.run sched fail io flow (recv_pack ctx) |> prj >>= function
| `End_of_transmission -> return ()
| `Payload (str, off, len) -> pack (str, off, len) >>= go
| `Stdout -> go ()
| `Stderr -> go ()
in
Log.debug (fun m -> m "Start to download PACK file.");
go () >>= fun () -> return (List.combine refs uids)
Expand Down
2 changes: 1 addition & 1 deletion src/not-so-smart/fetch.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ module Make
(Uid.t, Uid.t * int ref * int64, 'g) store ->
(Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) access ->
configuration ->
(string * int * int -> unit) ->
(string * int * int -> unit IO.t) ->
(Ref.t * Uid.t) list IO.t
end
21 changes: 8 additions & 13 deletions src/not-so-smart/protocol.ml
Original file line number Diff line number Diff line change
Expand Up @@ -574,37 +574,32 @@ module Decoder = struct
let junk_pack_without_sideband (decoder : decoder) =
decoder.pos <- decoder.max

let decode_pack ?(side_band = false) ~push_pack ~push_stdout ~push_stderr
decoder =
let decode_pack ?(side_band = false) ~push_stdout ~push_stderr decoder =
let with_side_band decoder =
let v = peek_pkt ~trim:false decoder in
match String.Sub.head v with
| Some '\001' ->
let off = String.Sub.start_pos v + 1 in
let len = String.Sub.stop_pos v - off in
let buf = String.Sub.base_string v in
push_pack (buf, off, len);
let str = String.Sub.to_string (String.Sub.tail v) in
junk_pkt decoder;
return true decoder
return (`Payload (str, 0, String.length str)) decoder
| Some '\002' ->
let tail = String.Sub.to_string (String.Sub.tail v) (* copy *) in
push_stdout tail;
junk_pkt decoder;
return true decoder
return `Stdout decoder
| Some '\003' ->
let tail = String.Sub.to_string (String.Sub.tail v) (* copy *) in
push_stderr tail;
junk_pkt decoder;
return true decoder
return `Stderr decoder
| Some _ -> fail decoder (`Invalid_side_band (String.Sub.to_string v))
| None -> return false decoder
| None -> return `End_of_transmission decoder
in
let end_of_pack decoder () = return false decoder in
let end_of_pack decoder () = return `End_of_transmission decoder in
let without_side_band decoder =
let buf, off, len = peek_pack_without_sideband decoder in
push_pack (buf, off, len);
junk_pack_without_sideband decoder;
return true decoder
return (`Payload (buf, off, len)) decoder
in
if side_band then prompt_pkt ~strict:true with_side_band decoder
else prompt_pack_without_sideband without_side_band end_of_pack decoder
Expand Down
8 changes: 6 additions & 2 deletions src/not-so-smart/protocol.mli
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ module Decoder : sig

val decode_pack :
?side_band:bool ->
push_pack:(string * int * int -> unit) ->
push_stdout:(string -> unit) ->
push_stderr:(string -> unit) ->
decoder ->
(bool, [> error ]) state
( [ `Payload of string * int * int
| `End_of_transmission
| `Stdout
| `Stderr ],
[> error ] )
state

val decode_negotiation : decoder -> (string Negotiation.t, [> error ]) state
val decode_shallows : decoder -> (string Shallow.t list, [> error ]) state
Expand Down
16 changes: 9 additions & 7 deletions src/not-so-smart/smart.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ module Witness = struct
| Commands : (string, string) Commands.t option recv
| Recv_pack : {
side_band : bool;
push_pack : string * int * int -> unit;
push_stdout : string -> unit;
push_stderr : string -> unit;
}
-> bool recv
-> [ `Payload of string * int * int
| `End_of_transmission
| `Stdout
| `Stderr ]
recv
| Ack : string Negotiation.t recv
| Flush : unit recv
| Shallows : string Shallow.t list recv
Expand Down Expand Up @@ -93,8 +96,8 @@ module Value = struct
| Advertised_refs -> decode_advertised_refs decoder
| Result -> decode_result decoder
| Commands -> decode_commands decoder
| Recv_pack { side_band; push_pack; push_stdout; push_stderr } ->
decode_pack ~side_band ~push_pack ~push_stdout ~push_stderr decoder
| Recv_pack { side_band; push_stdout; push_stderr } ->
decode_pack ~side_band ~push_stdout ~push_stderr decoder
| Ack -> decode_negotiation decoder
| Status sideband -> decode_status ~sideband decoder
| Flush -> decode_flush decoder
Expand Down Expand Up @@ -138,9 +141,8 @@ let negotiation_done = Done
let negotiation_result = Result
let commands : _ send = Commands

let recv_pack ?(side_band = false) ?(push_stdout = ignore)
?(push_stderr = ignore) push_pack =
Recv_pack { side_band; push_pack; push_stdout; push_stderr }
let recv_pack ?(push_stdout = ignore) ?(push_stderr = ignore) side_band =
Recv_pack { side_band; push_stdout; push_stderr }

let recv_flush : _ recv = Flush
let status sideband = Status sideband
Expand Down
6 changes: 3 additions & 3 deletions src/not-so-smart/smart.mli
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ val advertised_refs : (string, string) Advertised_refs.t recv
val negotiation_result : string Result.t recv

val recv_pack :
?side_band:bool ->
?push_stdout:(string -> unit) ->
?push_stderr:(string -> unit) ->
(string * int * int -> unit) ->
bool recv
bool ->
[ `Payload of string * int * int | `End_of_transmission | `Stdout | `Stderr ]
recv

val recv_flush : unit recv
val recv_commands : (string, string) Commands.t option recv
Expand Down
17 changes: 9 additions & 8 deletions src/not-so-smart/smart_git.ml
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ struct
let v = String.sub payload off len in
pack (Some (v, 0, len)))
(fun refs ->
pack None;
pack None >>= fun () ->
Mimic.close flow >>= fun () -> Lwt.return_ok refs)
@@ fun exn ->
pack None;
pack None >>= fun () ->
Mimic.close flow >>= fun () -> Lwt.fail exn

let default_capabilities =
Expand Down Expand Up @@ -420,22 +420,23 @@ struct
in
Mimic.replace git_http_headers headers ctx

let fetch ?(push_stdout = ignore) ?(push_stderr = ignore) ?threads ~ctx
(access, light_load, heavy_load) store edn ?(version = `V1)
let fetch ?(push_stdout = ignore) ?(push_stderr = ignore) ?(bounds = 10)
?threads ~ctx (access, light_load, heavy_load) store edn ?(version = `V1)
?(capabilities = default_capabilities) ?deepen want t_pck t_idx ~src ~dst
~idx =
let open Rresult in
let open Lwt.Infix in
let hostname = edn.Endpoint.hostname in
let path = edn.Endpoint.path in
let stream, pusher = Lwt_stream.create () in
let stream, emitter = Lwt_stream.create_bounded bounds in
let pusher_with_logging = function
| Some (_, _, len) as v ->
| Some (str, off, len) ->
Log.debug (fun m -> m "Download %d byte(s) of the PACK file." len);
pusher v
emitter#push (str, off, len)
| None ->
Log.debug (fun m -> m "End of pack.");
pusher None
emitter#close;
Lwt.return_unit
in
let stream () = Lwt_stream.get stream in
let ctx = Mimic.add git_capabilities `Rd (Endpoint.to_ctx edn ctx) in
Expand Down
1 change: 1 addition & 0 deletions src/not-so-smart/smart_git_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ module type SMART_GIT = sig
val fetch :
?push_stdout:(string -> unit) ->
?push_stderr:(string -> unit) ->
?bounds:int ->
?threads:int ->
ctx:Mimic.ctx ->
(Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) Sigs.access
Expand Down

0 comments on commit 627ae72

Please sign in to comment.