Implementing logrotate for kafka

Hi,

Yes, we will need to implement also logrotate if we want to keep kafka under control. My solution was with puppet, as you probably expected. After i took a look on the documentation related to log4j properties i this i had a configuration figured out that should look like the following erb template

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.MaxFileSize=<%= @filesize %>
log4j.appender.kafkaAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.MaxFileSize=<%= @filesize %>
log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.MaxFileSize=<%= @filesize%>
log4j.appender.requestAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.MaxFileSize=<%= @filesize %>
log4j.appender.cleanerAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controlAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.MaxFileSize=<%= @filesize %>
log4j.appender.controllerAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
#log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.MaxFileSize=<%= @filesize %>
log4j.appender.authorizerAppender.MaxBackupIndex=<%= @backupindex %>
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender

log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

#Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

It just adds the two most important values required for a working logrotate system MaxFileSize and MaxBackupIndex. The first tells us what is the max size for a file and the second one the amount of files of each type to be kept.
In order to use it you need to put it in a class that in my view is constructed as follows:

class profiles::kafkalogrotate {
  
    $filesize = hiera('kafkalogrotate::size','50MB')
    $backupindex = hiera('kafkalogrotate::backupindex',10)
    validate_string($filesize)
    validate_integer($backupindex)
	file {"adding_log4j":
	path => "/opt/kafka/config/log4j.properties",
	content => template("${module_name}/log4j.properties.erb"),
	replace => true,
	owner => 'kafka',
  group => 'kafka',
  mode => '0644',
	}
}

I put the path as we generally install kafka in /opt/kafka but that can be changed. In our case restart of kafka brokers is not needed immediately so i just called the class in my kafka manifests.

That is all, if i think of something additional to what i wrote, i will post it.

Cheers!