in

Registrar temas compactados en Apache Kafka

1wKnOLt56gvutDFzfyjNzJg

Registrar temas compactos en Apache Kafka

Seyed Morteza Mousavi

12 de abril de 2019·8 min de lectura

Cuando comencé a leer la documentación de Kafka, aunque el tema de logaritmo compacto parecía un concepto simple, no tenía claro cómo internamente Kafka mantiene los estados de ellos en el sistema de archivos. Este mes tuve algo de tiempo para leer más sobre esta función y quiero compartir mi entendimiento con ustedes.

TL; DR

En este artículo, describiré los temas de registro compactado en Kafka. A continuación, le mostraré cómo Kafka mantiene internamente los estados de estos temas en el sistema de archivos.

Prerrequisitos

Asumo que tu ya están familiarizados con los conceptos básicos de Apache Kafka como intermediario, tema, partición, consumidor y productor. Además, si desea ejecutar los comandos de muestra, debe ejecutar un corredor de Kafka y un servidor Zookeeper.

¿Qué es un registro de temas compactos?

La documentación de Kafka dice:

La compactación de troncos es un mecanismo para proporcionar una retención por registro de grano más fino, en lugar de la retención basada en el tiempo de grano más grueso. La idea es eliminar de forma selectiva los registros en los que tenemos una actualización más reciente con la misma clave principal. De esta forma, se garantiza que el registro tendrá al menos el último estado de cada clave.

Para simplificar esta descripción, Kafka elimina los registros antiguos cuando hay una versión más nueva con la misma clave en el registro de partición. Como ejemplo, considere la siguiente partición de un tema de registro compactado llamado último precio del producto:

1*wKnOLt56gvutDFzfyjNzJg

Como ves al principio hay dos registros con clave p3. Pero debido a que es un tema de registro compactado, Kafka elimina el registro anterior en un hilo de fondo (más sobre esto en las próximas secciones). Ahora suponga que tenemos un productor que envía nuevos registros a esta partición. El productor produce 3 discos con las claves p6, p5, p5 respectivamente:

Una vez más, un hilo de fondo dentro del corredor de Kafka elimina los registros más antiguos con las claves p5 y p6. Tenga en cuenta que el tronco compactado se compone de dos partes: una cola y una cabeza. Kafka se asegura de que todos los registros dentro de la parte de la cola tengan una clave única porque la sección de la cola se escanea en el ciclo anterior del proceso de limpieza. Pero la sección de cabecera puede tener valores duplicados.

Ahora que aprendimos lo que es el tema de logaritmo compactado, es hora de crearlos usando kafka-topics herramienta.

Crear un tema de registro compactado

Cree un tema compactado (describiré todas las configuraciones en detalle):

kafka-topics --create --zookeeper zookeeper:2181 --topic latest-product-price --replication-factor 1 --partitions 1 --config "cleanup.policy=compact" --config "delete.retention.ms=100"  --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"

Producir algunos registros:

kafka-console-producer --broker-list localhost:9092 --topic latest-product-price --property parse.key=true --property key.separator=:
>p3:10$
>p5:7$
>p3:11$
>p6:25$
>p6:12$
>p5:14$
>p5:17$

Tenga en cuenta que en el comando anterior separé la clave y el valor por :. Ahora consuma el tema:

kafka-console-consumer --bootstrap-server localhost:9092 --topic latest-product-price --property  print.key=true --property key.separator=: --from-beginning
p3:11$
p6:12$
p5:14$
p5:17$

Como ve, se eliminan los registros con claves duplicadas. El registro p5: 14 $ no se elimina, por lo que veremos el motivo cuando describa el proceso de limpieza. Pero primero debemos observar cómo Kafka almacena internamente los mensajes.

Segmentos

El registro de partición es una abstracción que nos permite consumir fácilmente mensajes ordenados dentro de la partición, sin preocuparnos por el almacenamiento interno de Kafka. En realidad, sin embargo, el corredor de Kafka divide el registro de partición en segmentos. Los segmentos son archivos almacenados en el sistema de archivos (dentro del directorio de datos y en el directorio de la partición), cuyo nombre termina con .log. En la imagen de abajo, un registro de partición se divide en 3 segmentos:

1*tXOSydkHJTEWw lOQaupQA

Como puede ver, tenemos un registro de partición que contiene 7 registros que residen en 3 archivos de segmento separados. El primer desplazamiento de un segmento se llama desplazamiento base del segmento. El nombre del archivo de segmento siempre es igual a su valor de desplazamiento base.

El último segmento de la partición se llama segmento activo. Solo el segmento activo de un registro puede recibir los mensajes recién producidos. Veremos cómo se comporta Kafka con el segmento activo en el proceso de limpieza de un tronco compactado.

Volviendo a nuestro ejemplo, podemos ver archivos de segmento de nuestra partición de tema con el siguiente comando (asumiendo que su directorio de datos de Kafka es /var/lib/kafka/data):

ls /var/lib/kafka/data/latest-product-price-0/
00000000000000000000.index 00000000000000000006.log
00000000000000000000.log 00000000000000000006.snapshot
00000000000000000000.timeindex 00000000000000000006.timeindex
00000000000000000005.snapshot leader-epoch-checkpoint
00000000000000000006.index

00000000000000000000.log y 00000000000000000006.log son segmentos de esta partición y 00000000000000000006.log es el segmento activo.

¿Cuándo crea Kafka un nuevo segmento? Una opción es configurar segment.bytes (el valor predeterminado es 1GB) config durante la creación del tema. Cuando el tamaño de su segmento supere este valor, Kafka creará un nuevo segmento. Otra opción es configurar segment.ms como viste antes. Con esta opción, cuando Kafka recibe una solicitud de producción, verificará que el segmento activo sea anterior a segment.ms valor. Si es más antiguo, creará un nuevo segmento. En nuestro comando, establecemos segment.ms=100 para asegurarse de que cada 100 milisegundos se cree un nuevo segmento.

Un punto interesante es que cuando estableces segment.ms=100 probablemente tendrá segmentos más pequeños. Después del proceso de limpieza (consulte la siguiente sección), el corredor de Kafka fusionará los segmentos no activos y creará un segmento grande a partir de ellos.

Para obtener más información sobre los segmentos y el almacenamiento interno de Kafka, puede leer los artículos Cómo funcionan los componentes internos del almacenamiento de Kafka y Introducción práctica a los componentes internos del almacenamiento de Kafka.

Proceso de limpieza

Durante el inicio, el corredor de Kafka crea una serie de hilos más limpios, responsable de limpiar los registros compactados (el número de estos subprocesos se puede configurar a través de log.cleaner.threads config). El hilo más limpio, constantemente intentará encontrar el más sucio inicie sesión en el corredor y luego intente limpiarlo. Para cada registro, calcula el proporción de suciedad como a continuación:

dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)

El hilo más limpio elige el tronco con el mayor proporción de suciedad. Este registro se llama el registro más sucio y si su valor es mayor que min.cleanable.dirty.ratio config, se limpiará. De lo contrario, el hilo más limpio se bloqueará durante varios milisegundos (configurable con log.cleaner.backoff.ms).

Después de encontrar el registro más sucio, queremos encontrar la parte del registro que se puede limpiar. Tenga en cuenta que una parte del registro no se puede limpiar y no se analizará:

  • Todos los registros dentro del segmento activo. Es por eso que todavía vemos un récord de p5: 14 $ duplicado en nuestro consumidor.
  • Si pones min.compaction.lag.ms config mayor que 0, no se limpiará ningún segmento que tenga un registro con una marca de tiempo más reciente que esta configuración. Estos segmentos no serán escaneados para compactación.

Ahora sabemos qué registros vamos a compactar. Desde el primer registro del registro hasta el primer registro que no se puede limpiar. Para simplificar este artículo, asumimos que todos los registros del cabezal se pueden limpiar.

Tenga en cuenta que sabemos que cada registro en la sección final de un registro tiene una clave única porque los duplicados se eliminaron en la última limpieza. Solo es posible que tengamos algunos registros en la sección de cabecera cuyas claves no sean únicas en el registro. Para encontrar registros duplicados más rápidamente, Kafka crea un mapa de registros en la sección principal. Volviendo a nuestro ejemplo, la estructura del mapa de compensación es algo como esto:

1*mm0FKL BaasuIukWl 1FRQ

Como ve, Kafka crea una estructura llamada mapa de compensación que para cada tecla en la sección del cabezal tiene su correspondiente desplazamiento. Si tenemos duplicados en la cabeza, Kafka usa el desplazamiento más nuevo. En la imagen de arriba, el registro con la tecla p6 está en el desplazamiento 5 y el desplazamiento más nuevo de p5 es 7. Ahora, el hilo más limpio verifica cada registro en el registro y lo elimina si hay algún registro con la misma clave dentro del mapa de desplazamiento y su desplazamiento es diferente de la entrada en el mapa (no queremos eliminar los registros más recientes).

Durante el proceso de limpieza de un registro compactado, no solo se eliminarán los mensajes duplicados, sino que Kafka también elimina los registros que tienen un valor nulo. Estos registros se llaman lápida sepulcral. Puede retrasar la eliminación de ellos configurando delete.retention.ms config. Al establecer esta configuración, Kafka verifica la marca de tiempo de modificación del segmento que contiene este registro y si la hora de modificación es menor que el valor de configuración, el registro se conservará.

Ahora el tronco quedó limpio. Después de este proceso de limpieza, ¡tenemos una cola nueva y una cabeza nueva! El último desplazamiento que se escanea para su limpieza (en nuestro ejemplo, el último registro en la cabeza anterior) es el último desplazamiento de la cola nueva.

Kafka mantiene el desplazamiento de inicio del nuevo encabezado en un archivo llamado cleaner-offset-checkpoint en la raíz del directorio de datos. Este archivo se utiliza para el próximo ciclo de limpieza del registro. Podemos ver nuestro archivo de puntos de control de temas:

cat /var/lib/kafka/data/cleaner-offset-checkpoint
0
1
latest-product-price 0 6

Como ves, hay tres líneas. La primera línea es la versión del archivo (creo que por compatibilidad con versiones anteriores), la segunda línea tiene el valor 1 que muestra cuántas líneas seguirán esta línea (solo una línea), y la última línea contiene el nombre del tema de registro compactado, el número de partición y el desplazamiento del cabezal de esta partición.

Conclusión

En este artículo, le mostré qué es el tema de la compactación de registros, cómo se almacenan y cómo Kafka los limpia periódicamente. Al final, quiero señalar que la compactación de registros es excelente para los escenarios de almacenamiento en caché en los que solo desea mantener el valor más reciente para cada registro casi en tiempo real. Suponga que desea construir su caché en el inicio de su aplicación. Puede simplemente leer su tema compactado y construir su caché y debido a que Kafka lee los mensajes secuencialmente, es mucho más rápido que calentar su caché usando una base de datos SQL.

Puede leer más sobre esta técnica en el artículo de Martin Kleppmann Volviendo la base de datos de adentro hacia afuera. También puede encontrar útil mi artículo anterior Beat Cache Invalidation en ASP.NET Core Using Kafka y Debezium, que es una implementación de esta técnica.

Referencias

https://github.com/apache/kafka/
https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026
https://medium.com/@durgaswaroop/a-practical-introduction-to-kafka-storage-internals-d5b544f6925f
https://kafka.apache.org/documentation/

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

94G3CTNBJB57pq7eg7VhcN 1200 80

Los mejores juegos de PC 2021: los mejores juegos de PC en este momento

Jeffrey O. Henley | Biografia ejecutiva