Pular para o conteúdo principal

monitoramento transmissão estructurada queries on Databricks

Databricks fornece monitoramento integrado para aplicações de transmissão estruturada por meio do site Spark UI sob a transmissão tab.

Distinguir as consultas de transmissão estruturada no Spark UI

Forneça à sua transmissão um nome de consulta exclusivo adicionando .queryName(<query-name>) ao seu código writeStream para distinguir facilmente quais métricas pertencem a qual transmissão no site Spark UI.

Empurrar a transmissão estructurada métricas para o serviço externo

As transmissões métricas podem ser enviadas para serviços externos para casos de uso de alertas ou painéis de controle usando a interface Query Listener de transmissão do site Apache Spark. Em Databricks Runtime 11.3 LTS e acima, StreamingQueryListener está disponível em Python e Scala.

important

As seguintes limitações se aplicam às cargas de trabalho que usam os modos de acesso compute habilitados pelo Unity Catalog:

  • StreamingQueryListener requer Databricks Runtime 15.1 ou acima para usar credenciais ou interagir com objetos gerenciados por Unity Catalog em compute com modo de acesso dedicado.
  • StreamingQueryListener requer o Databricks Runtime 16.1 ou o acima para cargas de trabalho do Scala configuradas com o modo de acesso padrão (antigo modo de acesso compartilhado).
nota

A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. É aconselhável limitar a lógica de processamento nesses ouvintes e optar por escrever em sistemas de resposta rápida, como o Kafka, para obter eficiência.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}

/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Definição de métricas observáveis na transmissão estruturada

As métricas observáveis são denominadas funções agregadas arbitrárias que podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, termina uma consulta de lotes ou atinge uma época de transmissão), é emitido um evento nomeado que contém as métricas dos dados processados desde o último ponto de conclusão.

O senhor pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • modo lotes : Use QueryExecutionListener.

    QueryExecutionListener é chamado quando a consulta é concluída. Acesse as métricas usando o mapa QueryExecution.observedMetrics.

  • transmissão ou microbatch : Use StreamingQueryListener.

    StreamingQueryListener é chamado quando a consulta de transmissão completa uma época. Acesse as métricas usando o mapa StreamingQueryProgress.observedMetrics. Databricks não é compatível com o modo de disparo continuous para transmissão.

Por exemplo:

Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})

Mapa Unity Catalog, Delta Lake, e identificadores de tabela de transmissão estruturada métricas

As transmissões estruturadas métricas usam o campo reservoirId em vários lugares para a identidade exclusiva de uma tabela Delta usada como fonte para uma consulta de transmissão.

O campo reservoirId mapeia o identificador exclusivo armazenado pela tabela Delta no log de transações Delta. Esse ID não é mapeado para o valor tableId atribuído pelo Unity Catalog e exibido no Catalog Explorer.

Use a seguinte sintaxe para revisar o identificador de tabela de uma tabela Delta. Isso funciona para tabelas gerenciais Unity Catalog, tabelas externas Unity Catalog e todas as tabelas Hive metastore Delta :

SQL
DESCRIBE DETAIL <table-name>

O campo id exibido nos resultados é o identificador que mapeia para o reservoirId na transmissão métrica.

Métricas do objeto StreamingQueryListener

Campos

Descrição

id

Um ID de consulta exclusivo que persiste nas reinicializações.

runId

Um ID de consulta que é exclusivo para cada início/reinício. Consulte streamingQuery.runid().

name

O nome da consulta especificado pelo usuário. O nome é nulo se nenhum nome for especificado.

timestamp

O timestamp para a execução do microbatch.

batchId

Uma ID exclusiva para os lotes atuais de dados que estão sendo processados. No caso de novas tentativas após uma falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID do lote não é incrementado.

batchDuration

A duração do processamento de um lote de operações, em milissegundos.

numInputRows

O número agregado (em todas as fontes) de registros processados em um acionador.

inputRowsPerSecond

A taxa agregada (em todas as fontes) de dados recebidos.

processedRowsPerSecond

A taxa agregada (em todas as fontes) na qual o Spark está processando dados.

StreamingQueryListener também define os seguintes campos que contêm objetos que o senhor pode examinar para obter métricas do cliente e detalhes do progresso da origem:

Campos

Descrição

durationMs

Tipo: ju.Map[String, JLong]. Veja o objeto DurationMs.

eventTime

Tipo: ju.Map[String, String]. Consulte o objeto EventTime.

stateOperators

Tipo: Array[StateOperatorProgress]. Consulte o objeto StateOperators.

sources

Tipo: Array[SourceProgress]. Veja o objeto de fontes.

sink

Tipo: SinkProgress. Veja o objeto coletor.

observedMetrics

Tipo: ju.Map[String, Row]. Funções agregadas arbitrárias nomeadas que podem ser definidas em um DataFrame/query (como df.observe).

Objeto DurationMS

Tipo de objeto : ju.Map[String, JLong]

informações sobre o tempo necessário para concluir os vários estágios do processo de execução do microbatch.

Campos

Descrição

durationMs.addBatch

O tempo necessário para executar o microbatch. Isso exclui o tempo que o Spark leva para planejar o microbatch.

durationMs.getBatch

O tempo necessário para recuperar os metadados sobre os deslocamentos da fonte.

durationMs.latestOffset

A última compensação consumida pelo microlote. Esse objeto de progresso se refere ao tempo gasto para recuperar a última compensação das fontes.

durationMs.queryPlanning

O tempo gasto para gerar o plano de execução.

durationMs.triggerExecution

O tempo necessário para planejar e executar o microbatch.

durationMs.walCommit

O tempo necessário para commit os novos offsets disponíveis.

durationMs.commitBatch

O tempo necessário para commit os dados gravados no sink durante addBatch. Presente apenas para sinks que suportam commit.

durationMs.commitOffsets

O tempo necessário para commit os lotes para o commit log.

objeto EventTime

Tipo de objeto : ju.Map[String, String]

informações sobre o valor de tempo do evento visto nos dados que estão sendo processados no microbatch. Esses dados são usados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no Job de transmissão estruturada.

Campos

Descrição

eventTime.avg

O tempo médio do evento visto nesse gatilho.

eventTime.max

O tempo máximo do evento visto nesse gatilho.

eventTime.min

O tempo mínimo do evento visto nesse gatilho.

eventTime.watermark

O valor da marca d'água usada nesse gatilho.

objeto StateOperators

Tipo de objeto : Array[StateOperatorProgress] O objeto stateOperators contém informações sobre as operações com estado definidas no Job de transmissão estruturada e as agregações que são produzidas a partir delas.

Para obter mais detalhes sobre os operadores de estado de transmissão, consulte O que é transmissão com estado?

Campos

Descrição

stateOperators.operatorName

O nome do operador com estado ao qual as métricas se referem, como symmetricHashJoin, dedupe, ou stateStoreSave.

stateOperators.numRowsTotal

O número total de linhas no estado como resultado de um operador ou agregação com estado.

stateOperators.numRowsUpdated

O número total de linhas atualizadas no estado como resultado de um operador ou agregação com estado.

stateOperators.allUpdatesTimeMs

Atualmente, essa métrica não é mensurável pelo site Spark e está planejada para ser removida em atualizações futuras.

stateOperators.numRowsRemoved

O número total de linhas removidas do estado como resultado de um operador ou agregação com estado.

stateOperators.allRemovalsTimeMs

Atualmente, essa métrica não é mensurável pelo site Spark e está planejada para ser removida em atualizações futuras.

stateOperators.commitTimeMs

O tempo necessário para commit todas as atualizações (colocações e remoções) e retornar uma nova versão.

stateOperators.memoryUsedBytes

Memória utilizada pelo armazenamento do estado.

stateOperators.numRowsDroppedByWatermark

O número de linhas consideradas tardias demais para serem incluídas em uma agregação com estado. Somente agregações de transmissão : O número de linhas descartadas após a agregação (não as linhas de entrada brutas). Esse número não é preciso, mas fornece uma indicação de que dados atrasados estão sendo descartados.

stateOperators.numShufflePartitions

O número de partições aleatórias para esse operador com estado.

stateOperators.numStateStoreInstances

A instância real de armazenamento do estado que o operador inicializou e manteve. Para muitos operadores com estado, isso é o mesmo que o número de partições. Entretanto, a junção transmissão-transmissão inicializa quatro instâncias de armazenamento do estado por partição.

stateOperators.customMetrics

Consulte StateOperators.customMetrics neste tópico para obter mais detalhes.

Objeto StateOperatorProgress.CustomMetrics

Tipo de objeto : ju.Map[String, JLong]

StateOperatorProgress tem um campo, customMetrics, que contém as métricas específicas do recurso que o senhor está usando ao reunir essas métricas.

Recurso

Descrição

RocksDB armazenamento do estado

métricas para RocksDB armazenamento do estado.

HDFS armazenamento do estado

métricas para HDFS armazenamento do estado.

transmissão deduplicação

métricas para deduplicação de linhas.

transmissão agregação

métricas para agregação de linhas.

transmissão join operador

métricas para transmissão join operador.

transformWithState

métricas para a operadora transformWithState.

RocksDB armazenamento do estado custom métricas

informações coletadas em RocksDB capturando métricas sobre seu desempenho e operações com relação aos valores de estado que mantém para o Job de transmissão estruturada. Para obter mais informações, consulte Configure RocksDB armazenamento do estado em Databricks.

Campos

Descrição

customMetrics.rocksdbBytesCopied

O número de bytes copiados, conforme monitorado pelo Gerenciador de arquivos do RocksDB.

customMetrics.rocksdbCommitCheckpointLatency

O tempo em milissegundos para tirar um Snapshot do site nativo RocksDB e gravá-lo em um diretório local.

customMetrics.rocksdbCompactLatency

O tempo em milissegundos de compactação (opcional) durante o ponto de verificação commit.

customMetrics.rocksdbCommitCompactLatency

O tempo de compactação durante o commit, em milissegundos.

customMetrics.rocksdbCommitFileSyncLatencyMs

O tempo, em milissegundos, de sincronização do Snapshot nativo do RocksDB com o armazenamento externo (o local do ponto de verificação).

customMetrics.rocksdbCommitFlushLatency

O tempo, em milissegundos, de descarga das alterações do RocksDB na memória para o disco local.

customMetrics.rocksdbCommitPauseLatency

O tempo, em milissegundos, que interrompe os threads em segundo plano worker como parte do ponto de verificação commit, como, por exemplo, para compactação.

customMetrics.rocksdbCommitWriteBatchLatency

O tempo em milissegundos para aplicar as gravações em etapas na estrutura na memória (WriteBatch) ao RocksDB nativo.

customMetrics.rocksdbFilesCopied

O número de arquivos copiados, conforme monitorado pelo RocksDB File Manager.

customMetrics.rocksdbFilesReused

O número de arquivos reutilizados, conforme monitorado pelo Gerenciador de arquivos do RocksDB.

customMetrics.rocksdbGetCount

O número de chamadas para get (não inclui gets de WriteBatch - lotes na memória usados para armazenar gravações).

customMetrics.rocksdbGetLatency

O tempo médio em nanossegundos para a chamada RocksDB::Get nativa subjacente.

customMetrics.rocksdbReadBlockCacheHitCount

A contagem de acessos ao cache do cache de blocos no RocksDB.

customMetrics.rocksdbReadBlockCacheMissCount

A contagem dos erros de cache de bloco no RocksDB.

customMetrics.rocksdbSstFileSize

O tamanho de todos os arquivos Static Sorted Table (SST) na instância do RocksDB.

customMetrics.rocksdbTotalBytesRead

O número de bytes não compactados lidos pelas operações do site get.

customMetrics.rocksdbTotalBytesWritten

O número total de bytes não compactados gravados pelas operações do site put.

customMetrics.rocksdbTotalBytesReadThroughIterator

O número total de bytes de dados não compactados lidos usando um iterador. Algumas operações com estado (por exemplo, processamento de tempo limite em FlatMapGroupsWithState e marca d'água) exigem a leitura de dados no Databricks por meio de um iterador.

customMetrics.rocksdbTotalBytesReadByCompaction

O número de bytes que o processo de compactação lê do disco.

customMetrics.rocksdbTotalBytesWrittenByCompaction

O número total de bytes que o processo de compactação grava no disco.

customMetrics.rocksdbTotalCompactionLatencyMs

O tempo em milissegundos para as compactações do RocksDB, incluindo as compactações em segundo plano e a compactação opcional iniciada durante o commit.

customMetrics.rocksdbTotalFlushLatencyMs

O tempo total de descarga, incluindo a descarga de fundo. As operações de descarga são processos pelos quais o site MemTable é descarregado no armazenamento quando está cheio. MemTables são o primeiro nível em que os dados são armazenados no RocksDB.

customMetrics.rocksdbZipFileBytesUncompressed

O tamanho em bytes dos arquivos zip não compactados, conforme relatado pelo Gerenciador de arquivos. O File Manager gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico.

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

A versão mais recente do RocksDB Snapshot salva no local do ponto de verificação. Um valor de "-1" indica que nenhum Snapshot foi salvo. Como o Snapshot é específico para cada instância de armazenamento do estado, essa métrica se aplica a um determinado ID de partição e nome de armazenamento do estado.

customMetrics.rocksdbPutLatency

A latência total da chamada put.

customMetrics.rocksdbPutCount

O número de chamadas de venda.

customMetrics.rocksdbWriterStallLatencyMs

O gravador espera um tempo até que a compactação ou a descarga terminem.

customMetrics.rocksdbTotalBytesWrittenByFlush

O total de bytes escritos por flush

customMetrics.rocksdbPinnedBlocksMemoryUsage

O uso de memória para blocos de pinos

customMetrics.rocksdbNumInternalColFamiliesKeys

O número de chaves internas para famílias de colunas internas

customMetrics.rocksdbNumExternalColumnFamilies

O número de famílias de colunas externas

customMetrics.rocksdbNumInternalColumnFamilies

O número de famílias de colunas internas

HDFS armazenamento do estado custom métricas

informações coletadas sobre HDFS armazenamento do estado comportamentos e operações do provedor.

Campos

Descrição

customMetrics.stateOnCurrentVersionSizeBytes

O tamanho estimado do estado somente na versão atual.

customMetrics.loadedMapCacheHitCount

A contagem de acessos de cache nos estados armazenados em cache no provedor.

customMetrics.loadedMapCacheMissCount

A contagem de falhas de cache nos estados armazenados em cache no provedor.

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

A última versão de upload do Snapshot para uma instância específica de armazenamento do estado.

Deduplicação de métricas personalizadas

informações coletadas sobre comportamentos e operações de deduplicação.

Campos

Descrição

customMetrics.numDroppedDuplicateRows

O número de linhas duplicadas diminuiu.

customMetrics.numRowsReadDuringEviction

O número de linhas estaduais lidas durante o despejo estadual.

Agregação de métricas personalizadas

informações coletadas sobre comportamentos e operações de agregação.

Campos

Descrição

customMetrics.numRowsReadDuringEviction

O número de linhas estaduais lidas durante o despejo estadual.

transmissão join métricas personalizadas

informações coletadas sobre a transmissão join comportamentos e operações.

Campos

Descrição

customMetrics.skippedNullValueCount

O número de valores null ignorados, quando spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled está definido como true.

TransformWithState métricas personalizadas

informações coletadas sobre comportamentos e operações do transformWithState (TWS). Para obter mais detalhes sobre transformWithState, consulte Criar um aplicativo personalizado com estado.

Campos

Descrição

customMetrics.initialStateProcessingTimeMs

Número de milissegundos necessários para processar todo o estado inicial.

customMetrics.numValueStateVars

Número de variáveis de estado de valor. Também presente em transformWithStateInPandas.

customMetrics.numListStateVars

Número de variáveis de estado da lista. Também presente em transformWithStateInPandas.

customMetrics.numMapStateVars

Número de variáveis de estado do mapa. Também presente em transformWithStateInPandas.

customMetrics.numDeletedStateVars

Número de variáveis de estado excluídas. Também presente em transformWithStateInPandas.

customMetrics.timerProcessingTimeMs

Número de milissegundos necessários para processar todos os temporizadores

customMetrics.numRegisteredTimers

Número de temporizadores registrados. Também presente em transformWithStateInPandas.

customMetrics.numDeletedTimers

Número de temporizadores excluídos. Também presente em transformWithStateInPandas.

customMetrics.numExpiredTimers

Número de temporizadores expirados. Também presente em transformWithStateInPandas.

customMetrics.numValueStateWithTTLVars

Número de variáveis de estado de valor com TTL. Também presente em transformWithStateInPandas.

customMetrics.numListStateWithTTLVars

Número de variáveis de estado da lista com TTL. Também presente em transformWithStateInPandas.

customMetrics.numMapStateWithTTLVars

Número de variáveis de estado do mapa com TTL. Também presente em transformWithStateInPandas.

customMetrics.numValuesRemovedDueToTTLExpiry

Número de valores removidos devido à expiração do TTL. Também presente em transformWithStateInPandas.

customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry

Número de valores removidos incrementalmente devido à expiração do TTL.

objeto de fontes

Tipo de objeto : Array[SourceProgress]

O objeto sources contém informações e métricas para transmissão da fonte de dados.

Campos

Descrição

description

Uma descrição detalhada da tabela de transmissão fonte de dados.

startOffset

O número de deslocamento inicial na tabela de fontes de dados em que o trabalho de transmissão começa.

endOffset

A última compensação processada pelo microlote.

latestOffset

A última compensação processada pelo microlote.

numInputRows

O número de linhas de entrada processadas a partir dessa fonte.

inputRowsPerSecond

A taxa, em segundos, na qual os dados estão chegando para processamento dessa fonte.

processedRowsPerSecond

A taxa na qual o Spark está processando dados dessa fonte.

metrics

Tipo: ju.Map[String, String]. Contém métricas personalizadas para uma fonte de dados específica.

A Databricks fornece a seguinte implementação de objeto de fontes:

nota

Para campos definidos no formato sources.<startOffset / endOffset / latestOffset>.* (ou alguma variação), interprete-o como um dos (até esses) 3 campos possíveis, todos contendo o campo filho indicado:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake objeto de fontes

Definições para métricas personalizadas usadas para Delta tabela de transmissão fonte de dados.

Campos

Descrição

sources.description

A descrição da fonte da qual a consulta de transmissão está sendo lida. Por exemplo: “DeltaSource[table]”.

sources.<startOffset / endOffset>.sourceVersion

A versão da serialização com a qual esse deslocamento é codificado.

sources.<startOffset / endOffset>.reservoirId

O ID da tabela que está sendo lida. Isso é usado para detectar configurações incorretas ao reiniciar uma consulta. Consulte os identificadores de tabela Map Unity Catalog, Delta Lake e transmissão estruturada métrica.

sources.<startOffset / endOffset>.reservoirVersion

A versão da tabela que está sendo processada no momento.

sources.<startOffset / endOffset>.index

O índice na sequência de AddFiles nesta versão. Isso é usado para dividir um commit grande em vários lotes. Esse índice é criado classificando em modificationTimestamp e path.

sources.<startOffset / endOffset>.isStartingVersion

Identifica se o deslocamento atual marca o início de uma nova consulta de transmissão em vez do processamento de alterações que ocorreram depois que os dados iniciais foram processados. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados primeiro e, em seguida, todos os novos dados que chegarem.

sources.<startOffset / endOffset / latestOffset>.eventTimeMillis

Hora do evento registrada para ordenar a hora do evento. A hora do evento dos dados iniciais do Snapshot que estão pendentes para serem processados. Usado ao processar um Snapshot inicial com ordem de tempo de evento.

sources.latestOffset

A última compensação processada pela consulta de microbatch.

sources.numInputRows

O número de linhas de entrada processadas a partir dessa fonte.

sources.inputRowsPerSecond

A taxa na qual os dados estão chegando para processamento dessa fonte.

sources.processedRowsPerSecond

A taxa na qual o Spark está processando dados dessa fonte.

sources.metrics.numBytesOutstanding

O tamanho combinado dos arquivos pendentes (arquivos monitorados pelo RocksDB). Esse é o backlog métricas para Delta e Auto Loader como fonte de transmissão.

sources.metrics.numFilesOutstanding

O número de arquivos pendentes a serem processados. Esse é o backlog métricas para Delta e Auto Loader como fonte de transmissão.

Objeto de fontes do Apache Kafka

Definições para métricas personalizadas usadas para Apache Kafka transmissão fonte de dados.

Campos

Descrição

sources.description

Uma descrição detalhada da fonte do Kafka, especificando o tópico exato do Kafka que está sendo lido. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset

O número de deslocamento inicial dentro do tópico Kafka no qual o trabalho de transmissão começa.

sources.endOffset

A última compensação processada pelo microlote. Isso pode ser igual a latestOffset para uma execução contínua de microbatch.

sources.latestOffset

A última compensação calculada pelo microlote. O processo de microbatching pode não processar todas as compensações quando há limitação, o que resulta na diferença de endOffset e latestOffset.

sources.numInputRows

O número de linhas de entrada processadas a partir dessa fonte.

sources.inputRowsPerSecond

A taxa na qual os dados estão chegando para processamento dessa fonte.

sources.processedRowsPerSecond

A taxa na qual o Spark está processando dados dessa fonte.

sources.metrics.avgOffsetsBehindLatest

O número médio de deslocamentos em que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.estimatedTotalBytesBehindLatest

O número estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos.

sources.metrics.maxOffsetsBehindLatest

O número máximo de deslocamentos que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.minOffsetsBehindLatest

O número mínimo de deslocamentos em que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

Objeto de fontes do AWS Kinesis

Definições para métricas personalizadas usadas para AWS Kinesis transmissão fonte de dados.

Campos

Descrição

sources.description

A descrição da fonte Kinesis, especificando a transmissão Kinesis exata da qual a consulta de transmissão está lendo. Por exemplo: “KinesisV2[stream]”.

sources.metrics.avgMsBehindLatest

O número médio de milissegundos em que um consumidor está atrasado em relação ao início de uma transmissão.

sources.metrics.maxMsBehindLatest

O número máximo de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão.

sources.metrics.minMsBehindLatest

O número mínimo de milissegundos em que um consumidor está atrasado em relação ao início de uma transmissão.

sources.metrics.totalPrefetchedBytes

O número de bytes restantes para processar. Esse é o backlog métricas para Kinesis como uma fonte.

sources.<startOffset / endOffset / latestOffset>(index).shard.stream

Nome do site Kinesis transmissão.

sources.<startOffset / endOffset / latestOffset>(index).shard.shardId

ID do fragmento de transmissão Kinesis.

sources.<startOffset / endOffset / latestOffset>(index).firstSeqNum

O primeiro número de sequência dos registros em um fragmento Kinesis que foram consumidos em um determinado lote.

sources.<startOffset / endOffset / latestOffset>(index).lastSeqNum

O último número de sequência dos registros consumidos de um fragmento Kinesis em um determinado lote.

sources.<startOffset / endOffset / latestOffset>(index).closed

Se o fragmento Kinesis tiver sido fechado pela transmissão Kinesis.

sources.<startOffset / endOffset / latestOffset>(index).msBehindLatest

O tempo aproximado em que a consulta de transmissão está atrasada em relação aos dados mais recentes na transmissão Kinesis.

sources.<startOffset / endOffset / latestOffset>(index).lastRecordSeqNum

O número de sequência do último registro consumido e é usado para verificação de perda de dados. Observe que lastRecordSeqNum pode ser diferente de endSeqNum para leituras EFO.

sources.metrics.mode

O modo de consumidor usado para executar a consulta de transmissão. Pode ser Polling ou EFO. modo.

sources.metrics.numStreams

O número de Kinesis transmissões processadas nesse microbatch.

sources.metrics.numTotalShards

O número total de fragmentos processados nesse microlote.

sources.metrics.numClosedShards

O número de fragmentos fechados processados nesse microlote.

sources.metrics.numProcessedBytes

O número de bytes processados nesse microlote.

sources.metrics.numProcessedRecords

O número de registros processados nesse microlote.

sources.metrics.numRegisteredConsumers

O número de consumidores registrados usados no modo EFO.

Para obter mais informações, consulte Quais métricas o site Kinesis reporta?

Auto Loader fontes métricas

Definições para métricas personalizadas usadas para Auto Loader transmissão fonte de dados.

Campos

Descrição

sources.<startOffset / endOffset / latestOffset>.seqNum

A posição atual na sequência de arquivos que estão sendo processados na ordem em que os arquivos foram descobertos.

sources.<startOffset / endOffset / latestOffset>.sourceVersion

A versão de implementação da fonte CloudFiles.

sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs

O tempo de início das operações de backfill mais recentes.

sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs

O horário de término das operações de backfill mais recentes.

sources.<startOffset / endOffset / latestOffset>.lastInputPath

O último caminho de entrada da transmissão fornecido pelo usuário antes de a transmissão ser reiniciada.

sources.metrics.numFilesOutstanding

O número de arquivos na lista de pendências

sources.metrics.numBytesOutstanding

O tamanho (bytes) dos arquivos na lista de pendências

sources.metrics.approximateQueueSize

O tamanho aproximado da fila de mensagens. Somente quando a opção cloudfiles.useNotifications estiver habilitada.

PubSub fontes métricas

Definições para métricas personalizadas usadas para a fonte de dados de transmissão do PubSub. Para obter mais detalhes sobre o monitoramento de fontes de transmissão PubSub, consulte monitoramento de transmissão métricas.

Campos

Descrição

sources.<startOffset / endOffset / latestOffset>.sourceVersion

A versão de implementação com a qual esse deslocamento está codificado.

sources.<startOffset / endOffset / latestOffset>.seqNum

O número de sequência persistente que está sendo processado.

sources.<startOffset / endOffset / latestOffset>.fetchEpoch

A maior época de busca que está sendo processada.

sources.metrics.numRecordsReadyToProcess

O número de registros disponíveis para processamento no backlog atual.

sources.metrics.sizeOfRecordsReadyToProcess

O tamanho total, em bytes, dos dados não processados na lista de pendências atual.

sources.metrics.numDuplicatesSinceStreamStart

A contagem total de registros duplicados processados pela transmissão desde o seu início.

Fontes de pulsar métricas

Definições para métricas personalizadas usadas para Pulsar transmissão fonte de dados.

Campos

Descrição

sources.metrics.numInputRows

O número de linhas processadas nas microlotes atuais.

sources.metrics.numInputBytes

O número total de bytes processados nas micro-lotes atuais.

objeto de sumidouro

Tipo de objeto : SinkProgress

Campos

Descrição

sink.description

A descrição do coletor, detalhando a implementação específica do coletor que está sendo usada.

sink.numOutputRows

O número de linhas de saída. Diferentes tipos de coletor podem ter comportamentos ou restrições diferentes para os valores. Veja os tipos específicos suportados

sink.metrics

ju.Map[String, String] de métricas de sink.

Atualmente, a Databricks oferece duas implementações específicas do objeto sink:

Tipo de pia

Detalhes

Mesa Delta

Consulte Objeto Delta sink.

Tópico do Apache Kafka

Consulte o objeto sink do Kafka.

O campo sink.metrics se comporta da mesma forma para as duas variantes do objeto sink.

Delta Lake objeto de pia

Campos

Descrição

sink.description

A descrição do Delta sink, detalhando a implementação específica do Delta sink que está sendo usada. Por exemplo: “DeltaSink[table]”.

sink.numOutputRows

O número de linhas é sempre -1 porque o Spark não pode inferir linhas de saída para sinks DSv1, que é a classificação do sink Delta Lake.

Objeto de sink do Apache Kafka

Campos

Descrição

sink.description

A descrição do sink Kafka para o qual a consulta de transmissão está sendo gravada, detalhando a implementação específica do sink Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

O número de linhas que foram gravadas na tabela ou coletor de saída como parte do microbatch. Em algumas situações, esse valor pode ser “-1” e geralmente pode ser interpretado como “desconhecido”.

Exemplos

Exemplo de evento StreamingQueryListener de Kafka para Kafka

Python
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}

Exemplo de evento StreamingQueryListener de Delta Lake para Delta Lake

Python
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/[email protected]/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}

Exemplo de evento Kinesis-to-Delta Lake StreamingQueryListener

Python
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}

Exemplo de evento StreamingQueryListener do Kafka+Delta Lake para o Delta Lake

Python
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}

Exemplo de fonte de taxa para Delta Lake evento StreamingQueryListener

Python
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}