Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class UniqueQueue<T> {
return false
}

operator fun plusAssign(value: T) {
add(value)
}

fun removeFirstOrNull(): T? {
val value = queue.removeFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
package com.open592.fileserver.net

import com.github.michaelbull.logging.InlineLogger
import com.open592.fileserver.protocol.inbound.Js5InboundChannelHandler
import com.open592.fileserver.protocol.inbound.Js5InboundMessageDecoder
import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessageEncoder
import com.open592.fileserver.protocol.outbound.Js5OutboundStatusMessageEncoder
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.handler.timeout.IdleStateHandler
import jakarta.inject.Inject
import jakarta.inject.Provider
import jakarta.inject.Singleton
import java.util.concurrent.TimeUnit

@Singleton
class NetworkChannelInitializer : ChannelInitializer<Channel>() {
class NetworkChannelInitializer
@Inject
constructor(private val js5InboundChannelHandler: Provider<Js5InboundChannelHandler>) :
ChannelInitializer<Channel>() {
override fun initChannel(channel: Channel) {
channel
.pipeline()
.addLast(
IdleStateHandler(
true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS))
true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS),
Js5InboundMessageDecoder(),
Js5OutboundStatusMessageEncoder(),
Js5OutboundGroupMessageEncoder(),
js5InboundChannelHandler.get(),
)
}

override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
logger.error { "Caught exception: ${cause.message}" }
}

private companion object {
private const val TIMEOUT_SECONDS: Long = 30
private val logger = InlineLogger()
}
}
44 changes: 44 additions & 0 deletions src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.open592.fileserver.net.js5

import com.open592.fileserver.protocol.inbound.Js5InboundMessage
import io.netty.channel.ChannelHandlerContext

class Js5Client(val ctx: ChannelHandlerContext) {
private val urgent = ArrayDeque<Js5InboundMessage.RequestGroup>()
private val prefetch = ArrayDeque<Js5InboundMessage.RequestGroup>()

fun push(request: Js5InboundMessage.RequestGroup) {
if (request.isPrefetch) {
prefetch += request
} else {
urgent += request
prefetch -= request
}
}

fun pop(): Js5InboundMessage.RequestGroup? {
val request = urgent.removeFirstOrNull()

if (request != null) {
return request
}

return prefetch.removeFirstOrNull()
}

fun isNotFull(): Boolean {
return urgent.size < MAX_QUEUE_SIZE && prefetch.size < MAX_QUEUE_SIZE
}

fun isNotEmpty(): Boolean {
return urgent.isNotEmpty() || prefetch.isNotEmpty()
}

fun isReady(): Boolean {
return ctx.channel().isWritable && isNotEmpty()
}

private companion object {
private const val MAX_QUEUE_SIZE = 20
}
}
142 changes: 142 additions & 0 deletions src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.open592.fileserver.net.js5

import com.displee.cache.CacheLibrary
import com.displee.compress.CompressionType
import com.displee.compress.compress
import com.displee.compress.type.EmptyCompressor
import com.github.michaelbull.logging.InlineLogger
import com.google.common.util.concurrent.AbstractExecutionThreadService
import com.open592.fileserver.buffer.use
import com.open592.fileserver.collections.UniqueQueue
import com.open592.fileserver.protocol.inbound.Js5InboundMessage
import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessage
import io.netty.buffer.ByteBufAllocator
import jakarta.inject.Inject
import jakarta.inject.Singleton

@Singleton
class Js5Service
@Inject
constructor(
private val allocator: ByteBufAllocator,
private val cacheLibrary: CacheLibrary,
) : AbstractExecutionThreadService() {
private val lock = Object()
private val clients = UniqueQueue<Js5Client>()

override fun run() {
while (true) {
var client: Js5Client
var request: Js5InboundMessage.RequestGroup

synchronized(lock) {
while (true) {
if (!isRunning) {
return
}

val next = clients.removeFirstOrNull()

if (next == null) {
lock.wait()
continue
}

client = next
request = client.pop() ?: continue

break
}
}

serve(client, request)
}
}

private fun serve(client: Js5Client, request: Js5InboundMessage.RequestGroup) {
val ctx = client.ctx

if (!ctx.channel().isActive) {
return
}

val buf =
if (request.archive == ARCHIVE_SET && request.group == ARCHIVE_SET) {
allocator.buffer().use { buffer ->
val masterIndex =
cacheLibrary.generateUkeys(false).compress(CompressionType.NONE, EmptyCompressor)

buffer.writeBytes(masterIndex)
buffer.retain()
}
} else {
allocator.buffer().use { buffer ->
val archiveSector =
if (request.group == 255) {
cacheLibrary.index255?.readArchiveSector(request.archive)
} else {
cacheLibrary.index(request.group).readArchiveSector(request.archive)
} ?: return

buffer.writeBytes(archiveSector.data)
buffer.retain()
}
}

val response =
Js5OutboundGroupMessage(request.isPrefetch, request.archive, request.group, data = buf)

ctx.writeAndFlush(response, ctx.voidPromise())

synchronized(lock) {
if (client.isReady()) {
clients.add(client)
}

if (client.isNotFull()) {
ctx.read()
}
}
}

fun push(client: Js5Client, request: Js5InboundMessage.RequestGroup) {
synchronized(lock) {
client.push(request)

if (client.isReady()) {
clients.add(client)

lock.notifyAll()
}

if (client.isNotFull()) {
client.ctx.read()
}
}
}

fun readIfNotFull(client: Js5Client) {
synchronized(lock) {
if (client.isNotFull()) {
client.ctx.read()
}
}
}

fun notifyIfNotEmpty(client: Js5Client) {
synchronized(lock) {
if (client.isNotEmpty()) {
lock.notifyAll()
}
}
}

override fun triggerShutdown() {
synchronized(lock) { lock.notifyAll() }
}

private companion object {
private const val ARCHIVE_SET = 255
private val logger = InlineLogger()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.open592.fileserver.protocol.inbound

import com.github.michaelbull.logging.InlineLogger
import com.open592.fileserver.configuration.ServerConfiguration
import com.open592.fileserver.net.js5.Js5Client
import com.open592.fileserver.net.js5.Js5Service
import com.open592.fileserver.protocol.outbound.Js5OutboundStatusMessage
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.timeout.IdleStateEvent
import jakarta.inject.Inject

class Js5InboundChannelHandler
@Inject
constructor(
private val service: Js5Service,
private val serverConfiguration: ServerConfiguration,
) : SimpleChannelInboundHandler<Js5InboundMessage>(Js5InboundMessage::class.java) {
private lateinit var client: Js5Client

override fun handlerAdded(ctx: ChannelHandlerContext) {
client = Js5Client(ctx.read())
}

override fun channelRead0(ctx: ChannelHandlerContext, message: Js5InboundMessage) {
when (message) {
is Js5InboundMessage.InitializeJs5RemoteConnection ->
handleInitializeJs5RemoteConnection(ctx, message)
is Js5InboundMessage.RequestGroup -> service.push(client, message)
is Js5InboundMessage.ExchangeObfuscationKey -> handleExchangeObfuscationKey(message)
is Js5InboundMessage.RequestConnectionDisconnect -> ctx.close()
else -> Unit
}
}

private fun handleInitializeJs5RemoteConnection(
ctx: ChannelHandlerContext,
message: Js5InboundMessage.InitializeJs5RemoteConnection
) {
if (message.build != serverConfiguration.getBuildNumber()) {
ctx.write(Js5OutboundStatusMessage.ClientIsOutOfDate)
} else {
ctx.write(Js5OutboundStatusMessage.Ok)
}
}

private fun handleExchangeObfuscationKey(message: Js5InboundMessage.ExchangeObfuscationKey) {
logger.info { "Handle Exchange Obfuscation Key with value = ${message.key}" }
}

override fun channelReadComplete(ctx: ChannelHandlerContext) {
service.readIfNotFull(client)
ctx.flush()
}

override fun channelWritabilityChanged(ctx: ChannelHandlerContext) {
if (ctx.channel().isWritable) {
service.notifyIfNotEmpty(client)
}
}

override fun userEventTriggered(ctx: ChannelHandlerContext, event: Any) {
if (event is IdleStateEvent) {
ctx.close()
}
}

private companion object {
private val logger = InlineLogger()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.open592.fileserver.protocol.inbound

sealed class Js5InboundMessage {
data class InitializeJs5RemoteConnection(val build: Int) : Js5InboundMessage()

data class RequestGroup(val archive: Int, val group: Int, val isPrefetch: Boolean) :
Js5InboundMessage()

object InformUserIsLoggedIn : Js5InboundMessage()

object InformUserIsLoggedOut : Js5InboundMessage()

object InformClientIsReady : Js5InboundMessage()

object RequestConnectionDisconnect : Js5InboundMessage()

data class ExchangeObfuscationKey(val key: Int) : Js5InboundMessage()
}
Loading