Ich habe einen Servercode, der im Grunde eine Protobuf-Nachricht einer zufälligen Koordinate in Deutschland entgegennimmt und die Antwort (Bezirksname) an den Client zurückgibt. Ich habe dafür meine eigenen Geodaten usw. verwendet. Da die Standortverarbeitung für einen Thread nicht ausreicht, habe ich beschlossen, daraus eine Art Producer-Consumer-Multithreading zu machen, das heißt, dass die Standortverarbeitung mehrere Threads verwendet:
Connection.cc
void consumer(GeoData &geodata, Statistic &globalStats, ClientConfig &config) {
Client::updateBufferSize(config);
while (true) {
// Directly dequeue a single message from the shared queue
auto [client, msg] = sharedQueue.dequeue();
if (!client) {
std::cerr processLocationUpdates(msg.location_request, resp.location_response);
// Send the response back to the client
if (!client->writeMessage(resp)) {
std::cerr ai_family, res->ai_socktype, res->ai_protocol);
if (sock == -1) {
perror("socket");
return 1;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
perror("setsockopt");
return 1;
}
// Bind to the address
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
return 0;
}
freeaddrinfo(res);
if (listen(sock, BACKLOG) == -1) {
perror("listen");
return 0;
}
GeoData geodata;
if (!geodata.loadData("geodata/build/bezirke-final.geodata", nodeCapacity)) {
die("Could not load geodata");
}
// stats thread
Statistic stats;
if (pthread_create(&thread, NULL, statistic_thread, &stats) != 0) {
die("Failed to create statistics thread");
}
signal(SIGPIPE, SIG_IGN);
// pool of consumer threads
const int numConsumers = 4; // we can play around with this parameter of num of consumers
std::vector consumerThreads;
for (int i = 0; i < numConsumers; ++i) {
consumerThreads.emplace_back(consumer, std::ref(geodata), std::ref(stats), std::ref(clientConfig));
}
// Accept incoming connections and create producer threads
while (true) {
socklen_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int client_fd = accept(sock, (struct sockaddr*)&their_addr, &size);
if (client_fd == -1) {
perror("accept");
continue; // Skip to the next connection attempt on failure
}
// Create a Client object for the new connection
Client* client = new Client(&geodata, client_fd, clientConfig);
client->setStatistic(&stats);
if (pthread_create(&thread, NULL, connection_thread, client) != 0) {
fprintf(stderr, "Failed to create producer thread\n");
delete client; // Cleanup client object on failure
}
}
// Wait for all consumer threads to finish (will not happen in this loop)
for (auto& thread : consumerThreads) {
thread.join();
}
close(sock); // Close the listening socket when the server shuts down
return 0;
}
#pragma once
#include
#include "geodata.h"
#include
#include
#include
#include
#include
#include "../proto/message.h"
#include
#include
#include
#include
/**
* @brief Structure to hold statistics about the number of messages processed.
*/
struct Statistic {
std::atomic countMsgs{0};
std::atomic countCacheHits{0};
std::atomic countCacheMisses{0};
};
/**
* @brief Structure to hold the client configuarations.
*/
struct ClientConfig {
size_t write_buffer_size;
size_t read_bytes_buffer_size;
size_t read_buffer_size;
};
/**
* @brief Thread-safe queue for multiple producers and consumers.
*
* Provides synchronized access to a queue for multiple producers and consumers.
*/
template
class ThreadSafeQueue {
private:
std::queue queue;
std::mutex mutex;
std::condition_variable cv;
public:
void enqueue(T item) {
{
std::lock_guard lock(mutex);
queue.push(std::move(item));
}
cv.notify_one();
}
T dequeue() {
std::unique_lock lock(mutex);
cv.wait(lock, [this] { return !queue.empty(); });
T item = std::move(queue.front());
queue.pop();
return item;
}
bool isEmpty() {
std::lock_guard lock(mutex);
return queue.empty();
}
};
/**
* @class Client
* @brief Represents a client connection and manages the communication between the server and client.
*
* The Client class handles receiving messages from the client, processing location updates,
* and sending responses back to the client. It also uses GEOS (Geometry Engine Open Source)
* for location-based queries and maintains statistics about message processing.
*/
class Client {
int client_fd; ///< The file descriptor for the client connection.
GeoData *geo; ///< A pointer to GeoData used for querying locations.
Statistic *stats = nullptr; ///< A pointer to the statistics object for tracking processed messages.
GEOSContextHandle_t geos_ctx;
ClientConfig &config;
static thread_local std::vector write_buffer;
static thread_local std::vector read_bytes_buffer;
static thread_local std::vector read_buffer;
static thread_local size_t write_buffer_offset;
static thread_local size_t read_bytes_buffer_offset;
static thread_local unsigned short write_buffer_repeat;
public:
/**
* @brief Constructor to initialize the Client object.
*
* Initializes the client object with the socket file descriptor and GeoData.
* A new GEOS context is created for geospatial operations.
*
* @param geo Pointer to the GeoData object used for querying location data.
* @param socket The socket file descriptor for client communication.
*/
Client(GeoData *geo, int socket, ClientConfig &config) : client_fd(socket), geo(geo), config(config) {
geos_ctx = geo->newContext();
if(write_buffer.empty()) {
write_buffer.resize(config.write_buffer_size);
}
if(read_bytes_buffer.empty()) {
read_bytes_buffer.resize(config.read_bytes_buffer_size);
}
if(read_buffer.empty()) {
read_buffer.resize(config.read_buffer_size);
}
}
/**
* @brief Destructor to clean up resources.
*
* Closes the client socket and cleans up the GEOS context.
*/
~Client() {
flushWriteBuffer();
close(client_fd);
GEOS_finish_r(geos_ctx);
}
/**
* @brief Set the statistics object to track message count.
*
* Associates a Statistic object with the client to record the number of messages processed.
*
* @param stats Pointer to a Statistic object.
*/
void setStatistic(Statistic *stats) { this->stats = stats; }
/**
* @brief Reads a message from the client socket.
*
* Reads a `wire::ClientMessage` from the client. The message is prefixed with
* a 4-byte length header indicating the size of the message.
*
* @param msg Reference to the `wire::ClientMessage` object to be populated with data.
* @return true if the message was successfully read, false otherwise.
*/
bool readMessage(ClientMessage &msg);
/**
* @brief Reads a specified number of bytes from the client socket into the provided buffer.
*
* This function now uses a thread-local buffer to minimize repeated allocations.
*
* @param client_fd The file descriptor to read from.
* @param buffer The buffer to copy data into.
* @param len The number of bytes to read.
* @return true if the specified number of bytes was successfully read, false otherwise.
*/
static bool read_bytes(int client_fd, char *buf, size_t len);
/**
* @brief Sends a response message to the client socket.
*
* Buffers the `wire::ClientResponse` to be sent, flushing only when the buffer is full.
*
* @param msg The `wire::ClientResponse` to send.
* @return true if the message was successfully buffered or sent, false otherwise.
*/
bool writeMessage(ClientResponse &msg);
/**
* @brief Flushes the write buffer to the client socket.
*
* Ensures all buffered messages are sent to the client.
*
* @return true if the buffer was successfully flushed, false otherwise.
*/
bool flushWriteBuffer();
/**
* @brief Processes a location update request.
*
* This method processes the location data contained in the `wire::LocationUpdateRequest`,
* performs a geospatial query based on longitude and latitude, and populates the
* `wire::LocationUpdateResponse` with the results.
*
* @param msg The `wire::LocationUpdateRequest` containing the car's location data.
* @param resp The `wire::LocationUpdateResponse` to populate with the result of the query.
* @return true if the location update was successfully processed, false otherwise.
*/
bool processLocationUpdates(const LocationUpdateRequest &msg,
LocationUpdateResponse &resp);
/**
* @brief Resize read and write buffers.
*
* This method resizes the read and write buffers.
*
* @param write_size The write buffer size.
* @param read_bytes_size The read bytes buffer size.
* @param read_size The read buffer size.
*/
static void updateBufferSize(ClientConfig &config);
bool isCacheHit(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
bool queryLocation(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
ClientConfig & getConfig();
};
/**
* @brief Function to process client messages using the consumer role.
*
* This function is run in worker threads to handle messages from the shared queue.
* It dequeues messages, processes location updates, and sends responses back to clients.
*
* @param geodata Reference to the GeoData object for geospatial queries.
* @param stats Reference to the statistics object to track processed messages.
*/
void consumer(GeoData& geodata, Statistic& stats, ClientConfig& config);
Das Problem besteht darin, dass die Nachrichten irgendwie nicht korrekt zurückgegeben werden, wenn ich mehrere Client-Instanzen verwende (mehr Threads und Verbindungen vom Client). Einige Client-Instanzen haben Fehler protokolliert, z. B. dass der Client eine Antwort für ein Auto erhalten hat, das nicht auf eine Antwort gewartet hat.
Ich habe einen Servercode, der im Grunde eine Protobuf-Nachricht einer zufälligen Koordinate in Deutschland entgegennimmt und die Antwort (Bezirksname) an den Client zurückgibt. Ich habe dafür meine eigenen Geodaten usw. verwendet. Da die Standortverarbeitung für einen Thread nicht ausreicht, habe ich beschlossen, daraus eine Art Producer-Consumer-Multithreading zu machen, das heißt, dass die Standortverarbeitung mehrere Threads verwendet: Connection.cc [code]void consumer(GeoData &geodata, Statistic &globalStats, ClientConfig &config) { Client::updateBufferSize(config);
while (true) { // Directly dequeue a single message from the shared queue auto [client, msg] = sharedQueue.dequeue();
if (!client) { std::cerr processLocationUpdates(msg.location_request, resp.location_response);
// Send the response back to the client if (!client->writeMessage(resp)) { std::cerr ai_family, res->ai_socktype, res->ai_protocol); if (sock == -1) { perror("socket"); return 1; }
// Bind to the address if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) { perror("bind"); return 0; }
freeaddrinfo(res);
if (listen(sock, BACKLOG) == -1) { perror("listen"); return 0; }
GeoData geodata; if (!geodata.loadData("geodata/build/bezirke-final.geodata", nodeCapacity)) { die("Could not load geodata"); }
// stats thread Statistic stats; if (pthread_create(&thread, NULL, statistic_thread, &stats) != 0) { die("Failed to create statistics thread"); }
signal(SIGPIPE, SIG_IGN);
// pool of consumer threads const int numConsumers = 4; // we can play around with this parameter of num of consumers std::vector consumerThreads; for (int i = 0; i < numConsumers; ++i) { consumerThreads.emplace_back(consumer, std::ref(geodata), std::ref(stats), std::ref(clientConfig)); }
// Accept incoming connections and create producer threads while (true) { socklen_t size = sizeof(struct sockaddr_in); struct sockaddr_in their_addr; int client_fd = accept(sock, (struct sockaddr*)&their_addr, &size); if (client_fd == -1) { perror("accept"); continue; // Skip to the next connection attempt on failure }
// Create a Client object for the new connection Client* client = new Client(&geodata, client_fd, clientConfig); client->setStatistic(&stats);
if (pthread_create(&thread, NULL, connection_thread, client) != 0) { fprintf(stderr, "Failed to create producer thread\n"); delete client; // Cleanup client object on failure } }
// Wait for all consumer threads to finish (will not happen in this loop) for (auto& thread : consumerThreads) { thread.join(); }
close(sock); // Close the listening socket when the server shuts down return 0; }
/** * @brief Structure to hold statistics about the number of messages processed. */ struct Statistic { std::atomic countMsgs{0}; std::atomic countCacheHits{0}; std::atomic countCacheMisses{0}; };
/** * @brief Structure to hold the client configuarations. */ struct ClientConfig { size_t write_buffer_size; size_t read_bytes_buffer_size; size_t read_buffer_size; };
/** * @brief Thread-safe queue for multiple producers and consumers. * * Provides synchronized access to a queue for multiple producers and consumers. */ template class ThreadSafeQueue { private: std::queue queue; std::mutex mutex; std::condition_variable cv;
/** * @class Client * @brief Represents a client connection and manages the communication between the server and client. * * The Client class handles receiving messages from the client, processing location updates, * and sending responses back to the client. It also uses GEOS (Geometry Engine Open Source) * for location-based queries and maintains statistics about message processing. */ class Client { int client_fd; ///< The file descriptor for the client connection. GeoData *geo; ///< A pointer to GeoData used for querying locations. Statistic *stats = nullptr; ///< A pointer to the statistics object for tracking processed messages.
public: /** * @brief Constructor to initialize the Client object. * * Initializes the client object with the socket file descriptor and GeoData. * A new GEOS context is created for geospatial operations. * * @param geo Pointer to the GeoData object used for querying location data. * @param socket The socket file descriptor for client communication. */ Client(GeoData *geo, int socket, ClientConfig &config) : client_fd(socket), geo(geo), config(config) { geos_ctx = geo->newContext();
/** * @brief Destructor to clean up resources. * * Closes the client socket and cleans up the GEOS context. */ ~Client() { flushWriteBuffer(); close(client_fd); GEOS_finish_r(geos_ctx); }
/** * @brief Set the statistics object to track message count. * * Associates a Statistic object with the client to record the number of messages processed. * * @param stats Pointer to a Statistic object. */ void setStatistic(Statistic *stats) { this->stats = stats; }
/** * @brief Reads a message from the client socket. * * Reads a `wire::ClientMessage` from the client. The message is prefixed with * a 4-byte length header indicating the size of the message. * * @param msg Reference to the `wire::ClientMessage` object to be populated with data. * @return true if the message was successfully read, false otherwise. */ bool readMessage(ClientMessage &msg);
/** * @brief Reads a specified number of bytes from the client socket into the provided buffer. * * This function now uses a thread-local buffer to minimize repeated allocations. * * @param client_fd The file descriptor to read from. * @param buffer The buffer to copy data into. * @param len The number of bytes to read. * @return true if the specified number of bytes was successfully read, false otherwise. */ static bool read_bytes(int client_fd, char *buf, size_t len);
/** * @brief Sends a response message to the client socket. * * Buffers the `wire::ClientResponse` to be sent, flushing only when the buffer is full. * * @param msg The `wire::ClientResponse` to send. * @return true if the message was successfully buffered or sent, false otherwise. */ bool writeMessage(ClientResponse &msg);
/** * @brief Flushes the write buffer to the client socket. * * Ensures all buffered messages are sent to the client. * * @return true if the buffer was successfully flushed, false otherwise. */ bool flushWriteBuffer();
/** * @brief Processes a location update request. * * This method processes the location data contained in the `wire::LocationUpdateRequest`, * performs a geospatial query based on longitude and latitude, and populates the * `wire::LocationUpdateResponse` with the results. * * @param msg The `wire::LocationUpdateRequest` containing the car's location data. * @param resp The `wire::LocationUpdateResponse` to populate with the result of the query. * @return true if the location update was successfully processed, false otherwise. */ bool processLocationUpdates(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
/** * @brief Resize read and write buffers. * * This method resizes the read and write buffers. * * @param write_size The write buffer size. * @param read_bytes_size The read bytes buffer size. * @param read_size The read buffer size. */ static void updateBufferSize(ClientConfig &config);
/** * @brief Function to process client messages using the consumer role. * * This function is run in worker threads to handle messages from the shared queue. * It dequeues messages, processes location updates, and sends responses back to clients. * * @param geodata Reference to the GeoData object for geospatial queries. * @param stats Reference to the statistics object to track processed messages. */ void consumer(GeoData& geodata, Statistic& stats, ClientConfig& config); [/code] Das Problem besteht darin, dass die Nachrichten irgendwie nicht korrekt zurückgegeben werden, wenn ich mehrere Client-Instanzen verwende (mehr Threads und Verbindungen vom Client). Einige Client-Instanzen haben Fehler protokolliert, z. B. dass der Client eine Antwort für ein Auto erhalten hat, das nicht auf eine Antwort gewartet hat.
Ich versuche, eine Java-Funktion dazu zu bringen, Eingaben entgegenzunehmen, sie zu ändern und zurückzusenden. Normalerweise müssten Sie varName zurückgeben; oder so, aber hier funktioniert es nicht....
Ich versuche, eine Java-Funktion dazu zu bringen, Eingaben entgegenzunehmen, sie zu ändern und zurückzusenden. Normalerweise müssten Sie varName zurückgeben; oder so, aber hier funktioniert es nicht....
Wie erhalte ich den richtigen Auswahlwert in PHP? Ich meine, wie stelle ich die Auswahl ein, wenn ich den Wert erhalten habe?
Ich habe versucht, diese Methoden zu verwenden, aber immer noch nicht...
Ich versuche, in Spring einen Server zu erstellen, der einen TCP-Port überwacht und Verbindungen akzeptiert.
Ich weiß, wie ich eingehende Anfragen an meinen Dienst weiterleiten kann, und er kann...