Erstellen eines MsQuic/Nghttp3-WebTransport-ServersC++

Programme in C++. Entwicklerforum
Anonymous
 Erstellen eines MsQuic/Nghttp3-WebTransport-Servers

Post by Anonymous »

Die Idee ist, dass meine Windows-App QUIC-Daten mit einem Browser austauscht.
Beim Erstellen eines Servers mit msquic und nghttp3 bin ich nicht sicher, warum Chrome sagt

Es konnte keine Verbindung zu https://host.com:4433/: net::ERR_METHOD_NOT_SUPPORTED hergestellt werden. Verstehen Sie diesen Fehler
(index):20 Fail: WebTransportError: Opening handshake failed.

Wenn ich es mit Curl mache, funktioniert es

Code: Select all

curl --http3 -vvv https://host.com:4433/
Hier ist vereinfachter Code mit msquic und nghttp3:

Code: Select all

struct TOT_BUFFER : public QUIC_BUFFER
{
std::vector data;
void Load()
{
Length = (uint32_t)data.size();
Buffer = data.data();
}
TOT_BUFFER(QUIC_BUFFER* b = 0)
{
if (!b)
return;
Length = b->Length;
data.resize(Length);
memcpy(data.data(), b->Buffer, Length);
Buffer = data.data();
Load();
}
};
struct TOT_BUFFERS
{
std::vector buffers;
};

class Server;
class WebTransportSession;
class QuicStream
{
public:
HQUIC h = 0;
long long id = 0;
int Remote = 0;
int Type = 0; // 0->Control 1->QPACK Encoder 2->QPACK Decoder 3->Datagram 4->Unidirectional App
WebTransportSession* s = 0;
QuicStream(WebTransportSession* ss)
{
s = ss;
}

static QUIC_STATUS
QUIC_API StreamCallback(
_In_ HQUIC Stream,
_In_opt_ void* Context,
_Inout_ QUIC_STREAM_EVENT* Event
)
{
QuicStream* session = (QuicStream*)Context;
if (session == 0)
return QUIC_STATUS_INVALID_STATE;
return session->StreamCallback2(Stream, Event);
}

QUIC_STATUS
StreamCallback2(
_In_ HQUIC Stream,
_Inout_ QUIC_STREAM_EVENT* Event
);

};

class WebTransportSession
{
public:
const QUIC_API_TABLE* qt = 0;
HQUIC Connection = 0;
HQUIC Config = 0;
Server* srv = 0;

nghttp3_conn* Http3 = 0;
std::vector Streams; // 0->Control
bool Bound = false;
std::recursive_mutex mtx;

QUIC_STATUS FlushX()
{
for (;;)
{
wchar_t log[1000] = {};
nghttp3_vec vec[16] = {};
nghttp3_ssize nvecs_produced = 0;

int64_t stream_id = 0;
int fin = 0;
auto rv = nghttp3_conn_writev_stream(Http3, &stream_id, &fin, vec, 16);
nvecs_produced = rv; // Ο rv είναι το πλήθος των vecs
if (rv < 0)
return QUIC_STATUS_INTERNAL_ERROR;

if (stream_id id == stream_id)
{
h = strm->h;
break;
}
}
if (h == 0 && nvecs_produced > 0)
return QUIC_STATUS_INTERNAL_ERROR;

size_t tot_sent = 0;
if (nvecs_produced > 0) {

TOT_BUFFERS* tot = new TOT_BUFFERS();
for (int i = 0; i < nvecs_produced; ++i) {
TOT_BUFFER  b;
b.data.resize(vec[i].len);
memcpy(b.data.data(), vec[i].base, vec[i].len);
tot->buffers.emplace_back(b);
tot_sent += vec[i].len;

}
for (auto& t : tot->buffers)
t.Load();

swprintf_s(log, 1000, L"FlushX: stream_id=%lld, nvecs_produced=%lld, tot=%zi, fin=%d\n", stream_id, nvecs_produced, tot_sent, fin);
OutputDebugStringW(log);

auto flags = QUIC_SEND_FLAG_NONE;
if (fin)
flags |= QUIC_SEND_FLAG_FIN;
auto qs = qt->StreamSend(
h,
tot->buffers.data(),
(uint32_t)tot->buffers.size(),
flags,
(void*)tot
);
if (QUIC_FAILED(qs))
return QUIC_STATUS_INTERNAL_ERROR;
}

if (stream_id >= 0 &&  (tot_sent || fin))
{
rv = nghttp3_conn_add_write_offset(Http3, stream_id, tot_sent);
if (rv != 0)
return QUIC_STATUS_INTERNAL_ERROR;
}
if (stream_id < 0)
break;
}
return QUIC_STATUS_SUCCESS;
}

WebTransportSession(const QUIC_API_TABLE* _qt, HQUIC conn, HQUIC config,Server* ts)
: qt(_qt), Connection(conn), Config(config), srv(ts)
{
}

void Cleanup() {
if (Http3) {
nghttp3_conn_del(Http3);
Http3 = nullptr;
}
qt->ConnectionClose(Connection);
}

static int OnHeadersReceived(nghttp3_conn* conn, int64_t stream_id,
int32_t token, nghttp3_rcbuf* name,
nghttp3_rcbuf* value, uint8_t flags,
void* conn_user_data,
void* stream_user_data)
{

// Submit headers to your application logic here
WebTransportSession* session = (WebTransportSession*)conn_user_data;

nghttp3_nv resp[] = {
{(uint8_t*)":status", (uint8_t*)"200", 7, 3, NGHTTP3_NV_FLAG_NONE}
};

int rv = nghttp3_conn_submit_response(session->Http3, stream_id, resp, 1, nullptr);
session->FlushX();
return rv;
}

static int OnDataReceived(nghttp3_conn* conn, int64_t stream_id,
const uint8_t* data, size_t datalen,
void* conn_user_data, void* stream_user_data)
{
return 0;
}

static int OnSettingsReceived(nghttp3_conn* conn,
const nghttp3_settings* settings,
void* conn_user_data)
{
if (!conn || !settings || !conn_user_data)
return -5;

WebTransportSession* session = (WebTransportSession*)conn_user_data;
// Process settings as needed

return 0;
}

static int OnBeginHeaders(nghttp3_conn* conn, int64_t stream_id,
void* conn_user_data,
void* stream_user_data)
{
return 0;
}
static int OnEndStream(nghttp3_conn* conn, int64_t stream_id,
void* conn_user_data, void* stream_user_data)
{
return 0;
}

static int OnEndHeaders(nghttp3_conn* conn, int64_t stream_id,
int fin, void* conn_user_data,
void* stream_user_data)
{
return 0;
}

static int OnRecvOrigin(nghttp3_conn* conn, const uint8_t* origin,
size_t originlen, void* conn_user_data)
{
return 0;
}

static int OnAcked(nghttp3_conn* conn, int64_t stream_id,
uint64_t datalen, void* conn_user_data,
void* stream_user_data)
{
return 0;
}

static int OnResetStream(nghttp3_conn* conn, int64_t stream_id,
uint64_t app_error_code,
void* conn_user_data,
void* stream_user_data)
{
return 0;
}

QUIC_STATUS InitializeHttp3()
{
nghttp3_callbacks callbacks = {};
callbacks.recv_header = OnHeadersReceived;
callbacks.recv_data = OnDataReceived;
callbacks.recv_settings = OnSettingsReceived;
callbacks.begin_headers = OnBeginHeaders;
callbacks.end_stream = OnEndStream;
callbacks.end_headers = OnEndHeaders;
callbacks.recv_origin = OnRecvOrigin;
callbacks.acked_stream_data = OnAcked;
callbacks.reset_stream = OnResetStream;

nghttp3_settings settings = {};
nghttp3_settings_default(&settings);
settings.enable_connect_protocol = 1;
settings.h3_datagram = 1;
settings.qpack_blocked_streams = 100;
settings.max_field_section_size = 65536;

int rv = nghttp3_conn_server_new(&Http3, &callbacks, &settings, 0, (void*)this);
if (rv != 0)
return QUIC_STATUS_INTERNAL_ERROR;

return QUIC_STATUS_SUCCESS;
}

QUIC_STATUS CreateMyStreams()
{
// Open the control stream
for (int i = 0; i < 3;  i++)
{
HQUIC ControlStreamID = 0;
auto strm = std::make_shared(this);
auto qs = qt->StreamOpen(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, &QuicStream::StreamCallback, strm.get(), &ControlStreamID);
if (QUIC_FAILED(qs)) return qs;
qs = qt->StreamReceiveSetEnabled(ControlStreamID, TRUE);
strm->h = ControlStreamID;
strm->Type = i; // 0->Control 1->QPACK Encoder 2->QPACK Decoder
qs = qt->StreamStart(ControlStreamID, QUIC_STREAM_START_FLAG_NONE);

if (QUIC_FAILED(qs)) return qs;
Streams.push_back(strm);
}
return QUIC_STATUS_SUCCESS;
}

QUIC_STATUS QUIC_API ConnectionCallback2(
HQUIC Connection,
QUIC_CONNECTION_EVENT* Event
)
{
if (Event->Type == QUIC_CONNECTION_EVENT_DATAGRAM_STATE_CHANGED)
{
auto& ev2 = Event->DATAGRAM_STATE_CHANGED;
ev2;
return QUIC_STATUS_SUCCESS;
}
if (Event->Type == QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT)
{
auto Error = Event->SHUTDOWN_INITIATED_BY_TRANSPORT.Status;
// Τύπωσε το Error για να δεις τον λόγο (π.χ. TLS error)
printf("QUIC Connection Failed.  Error Status: 0x%X\n", Error);
return QUIC_STATUS_SUCCESS;
}
if (Event->Type == QUIC_CONNECTION_EVENT_CONNECTED)
{
// Connection is established
InitializeHttp3();

// My streams
if (Streams.size() == 0)
CreateMyStreams();

return QUIC_STATUS_SUCCESS;
}
if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED)
{
HQUIC Stream = Event->PEER_STREAM_STARTED.Stream;
// Find ID
int64_t stream_id = 0;
uint32_t bl = 8;
qt->GetParam(
Stream,
QUIC_PARAM_STREAM_ID,
&bl,
&stream_id
);

// enable auto-delivery
auto qs = qt->StreamReceiveSetEnabled(Stream, TRUE);

// Set stream callback here if needed
for (auto& s : Streams)
{
if (s->h == Stream)
return QUIC_STATUS_SUCCESS;
}
// Here, we create a new QuicStream instance to handle this stream
auto nn = std::make_shared(this);
nn->h = Stream;
nn->id = stream_id;
nn->Remote = 1;

Streams.push_back(nn);
qt->SetCallbackHandler(
Stream,
&QuicStream::StreamCallback,
nn.get()
);

return qs;
}

if (Event->Type == QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED)
{
auto& recv = Event->DATAGRAM_SEND_STATE_CHANGED;
recv;
TOT_BUFFER* buf = (TOT_BUFFER*)recv.ClientContext;
if (buf && recv.State == QUIC_DATAGRAM_SEND_SENT)
delete buf;

return QUIC_STATUS_SUCCESS;
}

if (Event->Type == QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED)
{
auto& recv = Event->DATAGRAM_RECEIVED;
recv;

/*          // Send test reply with same data
TOT_BUFFER* buf = new TOT_BUFFER;
buf->data.resize(recv.Buffer->Length);
memcpy(buf->data.data(), recv.Buffer->Buffer, recv.Buffer->Length);
buf->Load();

// Send a reply
auto qs = qt->DatagramSend(
Connection,
buf,
1,
QUIC_SEND_FLAG_NONE,
buf
);
*/
return QUIC_STATUS_SUCCESS;
}
if (Event->Type == QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE)
{
// Cleanup after connection shutdown
return QUIC_STATUS_SUCCESS;
}
return QUIC_STATUS_SUCCESS;
}

static
QUIC_STATUS QUIC_API ConnectionCallback(
HQUIC Connection,
void* Context,
QUIC_CONNECTION_EVENT* Event
)
{
WebTransportSession* session = (WebTransportSession*)Context;
return session->ConnectionCallback2(Connection, Event);
}

};

QUIC_STATUS
QuicStream::StreamCallback2(
_In_ HQUIC Stream,
_Inout_ QUIC_STREAM_EVENT* Event
)
{
if (Event->Type == QUIC_STREAM_EVENT_RECEIVE)
{
// Pass it to nghttp3
for (uint32_t i = 0; i < Event->RECEIVE.BufferCount; ++i)
{
int fin = 0;
if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_FIN)
fin = 1;
const QUIC_BUFFER* Buffer = &Event->RECEIVE.Buffers[i];
auto rv = nghttp3_conn_read_stream(
s->Http3,
(int64_t)id,
(const uint8_t*)Buffer->Buffer,
Buffer->Length,fin);
if (rv <  0)
{
// Error - close connection
s->qt->ConnectionClose(s->Connection);
return QUIC_STATUS_INTERNAL_ERROR;
}
}

return s->FlushX();
}
if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE)
{
id = Event->START_COMPLETE.ID;
std::lock_guard lock(s->mtx);
bool both_ready = (s->Streams.size() >= 3 &&
s->Streams[0]->id != 0 &&
s->Streams[1]->id != 0 &&
s->Streams[2]->id != 0);
if (both_ready && !s->Bound)
{
s->Bound = 1;
nghttp3_ssize rv = nghttp3_conn_bind_control_stream(s->Http3, s->Streams[0]->id);
if (rv < 0)
return QUIC_STATUS_INTERNAL_ERROR;
rv = nghttp3_conn_bind_qpack_streams(s->Http3, s->Streams[1]->id,s->Streams[2]->id);
if (rv < 0)
return QUIC_STATUS_INTERNAL_ERROR;
// Flush WebTransport
return s->FlushX();
}
return QUIC_STATUS_SUCCESS;
}
if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE)
{
TOT_BUFFERS* tot = (TOT_BUFFERS*)Event->SEND_COMPLETE.ClientContext;
// Ενημέρωση του nghttp3 ότι αυτά τα bytes στάλθηκαν.
auto total_bytes_size = 0;
for (auto& t : tot->buffers)
total_bytes_size += t.Length;
int rv =  nghttp3_conn_add_ack_offset(s->Http3, id, total_bytes_size);
delete tot;
if (rv != 0) {
// Εάν αποτύχει, κλείνουμε τη σύνδεση
s->qt->ConnectionClose(s->Connection);
return QUIC_STATUS_INTERNAL_ERROR;
}
nop();

// Go again
return s->FlushX();
}
return QUIC_STATUS_SUCCESS;
}

class Server
{
public:

int HttpPort = 58001;
int QuicPort = 4433;
const QUIC_API_TABLE* qt = {};
HQUIC hRegistration = nullptr;
HQUIC hConfiguration = nullptr;
HQUIC hListener = nullptr;
HRESULT hr = 0;
QUIC_STATUS qs = 0;
std::shared_ptr s1thread;
XSOCKET webs;

Server(int httpPort = 58001, int quicPort = 4433)
: HttpPort(httpPort), QuicPort(quicPort)
{
}

HRESULT CertificatePrepare()
{
EnsureDHTP();
if (!dhtp)
return E_FAIL;
auto ce = dhtp->RequestCertificate();
if (!std::get(ce))
return E_FAIL;
return S_OK;
}

HRESULT QuicPrepare()
{
hr = MsQuicOpen2(&qt);
if (FAILED(hr))
return hr;
QUIC_REGISTRATION_CONFIG config = {};
config.AppName = "TPMad";
config.ExecutionProfile = QUIC_EXECUTION_PROFILE_LOW_LATENCY;
qs = qt->RegistrationOpen(&config, &hRegistration);
if (FAILED(qs))
return qs;

QUIC_SETTINGS settings{};
settings.IsSet.PeerBidiStreamCount = TRUE;
settings.PeerBidiStreamCount = 16;

settings.IsSet.PeerUnidiStreamCount = TRUE;
settings.PeerUnidiStreamCount = 16;

settings.IsSet.IdleTimeoutMs = TRUE;
settings.IdleTimeoutMs = 60000;

settings.IsSet.DatagramReceiveEnabled = TRUE;
settings.DatagramReceiveEnabled = TRUE;

const char* ALPN_HTTP3 = "h3";
const QUIC_BUFFER AlpnBuffers[] = {
{(uint32_t)strlen(ALPN_HTTP3), (uint8_t*)ALPN_HTTP3}
};
qs = qt->ConfigurationOpen(
hRegistration,
AlpnBuffers,
ARRAYSIZE(AlpnBuffers),
&settings,
sizeof(settings),
nullptr,
&hConfiguration);

if (FAILED(qs))
return qs;
auto cs = dhtp->RequestCertificate();  // returns std::tuple
QUIC_CREDENTIAL_CONFIG credConfig = {};
credConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_CONTEXT;
credConfig.Flags = QUIC_CREDENTIAL_FLAG_NONE;
credConfig.CertificateContext = (QUIC_CERTIFICATE*)std::get(cs);

/*
auto pfx_data = dhtp->pfx_received;
credConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_PKCS12;
credConfig.CertificatePkcs12->Asn1Blob = (uint8_t*)pfx_data.data();
credConfig.CertificatePkcs12->Asn1BlobLength = (uint32_t)pfx_data.size();
credConfig.CertificatePkcs12->PrivateKeyPassword = "12345678";
*/

qs = qt->ConfigurationLoadCredential(hConfiguration, &credConfig);
if (FAILED(qs))
return qs;
qs = qt->ListenerOpen(hRegistration,
[](_In_ HQUIC Listener,
_In_opt_ void* Context,
_Inout_ QUIC_LISTENER_EVENT* Event
) -> QUIC_STATUS
{
auto pThis = (Server*)Context;
return pThis->ListenerCallback(Event);
},
this, &hListener);
if (FAILED(qs))
return qs;

QUIC_ADDR LocalAddress = {};
QuicAddrSetFamily(&LocalAddress, QUIC_ADDRESS_FAMILY_INET); // IPv4
QuicAddrSetPort(&LocalAddress, QuicPort);

qs = qt->ListenerStart(
hListener,
AlpnBuffers,
ARRAYSIZE(AlpnBuffers),
&LocalAddress
);

if (FAILED(qs))
return qs;

return hr;
}

const char* raw_html = (const char*)u8R"(


const SERVER_URL = "https://lan1.users.turbo-play.com:%i/jack";
let transport = null;

async function startWebTransport() {
console.log("Trying WebTransport...");

try {
transport = new WebTransport(SERVER_URL);

await transport.ready;
console.log("OK!");

// 3. Ξεκινάμε τις ροές δεδομένων
handleDataStreams(transport);

} catch (e) {
console.error("Fail:", e);
}
}

startWebTransport(); // Κάλεσε το για να ξεκινήσει
async function handleDataStreams(transport) {
// ----------------------------------------------------
// A. ΛΗΨΗ DATAGRAMS (Από C++ Server προς Browser)
// ----------------------------------------------------
const reader = transport.datagrams.readable.getReader();

// Η συνάρτηση DataServe του C++ σου στέλνει data εδώ!
receiveDatagrams(reader);

// ----------------------------------------------------
// B. ΑΠΟΣΤΟΛΗ DATAGRAMS (Από Browser προς C++ Server)
// ----------------------------------------------------
// Αν θες να στείλεις δεδομένα πίσω (π.χ. Client audio)
const writer = transport.datagrams.writable.getWriter();

// Παράδειγμα: Στέλνει ένα μήνυμα "Hi"
const dataToSend = new TextEncoder().encode("Hello from Browser");
await writer.write(dataToSend);
console.log("Έστειλε Datagram στον Server.");
}

async function receiveDatagrams(reader) {
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;

// Το value είναι ένα Uint8Array.
// value[0] = Channel ID (0, 1, 2, ...)
// value.slice(1) = Raw PCM Float32 data

const channelId = value[0];
const pcmBytes = value.slice(1);

// Μετατροπή των bytes σε Float32Array για το Web Audio API
const floatData = new Float32Array(pcmBytes.buffer, pcmBytes.byteOffset, pcmBytes.byteLength / 4);

// console.log(`Λήψη ${floatData.length} samples για κανάλι ${channelId}.`);
// Εδώ τροφοδοτείς τον Web Audio API κόμβο σου (π.χ.  AudioWorklet)
}
} catch (e) {
}
}


)";

void HttpServerAThread(SOCKET y)
{
XSOCKET Y = y;
Y.SetSSL(true, 0);
auto c = dhtp->RequestCertificate();
Y.SetExtCert(std::get(c));
if (Y.Server() != 0)
return;
char buffer[4096] = {};
for (;;)
{
int r = Y.receive(buffer, 4096);
if (r == 0 || r == -1)
break;
std::string req(buffer, r);
std::string response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: ";
std::vector raw_html_filled;
raw_html_filled.resize(100000);
sprintf_s(raw_html_filled.data(), raw_html_filled.size(), raw_html, QuicPort);
response += std::to_string(strlen(raw_html_filled.data()));
response += "\r\nConnection: close\r\n\r\n";
response += raw_html_filled.data();
Y.transmit(response.data(), (int)response.size(), true);
break;
}

}

void HttpServerStart()
{
s1thread = std::make_shared(&Server::HttpServerStartThread, this);
}

void HttpServerStartThread()
{
webs.Create(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
if (!webs.BindAndListen(HttpPort))
{
webs.CloseIf();
return;
}

for (;;)
{
SOCKET y = webs.Accept();
if (y == INVALID_SOCKET || y == 0)
break;
std::thread t(&Server::HttpServerAThread, this, y);
t.detach();
}
}

std::vector Sessions;

QUIC_STATUS ListenerCallback(QUIC_LISTENER_EVENT* Event)
{
if (Event->Type == QUIC_LISTENER_EVENT_NEW_CONNECTION) {

HQUIC NewConnection = Event->NEW_CONNECTION.Connection;
HRESULT qs;

qs = qt->ConnectionSetConfiguration(NewConnection, hConfiguration);
if (FAILED(qs)) {
qt->ConnectionClose(NewConnection);
return qs;
}

WebTransportSession* Session = new WebTransportSession(qt, NewConnection, hConfiguration,this);
Sessions.push_back(Session);
qt->SetCallbackHandler(
NewConnection,
(void*)&WebTransportSession::ConnectionCallback,
(void*)Session
);
return QUIC_STATUS_SUCCESS;

}

return QUIC_STATUS_NOT_SUPPORTED;

}

void QuicEnd()
{
if (hListener)
{
qt->ListenerClose(hListener);
hListener = 0;
}
if (hConfiguration)
{
qt->ConfigurationClose(hConfiguration);
hConfiguration = 0;
}
if (hRegistration)
{
qt->RegistrationClose(hRegistration);
hRegistration = 0;
}
if (qt)
MsQuicClose(qt);
qt = 0;
}

void Off()
{
webs.CloseIf();
if (s1thread)
{
s1thread->join();
s1thread = 0;
}
QuicEnd();

}

~Server()
{
Off();
}

};

STDAPI Test1()
{
Server s;
s.CertificatePrepare();
s.HttpServerStart();
s.QuicPrepare();
MessageBox(0, 0, 0, 0);
//  s.Sessions[0]->FlushX();
//MessageBox(0, 0, 0, 0);
return S_OK;
}
Alle Rückrufe sind erfolgreich. Die gesamte Stream-Erstellung ist erfolgreich. Mit Curl erhalte ich einen Received Header. Bei Chrome und Firefox bleibt WebTransport jedoch hängen.
Offensichtlich übersehe ich etwas. Können msquic oder nghttp WebTransport automatisch bereitstellen? Verpasse ich eine Implementierung?
Es ist komplex, aber jemand könnte den gleichen Weg gegangen sein.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post