Warum reagiert Apache Cassandra nicht auf Startmeldungen?Java

Java-Forum
Anonymous
 Warum reagiert Apache Cassandra nicht auf Startmeldungen?

Post by Anonymous »

Ich möchte eine kleine Treiberanpassung für Apache Cassandra vornehmen, um zu verstehen, wie die Interaktion auf der Ebene mit niedriger Anwendung im Allgemeinen funktioniert. Ich verwende IO.NETTY dafür, aber aus irgendeinem Grund, der mir unbekannt ist, sendet der Server keine Fehlermeldung oder Autorisierungsanforderung zurück. Vielen Dank.

Code: Select all

public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
private final Bootstrap client;
private final NioEventLoopGroup group;

private static final String HOST = "127.0.0.1";
private static final int PORT = 9042;
private final Initializer initializer;
private final Bootstrap handler;

public NativeCQLConnection()
{
this.client = new Bootstrap();
this.group = new NioEventLoopGroup();
this.initializer = new Initializer(this, "cassandra", "cassandra");

this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
}

public static void main(String[] args)
{
new NativeCQLConnection().run();
}

@Override
public void run()
{
handler.connect(HOST, PORT);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
ByteBuf startupMessage = this.initializer.createStartupMessage();
ctx.writeAndFlush(startupMessage);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
this.group.shutdownGracefully();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
cause.printStackTrace();
}

private static final class Initializer extends ChannelInitializer
{
private final ChannelHandler handler;
private final byte[] username;
private final byte[] password;

public Initializer(ChannelHandler handler, String username, String password)
{
this.handler = handler;
this.username = username.getBytes(StandardCharsets.UTF_8);
this.password = password.getBytes(StandardCharsets.UTF_8);
}

@Override
protected void initChannel(SocketChannel channel) throws Exception
{
channel.pipeline().addLast(new MessageDecoder(this));
channel.pipeline().addLast(new MessageEncoder(this));

channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));

channel.pipeline().addLast(this.handler);
}

private static final String CQL_VERSION_OPTION = "CQL_VERSION";
private static final String CQL_VERSION = "3.0.0";
private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
private static final String DRIVER_NAME = "Apache Cassandra Java Driver";

static final String COMPRESSION_OPTION = "COMPRESSION";
static final String NO_COMPACT_OPTION = "NO_COMPACT";

public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
byte[] initialToken = initialResponse();
ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
buffer.writeInt(0); // Stream ID
buffer.writeInt(initialToken.length);
buffer.writeBytes(initialToken);
return buffer;
}

public ByteBuf createStartupMessage() {
ImmutableMap.Builder options = new ImmutableMap.Builder();
options.put(CQL_VERSION_OPTION, CQL_VERSION);
options.put(COMPRESSION_OPTION, "");
options.put(NO_COMPACT_OPTION, "true");
options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
options.put(DRIVER_NAME_OPTION, DRIVER_NAME);

ByteBuf body = Unpooled.buffer();
Writer.writeStringMap(options.build(), body);

ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(0x04); // version protocol
buffer.writeByte(0x00); // flags
buffer.writeByte(0x00); // stream od
buffer.writeByte(0x01); // Opcode (STARTUP)
buffer.writeInt(body.readableBytes()); // length body
buffer.writeBytes(body);  // body

return buffer;
}

public byte[] initialResponse() {
byte[] initialToken = new byte[username.length + password.length + 2];
initialToken[0] = 0;
System.arraycopy(username, 0, initialToken, 1, username.length);
initialToken[username.length + 1] = 0;
System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
return initialToken;
}
}

private static final class MessageEncoder extends MessageToMessageEncoder
{
private final Initializer initializer;

public MessageEncoder(Initializer initializer)
{
this.initializer = initializer;
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
out.add(msg.retain());
}
}

private static final class MessageDecoder extends MessageToMessageDecoder {

private final Initializer initializer;

public MessageDecoder(Initializer initializer)
{
this.initializer = initializer;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
System.out.println(msg.toString(StandardCharsets.UTF_8));

if (msg.readableBytes() > 0) {
byte opcodeByte = msg.readByte();
int opcode = Byte.toUnsignedInt(opcodeByte);
System.out.println("Opcode: " + opcode);

if (opcode == 0x03) { // AUTHENTICATE opcode
System.out.println("Server requires authentication");
} else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
ctx.writeAndFlush(authResponse);
} else {
System.out.println("Unknown opcode: " + opcode);
}
}
}
}

public static final class Writer
{
public static void writeStringMap(Map m, ByteBuf cb) {
cb.writeShort(m.size());
for (Map.Entry entry : m.entrySet()) {
writeString(entry.getKey(), cb);
writeString(entry.getValue(), cb);
}
}

public static void writeString(String str, ByteBuf cb) {
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
cb.writeShort(bytes.length);
cb.writeBytes(bytes);
}
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post