diff --git a/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt b/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt index 12d17eb..222086d 100644 --- a/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt +++ b/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt @@ -17,6 +17,10 @@ class UniqueQueue { return false } + operator fun plusAssign(value: T) { + add(value) + } + fun removeFirstOrNull(): T? { val value = queue.removeFirstOrNull() diff --git a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt index 0293851..0ead7ba 100644 --- a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt +++ b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt @@ -1,22 +1,43 @@ package com.open592.fileserver.net +import com.github.michaelbull.logging.InlineLogger +import com.open592.fileserver.protocol.inbound.Js5InboundChannelHandler +import com.open592.fileserver.protocol.inbound.Js5InboundMessageDecoder +import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5OutboundStatusMessageEncoder import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInitializer import io.netty.handler.timeout.IdleStateHandler +import jakarta.inject.Inject +import jakarta.inject.Provider import jakarta.inject.Singleton import java.util.concurrent.TimeUnit @Singleton -class NetworkChannelInitializer : ChannelInitializer() { +class NetworkChannelInitializer +@Inject +constructor(private val js5InboundChannelHandler: Provider) : + ChannelInitializer() { override fun initChannel(channel: Channel) { channel .pipeline() .addLast( IdleStateHandler( - true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS)) + true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS), + Js5InboundMessageDecoder(), + Js5OutboundStatusMessageEncoder(), + Js5OutboundGroupMessageEncoder(), + js5InboundChannelHandler.get(), + ) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + logger.error { "Caught exception: ${cause.message}" } } private companion object { private const val TIMEOUT_SECONDS: Long = 30 + private val logger = InlineLogger() } } diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt new file mode 100644 index 0000000..37f9e49 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt @@ -0,0 +1,44 @@ +package com.open592.fileserver.net.js5 + +import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import io.netty.channel.ChannelHandlerContext + +class Js5Client(val ctx: ChannelHandlerContext) { + private val urgent = ArrayDeque() + private val prefetch = ArrayDeque() + + fun push(request: Js5InboundMessage.RequestGroup) { + if (request.isPrefetch) { + prefetch += request + } else { + urgent += request + prefetch -= request + } + } + + fun pop(): Js5InboundMessage.RequestGroup? { + val request = urgent.removeFirstOrNull() + + if (request != null) { + return request + } + + return prefetch.removeFirstOrNull() + } + + fun isNotFull(): Boolean { + return urgent.size < MAX_QUEUE_SIZE && prefetch.size < MAX_QUEUE_SIZE + } + + fun isNotEmpty(): Boolean { + return urgent.isNotEmpty() || prefetch.isNotEmpty() + } + + fun isReady(): Boolean { + return ctx.channel().isWritable && isNotEmpty() + } + + private companion object { + private const val MAX_QUEUE_SIZE = 20 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt new file mode 100644 index 0000000..c8e8f22 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt @@ -0,0 +1,142 @@ +package com.open592.fileserver.net.js5 + +import com.displee.cache.CacheLibrary +import com.displee.compress.CompressionType +import com.displee.compress.compress +import com.displee.compress.type.EmptyCompressor +import com.github.michaelbull.logging.InlineLogger +import com.google.common.util.concurrent.AbstractExecutionThreadService +import com.open592.fileserver.buffer.use +import com.open592.fileserver.collections.UniqueQueue +import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessage +import io.netty.buffer.ByteBufAllocator +import jakarta.inject.Inject +import jakarta.inject.Singleton + +@Singleton +class Js5Service +@Inject +constructor( + private val allocator: ByteBufAllocator, + private val cacheLibrary: CacheLibrary, +) : AbstractExecutionThreadService() { + private val lock = Object() + private val clients = UniqueQueue() + + override fun run() { + while (true) { + var client: Js5Client + var request: Js5InboundMessage.RequestGroup + + synchronized(lock) { + while (true) { + if (!isRunning) { + return + } + + val next = clients.removeFirstOrNull() + + if (next == null) { + lock.wait() + continue + } + + client = next + request = client.pop() ?: continue + + break + } + } + + serve(client, request) + } + } + + private fun serve(client: Js5Client, request: Js5InboundMessage.RequestGroup) { + val ctx = client.ctx + + if (!ctx.channel().isActive) { + return + } + + val buf = + if (request.archive == ARCHIVE_SET && request.group == ARCHIVE_SET) { + allocator.buffer().use { buffer -> + val masterIndex = + cacheLibrary.generateUkeys(false).compress(CompressionType.NONE, EmptyCompressor) + + buffer.writeBytes(masterIndex) + buffer.retain() + } + } else { + allocator.buffer().use { buffer -> + val archiveSector = + if (request.group == 255) { + cacheLibrary.index255?.readArchiveSector(request.archive) + } else { + cacheLibrary.index(request.group).readArchiveSector(request.archive) + } ?: return + + buffer.writeBytes(archiveSector.data) + buffer.retain() + } + } + + val response = + Js5OutboundGroupMessage(request.isPrefetch, request.archive, request.group, data = buf) + + ctx.writeAndFlush(response, ctx.voidPromise()) + + synchronized(lock) { + if (client.isReady()) { + clients.add(client) + } + + if (client.isNotFull()) { + ctx.read() + } + } + } + + fun push(client: Js5Client, request: Js5InboundMessage.RequestGroup) { + synchronized(lock) { + client.push(request) + + if (client.isReady()) { + clients.add(client) + + lock.notifyAll() + } + + if (client.isNotFull()) { + client.ctx.read() + } + } + } + + fun readIfNotFull(client: Js5Client) { + synchronized(lock) { + if (client.isNotFull()) { + client.ctx.read() + } + } + } + + fun notifyIfNotEmpty(client: Js5Client) { + synchronized(lock) { + if (client.isNotEmpty()) { + lock.notifyAll() + } + } + } + + override fun triggerShutdown() { + synchronized(lock) { lock.notifyAll() } + } + + private companion object { + private const val ARCHIVE_SET = 255 + private val logger = InlineLogger() + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt new file mode 100644 index 0000000..6b46bbd --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt @@ -0,0 +1,71 @@ +package com.open592.fileserver.protocol.inbound + +import com.github.michaelbull.logging.InlineLogger +import com.open592.fileserver.configuration.ServerConfiguration +import com.open592.fileserver.net.js5.Js5Client +import com.open592.fileserver.net.js5.Js5Service +import com.open592.fileserver.protocol.outbound.Js5OutboundStatusMessage +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.timeout.IdleStateEvent +import jakarta.inject.Inject + +class Js5InboundChannelHandler +@Inject +constructor( + private val service: Js5Service, + private val serverConfiguration: ServerConfiguration, +) : SimpleChannelInboundHandler(Js5InboundMessage::class.java) { + private lateinit var client: Js5Client + + override fun handlerAdded(ctx: ChannelHandlerContext) { + client = Js5Client(ctx.read()) + } + + override fun channelRead0(ctx: ChannelHandlerContext, message: Js5InboundMessage) { + when (message) { + is Js5InboundMessage.InitializeJs5RemoteConnection -> + handleInitializeJs5RemoteConnection(ctx, message) + is Js5InboundMessage.RequestGroup -> service.push(client, message) + is Js5InboundMessage.ExchangeObfuscationKey -> handleExchangeObfuscationKey(message) + is Js5InboundMessage.RequestConnectionDisconnect -> ctx.close() + else -> Unit + } + } + + private fun handleInitializeJs5RemoteConnection( + ctx: ChannelHandlerContext, + message: Js5InboundMessage.InitializeJs5RemoteConnection + ) { + if (message.build != serverConfiguration.getBuildNumber()) { + ctx.write(Js5OutboundStatusMessage.ClientIsOutOfDate) + } else { + ctx.write(Js5OutboundStatusMessage.Ok) + } + } + + private fun handleExchangeObfuscationKey(message: Js5InboundMessage.ExchangeObfuscationKey) { + logger.info { "Handle Exchange Obfuscation Key with value = ${message.key}" } + } + + override fun channelReadComplete(ctx: ChannelHandlerContext) { + service.readIfNotFull(client) + ctx.flush() + } + + override fun channelWritabilityChanged(ctx: ChannelHandlerContext) { + if (ctx.channel().isWritable) { + service.notifyIfNotEmpty(client) + } + } + + override fun userEventTriggered(ctx: ChannelHandlerContext, event: Any) { + if (event is IdleStateEvent) { + ctx.close() + } + } + + private companion object { + private val logger = InlineLogger() + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt new file mode 100644 index 0000000..42c5c23 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt @@ -0,0 +1,18 @@ +package com.open592.fileserver.protocol.inbound + +sealed class Js5InboundMessage { + data class InitializeJs5RemoteConnection(val build: Int) : Js5InboundMessage() + + data class RequestGroup(val archive: Int, val group: Int, val isPrefetch: Boolean) : + Js5InboundMessage() + + object InformUserIsLoggedIn : Js5InboundMessage() + + object InformUserIsLoggedOut : Js5InboundMessage() + + object InformClientIsReady : Js5InboundMessage() + + object RequestConnectionDisconnect : Js5InboundMessage() + + data class ExchangeObfuscationKey(val key: Int) : Js5InboundMessage() +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt new file mode 100644 index 0000000..f1f1ed1 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt @@ -0,0 +1,97 @@ +package com.open592.fileserver.protocol.inbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ByteToMessageDecoder +import io.netty.handler.codec.DecoderException + +class Js5InboundMessageDecoder : ByteToMessageDecoder() { + override fun decode(ctx: ChannelHandlerContext, input: ByteBuf, output: MutableList) { + if (input.readableBytes() < 4) { + return + } + + val opcode = input.readUnsignedByte().toInt() + val message = decodeOpcode(input, opcode) + + output += message + } + + private fun decodeOpcode(input: ByteBuf, opcode: Int): Js5InboundMessage { + return when (opcode) { + 0 -> decodeRequestGroupPacket(input, isPrefetch = true) + 1 -> decodeRequestGroupPacket(input, isPrefetch = false) + 2 -> decodeInformUserIsLoggedInPacket(input) + 3 -> decodeInformUserIsLoggedOutPacket(input) + 4 -> decodeExchangeObfuscationKeyPacket(input) + 6 -> decodeInformClientIsReadyPacket(input) + 7 -> decodeRequestConnectionDisconnectPacket(input) + 15 -> decodeInitializeJs5RemoteConnectionPacket(input) + else -> throw DecoderException("Unknown Js5 inbound message opcode: $opcode") + } + } + + private fun decodeRequestGroupPacket( + input: ByteBuf, + isPrefetch: Boolean + ): Js5InboundMessage.RequestGroup { + val archive = input.readUnsignedByte().toInt() + val group = input.readUnsignedShort().toInt() + + return Js5InboundMessage.RequestGroup(archive, group, isPrefetch) + } + + private fun decodeInformUserIsLoggedInPacket( + input: ByteBuf + ): Js5InboundMessage.InformUserIsLoggedIn { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.InformUserIsLoggedIn + } + + private fun decodeInformUserIsLoggedOutPacket( + input: ByteBuf + ): Js5InboundMessage.InformUserIsLoggedOut { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.InformUserIsLoggedOut + } + + private fun decodeRequestConnectionDisconnectPacket( + input: ByteBuf + ): Js5InboundMessage.RequestConnectionDisconnect { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.RequestConnectionDisconnect + } + + private fun decodeExchangeObfuscationKeyPacket( + input: ByteBuf + ): Js5InboundMessage.ExchangeObfuscationKey { + val key = input.readUnsignedByte().toInt() + + return Js5InboundMessage.ExchangeObfuscationKey(key) + } + + private fun decodeInformClientIsReadyPacket( + input: ByteBuf + ): Js5InboundMessage.InformClientIsReady { + // Skip padding bytes + // NOTE: The client usually sends along padding bytes with `0` as the value, + // but for this message it's using `p3(3)`. + input.skipBytes(3) + + return Js5InboundMessage.InformClientIsReady + } + + private fun decodeInitializeJs5RemoteConnectionPacket( + input: ByteBuf + ): Js5InboundMessage.InitializeJs5RemoteConnection { + val build = input.readInt() + + return Js5InboundMessage.InitializeJs5RemoteConnection(build) + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt new file mode 100644 index 0000000..a51191a --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt @@ -0,0 +1,10 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf + +data class Js5OutboundGroupMessage( + val isPrefetch: Boolean, + val archive: Int, + val group: Int, + val data: ByteBuf +) : Js5OutboundMessage diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt new file mode 100644 index 0000000..a962ced --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt @@ -0,0 +1,55 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.EncoderException +import io.netty.handler.codec.MessageToByteEncoder +import kotlin.math.min + +@ChannelHandler.Sharable +class Js5OutboundGroupMessageEncoder : + MessageToByteEncoder(Js5OutboundGroupMessage::class.java) { + override fun encode( + ctx: ChannelHandlerContext, + message: Js5OutboundGroupMessage, + output: ByteBuf + ) { + output.writeByte(message.archive) + output.writeShort(message.group) + + if (!message.data.isReadable) { + throw EncoderException("Missing compression byte") + } + + var compression = message.data.readUnsignedByte().toInt() + + if (message.isPrefetch) { + compression = compression or 0x80 + } + + output.writeByte(compression) + + output.writeBytes(message.data, min(message.data.readableBytes(), 508)) + + while (message.data.isReadable) { + output.writeByte(0xFF) + output.writeBytes(message.data, min(message.data.readableBytes(), 511)) + } + } + + override fun allocateBuffer( + ctx: ChannelHandlerContext, + message: Js5OutboundGroupMessage, + preferDirect: Boolean + ): ByteBuf { + val dataLength = message.data.readableBytes() + val bufferLength = 2 + dataLength + (512 + dataLength) / 511 + + return if (preferDirect) { + ctx.alloc().ioBuffer(bufferLength, bufferLength) + } else { + ctx.alloc().heapBuffer(bufferLength, bufferLength) + } + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt new file mode 100644 index 0000000..26dde23 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt @@ -0,0 +1,3 @@ +package com.open592.fileserver.protocol.outbound + +interface Js5OutboundMessage diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt new file mode 100644 index 0000000..292dcaf --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt @@ -0,0 +1,11 @@ +package com.open592.fileserver.protocol.outbound + +sealed class Js5OutboundStatusMessage(val opcode: Int) : Js5OutboundMessage { + object Ok : Js5OutboundStatusMessage(opcode = 0) + + object ClientIsOutOfDate : Js5OutboundStatusMessage(opcode = 6) + + object ServerIsFull : Js5OutboundStatusMessage(opcode = 7) + + object IpIsLimited : Js5OutboundStatusMessage(opcode = 9) +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt new file mode 100644 index 0000000..7dfee83 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt @@ -0,0 +1,32 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToByteEncoder + +class Js5OutboundStatusMessageEncoder : + MessageToByteEncoder(Js5OutboundStatusMessage::class.java) { + override fun encode( + ctx: ChannelHandlerContext, + message: Js5OutboundStatusMessage, + output: ByteBuf + ) { + output.writeByte(message.opcode) + } + + override fun allocateBuffer( + ctx: ChannelHandlerContext, + msg: Js5OutboundStatusMessage, + preferDirect: Boolean + ): ByteBuf { + return if (preferDirect) { + ctx.alloc().ioBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } else { + ctx.alloc().heapBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } + } + + private companion object { + private const val BUFFER_LENGTH = 4 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/server/FileServer.kt b/src/main/kotlin/com/open592/fileserver/server/FileServer.kt index 8bdcd2c..47e9a86 100644 --- a/src/main/kotlin/com/open592/fileserver/server/FileServer.kt +++ b/src/main/kotlin/com/open592/fileserver/server/FileServer.kt @@ -3,7 +3,6 @@ package com.open592.fileserver.server import com.github.michaelbull.logging.InlineLogger import com.google.common.util.concurrent.Service import com.google.common.util.concurrent.ServiceManager -import com.open592.fileserver.configuration.ServerConfiguration import jakarta.inject.Inject import jakarta.inject.Singleton @@ -12,7 +11,6 @@ class FileServer @Inject constructor( services: Set, - private val serverConfiguration: ServerConfiguration, ) { private val serviceManager = ServiceManager(services) diff --git a/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt b/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt index 978c666..47752cf 100644 --- a/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt +++ b/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt @@ -7,6 +7,7 @@ import com.open592.fileserver.buffer.BufferModule import com.open592.fileserver.cache.CacheModule import com.open592.fileserver.configuration.ServerConfigurationModule import com.open592.fileserver.net.NetworkService +import com.open592.fileserver.net.js5.Js5Service object FileServerModule : AbstractModule() { override fun configure() { @@ -16,5 +17,6 @@ object FileServerModule : AbstractModule() { val binder = Multibinder.newSetBinder(binder(), Service::class.java) binder.addBinding().to(NetworkService::class.java) + binder.addBinding().to(Js5Service::class.java) } }