概述 内容 由于在视频的算法计算和同步时是一次性计算完成,导致回顾时不能重新定位到需求位置(有人经过、有声音),需要将视频可以导出,同时可以导入到把视频重新放到pipeline中可复现
需求
导出文件
有一个用户操作的入口 [分享菜单]
导出后是一个mp4完整文件/ 原始数据格式(0000001是一帧的开头,根据标识保存数据)
读取文件读成pipeline的输入
文件转换成一帧
过程 pipelineFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 object DemoPipelineFactory { fun createVideoPipeline ( sink: EventSink , ai: IntelligenceRepository , logger: Logger ) : LabelPipeline<Bitmap> { val facePipelineBuilder = LabelPipeline.Builder<List<MarkedFace>>(logger) val facePipeline = facePipelineBuilder .sinkTo(sink) .addOp(HMSFaceVerifier(ai, logger)) .build() val facePipeSplitter = SplitExtractor(facePipeline, GoogleMLKitFaceDetector(logger)) val videoPipeBuilder = LabelPipeline.Builder<Bitmap>(logger) return videoPipeBuilder .sinkTo(sink) .addOp(facePipeSplitter) .addOp(TFLiteObjectDetector(ModelName.SSD_MOBILE_NET_V1, ai, logger)) .build() } fun createAudioPcmPipeline ( eventSink: EventSink , intelligenceRepository: IntelligenceRepository , logger: Logger ) : LabelPipeline<ShortArray> { val audioPipeBuilder = LabelPipeline.Builder<ShortArray>(logger) return audioPipeBuilder .sinkTo(eventSink) .addOp(YAMNetAudioClassifier(intelligenceRepository, logger)) .build() } }
LabelPipeLine
com/danale/edge/intelligence/pipe/LabelPipeline.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 open class LabelPipeline <T : Any >( private val extractors: List<LabelExtractor<T>>, private val logger: Logger, private val providedSink: EventSink? ) : Closeable { companion object { const val TAG = "LabelPipeline" } init { extractors.forEach { if (providedSink != null ) { logger.i(TAG, "init, sink ${it.hashCode()} to ${providedSink.hashCode()} " ) it.bindEventSink(providedSink) } } } fun process (item: T , metaData: LabeledObject .MetaData ) : Single<LabeledObject<T>> { var single = Single.just(LabeledObject(ConcurrentHashMap(), item, metaData = metaData)) for (op in extractors) { single = single.flatMap { op.extractLabel(it).toSingleDefault(it) } } return single } class Builder <T : Any >(val logger: Logger) { private val list = LinkedList<LabelExtractor<T>>() private var sink: EventSink? = null fun sinkTo (sink: EventSink ) : Builder<T> { this .sink = sink return this } fun addOp (extractor: LabelExtractor <T >) : Builder<T> { list.addLast(extractor) return this } fun build () : LabelPipeline<T> { return LabelPipeline(list, logger, sink) } } override fun close () { for (op in extractors) { op.flushSession(providedSink) op.close() } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 interface LabelExtractor <T : Any > : Closeable , EventSource { fun extractLabel (item: LabeledObject <T >) : Completable fun flushSession (sink: EventSink ?) {} fun labelKey (sourceType: String , category: String , vendor: String , vendorKey: String ) : String { return "$sourceType :$category :$vendor :$vendorKey " } }
视频导出 通过ByteBuffer导出成MP4格式视频
com/danale/edge/ui/flow/VideoFlowViewModel.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 fun attachPlayer ( surface: Surface , clipDri: String , clipDataSource: ByteBufferVideoSource ? = null , positionResponder: ClipPositionResponder ? = null ) { logger.d( TAG, "attachPlayer, clipDri=${Fuzzy.interval(clipDri)} , data=${clipDataSource?.hashCode()} " ) positionResponder?.let { weakPositionResponder = WeakReference(it) } loadPlayerExecutor.submit { detachPlayerSync() if (player == null ) { logger.i(TAG, "attachPlayer, instantiate" ) player = IjkPlayerProvider.provideIjkPlayerDefaultConfig() } player?.apply { try { setSurface(surface) setOnPreparedListener { } setOnErrorListener { mediaPlayer, errArg1, errArg2 -> } setOnCompletionListener { } } catch (e: Exception) { logger.e(TAG, "attachPlayer, null mp" , e) } } if (clipDataSource != null ) { logger.i(TAG, "attachPlayer, use exist data" ) try { } catch (e: Exception) {} } else { logger.i(TAG, "attachPlayer, load clip ${Fuzzy.interval(clipDri)} " ) val latch = CountDownLatch(1 ) dataSubscription = sdk.getBlobWithDri(clipDri) .subscribeOn(Schedulers.io()) .subscribeBy( onSuccess = { logger.i(TAG, "attachPlayer, blob ok" ) try { val data = ByteBufferVideoSource() data .load(it) preparePlayer(data ) saveBytebuffer(it, clipDri.split("clip_" )[1 ]) } catch (e: Exception) {} latch.countDown() }, onError = {} ) latch.await() } } }
其中data.load(it)
是获取视频来源,it
是ByteBuffer格式,通过保it
为MP4格式可以实现视频的导出,不同的视频通过clipDri
加以区分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private fun getPath () : String { val result = (Environment.getExternalStorageDirectory().toString() + File.separator + Environment.DIRECTORY_DCIM + File.separator + "Camera" + File.separator) val file = File(result) if (!file.exists()) { file.mkdir() } return result + File.separator } private fun saveBytebuffer (byteBuffer: ByteBuffer , fileName: String ) { logger.d(TAG, "attachPlayer, save bytebuffer" ) val file = File(getPath()+ fileName + ".mp4" ) logger.d(TAG, "attachPlayer, file=$file " ) if (!file.exists()){ file.createNewFile(); } val os: OutputStream = FileOutputStream(file,true ) os.write(byteBuffer.array()) os.flush() os.close() }
clipDri
是String格式,但直接将其存为文件名会有非法字符,[D*]dri:data_worm:kv:blob:grid/827f8b9cfbb780df63f56fe6e7bef03c:clip_f00b3dd5c79411edaa64d54fbc172c8c
通过截取拿到clip_
后的部分组成文件名
视频导入 图片导入 通过ffmpeg
将mp4视频文件分割成jpg图片文件
1 ffmpeg -i test.mp4 -r 10 -f image2 %05d.jpg
-i : 指定输入文件
-r : 帧数 10
-f : 指定格式化的格式为image2
image2后面跟着的是文件名
%5d:以为5位数按正序编号
将图片放入到app的文件夹中,由于手机的安全性,放入其他文件夹会读不到文件/sdcard/Android/data/com.alcidae.edgeapp/files
通过context.getExternalFilesDir(null)
获取到文件夹路径,存入每个图片的名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private fun getPics () : MutableList<String> { logger.d(TAG, "start getPics, ${context.getExternalFilesDir(null)} " ) val fileNames: MutableList<String> = mutableListOf() val fileTree = File("${context.getExternalFilesDir(null)} /test" ).walk() fileTree.maxDepth(1 ) .filter { it.isFile } .filter { it.extension in listOf("jpg" ) } .forEach { fileNames.add(it.absolutePath) } return fileNames }
通过循环读取图片,并将图片转成bitmap输入到pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @SuppressLint("CheckResult" ) private fun pipelineFromPic (files: MutableList <String >) { for (i in 0 until files.size){ val file = File(files[i]) val bytes = file.readBytes() val currBitmap = BitmapFactory.decodeByteArray(bytes, 0 , bytes.size) val metaData = LabeledObject.MetaData(dataTimestamp = i.toLong()) videoPipeline.process(currBitmap, metaData) .flatMap { val elapsedNano = it.getElapsedMillis() logger.d(TAG, "for, inferenceTime = $elapsedNano " ) it.obj.recycle() return @flatMap Single.just(it) } .subscribeBy( onSuccess = { logger.d(TAG, "for, onSuccess, i=$i , labeledObject = $it " ) }, onError = { logger.d(TAG, "for, onError, i=$i , throwable = $it " ) close() }, ) } videoPipeline.close() }
1 2 3 val files = getPics()pipelineFromPic(files)
音频导入 通过ffmpeg工具将mp4文件转成pcm文件
1 2 ffmpeg -i test.mp4 test.mp3 ffmpeg -i test.mp3 -ar 48000 -ac 2 -f s16le test.pcm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @SuppressLint("CheckResult" ) fun videoPipeline () { val file = File("${context.getExternalFilesDir(null)} /test/test.pcm" ) val bytes = file.readBytes() val metaData = LabeledObject.MetaData(dataTimestamp = 1. toLong()) logger.d(TAG, "bytes.size= ${bytes.size} " ) val step = 640 val num = floor((bytes.size / step).toDouble()) for (i in 0 until num.toInt()) { val byte = bytes.slice(i..i + step) val shorts = ShortArray(byte.size / 2 ) { (byte[it * 2 ].toUByte().toInt() + (byte[(it * 2 ) + 1 ].toInt() shl 8 )).toShort() } Maybe.fromSingle(audioPipeline.process(shorts, metaData) .flatMap { val elapsedNano = it.getElapsedMillis() logger.d(TAG, "audioPipe, inferenceTime = $elapsedNano " ) return @flatMap Single.just(it) }).subscribeBy( onSuccess = { logger.d(TAG, "for, onSuccess, labeledObject = $it " ) }, onError = { logger.d(TAG, "for, onError, throwable = $it " ) close() }, ) } audioPipeline.close() }