Created
October 16, 2020 08:37
-
-
Save dinosaure/ac2a5956a16370ddcf965a1980f5c9a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt.Infix | |
let ( >>? ) x f = | |
x >>= function Ok x -> f x | Error err -> Lwt.return (Error err) | |
let getline queue = | |
let exists ~predicate queue = | |
let pos = ref 0 and res = ref (-1) in | |
Ke.Rke.iter | |
(fun chr -> | |
if predicate chr then res := !pos ; | |
incr pos) | |
queue ; | |
if !res = -1 then None else Some !res in | |
let blit src src_off dst dst_off len = | |
Bigstringaf.blit_to_bytes src ~src_off dst ~dst_off ~len in | |
match exists ~predicate:(( = ) '\n') queue with | |
| Some pos -> | |
let tmp = Bytes.create pos in | |
Ke.Rke.N.keep_exn queue ~blit ~length:Bytes.length ~off:0 ~len:pos tmp ; | |
Ke.Rke.N.shift_exn queue (pos + 1) ; | |
Some (Bytes.unsafe_to_string tmp) | |
| None -> None | |
let getline queue flow = | |
let tmp = Cstruct.create 0x1000 in | |
let blit src src_off dst dst_off len = | |
let src = Cstruct.to_bigarray src in | |
Bigstringaf.blit src ~src_off dst ~dst_off ~len in | |
let rec go () = | |
match getline queue with | |
| Some line -> Lwt.return (Ok (`Line line)) | |
| None -> ( | |
Conduit_lwt.recv flow tmp >>? function | |
| `End_of_flow -> Lwt.return (Ok `End_of_flow) | |
| `Input len -> | |
Ke.Rke.N.push queue ~blit ~length:Cstruct.len ~off:0 ~len tmp ; | |
go ()) in | |
go () | |
let client ~resolvers edn = | |
Conduit_lwt.resolve resolvers edn >>? fun flow -> | |
let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in | |
let rec go () = | |
match input_line stdin with | |
| line -> ( | |
Conduit_lwt.send flow (Cstruct.of_string (line ^ "\n")) >>? fun _ -> | |
Fmt.pr "> %s.\n%!" line ; | |
getline queue flow >>? function | |
| `Line "pong" -> | |
Fmt.pr "< %s.\n%!" line ; | |
go () | |
| `Line line -> | |
Fmt.pr "< %s.\n%!" line ; | |
Conduit_lwt.close flow | |
| `End_of_flow -> Conduit_lwt.close flow) | |
| exception End_of_file -> Conduit_lwt.close flow in | |
go () | |
let tls_config = Tls.Config.client ~authenticator:(fun ~host:_ _ -> Ok None) () | |
let () = Mirage_crypto_rng_unix.initialize () | |
let conduit_of_uri uri = | |
let host = Uri.host_with_default ~default:"localhost" uri in | |
let edn = Conduit_lwt.Endpoint.v host in | |
let resolvers = | |
match Uri.scheme uri with | |
| Some "pg" -> | |
let port = Option.value ~default:8080 (Uri.port uri) in | |
let resolvers = | |
Conduit.empty | |
|> Conduit_lwt.add Conduit_lwt.TCP.protocol | |
(Conduit_lwt.TCP.resolve ~port) in | |
resolvers | |
| Some "pgs" -> | |
let port = Option.value ~default:4343 (Uri.port uri) in | |
let resolvers = | |
Conduit.empty | |
|> Conduit_lwt.add Conduit_lwt_tls.TCP.protocol | |
(Conduit_lwt_tls.TCP.resolve ~port ~config:tls_config) in | |
resolvers | |
| None -> | |
let sport = Option.value ~default:4343 (Uri.port uri) in | |
let uport = Option.value ~default:8080 (Uri.port uri) in | |
let resolvers = | |
Conduit.empty | |
|> Conduit_lwt.add ~priority:10 Conduit_lwt_tls.TCP.protocol | |
(Conduit_lwt_tls.TCP.resolve ~port:sport ~config:tls_config) | |
|> Conduit_lwt.add Conduit_lwt.TCP.protocol | |
(Conduit_lwt.TCP.resolve ~port:uport) in | |
resolvers | |
| Some scheme -> Fmt.invalid_arg "Invalid scheme: %s" scheme in | |
(resolvers, edn) | |
let fiber ~uri = | |
let resolvers, edn = conduit_of_uri uri in | |
client ~resolvers edn >>= function | |
| Ok () -> Lwt.return_unit | |
| Error err -> Fmt.failwith "%a" Conduit_lwt.pp_error err | |
let () = | |
let uri = Uri.of_string Sys.argv.(1) in | |
Lwt_main.run (fiber ~uri) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt.Infix | |
let ( >>? ) x f = | |
x >>= function Ok x -> f x | Error err -> Lwt.return (Error err) | |
let getline queue = | |
let exists ~predicate queue = | |
let pos = ref 0 and res = ref (-1) in | |
Ke.Rke.iter | |
(fun chr -> | |
if predicate chr then res := !pos ; | |
incr pos) | |
queue ; | |
if !res = -1 then None else Some !res in | |
let blit src src_off dst dst_off len = | |
Bigstringaf.blit_to_bytes src ~src_off dst ~dst_off ~len in | |
match exists ~predicate:(( = ) '\n') queue with | |
| Some pos -> | |
let tmp = Bytes.create pos in | |
Ke.Rke.N.keep_exn queue ~blit ~length:Bytes.length ~off:0 ~len:pos tmp ; | |
Ke.Rke.N.shift_exn queue (pos + 1) ; | |
Some (Bytes.unsafe_to_string tmp) | |
| None -> None | |
let getline queue flow = | |
let tmp = Cstruct.create 0x1000 in | |
let blit src src_off dst dst_off len = | |
let src = Cstruct.to_bigarray src in | |
Bigstringaf.blit src ~src_off dst ~dst_off ~len in | |
let rec go () = | |
match getline queue with | |
| Some line -> Lwt.return (Ok (`Line line)) | |
| None -> ( | |
Conduit_lwt.recv flow tmp >>? function | |
| `End_of_flow -> Lwt.return (Ok `End_of_flow) | |
| `Input len -> | |
Ke.Rke.N.push queue ~blit ~length:Cstruct.len ~off:0 ~len tmp ; | |
go ()) in | |
go () | |
let handler flow = | |
let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in | |
let rec go () = | |
getline queue flow >>= function | |
| Ok `End_of_flow | Error _ -> Conduit_lwt.close flow | |
| Ok (`Line "ping") -> | |
Conduit_lwt.send flow (Cstruct.of_string "pong\n") >>? fun _ -> go () | |
| Ok (`Line "pong") -> | |
Conduit_lwt.send flow (Cstruct.of_string "ping\n") >>? fun _ -> go () | |
| Ok (`Line line) -> | |
Conduit_lwt.send flow (Cstruct.of_string (line ^ "\n")) >>? fun _ -> | |
Conduit_lwt.close flow in | |
go () >>= function | |
| Error err -> Fmt.failwith "%a" Conduit_lwt.pp_error err | |
| Ok () -> Lwt.return_unit | |
let server cfg ~protocol ~service = | |
Conduit_lwt.serve | |
~handler:(fun flow -> handler (Conduit_lwt.pack protocol flow)) | |
~service cfg | |
let () = Mirage_crypto_rng_unix.initialize () | |
let load_file filename = | |
let ic = open_in filename in | |
let ln = in_channel_length ic in | |
let rs = Bytes.create ln in | |
really_input ic rs 0 ln ; | |
close_in ic ; | |
Cstruct.of_bytes rs | |
let config cert key = | |
let cert = load_file cert in | |
let key = load_file key in | |
match | |
(X509.Certificate.decode_pem_multiple cert, X509.Private_key.decode_pem key) | |
with | |
| Ok certs, Ok (`RSA key) -> | |
Tls.Config.server ~certificates:(`Single (certs, key)) () | |
| _ -> failwith "Invalid key or certificate" | |
let fiber ~uri = | |
let host = Uri.host_with_default ~default:"127.0.0.1" uri in | |
let cfg ~port = | |
{ | |
Conduit_lwt.TCP.sockaddr = | |
Unix.(ADDR_INET (inet_addr_of_string host, port)); | |
capacity = 40; | |
} in | |
let _always, run = | |
match Uri.scheme uri with | |
| None | Some "pg" -> | |
let port = Option.value ~default:8080 (Uri.port uri) in | |
server (cfg ~port) ~protocol:Conduit_lwt.TCP.protocol | |
~service:Conduit_lwt.TCP.service | |
| Some "pgs" -> | |
let port = Option.value ~default:4343 (Uri.port uri) in | |
let cfg = (cfg ~port, config "server.pem" "server.key") in | |
server cfg ~protocol:Conduit_lwt_tls.TCP.protocol | |
~service:Conduit_lwt_tls.TCP.service | |
| Some scheme -> Fmt.invalid_arg "Invalid scheme: %s" scheme in | |
run () | |
let () = | |
let uri = Uri.of_string Sys.argv.(1) in | |
Lwt_main.run (fiber ~uri) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment