Anonymous
Warum reagiert Apache Cassandra nicht auf Startmeldungen?
Post
by Anonymous » 20 Feb 2025, 22:22
Ich möchte eine kleine Treiberanpassung für Apache Cassandra vornehmen, um zu verstehen, wie die Interaktion auf der Ebene mit niedriger Anwendung im Allgemeinen funktioniert. Ich verwende IO.NETTY dafür, aber aus irgendeinem Grund, der mir unbekannt ist, sendet der Server keine Fehlermeldung oder Autorisierungsanforderung zurück. Vielen Dank.
Code: Select all
public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
private final Bootstrap client;
private final NioEventLoopGroup group;
private static final String HOST = "127.0.0.1";
private static final int PORT = 9042;
private final Initializer initializer;
private final Bootstrap handler;
public NativeCQLConnection()
{
this.client = new Bootstrap();
this.group = new NioEventLoopGroup();
this.initializer = new Initializer(this, "cassandra", "cassandra");
this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
}
public static void main(String[] args)
{
new NativeCQLConnection().run();
}
@Override
public void run()
{
handler.connect(HOST, PORT);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
ByteBuf startupMessage = this.initializer.createStartupMessage();
ctx.writeAndFlush(startupMessage);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
this.group.shutdownGracefully();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
cause.printStackTrace();
}
private static final class Initializer extends ChannelInitializer
{
private final ChannelHandler handler;
private final byte[] username;
private final byte[] password;
public Initializer(ChannelHandler handler, String username, String password)
{
this.handler = handler;
this.username = username.getBytes(StandardCharsets.UTF_8);
this.password = password.getBytes(StandardCharsets.UTF_8);
}
@Override
protected void initChannel(SocketChannel channel) throws Exception
{
channel.pipeline().addLast(new MessageDecoder(this));
channel.pipeline().addLast(new MessageEncoder(this));
channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
channel.pipeline().addLast(this.handler);
}
private static final String CQL_VERSION_OPTION = "CQL_VERSION";
private static final String CQL_VERSION = "3.0.0";
private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
private static final String DRIVER_NAME = "Apache Cassandra Java Driver";
static final String COMPRESSION_OPTION = "COMPRESSION";
static final String NO_COMPACT_OPTION = "NO_COMPACT";
public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
byte[] initialToken = initialResponse();
ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
buffer.writeInt(0); // Stream ID
buffer.writeInt(initialToken.length);
buffer.writeBytes(initialToken);
return buffer;
}
public ByteBuf createStartupMessage() {
ImmutableMap.Builder options = new ImmutableMap.Builder();
options.put(CQL_VERSION_OPTION, CQL_VERSION);
options.put(COMPRESSION_OPTION, "");
options.put(NO_COMPACT_OPTION, "true");
options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
options.put(DRIVER_NAME_OPTION, DRIVER_NAME);
ByteBuf body = Unpooled.buffer();
Writer.writeStringMap(options.build(), body);
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(0x04); // version protocol
buffer.writeByte(0x00); // flags
buffer.writeByte(0x00); // stream od
buffer.writeByte(0x01); // Opcode (STARTUP)
buffer.writeInt(body.readableBytes()); // length body
buffer.writeBytes(body); // body
return buffer;
}
public byte[] initialResponse() {
byte[] initialToken = new byte[username.length + password.length + 2];
initialToken[0] = 0;
System.arraycopy(username, 0, initialToken, 1, username.length);
initialToken[username.length + 1] = 0;
System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
return initialToken;
}
}
private static final class MessageEncoder extends MessageToMessageEncoder
{
private final Initializer initializer;
public MessageEncoder(Initializer initializer)
{
this.initializer = initializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
out.add(msg.retain());
}
}
private static final class MessageDecoder extends MessageToMessageDecoder {
private final Initializer initializer;
public MessageDecoder(Initializer initializer)
{
this.initializer = initializer;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
System.out.println(msg.toString(StandardCharsets.UTF_8));
if (msg.readableBytes() > 0) {
byte opcodeByte = msg.readByte();
int opcode = Byte.toUnsignedInt(opcodeByte);
System.out.println("Opcode: " + opcode);
if (opcode == 0x03) { // AUTHENTICATE opcode
System.out.println("Server requires authentication");
} else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
ctx.writeAndFlush(authResponse);
} else {
System.out.println("Unknown opcode: " + opcode);
}
}
}
}
public static final class Writer
{
public static void writeStringMap(Map m, ByteBuf cb) {
cb.writeShort(m.size());
for (Map.Entry entry : m.entrySet()) {
writeString(entry.getKey(), cb);
writeString(entry.getValue(), cb);
}
}
public static void writeString(String str, ByteBuf cb) {
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
cb.writeShort(bytes.length);
cb.writeBytes(bytes);
}
}
}
1740086543
Anonymous
Ich möchte eine kleine Treiberanpassung für Apache Cassandra vornehmen, um zu verstehen, wie die Interaktion auf der Ebene mit niedriger Anwendung im Allgemeinen funktioniert. Ich verwende IO.NETTY dafür, aber aus irgendeinem Grund, der mir unbekannt ist, sendet der Server keine Fehlermeldung oder Autorisierungsanforderung zurück. Vielen Dank.[code]public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable { private final Bootstrap client; private final NioEventLoopGroup group; private static final String HOST = "127.0.0.1"; private static final int PORT = 9042; private final Initializer initializer; private final Bootstrap handler; public NativeCQLConnection() { this.client = new Bootstrap(); this.group = new NioEventLoopGroup(); this.initializer = new Initializer(this, "cassandra", "cassandra"); this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer); } public static void main(String[] args) { new NativeCQLConnection().run(); } @Override public void run() { handler.connect(HOST, PORT); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf startupMessage = this.initializer.createStartupMessage(); ctx.writeAndFlush(startupMessage); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { this.group.shutdownGracefully(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } private static final class Initializer extends ChannelInitializer { private final ChannelHandler handler; private final byte[] username; private final byte[] password; public Initializer(ChannelHandler handler, String username, String password) { this.handler = handler; this.username = username.getBytes(StandardCharsets.UTF_8); this.password = password.getBytes(StandardCharsets.UTF_8); } @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new MessageDecoder(this)); channel.pipeline().addLast(new MessageEncoder(this)); channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); channel.pipeline().addLast(this.handler); } private static final String CQL_VERSION_OPTION = "CQL_VERSION"; private static final String CQL_VERSION = "3.0.0"; private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION"; private static final String DRIVER_NAME_OPTION = "DRIVER_NAME"; private static final String DRIVER_NAME = "Apache Cassandra Java Driver"; static final String COMPRESSION_OPTION = "COMPRESSION"; static final String NO_COMPACT_OPTION = "NO_COMPACT"; public ByteBuf createAuthResponse(ChannelHandlerContext ctx) { byte[] initialToken = initialResponse(); ByteBuf buffer = ctx.alloc().buffer(initialToken.length); buffer.writeByte(0x0F); // AUTH_RESPONSE opcode buffer.writeInt(0); // Stream ID buffer.writeInt(initialToken.length); buffer.writeBytes(initialToken); return buffer; } public ByteBuf createStartupMessage() { ImmutableMap.Builder options = new ImmutableMap.Builder(); options.put(CQL_VERSION_OPTION, CQL_VERSION); options.put(COMPRESSION_OPTION, ""); options.put(NO_COMPACT_OPTION, "true"); options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT"); options.put(DRIVER_NAME_OPTION, DRIVER_NAME); ByteBuf body = Unpooled.buffer(); Writer.writeStringMap(options.build(), body); ByteBuf buffer = Unpooled.buffer(); buffer.writeByte(0x04); // version protocol buffer.writeByte(0x00); // flags buffer.writeByte(0x00); // stream od buffer.writeByte(0x01); // Opcode (STARTUP) buffer.writeInt(body.readableBytes()); // length body buffer.writeBytes(body); // body return buffer; } public byte[] initialResponse() { byte[] initialToken = new byte[username.length + password.length + 2]; initialToken[0] = 0; System.arraycopy(username, 0, initialToken, 1, username.length); initialToken[username.length + 1] = 0; System.arraycopy(password, 0, initialToken, username.length + 2, password.length); return initialToken; } } private static final class MessageEncoder extends MessageToMessageEncoder { private final Initializer initializer; public MessageEncoder(Initializer initializer) { this.initializer = initializer; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { out.add(msg.retain()); } } private static final class MessageDecoder extends MessageToMessageDecoder { private final Initializer initializer; public MessageDecoder(Initializer initializer) { this.initializer = initializer; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { System.out.println(msg.toString(StandardCharsets.UTF_8)); if (msg.readableBytes() > 0) { byte opcodeByte = msg.readByte(); int opcode = Byte.toUnsignedInt(opcodeByte); System.out.println("Opcode: " + opcode); if (opcode == 0x03) { // AUTHENTICATE opcode System.out.println("Server requires authentication"); } else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode ByteBuf authResponse = this.initializer.createAuthResponse(ctx); ctx.writeAndFlush(authResponse); } else { System.out.println("Unknown opcode: " + opcode); } } } } public static final class Writer { public static void writeStringMap(Map m, ByteBuf cb) { cb.writeShort(m.size()); for (Map.Entry entry : m.entrySet()) { writeString(entry.getKey(), cb); writeString(entry.getValue(), cb); } } public static void writeString(String str, ByteBuf cb) { byte[] bytes = str.getBytes(CharsetUtil.UTF_8); cb.writeShort(bytes.length); cb.writeBytes(bytes); } } } [/code]