java面试

1、讲下jvm类加载机制类加载器启动类加载器

这个类将器负责将存放在<JAVA_HOME>lib目录中的,或者被-Xbootclasspath参数所指定的路径中的,并且是虚拟机识别的(按照文件名识别,如rt.jar、tools.jar,名字不符合的类库即使放在lib目录中也不会被加载)类库加载到虚拟机内存中。

加载器是Java类加载层次中最顶层的类加载器,负责加载JDK中的核心类库,即将‘lib’目录下的类库加载到虚拟机内存中,用来加载java的核心库,此类加载器并不继承于java.lang.ClassLoader,不能被java程序直接调用,代码是使用C++编写的,是虚拟机自身的一部分。

扩展类加载器

这个加载器由sun.misc.Launcher$ExtClassLoader实现,该类加载器主要负责加载Java的扩展类库,默认加载JAVA_HOME/jre/lib/ext/目录下的所有jar包,当然也可以加载由java.ext.dirs系统属性指定的jar包,用来加载java的扩展库,开发者可以直接使用这个类加载器(其是AppClassLoader的父加载器,并且Java类加载器采用了委托机制)。

D:developSoftwarejdk1.7.0_67jrelibext

D:developSoftwarejdk1.8jrelibext

应用程序类加载器

这个类加载器由sun.misc.Launcher$AppClassLoader来实现。该类加载器负责在JVM启动时,加载用户类路径(CLASSPATH)下的类库,即来自在命令java中的classpath或者java.class.path系统属性或者CLASSPATH操作系统属性所指定的JAR类包和类路径。一般我们编写的java类都是由这个类加载器加载,这个类加载器是CLassLoader中的getSystemClassLoader()方法的返回值,所以也称为系统类加载器。一般情况下该类加载就是系统默认的类加载器。

自定义类加载器

1、如果不想打破双亲委派模型,那么只需要重写java.lang.ClassLoader#findClass方法即可

2、如果想打破双亲委派模型,那么就重写整个java.lang.ClassLoader#loadClass(java.lang.String, boolean)方法

当然,我们自定义的ClassLoader不想打破双亲委派模型,所以自定义的ClassLoader继承自java.lang.ClassLoader并且只重写findClass方法。

双亲委派模型

jvm对class文件采用的是按需加载的方式,当需要使用该类时,jvm才会将它的class文件加载到内存中产生class对象。在加载类的时候,是采用的 双亲委派机制 ,即把请求交给父类处理的一种任务委派模式

(1)如果一个类加载器接收到了类加载的请求,它自己不会先去加载,会把这个请求委托给父类加载器去执行。

(2)如果父类还存在父类加载器,则继续向上委托,一直委托到启动类加载器:Bootstrap ClassLoader

(3)如果父类加载器可以完成加载任务,就返回成功结果,如果父类加载失败,就由子类自己去尝试加载,如果子类加载失败就会抛出ClassNotFoundException异常,这就是双亲委派模式

为什么使用双亲委派机制?

1、防止重复加载同一个.class。通过委托去向上面问一问,加载过了,就不用再加载一遍。保证数据安全。

2、保证核心.class不能被篡改。通过委托方式,不会去篡改核心.clas,即使篡改也不会去加载,即使加载也不会是同一个.class对象了。不同的加载器加载同一个.class也不是同一个Class对象。这样保证了Class执行安全。

类加载的三种方式Tomcat为什么要破坏双亲委派模型

因为一个Tomcat可以部署N个web应用,但是每个web应用都有自己的classloader没权限使用网络资源,互不干扰。比如web1里面有com.test.A.class ,web2里面也有com.test.A.class ,如果没打破双亲委派模型的话,那么web1加载完后,web2在加载的话会冲突。因为只有一套classloader,却出现了两个重复的类路径,所以tomcat打破了,他是线程级别的,不同web应用是不同的classloader。

热部署的场景会破坏,否则实现不了热部署。

2、class.forName和classloader.loadClass区别

都可用来对类进行加载

class.forName

class.forName()前者除了将类的.class文件加载到jvm中之外,还会对类进行解释,执行类中的static块,静态方法。

java.lang.Class

   @CallerSensitive    public static Class forName(String className)                throws ClassNotFoundException {        Class caller = Reflection.getCallerClass();        return forName0(className, true, ClassLoader.getClassLoader(caller), caller);    }    private static native Class forName0(String name, boolean initialize,                                            ClassLoader loader,                                            Class caller)        throws ClassNotFoundException;

测试:

public class Test {    public static void main(String[] args) throws ClassNotFoundException {        //静态代码块执行 ,ClassLoaderTest给静态变量赋值的静态方法执行          Class.forName("com.zengqingfa.basic.classloader.ClassLoaderTest");    }}class ClassLoaderTest {    //静态代码块    static {        System.out.println("静态代码块执行");    }    //静态变量    public static String s = getString();    private static String getString() {        System.out.println("ClassLoaderTest给静态变量赋值的静态方法执行");        return "ss";    }    public static void test() {        System.out.println("ClassLoaderTest普通静态方法执行");    }    {        System.out.println("ClassLoaderTest普通的代码块");    }    public ClassLoaderTest() {        System.out.println("ClassLoaderTest 构造方法执行");    }}

结果为:

静态代码块执行ClassLoaderTest给静态变量赋值的静态方法执行

classloader.loadClass

将.class文件加载到jvm中,不会执行static中的内容,只有在newInstance才会去执行static块

java.lang.ClassLoader

    public Class loadClass(String name) throws ClassNotFoundException {        return loadClass(name, false);    }

测试:

//静态代码块不执行ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();systemClassLoader.loadClass("com.zengqingfa.basic.classloader.ClassLoaderTest");

3、讲下你做过哪些方面的jvm调优何时进行JVM调优

遇到以下情况,就需要考虑进行JVM调优了:

JVM调优目标

调优的最终目的都是为了令应用程序使用最小的硬件消耗来承载更大的吞吐。jvm调优主要是针对垃圾收集器的收集性能优化,令运行在虚拟机上的应用能够使用更少的内存以及延迟获取更大的吞吐量。

其中,任何一个属性性能的提高,几乎都是以牺牲其他属性性能的损为代价的,不可兼得。具体根据在业务中的重要性确定。

JVM调优量化目标

下面展示了一些JVM调优的量化目标参考实例:

注意:不同应用的JVM调优量化目标是不一样的。

JVM调优的步骤

一般情况下,JVM调优可通过以下步骤进行:

调优步骤1)调优方向2)确定目标

【低延迟】还是【高吞吐量】,选择合适的回收器

3)新生代调优4)老年代调优

以 CMS 为例

-XX:CMSInitiatingOccupancyFraction=percent

5)实战调优

参数设置

如果没有设置JVM堆空间大小,JVM会根据服务器物理内存大小设置默认堆大小的值。例如,在64位的服务器端,当物理内存小于192MB时,JVM堆大小默认选为物理内存的一半;当物理内存大192MB且小于128GB时,JVM堆大小默认选为物理内存的四分之一;当物理内存大于等于128GB时,都为32GB。通常情况下,Java应用程序的会通过参数指定堆大小

默认的,新生代 ( Young ) 与老年代 ( Old ) 的比例的值为 1:2 ( 该值可以通过参数 –XX:NewRatio 来指定 ),即:新生代 ( Young ) = 1/3 的堆空间大小。老年代 ( Old ) = 2/3 的堆空间大小。其中,新生代 ( Young ) 被细分为 Eden 和 两个 Survivor 区域,这两个 Survivor 区域分别被命名为 from 和 to,以示区分。

默认的,Edem : from : to = 8 : 1 : 1 ( 可以通过参数 –XX:SurvivorRatio 来设定 ),即:Eden = 8/10 的新生代空间大小,from = to = 1/10 的新生代空间大小。

4、设置新生代和老年代比例会带来什么影响

堆大小 = 新生代 + 老年代。其中,堆的大小可以通过参数 –Xms、-Xmx 来指定

年轻代(young 区)

从年轻代空间(包括Eden和Survivor 区域)回收内存被称为 Minor GC

空间太小可能导致对象直接进入 old区 。如果old区 满了,会触发full gc。但也不能过大,过大会引起回收耗时过长,导致应用阻塞。

老年代(old 区)

从老年代GC称为Major GC

空间过小会产生old区小碎片,放不下大对象,引起频繁full gc。如果用了缓存,old区也要适当大些,同时缓存不应无限增长

进入老年代的对象FullGC

执行 Minor GC(年轻代GC) 的时候,JVM 会检查老年代中最大连续可用空间是否大于了当前新生代所有对象的总大小,如果大于,则直接执行 Minor GC(年轻代GC)(这个时候执行是没有风险的)

如果小于,JVM 会检查是否开启了空间分配担保机制,如果没有开启则直接改为执行Full GC

如果开启担保机制,则 JVM 会检查老年代中最大连续可用空间是否大于历次晋升到老年代中的平均大小,如果小于则执行改为执行Full GC

如果大于则会执行 Minor GC(年轻代GC),如果 Minor GC(年轻代GC) 执行失败则会执行 Full GC

出现Full GC的时候经常伴随至少一次的Minor GC,但不绝对。Major GC的速度一般会比Minor GC慢10倍以上

内存溢出

老年代只有在新生代对象转入及创建大对象、大数组时才会出现空间不足的现象。当执行Full GC后空间仍然不足,则会抛出如下错误:

java.lang.OutOfMemoryError: Java heap space

full GC频繁出现的原因Full GC调优办法

1:让对象在Minor GC阶段被回收、让对象在新生代多存活一段时间及不要创建过大的对象及数组

2:年轻代小对象尽量多,大对象则尽可能直接进入老年代。年轻代由于使用标记复制算法进行回收内存,速度很快

3:Eden区如果没有足够的空间时会引发一次young区的GC,通过-XX:SurvivorRatio 进行调整 Eden 和 Survivor 比例大小。少量对象的存活,适合复制算法(年轻代),大量对象存活,适合标记清理或者标记压缩(年老代)。

调优建议

5、mysql哪些操作会带来表锁,如何避免

不同的存储引擎支持不同的锁机制。比如,MyISAM和MEMORY存储引擎采用的是表级锁(table-level locking);BDB (BerkeleyDB)存储引擎采用的是页面锁(page-level locking),但也支持表级锁;InnoDB存储引擎既支持行级锁(row-level locking),也支持表级锁,但默认情况下是采用行级锁。

MySQL这3种锁的特性可大致归纳如下。开销、加锁速度、死锁、粒度、并发性能

表锁情况如何避免

在了解InnoDB锁特性后,用户可以通过设计和SQL调整等措施减少锁冲突和死锁,

包括:

6、你说下乐观锁和悲观锁的区别6.1 乐观锁1)概念

乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人修改了数据则放弃操作,否则执行操作。

2)乐观锁的实现方式

主要有两种:CAS机制和版本号机制

a)CAS

CAS操作包括了3个操作数:

CAS操作逻辑如下:如果内存位置V的值等于预期的A值,则将该位置更新为新值B,否则不进行任何操作。许多CAS的操作是自旋的:如果操作不成功,会一直重试,直到操作成功为止。

这里引出一个新的问题,既然CAS包含了Compare和Swap两个操作,它又如何保证原子性呢?答案是:CAS是由CPU支持的原子操作,其原子性是在硬件层面进行保证的。

缺点:

b)版本号机制

在数据中增加一个字段version,表示该数据的版本号,每当数据被修改,版本号加1。当某个线程查询数据时,将该数据的版本号一起查出来;当该线程更新数据时,判断当前版本号与之前读取的版本号是否一致,如果一致才进行操作。

6.2 悲观锁

悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据。

实现方式是加锁,加锁既可以是对代码块加锁(如Java的synchronized关键字),也可以是对数据加锁(如MySQL中的排它锁 selelct * from t for update,增删改操作)。

6.3 如何选择悲观锁和乐观锁7、mysql做了哪些调优1)sql语句及索引索引生效

索引生效的情况有下面:

索引好处索引分类及建立

单列索引:最好是常用与检索、离散性高、长度适当的列

多列索引:建立合适的联合索引没权限使用网络资源,如果经常用多个单列索引进行检索,就需要考虑联合索引,联合索引的列顺序优先将选择性高的放前面

2)表设计优化3)参数优化

set @@session.sort_buffer_size := ;//执行语句set @@session.sort_buffer_size := DEFAULT

一般来说配置修改是不需要经常变动的,做优化都是先把表结构优化和sql索引优化完成先才考虑配置优化。配置修改要慎重

4)硬件优化

我们业务上常见场景

这种时候选取能力强劲的cpu,以及能够顶得住频繁交互的网络设备。

另一种常见的业务场景是数据仓库,用来做报表统计等功能,特点是

因此选择硬件对cpu要求比较低,但是硬盘容量要大,报表统计计算时间长,也许还可以做集群部署,将统计任务拆分为多个子任务进行并行统计。

5)特定sql语句的优化6)其他7)sql优化分析步骤8、synchronized是悲观锁还是乐观锁?为什么

悲观锁

Synchronized是从悲观的角度出发:

总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这

样别人想拿这个数据就会阻塞直到它拿到锁

(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。因此

Synchronized我们也将其称之为悲观锁。jdk中的ReentrantLock也是一种悲观锁。

9、说下concurrentHashMap是怎么保证并发安全的

jdk1.7 使用的是分段锁技术

最大并发度为Segement数组的长度

    static final class Segment extends ReentrantLock implements Serializable {              final V put(K key, int hash, V value, boolean onlyIfAbsent) {            HashEntry node = tryLock() ? null :                scanAndLockForPut(key, hash, value);            V oldValue;            try {                HashEntry[] tab = table;                int index = (tab.length - 1) & hash;                HashEntry first = entryAt(tab, index);                for (HashEntry e = first;;) {                    if (e != null) {                        K k;                        if ((k = e.key) == key ||                            (e.hash == hash && key.equals(k))) {                            oldValue = e.value;                            if (!onlyIfAbsent) {                                e.value = value;                                ++modCount;                            }                            break;                        }                        e = e.next;                    }                    else {                        if (node != null)                            node.setNext(first);                        else                            node = new HashEntry(hash, key, value, first);                        int c = count + 1;                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)                            rehash(node);                        else                            setEntryAt(tab, index, node);                        ++modCount;                        count = c;                        oldValue = null;                        break;                    }                }            } finally {                unlock();            }            return oldValue;        }                /**         * Scans for a node containing given key while trying to         * acquire lock, creating and returning one if not found. Upon         * return, guarantees that lock is held. UNlike in most         * methods, calls to method equals are not screened: Since         * traversal speed doesn't matter, we might as well help warm         * up the associated code and accesses as well.         *         * @return a new node if key not found, else null         */        private HashEntry scanAndLockForPut(K key, int hash, V value) {            HashEntry first = entryForHash(this, hash);            HashEntry e = first;            HashEntry node = null;            int retries = -1; // negative while locating node            while (!tryLock()) {                HashEntry f; // to recheck first below                if (retries < 0) {                    if (e == null) {                        if (node == null) // speculatively create node                            node = new HashEntry(hash, key, value, null);                        retries = 0;                    }                    else if (key.equals(e.key))                        retries = 0;                    else                        e = e.next;                }                else if (++retries > MAX_SCAN_RETRIES) {                    lock();                    break;                }                else if ((retries & 1) == 0 &&                         (f = entryForHash(this, hash)) != first) {                    e = first = f; // re-traverse if entry changed                    retries = -1;                }            }            return node;        }                /**         * Remove; match on key only if value null, else match both.         */        final V remove(Object key, int hash, Object value) {            if (!tryLock())                scanAndLock(key, hash);            V oldValue = null;            try {                HashEntry[] tab = table;                int index = (tab.length - 1) & hash;                HashEntry e = entryAt(tab, index);                HashEntry pred = null;                while (e != null) {                    K k;                    HashEntry next = e.next;                    if ((k = e.key) == key ||                        (e.hash == hash && key.equals(k))) {                        V v = e.value;                        if (value == null || value == v || value.equals(v)) {                            if (pred == null)                                setEntryAt(tab, index, next);                            else                                pred.setNext(next);                            ++modCount;                            --count;                            oldValue = v;                        }                        break;                    }                    pred = e;                    e = next;                }            } finally {                unlock();            }            return oldValue;        }   }

jdk1.8 使用synchronized+cas+violatile

锁的粒度变小了,写操作的并发性得到了极大的提升。

 /**     * Key-value entry.  This class is never exported out as a     * user-mutable Map.Entry (i.e., one supporting setValue; see     * MapEntry below), but can be used for read-only traversals used     * in bulk tasks.  Subclasses of Node with a negative hash field     * are special, and contain null keys and values (but are never     * exported).  Otherwise, keys and vals are never null.     */    static class Node implements Map.Entry {        final int hash;        final K key;        volatile V val; //volatile确保了val的内存可见性        volatile Node next;//volatile确保了next的内存可见性        ...    }

    /**     * Maps the specified key to the specified value in this table.     * Neither the key nor the value can be null.     *     * 

The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }

假设此时有2个put线程,都发现此时桶为空,线程一执行casTabAt(tab,i,null,node1),此时tab[i]等于预期值null,因此会插入node1。随后线程二执行casTabAt(tba,i,null,node2),此时tab[i]不等于预期值null,插入失败。然后线程二会回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑。

    final V putVal(K key, V value, boolean onlyIfAbsent) {        ...        for (Node[] tab = table;;) {            ...            //key定位到的hash桶为空            if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                //cas设置tab[i]的头结点。                if (casTabAt(tab, i, null,                             new Node(hash, key, value, null)))                    break;   //设置成功,跳出for循环                //设置失败,说明tab[i]已经被另一个线程修改了。回到for循环开始处,重新判断hash桶是否为空。如何往复,直到设置成功,或者hash桶不空。            }else{               synchronized (f) {                   //               }                            }        }        ...    }

10、redis的持久化方式1)rdb

默认的持久化方式,通过快照的方式,即在指定的时间间隔内多少个key发生变化就将内存中的数据集快照写入磁盘。

在一定的间隔时间内redis发生宕机,会存在间隔时间内的数据丢失,无法做到实时持久化

#  # Save the DB on disk:保存数据库到磁盘  #  #   save    #  #   如果指定的秒数和数据库写操作次数都满足了就将数据库保存。#  #   下面是保存操作的实例:#   900秒(15分钟)内至少1个key值改变(则进行数据库保存--持久化)  #   300秒(5分钟)内至少10个key值改变(则进行数据库保存--持久化)  #   60秒(1分钟)内至少10000个key值改变(则进行数据库保存--持久化)  #  #   注释:注释掉“save”这一行配置项就可以让保存数据库功能失效。#  #   你也可以通过增加一个只有一个空字符串的配置项(如下面的实例)来去掉前面的“save”配置。#  #   save ""    save 900 1  save 300 10  save 60 10000

触发快照的时机缺点2)aof

开启了AOF之后,RDB就默认不使用了

AOF 采用日志的形式来记录每个写操作,并追加到文件中。需要记录Redis的每个写命令,步骤为:命令追加(append)、文件写入(write)和文件同步(sync)

命令追加(append)

开启AOF持久化功能后,服务器每执行一个写命令,都会把该命令以协议格式先追加到aof_buf缓存区的末尾,而不是直接写入文件,避免每次有命令都直接写入硬盘,减少硬盘IO次数

文件写入(write)和文件同步(sync)

何时把aof_buf缓冲区的内容写入保存在AOF文件中,Redis提供了多种策略

appendfsync always:将aof_buf缓冲区的所有内容写入并同步到AOF文件,每个写命令同步写入磁盘

appendfsync everysec:将aof_buf缓存区的内容写入AOF文件,每秒同步一次,该操作由一个线程专门负责

appendfsync no:将aof_buf缓存区的内容写入AOF文件,什么时候同步由操作系统来决定

appendfsync选项的默认配置为everysec,即每秒执行一次同步

Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复 工作

重写(bgrewrite)

AOF持久化机制记录每个写命令,当服务重启的时候会复现AOF文件中的所有命令,会消耗太多的资源且重启很慢。因此为了避免AOF文件中的写命令太多文件太大,Redis引入了AOF的重写机制来压缩AOF文件体积。AOF文件重写是把Redis进程内的数据转化为写命令同步到新AOF文件的过程。

重写会根据重写策略或手动触发AOF重写。

重写流程:

使用

redis.conf配置文件参数

#开启AOF持久化appendonly yes #AOF文件名appendfilename "appendonly.aof" #AOF文件存储路径 与RDB是同一个参数dir "/opt/app/redis6/data" #AOF策略,一般都是选择第一种[always:每个命令都记录],[everysec:每秒记录一次],[no:看机器的心情高兴了就记录]appendfsync always#appendfsync everysec# appendfsync no  #aof文件大小比起上次重写时的大小,增长100%(配置可以大于100%)时,触发重写。[假如上次重写后大小为10MB,当AOF文件达到20MB时也会再次触发重写,以此类推]auto-aof-rewrite-percentage 100  #aof文件大小超过64MB时,触发重写auto-aof-rewrite-min-size 64mb

开启之后效果

客户端设置命令

127.0.0.1:6379> set k1 v1OK127.0.0.1:6379> set k2 v2OK127.0.0.1:6379> set k3 v3OK127.0.0.1:6379> bgsaveBackground saving started

查看aof文件

[root@dw etc]# lltotal 68-rw-r--r-- 1 root root   110 Jul 17 09:47 appendonly.aof-rw-r--r-- 1 root root   118 Jul 17 09:47 dump.rdb-rw-rw-r-- 1 root root 58350 Jul 17 09:45 redis.conf[root@dw etc]# cat appendonly.aof |grep k**2SELECT*3setk1v1*3setk2v2*3setk3v3

多次设置命令

127.0.0.1:6379> set k1 v2OK127.0.0.1:6379> set k1 v3OK127.0.0.1:6379> set k2 v1OK127.0.0.1:6379> set k2 v3OK127.0.0.1:6379> set k3 v1OK127.0.0.1:6379> set k3 v2OK127.0.0.1:6379> bgsaveBackground saving started

查看aop文件

[root@dw etc]# cat appendonly.aof*2SELECT*3setk1v1*3setk2v2*3setk3v3*3setk1v2*3setk1v3*3setk2v1*3setk2v3*3setk3v1*3setk3v2

查看当前key

127.0.0.1:6379> keys *1) "k2"2) "k3"3) "k1"

执行重写命令

127.0.0.1:6379> bgrewriteaofBackground append only file rewriting started

再次查看aof文件,文件体积已经缩小了

[root@dw etc]# lltotal 68-rw-r--r-- 1 root root   220 Jul 17 09:57 appendonly.aof-rw-r--r-- 1 root root   118 Jul 17 09:53 dump.rdb-rw-rw-r-- 1 root root 58350 Jul 17 09:45 redis.conf[root@dw etc]# lltotal 68-rw-r--r-- 1 root root   110 Jul 17 09:58 appendonly.aof-rw-r--r-- 1 root root   118 Jul 17 09:53 dump.rdb-rw-rw-r-- 1 root root 58350 Jul 17 09:45 redis.conf[root@dw etc]# cat appendonly.aof*2SELECT*3SETk2v3*3SETk3v2*3SETk1v3

3)redis 4.0支持的混合持久化(推荐使用)介绍

看了上面的RDB和AOF的介绍后,我们可以发现,使用RDB持久化会有数据丢失的风险,但是恢复速度快,而使用AOF持久化可以保证数据完整性,但恢复数据的时候会很慢。于是从Redis4之后新增了混合AOF和RDB的模式,先使用RDB进行快照存储,然后使用AOF持久化记录所有的写操作,当重写策略满足或手动触发重写的时候,将最新的数据存储为新的RDB记录。这样的话,重启服务的时候会从RDB何AOF两部分恢复数据,即保证了数据完整性,又提高了恢复的性能。

开启混合模式后,每当bgrewriteaof命令之后会在AOF文件中以RDB格式写入当前最新的数据,之后的新的写操作继续以AOF的追加形式追加写命令。当redis重启的时候,加载 aof 文件进行恢复数据:先加载 rdb 的部分再加载剩余的 aof部分。

配置及使用

/usr/local/redis/etc/redis.conf

# When rewriting the AOF file, Redis is able to use an RDB preamble in the# AOF file for faster rewrites and recoveries. When this option is turned# on the rewritten AOF file is composed of two different stanzas:##   [RDB file][AOF tail]## When loading Redis recognizes that the AOF file starts with the "REDIS"# string and loads the prefixed RDB file, and continues loading the AOF# tail.## This is currently turned off by default in order to avoid the surprise# of a format change, but will at some point be used as the default.aof-use-rdb-preamble yes

开启混合持久化模式后,重写之后的aof文件里和rdb一样存储二进制的 快照数据,继续往redis中进行写操作,后续操作在aof中仍然是以命令的方式追加。因此重写后aof文件由两部分组成,一部分是类似rdb的二进制快照,另一部分是追加的命令文本:

操作命令

127.0.0.1:6379> set k1 v1OK127.0.0.1:6379> set k1 v2OK127.0.0.1:6379> set k1 v3OK127.0.0.1:6379> bgrewriteaofBackground append only file rewriting started

查看aof文件

[root@dw etc]# cat appendonly.aof*2SELECT*3SETk2v3*3SETk3v2*3SETk1v3[root@dw etc]# cat appendonly.aofREDIS0008▒      redis-ver4.0.8▒redis-bits▒@▒ctime▒z=▒`used-mem▒X▒▒aof-preamble▒▒▒k1v3k3v2k2v3▒>▒▒=▒}▒▒

添加命令执行bgsave命令

127.0.0.1:6379> set k6 v6OK127.0.0.1:6379> set k7 v7OK127.0.0.1:6379> bgsaveBackground saving started

查看aof文件

[root@dw etc]# cat appendonly.aofREDIS0008▒      redis-ver4.0.8▒redis-bits▒@▒ctime▒N>▒`used-mem▒P▒▒aof-preamble▒▒▒k2v3k5v5k3v2k1v3k4v4▒J▒▒ߨ:*2SELECT*3setk6v6*3setk7v7

11、说下缓存穿透有哪几种场景

缓存穿透:缓存和数据库都不存在

大概可分为两类,一类是查询数据库不存在的记录,另外一类是缓存到期。正常情况下,查询不存在的数据记录时,请求直接打到数据库,并且因为没有数据,所有不会回填缓存。下次请求过来,还是重复这个过程,导致缓存穿透。

12、缓存穿透你们是怎么解决的1)缓存空对象

说在请求过来的时候,首先查询缓存,缓存里面没有数据,然后查询数据库,数据库里面也没有数据,这个时候需要回填缓存,把空值回填到缓存,这样的话,再有查询过来的时候就会命中缓存,解决缓存穿透问题。

由于空值并不是我们想要的业务数据,所以不能长时间的占用缓存,在回填空值的时候,设置一个比较短的过期时间。但是这种方法也有缺点,当大量请求数据都不存在的情况下,会占用很多缓存

2)布隆过滤器

是一种空间利用率较高的概率型数据结构,用来测试一个元素是否在集合中。但是存在一定可能,导致结果误判。即元素不在集合中,查询结果却返回元素在集合中。

当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不存在;如果都是1,则被检元素很可能存在

缺点13、在key失效后,穿透到DB,你怎么解决缓存击穿(某个热点key)

是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力

解决缓存雪崩(大量key同时失效)

Redis中大量的key几乎同时过期,然后大量并发查询穿过redis击打到底层数据库上,此时数据库层的负载压力会骤增

解决

14、如何保证缓存数据库数据一致性1)写时更新

当我们往数据库写数据的时候我们去更新缓存,包括先更新缓存再更新数据库和先更新数据库再更新缓存。

存在问题

存在A(缓存)B(缓存)B(数据库)A(数据库)这种情况,会存在脏数据

2)写时删除,读时更新

当我们往数据库写数据的时候我们直接删除缓存,然后其他请求读数据的时候更新缓存。包括先删除缓存再更新数据和先更新数据库再删除缓存。

先删除缓存在更新数据库先更新数据库再删除缓存3)其他15、spring boot自动装配是怎么实现的

SpringBoot 定义了一套接口规范,这套规范规定:SpringBoot 在启动时会扫描外部引用 jar 包中的META-INF/spring.factories文件,将文件中配置的类型信息加载到 Spring 容器(此处涉及到 JVM 类加载机制与 Spring 的容器知识),并执行类中定义的各种操作。对于外部 jar 来说,只需要按照 SpringBoot 定义的标准,就能将自己的功能装载进 SpringBoot。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@SpringBootConfiguration@EnableAutoConfiguration@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })public @interface SpringBootApplication {    ... }

注解1:org.springframework.boot.SpringBootConfiguration

允许在上下文中注册额外的 bean 或导入其他配置类

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Configurationpublic @interface SpringBootConfiguration {    ...}

注解2:org.springframework.context.annotation.ComponentScan

扫描被@Component (@Service,@Controller)注解的 bean,注解默认会扫描启动类所在的包下所有的类 ,可以自定义不扫描某些 bean。如下图所示,容器中将排除TypeExcludeFilter和AutoConfigurationExcludeFilter

@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Repeatable(ComponentScans.class)public @interface ComponentScan {    ...}

注解3:org.springframework.boot.autoconfigure.EnableAutoConfiguration

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@AutoConfigurationPackage@Import(AutoConfigurationImportSelector.class)public @interface EnableAutoConfiguration {    ...}

注解31:org.springframework.boot.autoconfigure.AutoConfigurationPackage

@Import(AutoConfigurationPackages.Registrar.class)

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(AutoConfigurationPackages.Registrar.class)public @interface AutoConfigurationPackage {    ...    }

注解32:@Import(AutoConfigurationImportSelector.class)

org.springframework.boot.autoconfigure.AutoConfigurationImportSelector

   public String[] selectImports(AnnotationMetadata annotationMetadata) {        if (!this.isEnabled(annotationMetadata)) {            return NO_IMPORTS;        } else {            AutoConfigurationImportSelector.AutoConfigurationEntry autoConfigurationEntry = this.getAutoConfigurationEntry(annotationMetadata);            return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());        }    }

获取需要自动装配的所有配置类,读取META-INF/spring.factories

org.springframework.core.io.support.SpringFactoriesLoader#loadSpringFactories

/** * Load the fully qualified class names of factory implementations of the * given type from {@value #FACTORIES_RESOURCE_LOCATION}, using the given * class loader. * @param factoryType the interface or abstract class representing the factory * @param classLoader the ClassLoader to use for loading resources; can be * {@code null} to use the default * @throws IllegalArgumentException if an error occurs while loading factory names * @see #loadFactories */public static List loadFactoryNames(Class factoryType, @Nullable ClassLoader classLoader) {String factoryTypeName = factoryType.getName();return loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());}private static Map<String, List> loadSpringFactories(@Nullable ClassLoader classLoader) {MultiValueMap result = cache.get(classLoader);if (result != null) {return result;}try {Enumeration urls = (classLoader != null ?classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));result = new LinkedMultiValueMap();while (urls.hasMoreElements()) {URL url = urls.nextElement();UrlResource resource = new UrlResource(url);Properties properties = PropertiesLoaderUtils.loadProperties(resource);for (Map.Entry entry : properties.entrySet()) {String factoryTypeName = ((String) entry.getKey()).trim();for (String factoryImplementationName : StringUtils.commaDelimitedListToStringArray((String) entry.getValue())) {result.add(factoryTypeName, factoryImplementationName.trim());}}}cache.put(classLoader, result);return result;}catch (IOException ex) {throw new IllegalArgumentException("Unable to load factories from location [" +FACTORIES_RESOURCE_LOCATION + "]", ex);}}

不光是这个依赖下的META-INF/spring.factories被读取到,所有 Spring Boot Starter 下的META-INF/spring.factories都会被读取到。

并不是所有的自动配置类都会生效,@ConditionalOnXXX 中的所有条件都满足,该类才会生效。

16、如果要你写一个启动器,你觉得应该从那些点去实现1)引入 SpringBoot 自动化配置依赖

  org.springframework.boot  spring-boot-autoconfigure  1.5.9.RELEASE

2)创建自动化配置类

这个相当于就是一个普通的 Java 配置类,可以在这里创建 Bean,并可获得与 application.properties 属性文件相对应的属性类的 Bean。

// 相当于一个普通的 java 配置类@Configuration// 当 HelloworldService 在类路径的条件下@ConditionalOnClass({HelloworldService.class})// 将 application.properties 的相关的属性字段与该类一一对应,并生成 Bean@EnableConfigurationProperties(HelloworldProperties.class)public class HelloworldAutoConfiguration {  // 注入属性类  @Autowired  private HelloworldProperties hellowordProperties;  @Bean  // 当容器没有这个 Bean 的时候才创建这个 Bean  @ConditionalOnMissingBean(HelloworldService.class)  public HelloworldService helloworldService() {    HelloworldService helloworldService = new HelloworldService();    helloworldService.setWords(hellowordProperties.getWords());    return helloworldService;  }}

3)在 META-INF 目录下创建 spring.factories

SpringBoot 自动化配置最终就是要扫描 META-INF/spring.factories 来加载项目的自动化配置类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xxx.starters.helloworld.HelloworldAutoConfiguration

17、动态代理模式和抽象工厂模式的区别1)动态代理

提供了对目标对象额外的访问方式,即通过代理对象访问目标对象,这样可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标对象的功能。

2)抽象工厂

抽象工厂模式提供了一种方式,可以将一组具有同一主题的单独的工厂封装起来。在正常使用中,客户端程序需要创建抽象工厂的具体实现,然后使用抽象工厂作为接口来创建这一主题的具体对象。

18、策略模式在你们实际开发中是用到了哪个场景?

仓库审核审批通过回调

19、rpc框架异步,你怎么去实现,异步有哪些好处?实现1.Future/Promise

比较常用的有 JDK8 之前的 Future,通过添加 Listener 来做异步回调,JDK8 之后通常使用 CompletableFuture,它支持各种复杂的异步处理策略,例如自定义线程池、多个异步操作的编排、有返回值和无返回值异步、多个异步操作的级联操作等。

2.线程池 +RxJava:

最经典的实现就是 Netflix 开源的 Hystrix 框架,使用 HystrixCommand(创建线程池)做一层异步封装,将同步调用封装成异步调用,利用 RxJava API,通过订阅的方式对结果做异步处理

好处

1、异步流程可以立即给调用方返回初步的结果。

2、异步流程可以延迟给调用方最终的结果数据,在此期间可以做更多额外的工作,例如结果记录等等。

3、异步流程在执行的过程中,可以释放占用的线程等资源,避免阻塞,等到结果产生再重新获取线程处理。

4、异步流程可以等多次调用的结果出来后,再统一返回一次结果集合,提高响应效率。

20、你熟悉哪些脚本语言?21、怎么查看线程的堆栈信息?用哪个命令。

通过使用jps 命令获取需要监控的进程的pid,然后使用jstack pid 命令查看线程的堆栈信息。通过jstack 命令可以获取当前进程的所有线程信息。每个线程堆中信息中,都可以查看到线程ID、线程的状态(wait、sleep、running 等状态)、是否持有锁信息等。

22、如何排查线上死锁问题?23、jdump命令是用来干嘛的?

Java虚拟机的运行时快照。将Java虚拟机运行时的状态和信息保存到文件。

堆Dump,包含线程Dump,并包含所有堆对象的状态。二进制格式。

线程Dump,包含所有线程的运行状态。纯文本格式。线程栈是瞬时记录,一般都需要结合程序的日志进行跟踪问题。

24、你们这份工作最大的挑战在哪里?

答:高并发吧,我们这边每天的单量在几百万,所以对性能要求会挺高的(截取片段)

25、你做过的最满意的项目是啥?

库存中心

26、消息队列怎么保证不丢失?怎么保证顺序为何消息会丢失?

要想保证消息只被消费一次,那么首先就得要保证消息不丢失。我们先来看看,消息从被写入消息队列,到被消费完成,这整个链路上会有哪些地方可能会导致消息丢失?我们不难看出,其实主要有三个地方:

保证不丢失生产阶段

消息队列通常使用确认机制,来保证消息可靠传递:当你代码调用发送消息的方法,消息队列的客户端会把消息发送到Broker,Broker接受到消息会返回客户端一个确认。只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送的确认响应后,会自动重试,如果重试再失败,就会一返回值或者异常方式返回给客户端。所以在编写发送消息的代码,需要正确处理消息发送返回值或者异常,保证这个阶段消息不丢失。

存储阶段

如果对消息可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。对于单个节点Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘再给Producer返回确认响应。如果是Broker集群,需要将Broker集群配置成:至少两个以上节点收到消息,再给客户端发送确认响应。同步双写

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递。Consumer收到消息后,需在执行消费逻辑后在发送确认消息。

保证顺序1)保证消息发送到同一个队列

org.apache.rocketmq.client.producer.MessageQueueSelector

public interface MessageQueueSelector {        MessageQueue select(final List mqs, final Message msg, final Object arg);}

mqs:该Topic下所有可选的MessageQueue

msg:待发送的消息

arg:发送消息时传递的参数 可以实现MessageQueueSelector接口,在select方法中自定义选择哪个MessageQueue

public SendResult send(Message msg, MessageQueueSelector selector, Object arg)       throws MQClientException, RemotingException, MQBrokerException, InterruptedException {       return this.defaultMQProducerImpl.send(msg, selector, arg);}

举例:选择算法

public class OrderMessageQueueSelector implements MessageQueueSelector{   @Override   public MessageQueue select(List mqs, Message msg, Object arg) {       //选择以参数arg为索引的MessageQueue       Integer id = (Integer) arg;       int index = id % mqs.size();       return mqs.get(index);   }}

发送消息

@Slf4j@Servicepublic class OrderMessageProducer {   @Value("${spring.rocketmq.namesrvAddr}")   private String namesrvAddr;   private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");   private static final String[] ORDER_MESSAGES = {"下单","结算","支付","完成"};   @PostConstruct   public void sendMessage() {       try {           //设置namesrv           producer.setNamesrvAddr(namesrvAddr);           //启动Producer           producer.start();           System.err.println("Order Message Producer Start...");           //创建3组消息,每组消息发往同一个Queue,保证消息的局部有序性           String tags = "Tags";           OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();           //注:要实现顺序消费,必须同步发送消息           for (int i = 0;i < 3;i++){               String orderId = "" + (i + 1);               for (int j = 0,size = ORDER_MESSAGES.length;j < size;j++){                   String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];                   String keys = message;                   byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);                   Message mqMsg = new Message(RocketMQConstant.TEST_TOPIC_NAME, tags, keys, messageBody);                   producer.send(mqMsg, orderMessageQueueSelector,i);               }           }       } catch (Exception e) {           log.error("Message Producer: Send Message Error ", e);       }   }}

使用DefaultMQProducer的send()方法,指定MessageQueueSelector和参数,Broker将会将逻辑上需要保证顺序性的消息发往同一队列。

注:想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!

2)消费端使用顺序消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService

/** * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one * thread */public interface MessageListenerOrderly extends MessageListener {    /**     * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT     * if consumption failure     *     * @param msgs msgs.size() >= 1
DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here * @return The consume status */ ConsumeOrderlyStatus consumeMessage(final List msgs, final ConsumeOrderlyContext context);}

27、假如有一个表,有a,b两个字段,我怎么保证我查a,查b, 查 a b都可以走索引,这个你怎么建索引

以最左前缀原则匹配:

建立b索引+建立ab索引

28、你用的redis是哪个版本的?

内存型数据库

1. 版本特性演化 2.x -> 3.x -> 4.x -> 5.x (只选核心重点提示)

版本

特性(部分关键点)

2.6.x

1.支持lua脚本

2.支持新命令dump以及restore

3.内存优化及存储优

4.提供了BITCOUNT与BITOP,前者支持位值count,后者支持了位操作

2.8.x

1.增量主从复制。

2.哨兵模式生产可用

3.可以通过config set命令设置maxclients。

4.config rewrite命令可以将config set持久化到Redis配置文件中。

本文到此结束,希望对大家有所帮助!

关于作者:

生活百科常识网