1 package net.osdn.gokigen.a01d.camera.ptpip.wrapper.command
3 import android.util.Log
4 import net.osdn.gokigen.a01d.camera.utils.SimpleLogDumper
5 import java.io.BufferedReader
6 import java.io.ByteArrayOutputStream
7 import java.io.DataOutputStream
8 import java.io.InputStream
9 import java.net.InetSocketAddress
10 import java.net.Socket
13 class PtpIpCommandPublisher(private val ipAddress : String, private val portNumber : Int, private val tcpNoDelay : Boolean, private val waitForever: Boolean) : IPtpIpCommandPublisher, IPtpIpCommunication
15 private val TAG = PtpIpCommandPublisher::class.java.simpleName
17 private val SEQUENCE_START_NUMBER = 1
18 private val BUFFER_SIZE = 1024 * 1024 + 16 // 受信バッファは 1MB
20 private val COMMAND_SEND_RECEIVE_DURATION_MS = 5
21 private val COMMAND_SEND_RECEIVE_DURATION_MAX = 3000
22 private val COMMAND_POLL_QUEUE_MS = 5
24 private var isConnected = false
25 private var isStart = false
26 private var isHold = false
27 private var holdId = 0
28 private var socket : Socket? = null
29 private var dos: DataOutputStream? = null
30 private var bufferedReader: BufferedReader? = null
31 private var sequenceNumber = SEQUENCE_START_NUMBER
32 private var commandQueue: Queue<IPtpIpCommand> = ArrayDeque()
33 private var holdCommandQueue: Queue<IPtpIpCommand> = ArrayDeque()
38 holdCommandQueue.clear()
41 override fun isConnected(): Boolean
46 override fun connect(): Boolean
50 Log.v(TAG, " connect()")
52 socket?.tcpNoDelay = tcpNoDelay
55 socket?.keepAlive = false
56 socket?.setPerformancePreferences(0, 2, 0)
57 socket?.oobInline = true
58 socket?.reuseAddress = false
59 socket?.trafficClass = 0x80
60 //socket?.setSoLinger(true, 3000);
61 //socket?.setReceiveBufferSize(2097152);
62 //socket?.setSendBufferSize(524288);
64 socket?.connect(InetSocketAddress(ipAddress, portNumber), 0)
76 override fun disconnect()
81 bufferedReader?.close()
91 sequenceNumber = SEQUENCE_START_NUMBER
103 // すでにコマンドのスレッド動作中なので抜ける
109 Log.v(TAG, " SOCKET IS NULL. (cannot start)")
114 Log.v(TAG, " start()")
115 val thread = Thread {
118 dos = DataOutputStream(socket?.getOutputStream())
123 val command = commandQueue.poll()
124 command?.let { issueCommand(it) }
125 Thread.sleep(COMMAND_POLL_QUEUE_MS.toLong())
135 Log.v(TAG, "<<<<< IP : $ipAddress port : $portNumber >>>>>")
155 override fun enqueueCommand(command: IPtpIpCommand): Boolean
161 return (if (holdId == command.holdId) {
162 if (command.isRelease)
164 // コマンドをキューに積んだ後、リリースする
165 val ret = commandQueue.offer(command)
169 while (holdCommandQueue.size != 0)
171 val queuedCommand = holdCommandQueue.poll()
172 commandQueue.offer(queuedCommand)
173 if (queuedCommand != null && queuedCommand.isHold)
175 // 特定シーケンスに入った場合は、そこで積みなおすのをやめる
177 holdId = queuedCommand.holdId
183 commandQueue.offer(command)
187 // 特定シーケンスではなかったので HOLD
188 holdCommandQueue.offer(command)
194 holdId = command.holdId
196 //Log.v(TAG, "Enqueue : " + command.getId());
197 return (commandQueue.offer(command))
206 override fun flushHoldQueue(): Boolean
208 Log.v(TAG, " flushHoldQueue()")
209 holdCommandQueue.clear()
214 private fun issueCommand(command: IPtpIpCommand)
218 var retry_over = true
221 //Log.v(TAG, "issueCommand : " + command.getId());
222 val commandBody = command.commandBody()
223 if (commandBody != null)
225 // コマンドボディが入っていた場合には、コマンド送信(入っていない場合は受信待ち)
226 sendToCamera(command.dumpLog(), commandBody, command.useSequenceNumber(), command.embeddedSequenceNumberIndex())
227 val commandBody2 = command.commandBody2()
228 if (commandBody2 != null)
230 // コマンドボディの2つめが入っていた場合には、コマンドを連続送信する
231 sendToCamera(command.dumpLog(), commandBody2, command.useSequenceNumber(), command.embeddedSequenceNumberIndex2())
233 val commandBody3 = command.commandBody3()
234 if (commandBody3 != null)
236 // コマンドボディの3つめが入っていた場合には、コマンドを連続送信する
237 sendToCamera(command.dumpLog(), commandBody3, command.useSequenceNumber(), command.embeddedSequenceNumberIndex3())
239 if (command.isIncrementSeqNumber)
245 retry_over = receiveFromCamera(command)
246 if ((retry_over)&&(commandBody != null))
248 if (!command.isRetrySend)
252 // コマンドを再送信しない場合はここで応答を待つ...
253 retry_over = receiveFromCamera(command)
257 if (!command.isIncrementSequenceNumberToRetry)
259 // 再送信...のために、シーケンス番号を戻す...
272 * カメラにコマンドを送信する(メイン部分)
275 private fun sendToCamera(isDumpReceiveLog: Boolean, byte_array: ByteArray, useSequenceNumber: Boolean, embeddedSequenceIndex: Int)
281 Log.v(TAG, " DataOutputStream is null.")
285 // メッセージボディを加工: 最初に4バイトのレングス長をつける
286 val sendData = ByteArray(byte_array.size + 4)
287 sendData[0] = (byte_array.size + 4).toByte()
291 System.arraycopy(byte_array, 0, sendData, 4, byte_array.size)
292 if (useSequenceNumber)
294 // Sequence Number を反映させる
295 sendData[embeddedSequenceIndex ] = (0x000000ff and sequenceNumber).toByte()
296 sendData[embeddedSequenceIndex + 1] = (0x0000ff00 and sequenceNumber ushr 8 and 0x000000ff).toByte()
297 sendData[embeddedSequenceIndex + 2] = (0x00ff0000 and sequenceNumber ushr 16 and 0x000000ff).toByte()
298 sendData[embeddedSequenceIndex + 3] = (-0x1000000 and sequenceNumber ushr 24 and 0x000000ff).toByte()
299 if (isDumpReceiveLog)
301 Log.v(TAG, "----- SEQ No. : $sequenceNumber -----")
304 if (isDumpReceiveLog)
307 SimpleLogDumper.dump_bytes("SEND[" + sendData.size + "] ", sendData)
320 private fun sleep(delayMs: Int)
324 Thread.sleep(delayMs.toLong())
333 * カメラからにコマンドの結果を受信する(メイン部分)
336 private fun receiveFromCamera(command: IPtpIpCommand): Boolean
338 val callback = command.responseCallback()
339 var delayMs = command.receiveDelayMs()
340 if (delayMs < 0 || delayMs > COMMAND_SEND_RECEIVE_DURATION_MAX)
342 delayMs = COMMAND_SEND_RECEIVE_DURATION_MS
345 return (if (callback != null && callback.isReceiveMulti)
347 // 受信したら逐次「受信したよ」と応答するパターン
348 receive_multi(command, delayMs)
352 receive_single(command, delayMs)
354 // 受信した後、すべてをまとめて「受信したよ」と応答するパターン
357 private fun receive_single(command: IPtpIpCommand, delayMs: Int): Boolean
359 val isDumpReceiveLog = command.dumpLog()
361 val callback = command.responseCallback()
364 val receive_message_buffer_size = BUFFER_SIZE
365 val byte_array = ByteArray(receive_message_buffer_size)
366 val inputStream = socket?.getInputStream()
367 if (inputStream == null)
369 Log.v(TAG, " InputStream is NULL... RECEIVE ABORTED.")
370 receivedAllMessage(isDumpReceiveLog, id, null, callback)
374 // 初回データが受信バッファにデータが溜まるまで待つ...
375 var read_bytes = waitForReceive(inputStream, delayMs, command.maxRetryCount())
379 Log.v(TAG, " RECEIVE : RETRY OVER...")
380 if (!command.isRetrySend)
382 // 再送しない場合には、応答がないことを通知する
383 receivedAllMessage(isDumpReceiveLog, id, null, callback)
389 val byteStream = ByteArrayOutputStream()
390 while (read_bytes > 0)
392 read_bytes = inputStream.read(byte_array, 0, receive_message_buffer_size)
395 Log.v(TAG, " RECEIVED MESSAGE FINISHED ($read_bytes)")
398 byteStream.write(byte_array, 0, read_bytes)
400 read_bytes = inputStream.available()
402 val outputStream = cutHeader(byteStream)
403 receivedAllMessage(isDumpReceiveLog, id, outputStream.toByteArray(), callback)
414 private fun receivedAllMessage(isDumpReceiveLog: Boolean, id: Int, body: ByteArray?, callback: IPtpIpCommandCallback?)
416 Log.v(TAG, "receivedAllMessage() : " + (body?.size ?: 0) + " bytes.")
417 if (isDumpReceiveLog && body != null)
420 SimpleLogDumper.dump_bytes("RECV[" + body.size + "] ", body)
422 callback?.receivedMessage(id, body)
425 private fun receive_multi(command: IPtpIpCommand, delayMs: Int): Boolean
427 //int estimatedSize = command.estimatedReceiveDataSize();
428 var maxRetryCount = command.maxRetryCount()
430 val callback = command.responseCallback()
433 Log.v(TAG, " ===== receive_multi() =====")
434 val receive_message_buffer_size = BUFFER_SIZE
435 val byte_array = ByteArray(receive_message_buffer_size)
436 val inputStream = socket?.getInputStream()
437 if (inputStream == null)
439 Log.v(TAG, " InputStream is NULL... RECEIVE ABORTED.")
443 // 初回データが受信バッファにデータが溜まるまで待つ...
444 var read_bytes = waitForReceive(inputStream, delayMs, command.maxRetryCount())
448 Log.v(TAG, " RECEIVE : RETRY OVER...... : " + delayMs + "ms x " + command.maxRetryCount())
449 if (command.isRetrySend)
451 // 要求を再送する場合、、、ダメな場合は受信待ちとする
455 var target_length: Int
456 var received_length: Int
458 //boolean read_retry = false;
463 read_bytes = inputStream.read(byte_array, 0, receive_message_buffer_size)
464 target_length = parseDataLength(byte_array, read_bytes)
465 received_length = read_bytes
466 if (target_length <= 0)
469 if (received_length > 0)
471 SimpleLogDumper.dump_bytes("WRONG DATA : ", Arrays.copyOfRange(byte_array, 0, Math.min(received_length, 64)))
473 Log.v(TAG, " WRONG LENGTH. : $target_length READ : $received_length bytes.")
474 callback?.receivedMessage(id, null)
477 } //while (read_retry);
479 Log.v(TAG, " -=-=-=- 1st CALL : read_bytes : " + read_bytes + "(" + received_length + ") : target_length : " + target_length + " buffer SIZE : " + byte_array.size)
480 callback?.onReceiveProgress(received_length, target_length, Arrays.copyOfRange(byte_array, 0, received_length))
485 read_bytes = inputStream.available()
488 //Log.v(TAG, " WAIT is.available() ... [" + received_length + ", " + target_length + "] retry : " + maxRetryCount);
491 } // while ((read_bytes == 0)&&(maxRetryCount > 0)&&(received_length < target_length)); // ((read_bytes == 0)&&(estimatedSize > 0)&&(received_length < estimatedSize));
492 while (read_bytes >= 0 && received_length < target_length)
494 read_bytes = inputStream.read(byte_array, 0, receive_message_buffer_size)
497 Log.v(TAG, " RECEIVED MESSAGE FINISHED ($read_bytes)")
500 received_length = received_length + read_bytes
503 callback?.onReceiveProgress(received_length, target_length, Arrays.copyOfRange(byte_array, 0, read_bytes))
504 //byteStream.write(byte_array, 0, read_bytes);
505 maxRetryCount = command.maxRetryCount()
509 read_bytes = inputStream.available()
510 //Log.v(TAG, " is.available() read_bytes : " + read_bytes + " " + received_length + " < " + estimatedSize);
511 if (read_bytes == 0) {
512 Log.v(TAG, " WAIT is.available() ... [$received_length, $target_length] $read_bytes retry : $maxRetryCount")
515 } // while ((read_bytes == 0)&&(maxRetryCount > 0)&&(received_length < target_length)); // while ((read_bytes == 0)&&(estimatedSize > 0)&&(received_length < estimatedSize));
519 Log.v(TAG, " --- receive_multi : $id ($read_bytes) [$maxRetryCount] $receive_message_buffer_size ($received_length) ")
520 callback?.receivedMessage(id, Arrays.copyOfRange(byte_array, 0, received_length))
529 private fun parseDataLength(byte_array: ByteArray, read_bytes: Int): Int
533 //int packetType = 0;
538 if (byte_array[offset + 4].toUByte().toInt() == 0x07)
543 if (byte_array[offset + 4].toUByte().toInt() == 0x09)
545 lenlen = (byte_array[offset + 15].toUByte().toInt() and 0xff shl 24) + (byte_array[offset + 14].toUByte().toInt() and 0xff shl 16) + (byte_array[offset + 13].toUByte().toInt() and 0xff shl 8) + (byte_array[offset + 12].toUByte().toInt() and 0xff)
546 //packetType = (((int)byte_array[offset + 16]) & 0xff);
549 //Log.v(TAG, " --- parseDataLength() length: " + lenlen + " TYPE: " + packetType + " read_bytes: " + read_bytes + " offset : " + offset);
558 private fun cutHeader(receivedBuffer: ByteArrayOutputStream): ByteArrayOutputStream
562 val byte_array = receivedBuffer.toByteArray()
563 val limit = byte_array.size
565 val len = (byte_array[3].toUByte().toInt() and 0xff shl 24) + (byte_array[2].toUByte().toInt() and 0xff shl 16) + (byte_array[1].toUByte().toInt() and 0xff shl 8) + (byte_array[0].toUByte().toInt() and 0xff)
566 val packetType = byte_array[4].toUByte().toInt() and 0xff
567 if (limit == len || limit < 16384)
569 // 応答は1つしか入っていない。もしくは受信データサイズが16kBの場合は、そのまま返す。
570 return (receivedBuffer)
572 if (packetType == 0x09)
574 lenlen = (byte_array[15].toUByte().toInt() and 0xff shl 24) + (byte_array[14].toUByte().toInt() and 0xff shl 16) + (byte_array[13].toUByte().toInt() and 0xff shl 8) + (byte_array[12].toUByte().toInt() and 0xff)
575 //packetType = (((int) byte_array[16]) & 0xff);
577 // Log.v(TAG, " --- RECEIVED MESSAGE : " + len + " bytes (BUFFER: " + byte_array.length + " bytes)" + " length : " + lenlen + " TYPE : " + packetType + " --- ");
580 // データとしては変なので、なにもしない
581 return receivedBuffer
583 val outputStream = ByteArrayOutputStream()
584 //outputStream.write(byte_array, 0, 20); //
585 var position = 20 // ヘッダ込の先頭
586 while (position < limit)
588 lenlen = (byte_array[position + 3].toUByte().toInt() and 0xff shl 24) + (byte_array[position + 2].toUByte().toInt() and 0xff shl 16) + (byte_array[position + 1].toUByte().toInt() and 0xff shl 8) + (byte_array[position].toUByte().toInt() and 0xff)
590 val copyByte = Math.min(limit - (position + 12), lenlen - 12)
591 outputStream.write(byte_array, position + 12, copyByte)
592 position = position + lenlen
594 return (outputStream)
601 return (receivedBuffer)
604 private fun waitForReceive(inputStream : InputStream, delayMs: Int, retryCount : Int): Int
606 var retry_count = retryCount
607 var isLogOutput = true
611 while (read_bytes <= 0)
614 read_bytes = inputStream.available()
619 Log.v(TAG, "waitForReceive:: is.available() WAIT... : " + delayMs + "ms")
623 if (!waitForever && retry_count < 0)