Category: puppet

  • Puppet gems install workaround after TLS 1.0 switchoff

    Hi,

    It seems that since Ruby disabled the TLS 1.0 protocol, there is an issue with installing custom gems in the puppet server.

    If you run puppetserver gem environment you will probably see the following output:

    /opt/puppetlabs/bin/puppetserver gem environment
    RubyGems Environment:
      - RUBYGEMS VERSION: 2.4.8
      - RUBY VERSION: 1.9.3 (2015-06-10 patchlevel 551) [java]
      - INSTALLATION DIRECTORY: /opt/puppetlabs/server/data/puppetserver/jruby-gems
      - RUBY EXECUTABLE: java -jar /opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar
      - EXECUTABLE DIRECTORY: /opt/puppetlabs/server/data/puppetserver/jruby-gems/bin
      - SPEC CACHE DIRECTORY: /root/.gem/specs
      - SYSTEM CONFIGURATION DIRECTORY: file:/opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar!/META-INF/jruby.home/etc
      - RUBYGEMS PLATFORMS:
        - ruby
        - universal-java-1.7
      - GEM PATHS:
         - /opt/puppetlabs/server/data/puppetserver/jruby-gems
         - /root/.gem/jruby/1.9
         - file:/opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar!/META-INF/jruby.home/lib/ruby/gems/shared
      - GEM CONFIGURATION:
         - :update_sources => true
         - :verbose => true
         - :backtrace => false
         - :bulk_threshold => 1000
         - "install" => "--no-rdoc --no-ri --env-shebang"
         - "update" => "--no-rdoc --no-ri --env-shebang"
      - REMOTE SOURCES:
         - https://rubygems.org/
      - SHELL PATH:
         - /usr/local/sbin
         - /usr/local/bin
         - /usr/sbin
         - /usr/bin
         - /sbin
         - /bin
         - /usr/games
         - /usr/local/games
         - /opt/puppetlabs/bin
    

    Also if you want to install a gem you will receive:

    /opt/puppetlabs/bin/puppetserver gem install toml-rb
    ERROR:  Could not find a valid gem 'toml-rb' (>= 0), here is why:
              Unable to download data from https://rubygems.org/ - Received fatal alert: protocol_version (https://api.rubygems.org/specs.4.8.gz)
    

    Short but unsafe fix for this is:

    opt/puppetlabs/bin/puppetserver gem install --source "http://rubygems.org/" toml-rb
    Fetching: toml-rb-1.1.1.gem (100%)
    Successfully installed toml-rb-1.1.1
    WARNING:  Unable to pull data from 'https://rubygems.org/': Received fatal alert: protocol_version (https://api.rubygems.org/specs.4.8.gz)
    1 gem installed
    

    It’s not that elegant, but it does the trick. You can also include this in an puppet exec block.

    Cheers

  • Error 127, not related to Puppet or Golang

    Hi,

    Something from my experience playing with Golang and Puppet code this morning.
    I wrote a very very simple script to restart a service that you can find here
    Today i wanted to put it on the machine and run it with puppet, so i wrote a very small class that looked like this:

    class profiles_test::updatekafka {
    
    package { 'golang':
      ensure => installed,
      name   => 'golang',
    }
    file {"/root/servicerestart.go":
        source => 'puppet:///modules/profiles_test/servicerestart.go',
        mode => '0644',
        replace => true,
    	}
    exec { 'go build servicerestart.go':
        cwd => '/root',
        creates => '/root/servicerestart',
        path => ['/usr/bin', '/usr/sbin'],
    } ->
    exec { '/root/servicerestart':
      cwd     => '/root',
      path    => ['/usr/bin', '/usr/sbin','/root'],
      onlyif => 'ps -u kafka',
    }
    }
    

    On execution, surprise, it kept throw this feedback:

    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/File[/root/check_cluster_state.go]/ensure: defined content as '{md5}0817dbf82b74072e125f8b71ee914150'
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: panic: exit status 127
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: goroutine 1 [running]:
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: runtime.panic(0x4c2100, 0xc2100000b8)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/usr/lib/go/src/pkg/runtime/panic.c:266 +0xb6
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.check(0x7f45b08ed150, 0xc2100000b8)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:94 +0x4f
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.RunCommand(0x4ea7f0, 0x2c, 0x7f4500000000, 0x409928, 0x5d9f38)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:87 +0x112
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetBrokerList(0x7f45b08ecf00, 0xc21003fa00, 0xe)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:66 +0x3c
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetStatus(0xc21000a170)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:33 +0x1e
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.StopBroker()
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:39 +0x21
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.main()
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:15 +0x1e
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: exit status 2
    08:19:44 Error: go run servicerestart.go returned 1 instead of one of [0]

    The secret is in panic: exit status 127 and it seems that it’s not related to neither golang or puppet, but shell.
    In my script they way to get the broker list is by

    output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")

    and the error is related to not having the required binaries in you path. For example if you run

    whereis echo
    echo: /bin/echo /usr/share/man/man1/echo.1.gz

    So the right way to for the exec block is actually:

    exec { '/root/servicerestart':
      cwd     => '/root',
      path    => ['/usr/bin', '/usr/sbin','/root','/bin','/sbin'],
      onlyif => 'ps -u kafka',
    }

    And then it will work.

    Cheers!

  • Multiple classes block declaration in hiera will not work

    Morning,

    Do not add multiple classes in hiera like this:

    ---
    classes:
      - profiles::datadogagent
      - profiles::updatekafka
    
    kafka::security: true
    kafka::security_default: true
    kafka::heap_size: 2048
    classes:
     - profiles::pybackuplogs
     - profiles::group_coordinator
    

    Class updatekafka will not be executed.

    The structure should look like:

    ---
    classes:
      - profiles::datadogagent
      - profiles::updatekafka
      - profiles::pybackuplogs
      - profiles::group_coordinator
    kafka::security: true
    kafka::security_default: true
    kafka::heap_size: 2048
    

    Cheers!

  • Log rotate for Kafka Garbage collect without restart

    Morning,

    If you have a Apache Kafka version which is below 1.0.0 and you don’t have garbage collect rotate as shown here:

    with:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

    without:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

    One option is to modify the parameters in order to include them in the process that starts but this involves also restarting the services.

    However, you can use also logrotate daemon with the following configuration, and i will put the block that you need to add on hiera using:

    logrotate::rule:
     'kafka_gc':
       path: '/opt/kafka/logs/kafkaServer-gc.log'
       copytruncate: true
       rotate_every: 'day'
       compress: true
       missingok: true
       su: true
       su_owner: 'kafka'
       su_group: 'kafka'
       ifempty: false
       size: '50M'
       maxsize: '50M'
       rotate: 5
    

    Or if you want to write it in a class, it should look like

    $version = lookup('kafka::version')
            if ($_role =~ /\Akafka/) and ($version != '1.0.0') {
                logrotate::rule { 'kafkagc_logs':
                 path => '/opt/kafka/logs/kafkaServer-gc.log',
                 copytruncate => true,
                 rotate => 5,
                 rotate_every => 'daily',
                 missingok => true,
                 ifempty => false,
                 su => true,
                 su_owner => 'kafka',
                 su_group => 'kafka',
                 size => '50M',
                 maxsize => '50M',
            }                                              
    

    Cheers!

  • Observer functionality for puppet zookeeper module

    Morning,

    I know it’s been some time since i last posted but i didn’t had the time to play that much. Today i want to share with you the use case in which we needed to modify the module used for the deployment of zookeeper in order to include also observer role.

    The link that describes how this should be activated from version 3.3.0 is located here: https://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

    Taking this situation we are using for deployment module https://github.com/wikimedia/puppet-zookeeper

    It’s not a nice module, trust me, i know, but since we did not want to take the development process from beginning and impact the infrastructure that it’s already deployed we had to cope with this situation by changing what we had.

    Main idea in our case is that since the number of zookeeper members for the election process needs to be 2n+1 in order for the Quorum mechanism to work, deployment of even number of machines was pretty tricky, so to fix this, the extra zookeeper instances over requirements should be set as observers

    A zookeeper observer is a node that it’s not included in the election process and just receives the updates from the cluster.

    My vision is that the best approach for delivery is to activate it in Hiera with a zookeeper::observer parameter per host.

    We can start by including it in the defaults.pp file as follows:

     $observer	      = hiera('zookeeper::observer', false)

    The zoo.conf file deployed for the configuration is being written in the init.pp file so we need to add it also here as parameter

    $observer	   = $::zookeeper::defaults::observer

    Ok, now how do we share the status of each node in the required domain? We will need to use another module https://github.com/WhatsARanjit/puppet-share_data and include in our code something like:

     share_data { $::fqdn:
      	    data  => [ $::fqdn, $observer ],
      	    label => 'role',
        }
       $obsrole = share_data::retrieve('role')
    

    This guarantees us that all servers have and can use the observer flag in the erb template.

    Jumping to the last component of this config, we need to modify the template to have it with the added observer role.

    How do we do that? Basically by rewriting the server information in this format:

    <% if @hosts
     @hosts.sort_by { |name, id| id }.each do |host_id| -%>
    server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183<% @obsrole.each do |item| if (item[0] == host_id[0]) && item[1] -%>:observer<% end -%><% end -%> 
    <% end -%>
    <% end -%>
    

    Straight forward this compares the values from the two lists and if the flag is true, it adds the observer configuration.
    One last part needs to be added and that is

    <% if @observer == true -%>
    peerType=observer
    <% end -%>
    

    And you are done, if you add zookeeper::observer: true to your yaml file, puppet should rewrite the file and restart Zookeeper service.

    Cheers

  • Kafka limits implementation using puppet

    Morning,

    I keep my promise and provide you with the two simple blocks that are needed to implement limits that we discussed in article http://log-it.tech/2017/10/16/ubuntu-change-ulimit-kafka-not-ignore/

    For the limits module you can use:
    https://forge.puppet.com/puppetlabs/limits

    As for the actual puppet implementation, I took the decision not to restart the service immediately. This being said, it’s dead simple to do it:

    	 file_line {"add_pamd_record":
    	 path => '/etc/pam.d/common-session',
    	 line => 'session required pam_limits.so'
    	 }
    	 limits::fragment {
    	     "*/soft/nofile":
          		value => "100000";
        		"*/hard/nofile":
          		value => "100000";
       		 "kafka/soft/nofile":
          		value => "100000";
        		"kafka/hard/nofile":
          		value => "100000";
      }
    

    This is all you need.

    Cheers

  • Kafka implementation using puppet at IMWorld Bucharest 2017

    Hi,

    I recently had a presentation on how to deploy kafka using puppet and what do you need as a minimum in order to have success in production.
    Here is the presentation:

    Hope it is useful.

    Cheers!

    Update:

    There is also an official version from IMWorld which you can find here:

    And also the article on medium.com that describes it in more technical detail:

    https://medium.com/@sorin.tudor/messaging-kafka-implementation-using-puppet-5438a0ed275d

  • Eyaml hiera configuration for puppet, as promised

    Morning,

    We managed to configure also the hiera backend in order to have eyaml module active. It is related to the following past article http://log-it.tech/2017/05/29/install-eyaml-module-on-puppet-master/. So in the hiera.yaml you bassicaly need to add the following configuration before hierarchy:

    :backends:
      - eyaml
      - yaml
      - puppetdb
    

    and

    :eyaml:
        :datadir: /etc/puppetlabs/hieradata
        :pkcs7_private_key: /etc/puppetlabs/puppet/eyaml/private_key.pkcs7.pem
        :pkcs7_public_key:  /etc/puppetlabs/puppet/eyaml/public_key.pkcs7.pem 
        :extension: 'yaml
    

    at the botton. After this is done, the most essential part is that you created the required symlinks so that the backend is enabled.
    This should be done easily with a bash script like:

    #!/bin/bash
    ln -s /opt/puppetlabs/puppet/lib/ruby/gems/2.1.0/gems/hiera-eyaml-2.1.0/lib/hiera/backend/eyaml /opt/puppetlabs/puppet/lib/ruby/vendor_ruby/hiera/backend/eyaml
    ln -s /opt/puppetlabs/puppet/lib/ruby/gems/2.1.0/gems/hiera-eyaml-2.1.0/lib/hiera/backend/eyaml_backend.rb /opt/puppetlabs/puppet/lib/ruby/vendor_ruby/hiera/backend/eyaml_backend.rb
    ln -s /opt/puppetlabs/puppet/lib/ruby/gems/2.1.0/gems/hiera-eyaml-2.1.0/lib/hiera/backend/eyaml.rb /opt/puppetlabs/puppet/lib/ruby/vendor_ruby/hiera/backend/eyaml.rb
    ln -s /opt/puppetlabs/puppet/lib/ruby/gems/2.1.0/gems/highline-1.6.21/lib/highline /opt/puppetlabs/puppet/lib/ruby/vendor_ruby/highline/
    ln -s /opt/puppetlabs/puppet/lib/ruby/gems/2.1.0/gems/highline-1.6.21/lib/highline.rb /opt/puppetlabs/puppet/lib/ruby/vendor_ruby/highline.rb

    After this is done, it is advised for a puppetdb and puppetserver restart, and you can try testing it by putting a string in hiera and see if a notice prints the required output. Something like

    profiles::test::teststring: '[string generated with eyaml ecrypt -s 'test']'

    and then creating a small class like :

    
    class profiles::test{
    $teststring = hiera('profiles::test::teststring')
    notice {"${teststring}":}
    }

    That should be most of you need in order to do this. Hope it works! 🙂

    Cheers!

  • Implementing logrotate for kafka

    Hi,

    Yes, we will need to implement also logrotate if we want to keep kafka under control. My solution was with puppet, as you probably expected. After i took a look on the documentation related to log4j properties i this i had a configuration figured out that should look like the following erb template

    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, stdout
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
    log4j.appender.kafkaAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.kafkaAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
    log4j.appender.stateChangeAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
    log4j.appender.requestAppender.MaxFileSize=<%= @filesize%>
    log4j.appender.requestAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
    log4j.appender.cleanerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.cleanerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.controlAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
    log4j.appender.controllerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.controllerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
    log4j.appender.authorizerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.authorizerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    # Turn on all our debugging info
    #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
    #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
    #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
    log4j.logger.kafka=INFO, kafkaAppender
    
    log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
    log4j.additivity.kafka.network.RequestChannel$=false
    
    #log4j.logger.kafka.network.Processor=TRACE, requestAppender
    #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
    #log4j.additivity.kafka.server.KafkaApis=false
    log4j.logger.kafka.request.logger=WARN, requestAppender
    log4j.additivity.kafka.request.logger=false
    
    log4j.logger.kafka.controller=TRACE, controllerAppender
    log4j.additivity.kafka.controller=false
    
    log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
    log4j.additivity.kafka.log.LogCleaner=false
    
    log4j.logger.state.change.logger=TRACE, stateChangeAppender
    log4j.additivity.state.change.logger=false
    
    #Change this to debug to get the actual audit log for authorizer.
    log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
    log4j.additivity.kafka.authorizer.logger=false
    
    

    It just adds the two most important values required for a working logrotate system MaxFileSize and MaxBackupIndex. The first tells us what is the max size for a file and the second one the amount of files of each type to be kept.
    In order to use it you need to put it in a class that in my view is constructed as follows:

    class profiles::kafkalogrotate {
      
        $filesize = hiera('kafkalogrotate::size','50MB')
        $backupindex = hiera('kafkalogrotate::backupindex',10)
        validate_string($filesize)
        validate_integer($backupindex)
    	file {"adding_log4j":
    	path => "/opt/kafka/config/log4j.properties",
    	content => template("${module_name}/log4j.properties.erb"),
    	replace => true,
    	owner => 'kafka',
      group => 'kafka',
      mode => '0644',
    	}
    }
    

    I put the path as we generally install kafka in /opt/kafka but that can be changed. In our case restart of kafka brokers is not needed immediately so i just called the class in my kafka manifests.

    That is all, if i think of something additional to what i wrote, i will post it.

    Cheers!

  • Securing kafka-manager endpoints with iptables rules behind traefik

    Hi,

    One extra addition to my traefik balancing article from http://log-it.tech/2017/08/19/puppet-implementation-traefik-load-balancer-kafka-manager/ is that even so now we have the balancing capability we still need to restrict access to unsecured endpoint. I thought all the code to be deployable on all of the nodes. If this is taken in consideration, our issue with the firewall rules should be easily solved by using the puppetlabs module https://github.com/puppetlabs/puppetlabs-firewall and the code that i included looks like:

    $hosts_count = $kafka_hosts.count
      
      package {'iptables-persistent':
      name => 'iptables-persistent',
      ensure => installed,
      }
      resources { 'firewall':
        purge => true,
      }
      
      $kafka_hosts.each | Integer $index,String $host | {
        firewall {"10${index} adding tcp rule kafka manager node ${index}":
          proto => 'tcp',
          dport => 9000,
          source => "${host}",
          destination => "${fqdn}",
          action => 'accept',
          }
      } 
      firewall {"10${hosts_count} droping rest of kafka manager calls":
        proto => 'tcp',
          dport => 9000,
          destination => "${fqdn}",
          action => 'drop',
    }

    This should be add rules in order to allow traffic on port 9000 only between the kafka hosts that have kafka manager installed.
    Cheers